mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-09 18:32:43 +00:00
7c252e1f16
* fix(volume): reopen .idx writable after MarkVolumeWritable When .vif has ReadOnly=true, load() opens .idx as O_RDONLY and builds a SortedFileNeedleMap whose Put returns os.ErrInvalid. MarkVolumeWritable only flipped noWriteOrDelete back to false and rewrote .vif, so writes still failed at v.nm.Put. Reopen .idx in O_RDWR and rebuild v.nm in its writable form (in-memory or leveldb small/medium/large) before flipping the flag. Mirror the same fix in seaweed-volume: the Rust load path leaves CompactNeedleMap/RedbNeedleMap with no idx_file writer when the volume boots read-only, so post-MarkVolumeWritable puts silently succeeded in-memory only and were lost on the next restart. set_writable now reattaches an append-mode writer when one is missing. * fix(volume): keep old needle map until replacement is built; defer writable flag Go: build the writable needle map into a local before swapping. A construction failure now leaves v.nm pointing at the original SortedFileNeedleMap so MarkVolumeWritable can roll back, instead of stranding the volume with v.nm == nil. Rust: attach the .idx writer before flipping no_write_or_delete to false. A transient open/metadata failure used to leave the volume marked writable with no writer attached, and subsequent puts would silently skip the on-disk append.
382 lines
15 KiB
Go
382 lines
15 KiB
Go
package storage
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
// Per-DB caps on goleveldb's open SST file cache. The library default is 500
|
|
// per DB, but a volume server hosts one DB per volume — easily thousands —
|
|
// so the per-DB default sums into FD exhaustion (`open .../00000N.log: too
|
|
// many open files`) even with generous ulimits, especially when leveldb is
|
|
// rotating its WAL.
|
|
//
|
|
// The trade-off: a larger cache lowers re-open overhead on cold reads, a
|
|
// smaller cache bounds total FD usage. CompactionTableSizeMultiplier=10
|
|
// already keeps SST counts low (~10x larger SSTs => ~10x fewer files), so
|
|
// even the small-volume cap is enough to keep the working set hot while
|
|
// leaving headroom for thousands of co-resident DBs.
|
|
const (
|
|
LevelDbOpenFilesCacheCapacity = 16
|
|
LevelDbMediumOpenFilesCacheCapacity = 32
|
|
LevelDbLargeOpenFilesCacheCapacity = 64
|
|
)
|
|
|
|
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ver needle.Version) (v *Volume, err error) {
|
|
v = &Volume{dir: dirname, Collection: collection, Id: id}
|
|
v.SuperBlock = super_block.SuperBlock{}
|
|
v.needleMapKind = needleMapKind
|
|
err = v.load(false, false, needleMapKind, 0, ver)
|
|
return
|
|
}
|
|
|
|
func loadVolumeWithoutWorker(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ldbTimeout int64) (v *Volume, err error) {
|
|
v = &Volume{
|
|
dir: dirname,
|
|
dirIdx: dirIdx,
|
|
Collection: collection,
|
|
Id: id,
|
|
needleMapKind: needleMapKind,
|
|
ldbTimeout: ldbTimeout,
|
|
}
|
|
v.SuperBlock = super_block.SuperBlock{}
|
|
err = v.load(true, false, needleMapKind, 0, needle.GetCurrentVersion())
|
|
return
|
|
}
|
|
|
|
// reopenIdxForWrite swaps the read-only SortedFileNeedleMap (loaded when the
|
|
// volume booted with .vif ReadOnly=true) for the writable needle map matching
|
|
// v.needleMapKind. Without this, MarkVolumeWritable flips noWriteOrDelete back
|
|
// to false but leaves .idx opened O_RDONLY and v.nm as a SortedFileNeedleMap
|
|
// whose Put returns os.ErrInvalid, so subsequent writes still fail.
|
|
//
|
|
// No-op when v.nm is already a writable form.
|
|
func (v *Volume) reopenIdxForWrite() error {
|
|
v.dataFileAccessLock.Lock()
|
|
defer v.dataFileAccessLock.Unlock()
|
|
|
|
oldNm, isSorted := v.nm.(*SortedFileNeedleMap)
|
|
if !isSorted {
|
|
return nil
|
|
}
|
|
|
|
indexFile, err := os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644)
|
|
if err != nil {
|
|
return fmt.Errorf("reopen %s read-write: %v", v.FileName(".idx"), err)
|
|
}
|
|
|
|
// Build the replacement first; only swap once we have a live writable
|
|
// map. A construction failure must leave v.nm pointing at the original
|
|
// SortedFileNeedleMap so the caller (MarkVolumeWritable) can roll back
|
|
// cleanly instead of stranding the volume with v.nm == nil.
|
|
var newNm NeedleMapper
|
|
switch v.needleMapKind {
|
|
case NeedleMapInMemory:
|
|
if newNm, err = LoadCompactNeedleMap(indexFile, v.Version()); err != nil {
|
|
indexFile.Close()
|
|
return fmt.Errorf("rebuild memory needle map for volume %d: %v", v.Id, err)
|
|
}
|
|
case NeedleMapLevelDb:
|
|
opts := &opt.Options{
|
|
BlockCacheCapacity: 2 * 1024 * 1024,
|
|
WriteBuffer: 1 * 1024 * 1024,
|
|
CompactionTableSizeMultiplier: 10,
|
|
OpenFilesCacheCapacity: LevelDbOpenFilesCacheCapacity,
|
|
}
|
|
if newNm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
|
|
indexFile.Close()
|
|
return fmt.Errorf("rebuild leveldb needle map for volume %d: %v", v.Id, err)
|
|
}
|
|
case NeedleMapLevelDbMedium:
|
|
opts := &opt.Options{
|
|
BlockCacheCapacity: 4 * 1024 * 1024,
|
|
WriteBuffer: 2 * 1024 * 1024,
|
|
CompactionTableSizeMultiplier: 10,
|
|
OpenFilesCacheCapacity: LevelDbMediumOpenFilesCacheCapacity,
|
|
}
|
|
if newNm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
|
|
indexFile.Close()
|
|
return fmt.Errorf("rebuild leveldb medium needle map for volume %d: %v", v.Id, err)
|
|
}
|
|
case NeedleMapLevelDbLarge:
|
|
opts := &opt.Options{
|
|
BlockCacheCapacity: 8 * 1024 * 1024,
|
|
WriteBuffer: 4 * 1024 * 1024,
|
|
CompactionTableSizeMultiplier: 10,
|
|
OpenFilesCacheCapacity: LevelDbLargeOpenFilesCacheCapacity,
|
|
}
|
|
if newNm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
|
|
indexFile.Close()
|
|
return fmt.Errorf("rebuild leveldb large needle map for volume %d: %v", v.Id, err)
|
|
}
|
|
default:
|
|
indexFile.Close()
|
|
return fmt.Errorf("unsupported needle map kind %v for volume %d", v.needleMapKind, v.Id)
|
|
}
|
|
|
|
if err := oldNm.Sync(); err != nil {
|
|
glog.Warningf("volume %d: sync sorted needle map before reopen: %v", v.Id, err)
|
|
}
|
|
oldNm.Close() // closes the O_RDONLY .idx handle held inside
|
|
v.nm = newNm
|
|
return nil
|
|
}
|
|
|
|
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64, ver needle.Version) (err error) {
|
|
alreadyHasSuperBlock := false
|
|
|
|
hasLoadedVolume := false
|
|
defer func() {
|
|
if !hasLoadedVolume {
|
|
if v.nm != nil {
|
|
v.nm.Close()
|
|
v.nm = nil
|
|
}
|
|
if v.DataBackend != nil {
|
|
v.DataBackend.Close()
|
|
v.DataBackend = nil
|
|
}
|
|
}
|
|
}()
|
|
|
|
hasVolumeInfoFile := v.maybeLoadVolumeInfo()
|
|
|
|
if v.volumeInfo.ReadOnly && !v.HasRemoteFile() {
|
|
// this covers the case where the volume is marked as read-only and has no remote file
|
|
v.noWriteOrDelete = true
|
|
}
|
|
|
|
if v.HasRemoteFile() {
|
|
v.noWriteCanDelete = true
|
|
v.noWriteOrDelete = false
|
|
glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo)
|
|
if err := v.LoadRemoteFile(); err != nil {
|
|
return fmt.Errorf("load remote file %v: %w", v.volumeInfo, err)
|
|
}
|
|
// Set lastModifiedTsSeconds from remote file to prevent premature expiry on startup
|
|
if len(v.volumeInfo.GetFiles()) > 0 {
|
|
remoteFileModifiedTime := v.volumeInfo.GetFiles()[0].GetModifiedTime()
|
|
if remoteFileModifiedTime > 0 {
|
|
v.lastModifiedTsSeconds = remoteFileModifiedTime
|
|
} else {
|
|
// Fallback: use .vif file's modification time
|
|
if exists, _, _, modifiedTime, _ := util.CheckFile(v.FileName(".vif")); exists {
|
|
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
|
|
}
|
|
}
|
|
glog.V(1).Infof("volume %d remote file lastModifiedTsSeconds set to %d", v.Id, v.lastModifiedTsSeconds)
|
|
}
|
|
alreadyHasSuperBlock = true
|
|
} else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
|
|
// open dat file
|
|
if !canRead {
|
|
return fmt.Errorf("cannot read Volume Data file %s", v.FileName(".dat"))
|
|
}
|
|
var dataFile *os.File
|
|
if canWrite {
|
|
dataFile, err = os.OpenFile(v.FileName(".dat"), os.O_RDWR|os.O_CREATE, 0644)
|
|
} else {
|
|
glog.V(0).Infof("opening %s in READONLY mode", v.FileName(".dat"))
|
|
dataFile, err = os.Open(v.FileName(".dat"))
|
|
v.noWriteOrDelete = true
|
|
}
|
|
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
|
|
if fileSize >= super_block.SuperBlockSize {
|
|
alreadyHasSuperBlock = true
|
|
}
|
|
v.DataBackend = backend.NewDiskFile(dataFile)
|
|
} else {
|
|
if createDatIfMissing {
|
|
v.DataBackend, err = backend.CreateVolumeFile(v.FileName(".dat"), preallocate, v.MemoryMapMaxSizeMb)
|
|
} else {
|
|
return fmt.Errorf("volume data file %s does not exist", v.FileName(".dat"))
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
if !os.IsPermission(err) {
|
|
return fmt.Errorf("cannot load volume data %s: %v", v.FileName(".dat"), err)
|
|
} else {
|
|
return fmt.Errorf("load data file %s: %v", v.FileName(".dat"), err)
|
|
}
|
|
}
|
|
|
|
if alreadyHasSuperBlock {
|
|
err = v.readSuperBlock()
|
|
if err == nil {
|
|
if !needle.IsSupportedVersion(v.SuperBlock.Version) {
|
|
glog.Fatalf("Unsupported volume %d version %v", v.Id, v.SuperBlock.Version)
|
|
}
|
|
v.volumeInfo.Version = uint32(v.SuperBlock.Version)
|
|
}
|
|
glog.V(2).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version)
|
|
if v.HasRemoteFile() {
|
|
// maybe temporary network problem
|
|
glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err)
|
|
err = nil
|
|
}
|
|
} else {
|
|
if !v.SuperBlock.Initialized() {
|
|
return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
|
|
}
|
|
err = v.maybeWriteSuperBlock(ver)
|
|
}
|
|
if err == nil && alsoLoadIndex {
|
|
// adjust for existing volumes with .idx together with .dat files
|
|
if v.dirIdx != v.dir {
|
|
if util.FileExists(v.DataFileName() + ".idx") {
|
|
v.dirIdx = v.dir
|
|
}
|
|
}
|
|
// check volume idx files
|
|
if err := v.checkIdxFile(); err != nil {
|
|
// A remote-tiered volume with a stray .vif but no .idx must not
|
|
// take the whole server down; skip just this volume.
|
|
if v.HasRemoteFile() {
|
|
glog.Errorf("skip remote volume %d (idx: %s): %v", v.Id, v.FileName(".idx"), err)
|
|
return fmt.Errorf("check volume idx file %s: %w", v.FileName(".idx"), err)
|
|
}
|
|
glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err)
|
|
}
|
|
var indexFile *os.File
|
|
if v.noWriteOrDelete {
|
|
glog.V(0).Infoln("open to read file", v.FileName(".idx"))
|
|
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil {
|
|
return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err)
|
|
}
|
|
} else {
|
|
glog.V(1).Infoln("open to write file", v.FileName(".idx"))
|
|
if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
|
return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err)
|
|
}
|
|
}
|
|
// Do not need to check the data integrity for remote volumes,
|
|
// since the remote storage tier may have larger capacity, the volume
|
|
// data read will trigger the ReadAt() function to read from the remote
|
|
// storage tier, and download to local storage, which may cause the
|
|
// capactiy overloading.
|
|
if !v.HasRemoteFile() {
|
|
glog.V(2).Infof("checking volume data integrity for volume %d", v.Id)
|
|
if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
|
|
v.noWriteOrDelete = true
|
|
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
|
|
}
|
|
}
|
|
|
|
// The post-load structural check below uses the in-memory needle map
|
|
// to verify that no .idx entry references bytes past the end of .dat
|
|
// (issue #8928). The check piggybacks on MaxNeedleEnd, which the load
|
|
// walks below populate without a second linear scan.
|
|
|
|
if v.noWriteOrDelete || v.noWriteCanDelete {
|
|
if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile, v.Version()); err != nil {
|
|
glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
|
|
}
|
|
} else {
|
|
switch needleMapKind {
|
|
case NeedleMapInMemory:
|
|
if v.tmpNm != nil {
|
|
glog.V(2).Infof("updating memory compact index %s ", v.FileName(".idx"))
|
|
err = v.tmpNm.UpdateNeedleMap(v, indexFile, nil, 0)
|
|
} else {
|
|
glog.V(2).Infoln("loading memory index", v.FileName(".idx"), "to memory")
|
|
if v.nm, err = LoadCompactNeedleMap(indexFile, v.Version()); err != nil {
|
|
glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err)
|
|
}
|
|
}
|
|
case NeedleMapLevelDb:
|
|
opts := &opt.Options{
|
|
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
|
|
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
|
|
CompactionTableSizeMultiplier: 10, // default value is 1
|
|
OpenFilesCacheCapacity: LevelDbOpenFilesCacheCapacity, // see package-level docs
|
|
}
|
|
if v.tmpNm != nil {
|
|
glog.V(0).Infoln("updating leveldb index", v.FileName(".ldb"))
|
|
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
|
|
} else {
|
|
glog.V(0).Infoln("loading leveldb index", v.FileName(".ldb"))
|
|
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
|
|
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
|
|
}
|
|
}
|
|
case NeedleMapLevelDbMedium:
|
|
opts := &opt.Options{
|
|
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
|
|
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
|
|
CompactionTableSizeMultiplier: 10, // default value is 1
|
|
OpenFilesCacheCapacity: LevelDbMediumOpenFilesCacheCapacity, // see package-level docs
|
|
}
|
|
if v.tmpNm != nil {
|
|
glog.V(0).Infoln("updating leveldb medium index", v.FileName(".ldb"))
|
|
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
|
|
} else {
|
|
glog.V(0).Infoln("loading leveldb medium index", v.FileName(".ldb"))
|
|
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
|
|
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
|
|
}
|
|
}
|
|
case NeedleMapLevelDbLarge:
|
|
opts := &opt.Options{
|
|
BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
|
|
WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
|
|
CompactionTableSizeMultiplier: 10, // default value is 1
|
|
OpenFilesCacheCapacity: LevelDbLargeOpenFilesCacheCapacity, // see package-level docs
|
|
}
|
|
if v.tmpNm != nil {
|
|
glog.V(0).Infoln("updating leveldb large index", v.FileName(".ldb"))
|
|
err = v.tmpNm.UpdateNeedleMap(v, indexFile, opts, v.ldbTimeout)
|
|
} else {
|
|
glog.V(0).Infoln("loading leveldb large index", v.FileName(".ldb"))
|
|
if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts, v.ldbTimeout, v.Version()); err != nil {
|
|
glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Structural check: no .idx entry may reference bytes past the end of
|
|
// the .dat. The needle map's load walk above already populated
|
|
// MaximumNeedleEnd, so this is just a numeric comparison — no extra
|
|
// disk I/O. A violation marks the volume read-only so a corrupt
|
|
// .idx left over from a crashed batched write does not silently
|
|
// power vacuum to drop reachable data. See issue #8928.
|
|
if !v.HasRemoteFile() && v.nm != nil && v.DataBackend != nil {
|
|
if datSize, _, statErr := v.DataBackend.GetStat(); statErr == nil && datSize > 0 {
|
|
if maxEnd := v.nm.MaxNeedleEnd(); maxEnd > datSize {
|
|
v.noWriteOrDelete = true
|
|
glog.V(0).Infof("volume %d: idx references end=%d but .dat is %d bytes; marking readonly",
|
|
v.Id, maxEnd, datSize)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !hasVolumeInfoFile {
|
|
v.volumeInfo.Version = uint32(v.SuperBlock.Version)
|
|
v.volumeInfo.BytesOffset = uint32(types.OffsetSize)
|
|
if err := v.SaveVolumeInfo(); err != nil {
|
|
glog.Warningf("volume %d failed to save file info: %v", v.Id, err)
|
|
}
|
|
}
|
|
|
|
stats.VolumeServerVolumeGauge.WithLabelValues(v.Collection, "volume").Inc()
|
|
|
|
if err == nil {
|
|
hasLoadedVolume = true
|
|
}
|
|
|
|
return err
|
|
}
|