diff --git a/weed/shell/command_volume_merge.go b/weed/shell/command_volume_merge.go index 6000409f2..06c4407d8 100644 --- a/weed/shell/command_volume_merge.go +++ b/weed/shell/command_volume_merge.go @@ -52,8 +52,9 @@ This command: 1) marks the volume readonly on replicas (if not already) 2) allocates a temporary copy on a third location 3) merges replicas in append timestamp order, skipping duplicates - 4) replaces the original replicas with the merged volume - 5) restores writable state if it was writable before + 4) verifies the merged copy is not short before touching the replicas + 5) replaces the original replicas with the merged volume + 6) restores writable state if it was writable before ` } @@ -173,17 +174,27 @@ func (c *commandVolumeMerge) Do(args []string, commandEnv *CommandEnv, writer io return mergeErr } - for _, replica := range replicas { + // Verify the merged copy before overwriting any replica. A short or empty + // merge stamped over every replica at once is unrecoverable; on failure the + // originals are left intact and the bad merged copy is cleaned up. + if err = verifyMergedVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, replicas); err != nil { + return err + } + + // The merged copy is now the only authoritative copy. Keep it until every + // replica is rebuilt from it so a mid-loop failure can still be finished. + cleanupTarget = false + + for i, replica := range replicas { sourceServer := pb.NewServerAddressFromDataNode(replica.location.dataNode) if _, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, targetServer, sourceServer, "", 0, false); err != nil { - return err + return fmt.Errorf("rebuild replica %d/%d on %s from merged volume %d: %w; merged copy kept on %s, re-run to finish", i+1, len(replicas), sourceServer, volumeId, err, targetServer) } } if err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false, false); err != nil { return err } - cleanupTarget = false fmt.Fprintf(writer, "merged volume %d from %d replicas via %s\n", volumeId, len(replicas), targetServer) return nil @@ -469,19 +480,13 @@ func ensureVolumeReadonly(commandEnv *CommandEnv, replicas []*VolumeReplica) ([] var writableReplicaIndices []int for i, replica := range replicas { server := pb.NewServerAddressFromDataNode(replica.location.dataNode) - err := operation.WithVolumeServerClient(false, server, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - resp, err := client.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{VolumeId: replica.info.Id}) - if err != nil { - return err - } - if !resp.IsReadOnly { - writableReplicaIndices = append(writableReplicaIndices, i) - } - return nil - }) + status, err := readVolumeStatus(commandEnv.option.GrpcDialOption, server, needle.VolumeId(replica.info.Id)) if err != nil { return nil, err } + if !status.IsReadOnly { + writableReplicaIndices = append(writableReplicaIndices, i) + } } if len(writableReplicaIndices) > 0 { if err := markReplicasWritable(commandEnv.option.GrpcDialOption, replicas, false, false); err != nil { @@ -491,6 +496,69 @@ func ensureVolumeReadonly(commandEnv *CommandEnv, replicas []*VolumeReplica) ([] return writableReplicaIndices, nil } +// verifyMergedVolume checks the freshly merged copy is at least as complete as the +// most complete source replica before the originals are overwritten. +func verifyMergedVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, targetServer pb.ServerAddress, replicas []*VolumeReplica) error { + merged, err := readVolumeStatus(grpcDialOption, targetServer, volumeId) + if err != nil { + return fmt.Errorf("read merged volume %d on %s: %w", volumeId, targetServer, err) + } + replicaLive := make(map[pb.ServerAddress]uint64, len(replicas)) + for _, replica := range replicas { + server := pb.NewServerAddressFromDataNode(replica.location.dataNode) + status, err := readVolumeStatus(grpcDialOption, server, volumeId) + if err != nil { + return fmt.Errorf("read replica volume %d on %s: %w", volumeId, server, err) + } + replicaLive[server] = liveNeedleCount(status) + } + return evaluateMergedVolume(volumeId, liveNeedleCount(merged), replicaLive) +} + +// evaluateMergedVolume reports why a merged copy with mergedLive live needles is +// unsafe to overwrite replicas whose live counts are replicaLive. The merge is the +// union of every replica's needles, so the merged copy must be non-empty and hold +// at least as many live needles as the most complete replica. +func evaluateMergedVolume(volumeId needle.VolumeId, mergedLive uint64, replicaLive map[pb.ServerAddress]uint64) error { + if mergedLive == 0 { + return fmt.Errorf("merged volume %d is empty; keeping original replicas", volumeId) + } + var maxLive uint64 + var maxFrom pb.ServerAddress + for server, live := range replicaLive { + if live > maxLive { + maxLive, maxFrom = live, server + } + } + if mergedLive < maxLive { + return fmt.Errorf("merged volume %d has %d live needles, fewer than replica %s with %d; refusing to overwrite replicas", volumeId, mergedLive, maxFrom, maxLive) + } + return nil +} + +func liveNeedleCount(status *volume_server_pb.VolumeStatusResponse) uint64 { + if status == nil || status.FileCount < status.FileDeletedCount { + return 0 + } + return status.FileCount - status.FileDeletedCount +} + +func readVolumeStatus(grpcDialOption grpc.DialOption, server pb.ServerAddress, volumeId needle.VolumeId) (*volume_server_pb.VolumeStatusResponse, error) { + var resp *volume_server_pb.VolumeStatusResponse + err := operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + r, statusErr := client.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{VolumeId: uint32(volumeId)}) + if statusErr != nil { + return statusErr + } + resp = r + return nil + }) + if err == nil && resp == nil { + return nil, fmt.Errorf("empty volume status response from %s", server) + } + return resp, err +} + func isReplicaServer(target pb.ServerAddress, replicas []*VolumeReplica) bool { for _, replica := range replicas { if pb.NewServerAddressFromDataNode(replica.location.dataNode).Equals(target) { diff --git a/weed/shell/command_volume_merge_test.go b/weed/shell/command_volume_merge_test.go index 0ba72b752..d1caac273 100644 --- a/weed/shell/command_volume_merge_test.go +++ b/weed/shell/command_volume_merge_test.go @@ -4,6 +4,7 @@ import ( "reflect" "testing" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" ) @@ -25,6 +26,24 @@ func (s *sliceNeedleStream) Err() error { return nil } +func TestEvaluateMergedVolume(t *testing.T) { + const vid = needle.VolumeId(7) + a, b := pb.ServerAddress("a:8080"), pb.ServerAddress("b:8080") + + if err := evaluateMergedVolume(vid, 0, map[pb.ServerAddress]uint64{a: 5}); err == nil { + t.Fatal("expected empty merged volume to be rejected") + } + if err := evaluateMergedVolume(vid, 4, map[pb.ServerAddress]uint64{a: 5, b: 3}); err == nil { + t.Fatal("expected short merged volume to be rejected") + } + if err := evaluateMergedVolume(vid, 5, map[pb.ServerAddress]uint64{a: 5, b: 3}); err != nil { + t.Fatalf("expected merged volume matching largest replica to pass: %v", err) + } + if err := evaluateMergedVolume(vid, 9, map[pb.ServerAddress]uint64{a: 5, b: 3}); err != nil { + t.Fatalf("expected larger merged volume to pass: %v", err) + } +} + func TestMergeNeedleStreamsOrdersByTimestamp(t *testing.T) { streamA := &sliceNeedleStream{needles: []*needle.Needle{ {Id: 1, AppendAtNs: 10_000_000_100},