Files
seaweedfs/weed/storage/volume_loading.go
Chris Lu 7c252e1f16 fix(volume): reopen .idx writable after MarkVolumeWritable (fixes #9515) (#9526)
* fix(volume): reopen .idx writable after MarkVolumeWritable

When .vif has ReadOnly=true, load() opens .idx as O_RDONLY and builds a
SortedFileNeedleMap whose Put returns os.ErrInvalid. MarkVolumeWritable
only flipped noWriteOrDelete back to false and rewrote .vif, so writes
still failed at v.nm.Put. Reopen .idx in O_RDWR and rebuild v.nm in its
writable form (in-memory or leveldb small/medium/large) before flipping
the flag.

Mirror the same fix in seaweed-volume: the Rust load path leaves
CompactNeedleMap/RedbNeedleMap with no idx_file writer when the volume
boots read-only, so post-MarkVolumeWritable puts silently succeeded
in-memory only and were lost on the next restart. set_writable now
reattaches an append-mode writer when one is missing.

* fix(volume): keep old needle map until replacement is built; defer writable flag

Go: build the writable needle map into a local before swapping. A
construction failure now leaves v.nm pointing at the original
SortedFileNeedleMap so MarkVolumeWritable can roll back, instead of
stranding the volume with v.nm == nil.

Rust: attach the .idx writer before flipping no_write_or_delete to
false. A transient open/metadata failure used to leave the volume
marked writable with no writer attached, and subsequent puts would
silently skip the on-disk append.
2026-05-18 20:51:04 -07:00

382 lines
15 KiB
Go

package storage
import (
"fmt"
"os"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// Per-DB caps on goleveldb's open SST file cache. The library default is 500
// per DB, but a volume server hosts one DB per volume — easily thousands —
// so the per-DB default sums into FD exhaustion (`open .../00000N.log: too
// many open files`) even with generous ulimits, especially when leveldb is
// rotating its WAL.
//
// The trade-off: a larger cache lowers re-open overhead on cold reads, a
// smaller cache bounds total FD usage. CompactionTableSizeMultiplier=10
// already keeps SST counts low (~10x larger SSTs => ~10x fewer files), so
// even the small-volume cap is enough to keep the working set hot while
// leaving headroom for thousands of co-resident DBs.
const (
LevelDbOpenFilesCacheCapacity = 16
LevelDbMediumOpenFilesCacheCapacity = 32
LevelDbLargeOpenFilesCacheCapacity = 64
)
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ver needle.Version) (v *Volume, err error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = super_block.SuperBlock{}
v.needleMapKind = needleMapKind
err = v.load(false, false, needleMapKind, 0, ver)
return
}
func loadVolumeWithoutWorker(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ldbTimeout int64) (v *Volume, err error) {
v = &Volume{
dir: dirname,
dirIdx: dirIdx,
Collection: collection,
Id: id,
needleMapKind: needleMapKind,
ldbTimeout: ldbTimeout,
}
v.SuperBlock = super_block.SuperBlock{}
err = v.load(true, false, needleMapKind, 0, needle.GetCurrentVersion())
return
}
// reopenIdxForWrite swaps the read-only SortedFileNeedleMap (loaded when the
// volume booted with .vif ReadOnly=true) for the writable needle map matching
// v.needleMapKind. Without this, MarkVolumeWritable flips noWriteOrDelete back
// to false but leaves .idx opened O_RDONLY and v.nm as a SortedFileNeedleMap
// whose Put returns os.ErrInvalid, so subsequent writes still fail.
//
// No-op when v.nm is already a writable form.
func (v *Volume) reopenIdxForWrite() error {
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
oldNm, isSorted := v.nm.(*SortedFileNeedleMap)
if !isSorted {
return nil
}
indexFile, err := os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return fmt.Errorf("reopen %s read-write: %v", v.FileName(".idx"), err)
}
// Build the replacement first; only swap once we have a live writable
// map. A construction failure must leave v.nm pointing at the original
// SortedFileNeedleMap so the caller (MarkVolumeWritable) can roll back
// cleanly instead of stranding the volume with v.nm == nil.
var newNm NeedleMapper
switch v.needleMapKind {
case NeedleMapInMemory:
if newNm, err = LoadCompactNeedleMap(indexFile, v.Version()); err != nil {
indexFile.Close()
return fmt.Errorf("rebuild memory needle map for volume %d: %v", v.Id, err)
}
case NeedleMapLevelDb:
opts := &opt.Options{
BlockCacheCapacity: 2 * 1024 * 1024,
WriteBuffer: 1 * 1024 * 1024,
CompactionTableSizeMultiplier: 10,
OpenFilesCacheCapacity: LevelDbOpenFilesCacheCapacity,
}
if newNm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
indexFile.Close()
return fmt.Errorf("rebuild leveldb needle map for volume %d: %v", v.Id, err)
}
case NeedleMapLevelDbMedium:
opts := &opt.Options{
BlockCacheCapacity: 4 * 1024 * 1024,
WriteBuffer: 2 * 1024 * 1024,
CompactionTableSizeMultiplier: 10,
OpenFilesCacheCapacity: LevelDbMediumOpenFilesCacheCapacity,
}
if newNm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
indexFile.Close()
return fmt.Errorf("rebuild leveldb medium needle map for volume %d: %v", v.Id, err)
}
case NeedleMapLevelDbLarge:
opts := &opt.Options{
BlockCacheCapacity: 8 * 1024 * 1024,
WriteBuffer: 4 * 1024 * 1024,
CompactionTableSizeMultiplier: 10,
OpenFilesCacheCapacity: LevelDbLargeOpenFilesCacheCapacity,
}
if newNm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
indexFile.Close()
return fmt.Errorf("rebuild leveldb large needle map for volume %d: %v", v.Id, err)
}
default:
indexFile.Close()
return fmt.Errorf("unsupported needle map kind %v for volume %d", v.needleMapKind, v.Id)
}
if err := oldNm.Sync(); err != nil {
glog.Warningf("volume %d: sync sorted needle map before reopen: %v", v.Id, err)
}
oldNm.Close() // closes the O_RDONLY .idx handle held inside
v.nm = newNm
return nil
}
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64, ver needle.Version) (err error) {
alreadyHasSuperBlock := false
hasLoadedVolume := false
defer func() {
if !hasLoadedVolume {
if v.nm != nil {
v.nm.Close()
v.nm = nil
}
if v.DataBackend != nil {
v.DataBackend.Close()
v.DataBackend = nil
}
}
}()
hasVolumeInfoFile := v.maybeLoadVolumeInfo()
if v.volumeInfo.ReadOnly && !v.HasRemoteFile() {
// this covers the case where the volume is marked as read-only and has no remote file
v.noWriteOrDelete = true
}
if v.HasRemoteFile() {
v.noWriteCanDelete = true
v.noWriteOrDelete = false
glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo)
if err := v.LoadRemoteFile(); err != nil {
return fmt.Errorf("load remote file %v: %w", v.volumeInfo, err)
}
// Set lastModifiedTsSeconds from remote file to prevent premature expiry on startup
if len(v.volumeInfo.GetFiles()) > 0 {
remoteFileModifiedTime := v.volumeInfo.GetFiles()[0].GetModifiedTime()
if remoteFileModifiedTime > 0 {
v.lastModifiedTsSeconds = remoteFileModifiedTime
} else {
// Fallback: use .vif file's modification time
if exists, _, _, modifiedTime, _ := util.CheckFile(v.FileName(".vif")); exists {
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
}
}
glog.V(1).Infof("volume %d remote file lastModifiedTsSeconds set to %d", v.Id, v.lastModifiedTsSeconds)
}
alreadyHasSuperBlock = true
} else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
// open dat file
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s", v.FileName(".dat"))
}
var dataFile *os.File
if canWrite {
dataFile, err = os.OpenFile(v.FileName(".dat"), os.O_RDWR|os.O_CREATE, 0644)
} else {
glog.V(0).Infof("opening %s in READONLY mode", v.FileName(".dat"))
dataFile, err = os.Open(v.FileName(".dat"))
v.noWriteOrDelete = true
}
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
if fileSize >= super_block.SuperBlockSize {
alreadyHasSuperBlock = true
}
v.DataBackend = backend.NewDiskFile(dataFile)
} else {
if createDatIfMissing {
v.DataBackend, err = backend.CreateVolumeFile(v.FileName(".dat"), preallocate, v.MemoryMapMaxSizeMb)
} else {
return fmt.Errorf("volume data file %s does not exist", v.FileName(".dat"))
}
}
if err != nil {
if !os.IsPermission(err) {
return fmt.Errorf("cannot load volume data %s: %v", v.FileName(".dat"), err)
} else {
return fmt.Errorf("load data file %s: %v", v.FileName(".dat"), err)
}
}
if alreadyHasSuperBlock {
err = v.readSuperBlock()
if err == nil {
if !needle.IsSupportedVersion(v.SuperBlock.Version) {
glog.Fatalf("Unsupported volume %d version %v", v.Id, v.SuperBlock.Version)
}
v.volumeInfo.Version = uint32(v.SuperBlock.Version)
}
glog.V(2).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version)
if v.HasRemoteFile() {
// maybe temporary network problem
glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err)
err = nil
}
} else {
if !v.SuperBlock.Initialized() {
return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
}
err = v.maybeWriteSuperBlock(ver)
}
if err == nil && alsoLoadIndex {
// adjust for existing volumes with .idx together with .dat files
if v.dirIdx != v.dir {
if util.FileExists(v.DataFileName() + ".idx") {
v.dirIdx = v.dir
}
}
// check volume idx files
if err := v.checkIdxFile(); err != nil {
// A remote-tiered volume with a stray .vif but no .idx must not
// take the whole server down; skip just this volume.
if v.HasRemoteFile() {
glog.Errorf("skip remote volume %d (idx: %s): %v", v.Id, v.FileName(".idx"), err)
return fmt.Errorf("check volume idx file %s: %w", v.FileName(".idx"), err)
}
glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err)
}
var indexFile *os.File
if v.noWriteOrDelete {
glog.V(0).Infoln("open to read file", v.FileName(".idx"))
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil {
return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err)
}
} else {
glog.V(1).Infoln("open to write file", v.FileName(".idx"))
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err)
}
}
// Do not need to check the data integrity for remote volumes,
// since the remote storage tier may have larger capacity, the volume
// data read will trigger the ReadAt() function to read from the remote
// storage tier, and download to local storage, which may cause the
// capactiy overloading.
if !v.HasRemoteFile() {
glog.V(2).Infof("checking volume data integrity for volume %d", v.Id)
if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
v.noWriteOrDelete = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
}
}
// The post-load structural check below uses the in-memory needle map
// to verify that no .idx entry references bytes past the end of .dat
// (issue #8928). The check piggybacks on MaxNeedleEnd, which the load
// walks below populate without a second linear scan.
if v.noWriteOrDelete || v.noWriteCanDelete {
if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile, v.Version()); err != nil {
glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
}
} else {
switch needleMapKind {
case NeedleMapInMemory:
if v.tmpNm != nil {
glog.V(2).Infof("updating memory compact index %s ", v.FileName(".idx"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil, 0)
} else {
glog.V(2).Infoln("loading memory index", v.FileName(".idx"), "to memory")
if v.nm, err = LoadCompactNeedleMap(indexFile, v.Version()); err != nil {
glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err)
}
}
case NeedleMapLevelDb:
opts := &opt.Options{
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
OpenFilesCacheCapacity: LevelDbOpenFilesCacheCapacity, // see package-level docs
}
if v.tmpNm != nil {
glog.V(0).Infoln("updating leveldb index", v.FileName(".ldb"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
} else {
glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb"))
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
}
case NeedleMapLevelDbMedium:
opts := &opt.Options{
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
OpenFilesCacheCapacity: LevelDbMediumOpenFilesCacheCapacity, // see package-level docs
}
if v.tmpNm != nil {
glog.V(0).Infoln("updating leveldb medium index", v.FileName(".ldb"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
} else {
glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb"))
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
}
case NeedleMapLevelDbLarge:
opts := &opt.Options{
BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
OpenFilesCacheCapacity: LevelDbLargeOpenFilesCacheCapacity, // see package-level docs
}
if v.tmpNm != nil {
glog.V(0).Infoln("updating leveldb large index", v.FileName(".ldb"))
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
} else {
glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb"))
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
}
}
}
// Structural check: no .idx entry may reference bytes past the end of
// the .dat. The needle map's load walk above already populated
// MaximumNeedleEnd, so this is just a numeric comparison — no extra
// disk I/O. A violation marks the volume read-only so a corrupt
// .idx left over from a crashed batched write does not silently
// power vacuum to drop reachable data. See issue #8928.
if !v.HasRemoteFile() && v.nm != nil && v.DataBackend != nil {
if datSize, _, statErr := v.DataBackend.GetStat(); statErr == nil && datSize > 0 {
if maxEnd := v.nm.MaxNeedleEnd(); maxEnd > datSize {
v.noWriteOrDelete = true
glog.V(0).Infof("volume %d: idx references end=%d but .dat is %d bytes; marking readonly",
v.Id, maxEnd, datSize)
}
}
}
}
if !hasVolumeInfoFile {
v.volumeInfo.Version = uint32(v.SuperBlock.Version)
v.volumeInfo.BytesOffset = uint32(types.OffsetSize)
if err := v.SaveVolumeInfo(); err != nil {
glog.Warningf("volume %d failed to save file info: %v", v.Id, err)
}
}
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Inc()
if err == nil {
hasLoadedVolume = true
}
return err
}