mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-29 13:10:21 +00:00
* fix(volume): keep vacuum running past dangling .idx entries Vacuum compaction aborted entirely on the first .idx entry whose offset pointed past the end of the .dat file, surfacing as `cannot hydrate needle from file: EOF` and stalling progress on every other volume. In both Go and Rust: - During compaction, skip an unreadable needle and continue. The bytes it pointed at were already unreachable via reads, so dropping the index reference makes the post-vacuum volume consistent. Real EIO still bails out so a disk fault is not silently papered over. - At volume load, do a single linear scan of the .idx and confirm every (offset + actual size) fits inside .dat. The pre-existing integrity check only looked at the last 10 entries, so deeper corruption (e.g. left over from a crashed batched write) went undetected and only surfaced later as a vacuum EOF. A failure now marks the volume read-only at load time so an operator can react. Refs #8928 * fix(volume): only skip permanent-corruption needle reads during vacuum Address PR review feedback (gemini-code-assist + coderabbit): The original patch skipped any non-EIO read failure, which would silently drop needles on transient errors — Windows hardware bad-sector errors (ERROR_CRC etc.) never surface as syscall.EIO; tiered-storage network timeouts and EROFS would also slip through and shrink the volume. Switch to an explicit whitelist of permanent-corruption shapes: - Add needle.ErrorCorrupted sentinel and wrap CRC and "index out of range" errors with %w so callers can match via errors.Is. - copyDataBasedOnIndexFile now skips only when the read failure is io.EOF, io.ErrUnexpectedEOF, ErrorSizeMismatch, ErrorSizeInvalid, or ErrorCorrupted. Anything else (real disk faults, environmental errors, Windows hardware codes) aborts the compaction so an operator notices. - Mirror the same whitelist in the Rust volume server, matching on io::ErrorKind::UnexpectedEof and the NeedleError corruption variants (SizeMismatch, CrcMismatch, IndexOutOfRange, TailTooShort). Also add `defer v.Close()` in TestVerifyIndexFitsInDat so Windows t.TempDir() cleanup can release the .dat/.idx handles. Refs #8928 * fix(volume): wrap entry-not-found size-mismatch with ErrorSizeMismatch Address PR review: the fallback branch in ReadBytes returned an unwrapped fmt.Errorf, so isSkippableNeedleReadError (and any caller using errors.Is(..., ErrorSizeMismatch)) could not match it. Wrap with %w so the whitelist applies, while leaving the existing direct sentinel return for the OffsetSize==4 / offset<MaxPossibleVolumeSize retry path unchanged so ReadData's `err == ErrorSizeMismatch` retry still triggers. Refs #8928 * fix(volume): integrate dangling-idx check into existing index load walk Address PR review (gemini-code-assist, medium): the structural .idx check used to do a second linear scan of the index file at every volume load, doubling the disk-I/O cost on servers managing many volumes. Track the largest (offset + actual size) seen during the existing needle-map load walks (`LoadCompactNeedleMap`, `NewLevelDbNeedleMap`, `NewSortedFileNeedleMap`'s `newNeedleMapMetricFromIndexFile`, `DoOffsetLoading`) on a new `MaximumNeedleEnd` field on `mapMetric`, exposed as `MaxNeedleEnd()` on the NeedleMapper interface. `volume.load()` then compares `nm.MaxNeedleEnd()` to the .dat size after the load is complete — pure numeric comparison, no extra I/O. The standalone `verifyIndexFitsInDat` helper and its caller in `CheckVolumeDataIntegrity` are removed; the test that used to drive the helper directly now exercises the new path via `LoadCompactNeedleMap`. Mirror the same change in the Rust volume server: track `max_needle_end` on `NeedleMapMetric`, expose via `max_needle_end()` on `CompactNeedleMap`, `RedbNeedleMap`, and the `NeedleMap` enum. The Rust load walk already happens in `load_from_idx` for both map kinds, so the structural check becomes free. Refs #8928
258 lines
7.3 KiB
Go
258 lines
7.3 KiB
Go
package chunk_cache
|
|
|
|
import (
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
// This implements an on disk cache
|
|
// The entries are an FIFO with a size limit
|
|
|
|
type ChunkCacheVolume struct {
|
|
DataBackend backend.BackendStorageFile
|
|
nm storage.NeedleMapper
|
|
fileName string
|
|
smallBuffer []byte
|
|
sizeLimit int64
|
|
lastModTime time.Time
|
|
fileSize int64
|
|
}
|
|
|
|
func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) {
|
|
|
|
v := &ChunkCacheVolume{
|
|
smallBuffer: make([]byte, types.NeedlePaddingSize),
|
|
fileName: fileName,
|
|
sizeLimit: preallocate,
|
|
}
|
|
|
|
var err error
|
|
|
|
if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists {
|
|
if !canRead {
|
|
return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName)
|
|
}
|
|
if !canWrite {
|
|
return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName)
|
|
}
|
|
if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
|
return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
|
|
} else {
|
|
v.DataBackend = backend.NewDiskFile(dataFile)
|
|
v.lastModTime = modTime
|
|
v.fileSize = fileSize
|
|
}
|
|
} else {
|
|
if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil {
|
|
return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
|
|
}
|
|
v.lastModTime = time.Now()
|
|
}
|
|
|
|
var indexFile *os.File
|
|
if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil {
|
|
return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err)
|
|
}
|
|
|
|
glog.V(1).Infoln("loading leveldb", v.fileName+".ldb")
|
|
opts := &opt.Options{
|
|
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
|
|
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
|
|
CompactionTableSizeMultiplier: 10, // default value is 1
|
|
}
|
|
if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts, 0, needle.GetCurrentVersion()); err != nil {
|
|
return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err)
|
|
}
|
|
|
|
return v, nil
|
|
|
|
}
|
|
|
|
func (v *ChunkCacheVolume) Shutdown() {
|
|
if v.DataBackend != nil {
|
|
v.DataBackend.Close()
|
|
v.DataBackend = nil
|
|
}
|
|
if v.nm != nil {
|
|
v.nm.Close()
|
|
v.nm = nil
|
|
}
|
|
}
|
|
|
|
func (v *ChunkCacheVolume) doReset() {
|
|
v.Shutdown()
|
|
fn := v.fileName + ".dat"
|
|
err := os.Truncate(fn, 0)
|
|
if err != nil {
|
|
glog.Errorf("ChunkCacheVolume.doReset: truncate %q failed: %s", fn, err)
|
|
}
|
|
fn = v.fileName + ".idx"
|
|
err = os.Truncate(fn, 0)
|
|
if err != nil {
|
|
glog.Errorf("ChunkCacheVolume.doReset: truncate %q failed: %s", fn, err)
|
|
}
|
|
fn = v.fileName + ".ldb"
|
|
err = os.RemoveAll(fn)
|
|
if err != nil {
|
|
glog.Errorf("ChunkCacheVolume.doReset: remove %q failed: %s", fn, err)
|
|
} else {
|
|
glog.V(4).Infof("cache removed %s", fn)
|
|
}
|
|
}
|
|
|
|
func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) {
|
|
v.doReset()
|
|
return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit)
|
|
}
|
|
|
|
// minFadviseSize is the minimum read size (in bytes) before we call fadvise
|
|
// DONTNEED. For small reads the syscall overhead outweighs the benefit, and
|
|
// the kernel's page cache may serve the data again sooner than we think.
|
|
const minFadviseSize = 1 << 20 // 1 MiB
|
|
|
|
// dropReadCache advises the kernel to drop page cache for the byte range
|
|
// just read. This is best-effort; failures are logged at V(4).
|
|
// Only applied for reads >= minFadviseSize to avoid syscall overhead on
|
|
// small needle reads where the kernel page cache is more beneficial.
|
|
func (v *ChunkCacheVolume) dropReadCache(offset int64, length int64) {
|
|
if length < minFadviseSize {
|
|
return
|
|
}
|
|
type fdProvider interface {
|
|
Fd() uintptr
|
|
}
|
|
if fp, ok := v.DataBackend.(fdProvider); ok {
|
|
fd := int(fp.Fd())
|
|
if fd < 0 {
|
|
return
|
|
}
|
|
if err := util.DropOSPageCache(fd, offset, length); err != nil {
|
|
glog.V(4).Infof("fadvise DONTNEED %s offset %d len %d: %v", v.fileName, offset, length, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) {
|
|
|
|
nv, ok := v.nm.Get(key)
|
|
if !ok {
|
|
return nil, storage.ErrorNotFound
|
|
}
|
|
data := make([]byte, nv.Size)
|
|
readOffset := nv.Offset.ToActualOffset()
|
|
if readSize, readErr := v.DataBackend.ReadAt(data, readOffset); readErr != nil {
|
|
if readSize != int(nv.Size) {
|
|
return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
|
|
v.fileName, readOffset, readOffset+int64(nv.Size), readErr)
|
|
}
|
|
} else {
|
|
if readSize != int(nv.Size) {
|
|
return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size)
|
|
}
|
|
}
|
|
|
|
v.dropReadCache(readOffset, int64(nv.Size))
|
|
return data, nil
|
|
}
|
|
|
|
func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uint64) ([]byte, error) {
|
|
nv, ok := v.nm.Get(key)
|
|
if !ok {
|
|
return nil, storage.ErrorNotFound
|
|
}
|
|
wanted := min(int(length), int(nv.Size)-int(offset))
|
|
if wanted < 0 {
|
|
// should never happen, but better than panicking
|
|
return nil, ErrorOutOfBounds
|
|
}
|
|
data := make([]byte, wanted)
|
|
readOffset := nv.Offset.ToActualOffset() + int64(offset)
|
|
var readSize int
|
|
var readErr error
|
|
if readSize, readErr = v.DataBackend.ReadAt(data, readOffset); readErr != nil {
|
|
if readSize != wanted {
|
|
return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
|
|
v.fileName, readOffset, int64(readOffset)+int64(wanted), readErr)
|
|
}
|
|
} else {
|
|
if readSize != wanted {
|
|
return nil, fmt.Errorf("read %d, expected %d", readSize, wanted)
|
|
}
|
|
}
|
|
if readErr != nil && readSize == wanted {
|
|
readErr = nil
|
|
}
|
|
if readSize > 0 {
|
|
v.dropReadCache(readOffset, int64(readSize))
|
|
}
|
|
return data, readErr
|
|
}
|
|
|
|
func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, offset uint64) (n int, err error) {
|
|
nv, ok := v.nm.Get(key)
|
|
if !ok {
|
|
return 0, storage.ErrorNotFound
|
|
}
|
|
wanted := min(len(data), int(nv.Size)-int(offset))
|
|
if wanted < 0 {
|
|
// should never happen, but better than panicking
|
|
return 0, ErrorOutOfBounds
|
|
}
|
|
readOffset := nv.Offset.ToActualOffset() + int64(offset)
|
|
if n, err = v.DataBackend.ReadAt(data, readOffset); err != nil {
|
|
if n != wanted {
|
|
return n, fmt.Errorf("read %s.dat [%d,%d): %v",
|
|
v.fileName, readOffset, int64(readOffset)+int64(wanted), err)
|
|
}
|
|
} else {
|
|
if n != wanted {
|
|
return n, fmt.Errorf("read %d, expected %d", n, wanted)
|
|
}
|
|
}
|
|
if err != nil && n == wanted {
|
|
err = nil
|
|
}
|
|
if n > 0 {
|
|
v.dropReadCache(readOffset, int64(n))
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error {
|
|
|
|
offset := v.fileSize
|
|
|
|
written, err := v.DataBackend.WriteAt(data, offset)
|
|
if err != nil {
|
|
return err
|
|
} else if written != len(data) {
|
|
return fmt.Errorf("partial written %d, expected %d", written, len(data))
|
|
}
|
|
|
|
v.fileSize += int64(written)
|
|
extraSize := written % types.NeedlePaddingSize
|
|
if extraSize != 0 {
|
|
_, err = v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v.fileSize += int64(types.NeedlePaddingSize - extraSize)
|
|
}
|
|
|
|
if err := v.nm.Put(key, types.ToOffset(offset), types.Size(len(data))); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|