Files
seaweedfs/weed/storage/volume_loading.go
Chris Lu 654292b57d fix(volume): cap leveldb OpenFilesCacheCapacity per index DB (#9139) (#9223)
* fix(volume): cap leveldb OpenFilesCacheCapacity per index DB (#9139)

The leveldb opt.Options for NeedleMapLevelDb / Medium / Large never
set OpenFilesCacheCapacity, so each leveldb instance defaulted to
goleveldb's 500. On servers with thousands of volumes, that ceiling
stacks across DBs and exhausts even high ulimits, starving WAL
rotation:

  failed to write leveldb: open .../000006.log: too many open files

CompactionTableSizeMultiplier=10 already keeps the SST count low,
so a small per-DB cache is sufficient. Cap at 16 / 32 / 64 for the
small / medium / large variants so per-DB FD usage is bounded.

* storage: hoist leveldb FD-cap values into named constants

Per review feedback: replace the inline 16/32/64 literals with
LevelDb{,Medium,Large}OpenFilesCacheCapacity, and move the rationale
(why 500 is too high per-DB on busy servers, what the tradeoff is)
into a package-level comment so future readers see the memory vs.
performance picture at the constant declaration instead of inline.
2026-04-26 02:15:15 -07:00

298 lines
12 KiB
Go

package storage
import (
"fmt"
"os"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// Per-DB caps on goleveldb's open SST file cache. The library default is 500
// per DB, but a volume server hosts one DB per volume — easily thousands —
// so the per-DB default sums into FD exhaustion (`open .../00000N.log: too
// many open files`) even with generous ulimits, especially when leveldb is
// rotating its WAL.
//
// The trade-off: a larger cache lowers re-open overhead on cold reads, a
// smaller cache bounds total FD usage. CompactionTableSizeMultiplier=10
// already keeps SST counts low (~10x larger SSTs => ~10x fewer files), so
// even the small-volume cap is enough to keep the working set hot while
// leaving headroom for thousands of co-resident DBs.
const (
LevelDbOpenFilesCacheCapacity = 16
LevelDbMediumOpenFilesCacheCapacity = 32
LevelDbLargeOpenFilesCacheCapacity = 64
)
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ver needle.Version) (v *Volume, err error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = super_block.SuperBlock{}
v.needleMapKind = needleMapKind
err = v.load(false, false, needleMapKind, 0, ver)
return
}
func loadVolumeWithoutWorker(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ldbTimeout int64) (v *Volume, err error) {
v = &Volume{
dir: dirname,
dirIdx: dirIdx,
Collection: collection,
Id: id,
needleMapKind: needleMapKind,
ldbTimeout: ldbTimeout,
}
v.SuperBlock = super_block.SuperBlock{}
err = v.load(true, false, needleMapKind, 0, needle.GetCurrentVersion())
return
}
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64, ver needle.Version) (err error) {
alreadyHasSuperBlock := false
hasLoadedVolume := false
defer func() {
if !hasLoadedVolume {
if v.nm != nil {
v.nm.Close()
v.nm = nil
}
if v.DataBackend != nil {
v.DataBackend.Close()
v.DataBackend = nil
}
}
}()
hasVolumeInfoFile := v.maybeLoadVolumeInfo()
if v.volumeInfo.ReadOnly && !v.HasRemoteFile() {
// this covers the case where the volume is marked as read-only and has no remote file
v.noWriteOrDelete = true
}
if v.HasRemoteFile() {
v.noWriteCanDelete = true
v.noWriteOrDelete = false
glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo)
if err := v.LoadRemoteFile(); err != nil {
return fmt.Errorf("load remote file %v: %w", v.volumeInfo, err)
}
// Set lastModifiedTsSeconds from remote file to prevent premature expiry on startup
if len(v.volumeInfo.GetFiles()) > 0 {
remoteFileModifiedTime := v.volumeInfo.GetFiles()[0].GetModifiedTime()
if remoteFileModifiedTime > 0 {
v.lastModifiedTsSeconds = remoteFileModifiedTime
} else {
// Fallback: use .vif file's modification time
if exists, _, _, modifiedTime, _ := util.CheckFile(v.FileName(".vif")); exists {
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
}
}
glog.V(1).Infof("volume %d remote file lastModifiedTsSeconds set to %d", v.Id, v.lastModifiedTsSeconds)
}
alreadyHasSuperBlock = true
} else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
// open dat file
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s", v.FileName(".dat"))
}
var dataFile *os.File
if canWrite {
dataFile, err = os.OpenFile(v.FileName(".dat"), os.O_RDWR|os.O_CREATE, 0644)
} else {
glog.V(0).Infof("opening %s in READONLY mode", v.FileName(".dat"))
dataFile, err = os.Open(v.FileName(".dat"))
v.noWriteOrDelete = true
}
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
if fileSize >= super_block.SuperBlockSize {
alreadyHasSuperBlock = true
}
v.DataBackend = backend.NewDiskFile(dataFile)
} else {
if createDatIfMissing {
v.DataBackend, err = backend.CreateVolumeFile(v.FileName(".dat"), preallocate, v.MemoryMapMaxSizeMb)
} else {
return fmt.Errorf("volume data file %s does not exist", v.FileName(".dat"))
}
}
if err != nil {
if !os.IsPermission(err) {
return fmt.Errorf("cannot load volume data %s: %v", v.FileName(".dat"), err)
} else {
return fmt.Errorf("load data file %s: %v", v.FileName(".dat"), err)
}
}
if alreadyHasSuperBlock {
err = v.readSuperBlock()
if err == nil {
if !needle.IsSupportedVersion(v.SuperBlock.Version) {
glog.Fatalf("Unsupported volume %d version %v", v.Id, v.SuperBlock.Version)
}
v.volumeInfo.Version = uint32(v.SuperBlock.Version)
}
glog.V(2).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version)
if v.HasRemoteFile() {
// maybe temporary network problem
glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err)
err = nil
}
} else {
if !v.SuperBlock.Initialized() {
return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
}
err = v.maybeWriteSuperBlock(ver)
}
if err == nil && alsoLoadIndex {
// adjust for existing volumes with .idx together with .dat files
if v.dirIdx != v.dir {
if util.FileExists(v.DataFileName() + ".idx") {
v.dirIdx = v.dir
}
}
// check volume idx files
if err := v.checkIdxFile(); err != nil {
glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err)
}
var indexFile *os.File
if v.noWriteOrDelete {
glog.V(0).Infoln("open to read file", v.FileName(".idx"))
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil {
return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err)
}
} else {
glog.V(1).Infoln("open to write file", v.FileName(".idx"))
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err)
}
}
// Do not need to check the data integrity for remote volumes,
// since the remote storage tier may have larger capacity, the volume
// data read will trigger the ReadAt() function to read from the remote
// storage tier, and download to local storage, which may cause the
// capactiy overloading.
if !v.HasRemoteFile() {
glog.V(2).Infof("checking volume data integrity for volume %d", v.Id)
if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
v.noWriteOrDelete = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
}
}
// The post-load structural check below uses the in-memory needle map
// to verify that no .idx entry references bytes past the end of .dat
// (issue #8928). The check piggybacks on MaxNeedleEnd, which the load
// walks below populate without a second linear scan.
if v.noWriteOrDelete || v.noWriteCanDelete {
if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile, v.Version()); err != nil {
glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
}
} else {
switch needleMapKind {
case NeedleMapInMemory:
if v.tmpNm != nil {
glog.V(2).Infof("updating memory compact index %s ", v.FileName(".idx"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil, 0)
} else {
glog.V(2).Infoln("loading memory index", v.FileName(".idx"), "to memory")
if v.nm, err = LoadCompactNeedleMap(indexFile, v.Version()); err != nil {
glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err)
}
}
case NeedleMapLevelDb:
opts := &opt.Options{
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
OpenFilesCacheCapacity: LevelDbOpenFilesCacheCapacity, // see package-level docs
}
if v.tmpNm != nil {
glog.V(0).Infoln("updating leveldb index", v.FileName(".ldb"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
} else {
glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb"))
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
}
case NeedleMapLevelDbMedium:
opts := &opt.Options{
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
OpenFilesCacheCapacity: LevelDbMediumOpenFilesCacheCapacity, // see package-level docs
}
if v.tmpNm != nil {
glog.V(0).Infoln("updating leveldb medium index", v.FileName(".ldb"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
} else {
glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb"))
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
}
case NeedleMapLevelDbLarge:
opts := &opt.Options{
BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
OpenFilesCacheCapacity: LevelDbLargeOpenFilesCacheCapacity, // see package-level docs
}
if v.tmpNm != nil {
glog.V(0).Infoln("updating leveldb large index", v.FileName(".ldb"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
} else {
glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb"))
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
}
}
}
// Structural check: no .idx entry may reference bytes past the end of
// the .dat. The needle map's load walk above already populated
// MaximumNeedleEnd, so this is just a numeric comparison — no extra
// disk I/O. A violation marks the volume read-only so a corrupt
// .idx left over from a crashed batched write does not silently
// power vacuum to drop reachable data. See issue #8928.
if !v.HasRemoteFile() && v.nm != nil && v.DataBackend != nil {
if datSize, _, statErr := v.DataBackend.GetStat(); statErr == nil && datSize > 0 {
if maxEnd := v.nm.MaxNeedleEnd(); maxEnd > datSize {
v.noWriteOrDelete = true
glog.V(0).Infof("volume %d: idx references end=%d but .dat is %d bytes; marking readonly",
v.Id, maxEnd, datSize)
}
}
}
}
if !hasVolumeInfoFile {
v.volumeInfo.Version = uint32(v.SuperBlock.Version)
v.volumeInfo.BytesOffset = uint32(types.OffsetSize)
if err := v.SaveVolumeInfo(); err != nil {
glog.Warningf("volume %d failed to save file info: %v", v.Id, err)
}
}
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Inc()
if err == nil {
hasLoadedVolume = true
}
return err
}