diff --git a/weed/command/volume.go b/weed/command/volume.go index c2969c9c8..4895cd103 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -177,12 +177,10 @@ func runVolume(cmd *Command, args []string) bool { // Apply EC bitrot checksum settings. erasure_coding.BitrotProtectionEnabled = *ecBitrotChecksum - // Validate the block size before multiplying so an absurd MiB value cannot - // overflow int64 and slip a bogus size past the power-of-two check. The - // block size also becomes the per-shard scratch buffer the scrub/backfill - // path allocates, so the upper bound caps that allocation (64 MiB per - // concurrent scrub worker) and keeps a typo from taking the server down. - const maxBitrotBlockSizeMB = 64 + // Bound-check before the multiply so a huge value cannot overflow int64 past + // the power-of-two check. Cap = shared MaxBitrotBlockSize, kept in sync with + // ValidateBitrotManifest. + const maxBitrotBlockSizeMB = erasure_coding.MaxBitrotBlockSize / (1024 * 1024) if mb := *ecBitrotBlockSizeMB; mb >= 1 && mb <= maxBitrotBlockSizeMB { if blockSize := int64(mb) * 1024 * 1024; blockSize&(blockSize-1) == 0 { erasure_coding.BitrotBlockSize = blockSize diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index a8cc155a1..7049c031b 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -233,7 +233,7 @@ func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeSe return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) } - modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, throttler, isAppend, progressFn) + modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, throttler, isAppend, ignoreSourceFileNotFound, progressFn) if err != nil { return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) } @@ -332,15 +332,25 @@ func findLastAppendAtNsFromCopiedFiles(idxFileName, datFileName string, version return util.BytesToUint64(tail[needle.NeedleChecksumSize : needle.NeedleChecksumSize+types.TimestampSize]), nil } -func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { glog.V(4).Infof("writing to %s", fileName) + + // For an optional copy (ignoreSourceFileNotFound), stage into a temp sibling + // and atomically rename on success, so a source that lacks the file cannot + // truncate a valid pre-existing destination. Mandatory copies write in place. + writePath := fileName + stageThenCommit := ignoreSourceFileNotFound && !isAppend + if stageThenCommit { + writePath = fileName + ".copying" + } + flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC if isAppend { flags = os.O_WRONLY | os.O_CREATE } - dst, err := os.OpenFile(fileName, flags, 0644) + dst, err := os.OpenFile(writePath, flags, 0644) if err != nil { - return modifiedTsNs, fmt.Errorf("open file %s: %w", fileName, err) + return modifiedTsNs, fmt.Errorf("open file %s: %w", writePath, err) } // Track the destination handle through a closer that runs at most once. // On Windows os.Remove fails while the file is still open, so any path @@ -367,10 +377,10 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s return } closeDst() - if removeErr := os.Remove(fileName); removeErr != nil && !os.IsNotExist(removeErr) { - glog.Warningf("failed to remove incomplete file %s after %s: %v", fileName, reason, removeErr) + if removeErr := os.Remove(writePath); removeErr != nil && !os.IsNotExist(removeErr) { + glog.Warningf("failed to remove incomplete file %s after %s: %v", writePath, reason, removeErr) } else if removeErr == nil { - glog.V(1).Infof("removed incomplete file %s after %s", fileName, reason) + glog.V(1).Infof("removed incomplete file %s after %s", writePath, reason) } } @@ -406,10 +416,20 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s // is valid and should result in an empty destination file. if modifiedTsNs == 0 && !isAppend { closeDst() - if removeErr := os.Remove(fileName); removeErr != nil { - glog.V(1).Infof("failed to remove empty file %s: %v", fileName, removeErr) - } else { - glog.V(1).Infof("removed empty file %s (source file not found)", fileName) + if removeErr := os.Remove(writePath); removeErr != nil && !os.IsNotExist(removeErr) { + glog.V(1).Infof("failed to remove empty file %s: %v", writePath, removeErr) + } else if removeErr == nil { + glog.V(1).Infof("removed empty file %s (source file not found)", writePath) + } + return modifiedTsNs, nil + } + + // Commit the staged temp into place. + if stageThenCommit { + closeDst() + if renameErr := os.Rename(writePath, fileName); renameErr != nil { + os.Remove(writePath) + return modifiedTsNs, fmt.Errorf("commit copied file %s: %w", fileName, renameErr) } } return modifiedTsNs, nil diff --git a/weed/server/volume_grpc_copy_writefile_test.go b/weed/server/volume_grpc_copy_writefile_test.go index 3dcb03011..c2144de06 100644 --- a/weed/server/volume_grpc_copy_writefile_test.go +++ b/weed/server/volume_grpc_copy_writefile_test.go @@ -60,7 +60,7 @@ func TestWriteToFile_RemovesPartialFileOnStreamError(t *testing.T) { finalErr: errors.New("simulated mid-stream failure"), } - _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, nil) + _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, false, nil) if err == nil { t.Fatalf("writeToFile should propagate the stream error") } @@ -82,7 +82,7 @@ func TestWriteToFile_RemovesEmptyFileOnImmediateStreamError(t *testing.T) { finalErr: errors.New("simulated immediate failure"), } - _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, nil) + _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, false, nil) if err == nil { t.Fatalf("writeToFile should propagate the stream error") } @@ -106,7 +106,7 @@ func TestWriteToFile_PreservesAppendModeOnError(t *testing.T) { finalErr: errors.New("simulated failure"), } - _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), true, nil) + _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), true, false, nil) if err == nil { t.Fatalf("writeToFile should propagate the stream error") } @@ -131,7 +131,7 @@ func TestWriteToFile_SucceedsOnCleanStream(t *testing.T) { }, } - if _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, nil); err != nil { + if _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, false, nil); err != nil { t.Fatalf("writeToFile failed on clean stream: %v", err) } @@ -143,3 +143,62 @@ func TestWriteToFile_SucceedsOnCleanStream(t *testing.T) { t.Errorf("contents = %q, want %q", got, want) } } + +// An optional copy whose source lacks the file must leave a valid destination intact. +func TestWriteToFile_PreservesDestinationWhenOptionalSourceMissing(t *testing.T) { + dir := t.TempDir() + dst := filepath.Join(dir, "vol_42.ecsum") + + original := []byte("valid existing ecsum sidecar") + if err := os.WriteFile(dst, original, 0o644); err != nil { + t.Fatalf("seed file: %v", err) + } + + stream := &fakeCopyFileStream{} // no responses -> source file absent + + if _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, true, nil); err != nil { + t.Fatalf("writeToFile should not error when an optional source is absent: %v", err) + } + + got, err := os.ReadFile(dst) + if err != nil { + t.Fatalf("destination sidecar should be preserved; read: %v", err) + } + if string(got) != string(original) { + t.Errorf("destination overwritten/truncated: got %q, want %q", got, original) + } + if _, statErr := os.Stat(dst + ".copying"); !os.IsNotExist(statErr) { + t.Errorf("temp .copying file should be cleaned up; stat err = %v", statErr) + } +} + +// An optional copy whose source has the file commits it over any stale destination. +func TestWriteToFile_CommitsOptionalCopyWhenSourcePresent(t *testing.T) { + dir := t.TempDir() + dst := filepath.Join(dir, "vol_42.ecsum") + + if err := os.WriteFile(dst, []byte("stale"), 0o644); err != nil { + t.Fatalf("seed file: %v", err) + } + want := []byte("fresh ecsum from source") + stream := &fakeCopyFileStream{ + responses: []*volume_server_pb.CopyFileResponse{ + {FileContent: want, ModifiedTsNs: 42}, + }, + } + + if _, err := writeToFile(stream, dst, util.NewWriteThrottler(0), false, true, nil); err != nil { + t.Fatalf("writeToFile failed on present optional source: %v", err) + } + + got, err := os.ReadFile(dst) + if err != nil { + t.Fatalf("read back: %v", err) + } + if string(got) != string(want) { + t.Errorf("contents = %q, want %q", got, want) + } + if _, statErr := os.Stat(dst + ".copying"); !os.IsNotExist(statErr) { + t.Errorf("temp .copying file should be renamed away; stat err = %v", statErr) + } +} diff --git a/weed/storage/erasure_coding/ec_bitrot.go b/weed/storage/erasure_coding/ec_bitrot.go index e4f820fe4..5aef042c7 100644 --- a/weed/storage/erasure_coding/ec_bitrot.go +++ b/weed/storage/erasure_coding/ec_bitrot.go @@ -47,6 +47,10 @@ const ( // to a 16 MiB region. DefaultBitrotBlockSize = 16 * 1024 * 1024 + // MaxBitrotBlockSize caps the block granularity so a loaded sidecar cannot + // force a huge scrub/verify scratch buffer. Power-of-two multiple of 1 MiB. + MaxBitrotBlockSize = 64 * 1024 * 1024 + bitrotMagic uint32 = 0x45435355 // "ECSU" bitrotFormatVersion uint16 = 1 bitrotHeaderSize = 14 // magic(4)+version(2)+payload_len(4)+payload_crc32c(4) @@ -117,10 +121,10 @@ func NewEncodeUUID() []byte { return b } -// isPow2MultipleOf1MiB reports whether block_size is a power-of-two multiple of -// 1 MiB (i.e. a power of two that is at least 1 MiB). +// isPow2MultipleOf1MiB reports whether block_size is a power of two in +// [1 MiB, MaxBitrotBlockSize]. func isPow2MultipleOf1MiB(blockSize uint32) bool { - return blockSize >= (1<<20) && bits.OnesCount32(blockSize) == 1 + return blockSize >= (1<<20) && blockSize <= MaxBitrotBlockSize && bits.OnesCount32(blockSize) == 1 } // shardChecksumBuilder accumulates the per-block CRC32C of a single shard's byte @@ -300,7 +304,7 @@ func ValidateBitrotManifest(prot *volume_server_pb.EcBitrotProtection, dataShard } bs := int64(prot.BlockSize) if !isPow2MultipleOf1MiB(prot.BlockSize) { - return fmt.Errorf("invalid block_size %d (must be a power-of-two multiple of 1 MiB)", prot.BlockSize) + return fmt.Errorf("invalid block_size %d (must be a power-of-two multiple of 1 MiB, at most %d)", prot.BlockSize, MaxBitrotBlockSize) } total := dataShards + parityShards if total <= 0 || total > MaxShardCount { diff --git a/weed/storage/erasure_coding/ec_bitrot_test.go b/weed/storage/erasure_coding/ec_bitrot_test.go index 62b6504d7..b8cf6758a 100644 --- a/weed/storage/erasure_coding/ec_bitrot_test.go +++ b/weed/storage/erasure_coding/ec_bitrot_test.go @@ -108,12 +108,13 @@ func TestValidateBitrotManifest(t *testing.T) { t.Fatalf("valid manifest rejected: %v", err) } bad := map[string]func(*volume_server_pb.EcBitrotProtection){ - "missing shard": func(p *volume_server_pb.EcBitrotProtection) { p.Shards = p.Shards[:2] }, - "out of range id": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[2].ShardId = 9 }, - "duplicate id": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[2].ShardId = 0 }, - "zero covered": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[0].CoveredSize = 0 }, - "bad crc count": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[0].BlockCrc32C = []byte{1, 2, 3} }, - "bad block size": func(p *volume_server_pb.EcBitrotProtection) { p.BlockSize = 3 << 20 }, // not power of two + "missing shard": func(p *volume_server_pb.EcBitrotProtection) { p.Shards = p.Shards[:2] }, + "out of range id": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[2].ShardId = 9 }, + "duplicate id": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[2].ShardId = 0 }, + "zero covered": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[0].CoveredSize = 0 }, + "bad crc count": func(p *volume_server_pb.EcBitrotProtection) { p.Shards[0].BlockCrc32C = []byte{1, 2, 3} }, + "bad block size": func(p *volume_server_pb.EcBitrotProtection) { p.BlockSize = 3 << 20 }, // not power of two + "oversized block size": func(p *volume_server_pb.EcBitrotProtection) { p.BlockSize = 128 << 20 }, // power of two but > MaxBitrotBlockSize "bad algorithm": func(p *volume_server_pb.EcBitrotProtection) { p.Algorithm = volume_server_pb.ChecksumAlgorithm_CHECKSUM_NONE },