mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-06-09 18:32:43 +00:00
EC bitrot follow-ups: protect destination sidecar on optional copy; cap sidecar block_size (#9763)
* fix(ec_bitrot): cap sidecar block_size in ValidateBitrotManifest A sidecar loaded from disk (or supplied via a backfill/peer RPC) could carry a huge power-of-two block_size that passed validation, then force a multi-GiB scratch-buffer allocation in scrub/verify. Add a shared MaxBitrotBlockSize (64 MiB) constant, enforce it as an upper bound in isPow2MultipleOf1MiB, and derive the volume flag cap from the same constant so they cannot drift. * fix(ec_bitrot): don't destroy a valid destination sidecar on an optional copy writeToFile opened the destination with O_TRUNC before knowing whether the source had the file, so an optional copy (ignoreSourceFileNotFound) from a source that lacks the .ecsum truncated and then removed a valid pre-existing destination sidecar. Stage the optional copy into a temp sibling and commit it with an atomic rename only when the source actually delivered the file; a missing source is now a no-op. Mandatory copies keep their in-place behavior.
This commit is contained in:
@@ -177,12 +177,10 @@ func runVolume(cmd *Command, args []string) bool {
|
||||
|
||||
// Apply EC bitrot checksum settings.
|
||||
erasure_coding.BitrotProtectionEnabled = *ecBitrotChecksum
|
||||
// Validate the block size before multiplying so an absurd MiB value cannot
|
||||
// overflow int64 and slip a bogus size past the power-of-two check. The
|
||||
// block size also becomes the per-shard scratch buffer the scrub/backfill
|
||||
// path allocates, so the upper bound caps that allocation (64 MiB per
|
||||
// concurrent scrub worker) and keeps a typo from taking the server down.
|
||||
const maxBitrotBlockSizeMB = 64
|
||||
// Bound-check before the multiply so a huge value cannot overflow int64 past
|
||||
// the power-of-two check. Cap = shared MaxBitrotBlockSize, kept in sync with
|
||||
// ValidateBitrotManifest.
|
||||
const maxBitrotBlockSizeMB = erasure_coding.MaxBitrotBlockSize / (1024 * 1024)
|
||||
if mb := *ecBitrotBlockSizeMB; mb >= 1 && mb <= maxBitrotBlockSizeMB {
|
||||
if blockSize := int64(mb) * 1024 * 1024; blockSize&(blockSize-1) == 0 {
|
||||
erasure_coding.BitrotBlockSize = blockSize
|
||||
|
||||
@@ -233,7 +233,7 @@ func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeSe
|
||||
return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
|
||||
}
|
||||
|
||||
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, throttler, isAppend, progressFn)
|
||||
modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, throttler, isAppend, ignoreSourceFileNotFound, progressFn)
|
||||
if err != nil {
|
||||
return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
|
||||
}
|
||||
@@ -332,15 +332,25 @@ func findLastAppendAtNsFromCopiedFiles(idxFileName, datFileName string, version
|
||||
return util.BytesToUint64(tail[needle.NeedleChecksumSize : needle.NeedleChecksumSize+types.TimestampSize]), nil
|
||||
}
|
||||
|
||||
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
|
||||
func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
|
||||
glog.V(4).Infof("writing to %s", fileName)
|
||||
|
||||
// For an optional copy (ignoreSourceFileNotFound), stage into a temp sibling
|
||||
// and atomically rename on success, so a source that lacks the file cannot
|
||||
// truncate a valid pre-existing destination. Mandatory copies write in place.
|
||||
writePath := fileName
|
||||
stageThenCommit := ignoreSourceFileNotFound && !isAppend
|
||||
if stageThenCommit {
|
||||
writePath = fileName + ".copying"
|
||||
}
|
||||
|
||||
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
|
||||
if isAppend {
|
||||
flags = os.O_WRONLY | os.O_CREATE
|
||||
}
|
||||
dst, err := os.OpenFile(fileName, flags, 0644)
|
||||
dst, err := os.OpenFile(writePath, flags, 0644)
|
||||
if err != nil {
|
||||
return modifiedTsNs, fmt.Errorf("open file %s: %w", fileName, err)
|
||||
return modifiedTsNs, fmt.Errorf("open file %s: %w", writePath, err)
|
||||
}
|
||||
// Track the destination handle through a closer that runs at most once.
|
||||
// On Windows os.Remove fails while the file is still open, so any path
|
||||
@@ -367,10 +377,10 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
|
||||
return
|
||||
}
|
||||
closeDst()
|
||||
if removeErr := os.Remove(fileName); removeErr != nil && !os.IsNotExist(removeErr) {
|
||||
glog.Warningf("failed to remove incomplete file %s after %s: %v", fileName, reason, removeErr)
|
||||
if removeErr := os.Remove(writePath); removeErr != nil && !os.IsNotExist(removeErr) {
|
||||
glog.Warningf("failed to remove incomplete file %s after %s: %v", writePath, reason, removeErr)
|
||||
} else if removeErr == nil {
|
||||
glog.V(1).Infof("removed incomplete file %s after %s", fileName, reason)
|
||||
glog.V(1).Infof("removed incomplete file %s after %s", writePath, reason)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -406,10 +416,20 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
|
||||
// is valid and should result in an empty destination file.
|
||||
if modifiedTsNs == 0 && !isAppend {
|
||||
closeDst()
|
||||
if removeErr := os.Remove(fileName); removeErr != nil {
|
||||
glog.V(1).Infof("failed to remove empty file %s: %v", fileName, removeErr)
|
||||
} else {
|
||||
glog.V(1).Infof("removed empty file %s (source file not found)", fileName)
|
||||
if removeErr := os.Remove(writePath); removeErr != nil && !os.IsNotExist(removeErr) {
|
||||
glog.V(1).Infof("failed to remove empty file %s: %v", writePath, removeErr)
|
||||
} else if removeErr == nil {
|
||||
glog.V(1).Infof("removed empty file %s (source file not found)", writePath)
|
||||
}
|
||||
return modifiedTsNs, nil
|
||||
}
|
||||
|
||||
// Commit the staged temp into place.
|
||||
if stageThenCommit {
|
||||
closeDst()
|
||||
if renameErr := os.Rename(writePath, fileName); renameErr != nil {
|
||||
os.Remove(writePath)
|
||||
return modifiedTsNs, fmt.Errorf("commit copied file %s: %w", fileName, renameErr)
|
||||
}
|
||||
}
|
||||
return modifiedTsNs, nil
|
||||
|
||||
@@ -60,7 +60,7 @@ func TestWriteToFile_RemovesPartialFileOnStreamError(t *testing.T) {
|
||||
finalErr: errors.New("simulated mid-stream failure"),
|
||||
}
|
||||
|
||||
_, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, nil)
|
||||
_, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, false, nil)
|
||||
if err == nil {
|
||||
t.Fatalf("writeToFile should propagate the stream error")
|
||||
}
|
||||
@@ -82,7 +82,7 @@ func TestWriteToFile_RemovesEmptyFileOnImmediateStreamError(t *testing.T) {
|
||||
finalErr: errors.New("simulated immediate failure"),
|
||||
}
|
||||
|
||||
_, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, nil)
|
||||
_, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, false, nil)
|
||||
if err == nil {
|
||||
t.Fatalf("writeToFile should propagate the stream error")
|
||||
}
|
||||
@@ -106,7 +106,7 @@ func TestWriteToFile_PreservesAppendModeOnError(t *testing.T) {
|
||||
finalErr: errors.New("simulated failure"),
|
||||
}
|
||||
|
||||
_, err := writeToFile(stream, dst, util.NewWriteThrottler(0), true, nil)
|
||||
_, err := writeToFile(stream, dst, util.NewWriteThrottler(0), true, false, nil)
|
||||
if err == nil {
|
||||
t.Fatalf("writeToFile should propagate the stream error")
|
||||
}
|
||||
@@ -131,7 +131,7 @@ func TestWriteToFile_SucceedsOnCleanStream(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, nil); err != nil {
|
||||
if _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, false, nil); err != nil {
|
||||
t.Fatalf("writeToFile failed on clean stream: %v", err)
|
||||
}
|
||||
|
||||
@@ -143,3 +143,62 @@ func TestWriteToFile_SucceedsOnCleanStream(t *testing.T) {
|
||||
t.Errorf("contents = %q, want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// An optional copy whose source lacks the file must leave a valid destination intact.
|
||||
func TestWriteToFile_PreservesDestinationWhenOptionalSourceMissing(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dst := filepath.Join(dir, "vol_42.ecsum")
|
||||
|
||||
original := []byte("valid existing ecsum sidecar")
|
||||
if err := os.WriteFile(dst, original, 0o644); err != nil {
|
||||
t.Fatalf("seed file: %v", err)
|
||||
}
|
||||
|
||||
stream := &fakeCopyFileStream{} // no responses -> source file absent
|
||||
|
||||
if _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, true, nil); err != nil {
|
||||
t.Fatalf("writeToFile should not error when an optional source is absent: %v", err)
|
||||
}
|
||||
|
||||
got, err := os.ReadFile(dst)
|
||||
if err != nil {
|
||||
t.Fatalf("destination sidecar should be preserved; read: %v", err)
|
||||
}
|
||||
if string(got) != string(original) {
|
||||
t.Errorf("destination overwritten/truncated: got %q, want %q", got, original)
|
||||
}
|
||||
if _, statErr := os.Stat(dst + ".copying"); !os.IsNotExist(statErr) {
|
||||
t.Errorf("temp .copying file should be cleaned up; stat err = %v", statErr)
|
||||
}
|
||||
}
|
||||
|
||||
// An optional copy whose source has the file commits it over any stale destination.
|
||||
func TestWriteToFile_CommitsOptionalCopyWhenSourcePresent(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
dst := filepath.Join(dir, "vol_42.ecsum")
|
||||
|
||||
if err := os.WriteFile(dst, []byte("stale"), 0o644); err != nil {
|
||||
t.Fatalf("seed file: %v", err)
|
||||
}
|
||||
want := []byte("fresh ecsum from source")
|
||||
stream := &fakeCopyFileStream{
|
||||
responses: []*volume_server_pb.CopyFileResponse{
|
||||
{FileContent: want, ModifiedTsNs: 42},
|
||||
},
|
||||
}
|
||||
|
||||
if _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, true, nil); err != nil {
|
||||
t.Fatalf("writeToFile failed on present optional source: %v", err)
|
||||
}
|
||||
|
||||
got, err := os.ReadFile(dst)
|
||||
if err != nil {
|
||||
t.Fatalf("read back: %v", err)
|
||||
}
|
||||
if string(got) != string(want) {
|
||||
t.Errorf("contents = %q, want %q", got, want)
|
||||
}
|
||||
if _, statErr := os.Stat(dst + ".copying"); !os.IsNotExist(statErr) {
|
||||
t.Errorf("temp .copying file should be renamed away; stat err = %v", statErr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,6 +47,10 @@ const (
|
||||
// to a 16 MiB region.
|
||||
DefaultBitrotBlockSize = 16 * 1024 * 1024
|
||||
|
||||
// MaxBitrotBlockSize caps the block granularity so a loaded sidecar cannot
|
||||
// force a huge scrub/verify scratch buffer. Power-of-two multiple of 1 MiB.
|
||||
MaxBitrotBlockSize = 64 * 1024 * 1024
|
||||
|
||||
bitrotMagic uint32 = 0x45435355 // "ECSU"
|
||||
bitrotFormatVersion uint16 = 1
|
||||
bitrotHeaderSize = 14 // magic(4)+version(2)+payload_len(4)+payload_crc32c(4)
|
||||
@@ -117,10 +121,10 @@ func NewEncodeUUID() []byte {
|
||||
return b
|
||||
}
|
||||
|
||||
// isPow2MultipleOf1MiB reports whether block_size is a power-of-two multiple of
|
||||
// 1 MiB (i.e. a power of two that is at least 1 MiB).
|
||||
// isPow2MultipleOf1MiB reports whether block_size is a power of two in
|
||||
// [1 MiB, MaxBitrotBlockSize].
|
||||
func isPow2MultipleOf1MiB(blockSize uint32) bool {
|
||||
return blockSize >= (1<<20) && bits.OnesCount32(blockSize) == 1
|
||||
return blockSize >= (1<<20) && blockSize <= MaxBitrotBlockSize && bits.OnesCount32(blockSize) == 1
|
||||
}
|
||||
|
||||
// shardChecksumBuilder accumulates the per-block CRC32C of a single shard's byte
|
||||
@@ -300,7 +304,7 @@ func ValidateBitrotManifest(prot *volume_server_pb.EcBitrotProtection, dataShard
|
||||
}
|
||||
bs := int64(prot.BlockSize)
|
||||
if !isPow2MultipleOf1MiB(prot.BlockSize) {
|
||||
return fmt.Errorf("invalid block_size %d (must be a power-of-two multiple of 1 MiB)", prot.BlockSize)
|
||||
return fmt.Errorf("invalid block_size %d (must be a power-of-two multiple of 1 MiB, at most %d)", prot.BlockSize, MaxBitrotBlockSize)
|
||||
}
|
||||
total := dataShards + parityShards
|
||||
if total <= 0 || total > MaxShardCount {
|
||||
|
||||
@@ -108,12 +108,13 @@ func TestValidateBitrotManifest(t *testing.T) {
|
||||
t.Fatalf("valid manifest rejected: %v", err)
|
||||
}
|
||||
bad := map[string]func(*volume_server_pb.EcBitrotProtection){
|
||||
"missing shard": func(p *volume_server_pb.EcBitrotProtection) { p.Shards = p.Shards[:2] },
|
||||
"out of range id": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[2].ShardId = 9 },
|
||||
"duplicate id": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[2].ShardId = 0 },
|
||||
"zero covered": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[0].CoveredSize = 0 },
|
||||
"bad crc count": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[0].BlockCrc32C = []byte{1, 2, 3} },
|
||||
"bad block size": func(p *volume_server_pb.EcBitrotProtection) { p.BlockSize = 3 << 20 }, // not power of two
|
||||
"missing shard": func(p *volume_server_pb.EcBitrotProtection) { p.Shards = p.Shards[:2] },
|
||||
"out of range id": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[2].ShardId = 9 },
|
||||
"duplicate id": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[2].ShardId = 0 },
|
||||
"zero covered": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[0].CoveredSize = 0 },
|
||||
"bad crc count": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[0].BlockCrc32C = []byte{1, 2, 3} },
|
||||
"bad block size": func(p *volume_server_pb.EcBitrotProtection) { p.BlockSize = 3 << 20 }, // not power of two
|
||||
"oversized block size": func(p *volume_server_pb.EcBitrotProtection) { p.BlockSize = 128 << 20 }, // power of two but > MaxBitrotBlockSize
|
||||
"bad algorithm": func(p *volume_server_pb.EcBitrotProtection) {
|
||||
p.Algorithm = volume_server_pb.ChecksumAlgorithm_CHECKSUM_NONE
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user