From 5955972fe6f1889fd96db00ebec94a18e459171c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 28 May 2026 19:29:25 -0700 Subject: [PATCH] fix(shell): verify volume.merge output before overwriting replicas (#9731) * fix(shell): verify volume.merge output before overwriting replicas volume.merge overwrote every replica with the merged copy without checking it was complete. Read back the merged copy and refuse to overwrite unless it holds at least as many live needles as the most complete source replica, leaving the originals intact on a short or empty merge. * fix(shell): keep merged volume until all replicas are rebuilt On a copy failure partway through the overwrite loop, the temporary merged copy was deleted along with the half-rebuilt replicas. Stop deleting it until every replica has been rebuilt; on failure the verified copy is kept so the merge can be re-run to completion. * refactor(shell): reuse readVolumeStatus in ensureVolumeReadonly * fix(shell): guard against nil volume status response --- weed/shell/command_volume_merge.go | 98 +++++++++++++++++++++---- weed/shell/command_volume_merge_test.go | 19 +++++ 2 files changed, 102 insertions(+), 15 deletions(-) diff --git a/weed/shell/command_volume_merge.go b/weed/shell/command_volume_merge.go index 6000409f2..06c4407d8 100644 --- a/weed/shell/command_volume_merge.go +++ b/weed/shell/command_volume_merge.go @@ -52,8 +52,9 @@ This command: 1) marks the volume readonly on replicas (if not already) 2) allocates a temporary copy on a third location 3) merges replicas in append timestamp order, skipping duplicates - 4) replaces the original replicas with the merged volume - 5) restores writable state if it was writable before + 4) verifies the merged copy is not short before touching the replicas + 5) replaces the original replicas with the merged volume + 6) restores writable state if it was writable before ` } @@ -173,17 +174,27 @@ func (c *commandVolumeMerge) Do(args []string, commandEnv *CommandEnv, writer io return mergeErr } - for _, replica := range replicas { + // Verify the merged copy before overwriting any replica. A short or empty + // merge stamped over every replica at once is unrecoverable; on failure the + // originals are left intact and the bad merged copy is cleaned up. + if err = verifyMergedVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, replicas); err != nil { + return err + } + + // The merged copy is now the only authoritative copy. Keep it until every + // replica is rebuilt from it so a mid-loop failure can still be finished. + cleanupTarget = false + + for i, replica := range replicas { sourceServer := pb.NewServerAddressFromDataNode(replica.location.dataNode) if _, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, targetServer, sourceServer, "", 0, false); err != nil { - return err + return fmt.Errorf("rebuild replica %d/%d on %s from merged volume %d: %w; merged copy kept on %s, re-run to finish", i+1, len(replicas), sourceServer, volumeId, err, targetServer) } } if err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false, false); err != nil { return err } - cleanupTarget = false fmt.Fprintf(writer, "merged volume %d from %d replicas via %s\n", volumeId, len(replicas), targetServer) return nil @@ -469,19 +480,13 @@ func ensureVolumeReadonly(commandEnv *CommandEnv, replicas []*VolumeReplica) ([] var writableReplicaIndices []int for i, replica := range replicas { server := pb.NewServerAddressFromDataNode(replica.location.dataNode) - err := operation.WithVolumeServerClient(false, server, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{VolumeId: replica.info.Id}) - if err != nil { - return err - } - if !resp.IsReadOnly { - writableReplicaIndices = append(writableReplicaIndices, i) - } - return nil - }) + status, err := readVolumeStatus(commandEnv.option.GrpcDialOption, server, needle.VolumeId(replica.info.Id)) if err != nil { return nil, err } + if !status.IsReadOnly { + writableReplicaIndices = append(writableReplicaIndices, i) + } } if len(writableReplicaIndices) > 0 { if err := markReplicasWritable(commandEnv.option.GrpcDialOption, replicas, false, false); err != nil { @@ -491,6 +496,69 @@ func ensureVolumeReadonly(commandEnv *CommandEnv, replicas []*VolumeReplica) ([] return writableReplicaIndices, nil } +// verifyMergedVolume checks the freshly merged copy is at least as complete as the +// most complete source replica before the originals are overwritten. +func verifyMergedVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, targetServer pb.ServerAddress, replicas []*VolumeReplica) error { + merged, err := readVolumeStatus(grpcDialOption, targetServer, volumeId) + if err != nil { + return fmt.Errorf("read merged volume %d on %s: %w", volumeId, targetServer, err) + } + replicaLive := make(map[pb.ServerAddress]uint64, len(replicas)) + for _, replica := range replicas { + server := pb.NewServerAddressFromDataNode(replica.location.dataNode) + status, err := readVolumeStatus(grpcDialOption, server, volumeId) + if err != nil { + return fmt.Errorf("read replica volume %d on %s: %w", volumeId, server, err) + } + replicaLive[server] = liveNeedleCount(status) + } + return evaluateMergedVolume(volumeId, liveNeedleCount(merged), replicaLive) +} + +// evaluateMergedVolume reports why a merged copy with mergedLive live needles is +// unsafe to overwrite replicas whose live counts are replicaLive. The merge is the +// union of every replica's needles, so the merged copy must be non-empty and hold +// at least as many live needles as the most complete replica. +func evaluateMergedVolume(volumeId needle.VolumeId, mergedLive uint64, replicaLive map[pb.ServerAddress]uint64) error { + if mergedLive == 0 { + return fmt.Errorf("merged volume %d is empty; keeping original replicas", volumeId) + } + var maxLive uint64 + var maxFrom pb.ServerAddress + for server, live := range replicaLive { + if live > maxLive { + maxLive, maxFrom = live, server + } + } + if mergedLive < maxLive { + return fmt.Errorf("merged volume %d has %d live needles, fewer than replica %s with %d; refusing to overwrite replicas", volumeId, mergedLive, maxFrom, maxLive) + } + return nil +} + +func liveNeedleCount(status *volume_server_pb.VolumeStatusResponse) uint64 { + if status == nil || status.FileCount < status.FileDeletedCount { + return 0 + } + return status.FileCount - status.FileDeletedCount +} + +func readVolumeStatus(grpcDialOption grpc.DialOption, server pb.ServerAddress, volumeId needle.VolumeId) (*volume_server_pb.VolumeStatusResponse, error) { + var resp *volume_server_pb.VolumeStatusResponse + err := operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + r, statusErr := client.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{VolumeId: uint32(volumeId)}) + if statusErr != nil { + return statusErr + } + resp = r + return nil + }) + if err == nil && resp == nil { + return nil, fmt.Errorf("empty volume status response from %s", server) + } + return resp, err +} + func isReplicaServer(target pb.ServerAddress, replicas []*VolumeReplica) bool { for _, replica := range replicas { if pb.NewServerAddressFromDataNode(replica.location.dataNode).Equals(target) { diff --git a/weed/shell/command_volume_merge_test.go b/weed/shell/command_volume_merge_test.go index 0ba72b752..d1caac273 100644 --- a/weed/shell/command_volume_merge_test.go +++ b/weed/shell/command_volume_merge_test.go @@ -4,6 +4,7 @@ import ( "reflect" "testing" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) @@ -25,6 +26,24 @@ func (s *sliceNeedleStream) Err() error { return nil } +func TestEvaluateMergedVolume(t *testing.T) { + const vid = needle.VolumeId(7) + a, b := pb.ServerAddress("a:8080"), pb.ServerAddress("b:8080") + + if err := evaluateMergedVolume(vid, 0, map[pb.ServerAddress]uint64{a: 5}); err == nil { + t.Fatal("expected empty merged volume to be rejected") + } + if err := evaluateMergedVolume(vid, 4, map[pb.ServerAddress]uint64{a: 5, b: 3}); err == nil { + t.Fatal("expected short merged volume to be rejected") + } + if err := evaluateMergedVolume(vid, 5, map[pb.ServerAddress]uint64{a: 5, b: 3}); err != nil { + t.Fatalf("expected merged volume matching largest replica to pass: %v", err) + } + if err := evaluateMergedVolume(vid, 9, map[pb.ServerAddress]uint64{a: 5, b: 3}); err != nil { + t.Fatalf("expected larger merged volume to pass: %v", err) + } +} + func TestMergeNeedleStreamsOrdersByTimestamp(t *testing.T) { streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: 10_000_000_100},