fix(shell): verify volume.merge output before overwriting replicas (#9731)

* fix(shell): verify volume.merge output before overwriting replicas

volume.merge overwrote every replica with the merged copy without checking it was complete. Read back the merged copy and refuse to overwrite unless it holds at least as many live needles as the most complete source replica, leaving the originals intact on a short or empty merge.

* fix(shell): keep merged volume until all replicas are rebuilt

On a copy failure partway through the overwrite loop, the temporary merged copy was deleted along with the half-rebuilt replicas. Stop deleting it until every replica has been rebuilt; on failure the verified copy is kept so the merge can be re-run to completion.

* refactor(shell): reuse readVolumeStatus in ensureVolumeReadonly

* fix(shell): guard against nil volume status response
This commit is contained in:
Chris Lu
2026-05-28 19:29:25 -07:00
committed by GitHub
parent 16717b0bf4
commit 5955972fe6
2 changed files with 102 additions and 15 deletions

View File

@@ -52,8 +52,9 @@ This command:
1) marks the volume readonly on replicas (if not already)
2) allocates a temporary copy on a third location
3) merges replicas in append timestamp order, skipping duplicates
4) replaces the original replicas with the merged volume
5) restores writable state if it was writable before
4) verifies the merged copy is not short before touching the replicas
5) replaces the original replicas with the merged volume
6) restores writable state if it was writable before
`
}
@@ -173,17 +174,27 @@ func (c *commandVolumeMerge) Do(args []string, commandEnv *CommandEnv, writer io
return mergeErr
}
for _, replica := range replicas {
// Verify the merged copy before overwriting any replica. A short or empty
// merge stamped over every replica at once is unrecoverable; on failure the
// originals are left intact and the bad merged copy is cleaned up.
if err = verifyMergedVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, replicas); err != nil {
return err
}
// The merged copy is now the only authoritative copy. Keep it until every
// replica is rebuilt from it so a mid-loop failure can still be finished.
cleanupTarget = false
for i, replica := range replicas {
sourceServer := pb.NewServerAddressFromDataNode(replica.location.dataNode)
if _, err = copyVolume(commandEnv.option.GrpcDialOption, writer, volumeId, targetServer, sourceServer, "", 0, false); err != nil {
return err
return fmt.Errorf("rebuild replica %d/%d on %s from merged volume %d: %w; merged copy kept on %s, re-run to finish", i+1, len(replicas), sourceServer, volumeId, err, targetServer)
}
}
if err = deleteVolume(commandEnv.option.GrpcDialOption, volumeId, targetServer, false, false); err != nil {
return err
}
cleanupTarget = false
fmt.Fprintf(writer, "merged volume %d from %d replicas via %s\n", volumeId, len(replicas), targetServer)
return nil
@@ -469,19 +480,13 @@ func ensureVolumeReadonly(commandEnv *CommandEnv, replicas []*VolumeReplica) ([]
var writableReplicaIndices []int
for i, replica := range replicas {
server := pb.NewServerAddressFromDataNode(replica.location.dataNode)
err := operation.WithVolumeServerClient(false, server, commandEnv.option.GrpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
resp, err := client.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{VolumeId: replica.info.Id})
if err != nil {
return err
}
if !resp.IsReadOnly {
writableReplicaIndices = append(writableReplicaIndices, i)
}
return nil
})
status, err := readVolumeStatus(commandEnv.option.GrpcDialOption, server, needle.VolumeId(replica.info.Id))
if err != nil {
return nil, err
}
if !status.IsReadOnly {
writableReplicaIndices = append(writableReplicaIndices, i)
}
}
if len(writableReplicaIndices) > 0 {
if err := markReplicasWritable(commandEnv.option.GrpcDialOption, replicas, false, false); err != nil {
@@ -491,6 +496,69 @@ func ensureVolumeReadonly(commandEnv *CommandEnv, replicas []*VolumeReplica) ([]
return writableReplicaIndices, nil
}
// verifyMergedVolume checks the freshly merged copy is at least as complete as the
// most complete source replica before the originals are overwritten.
func verifyMergedVolume(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, targetServer pb.ServerAddress, replicas []*VolumeReplica) error {
merged, err := readVolumeStatus(grpcDialOption, targetServer, volumeId)
if err != nil {
return fmt.Errorf("read merged volume %d on %s: %w", volumeId, targetServer, err)
}
replicaLive := make(map[pb.ServerAddress]uint64, len(replicas))
for _, replica := range replicas {
server := pb.NewServerAddressFromDataNode(replica.location.dataNode)
status, err := readVolumeStatus(grpcDialOption, server, volumeId)
if err != nil {
return fmt.Errorf("read replica volume %d on %s: %w", volumeId, server, err)
}
replicaLive[server] = liveNeedleCount(status)
}
return evaluateMergedVolume(volumeId, liveNeedleCount(merged), replicaLive)
}
// evaluateMergedVolume reports why a merged copy with mergedLive live needles is
// unsafe to overwrite replicas whose live counts are replicaLive. The merge is the
// union of every replica's needles, so the merged copy must be non-empty and hold
// at least as many live needles as the most complete replica.
func evaluateMergedVolume(volumeId needle.VolumeId, mergedLive uint64, replicaLive map[pb.ServerAddress]uint64) error {
if mergedLive == 0 {
return fmt.Errorf("merged volume %d is empty; keeping original replicas", volumeId)
}
var maxLive uint64
var maxFrom pb.ServerAddress
for server, live := range replicaLive {
if live > maxLive {
maxLive, maxFrom = live, server
}
}
if mergedLive < maxLive {
return fmt.Errorf("merged volume %d has %d live needles, fewer than replica %s with %d; refusing to overwrite replicas", volumeId, mergedLive, maxFrom, maxLive)
}
return nil
}
func liveNeedleCount(status *volume_server_pb.VolumeStatusResponse) uint64 {
if status == nil || status.FileCount < status.FileDeletedCount {
return 0
}
return status.FileCount - status.FileDeletedCount
}
func readVolumeStatus(grpcDialOption grpc.DialOption, server pb.ServerAddress, volumeId needle.VolumeId) (*volume_server_pb.VolumeStatusResponse, error) {
var resp *volume_server_pb.VolumeStatusResponse
err := operation.WithVolumeServerClient(false, server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
r, statusErr := client.VolumeStatus(context.Background(), &volume_server_pb.VolumeStatusRequest{VolumeId: uint32(volumeId)})
if statusErr != nil {
return statusErr
}
resp = r
return nil
})
if err == nil && resp == nil {
return nil, fmt.Errorf("empty volume status response from %s", server)
}
return resp, err
}
func isReplicaServer(target pb.ServerAddress, replicas []*VolumeReplica) bool {
for _, replica := range replicas {
if pb.NewServerAddressFromDataNode(replica.location.dataNode).Equals(target) {

View File

@@ -4,6 +4,7 @@ import (
"reflect"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
@@ -25,6 +26,24 @@ func (s *sliceNeedleStream) Err() error {
return nil
}
func TestEvaluateMergedVolume(t *testing.T) {
const vid = needle.VolumeId(7)
a, b := pb.ServerAddress("a:8080"), pb.ServerAddress("b:8080")
if err := evaluateMergedVolume(vid, 0, map[pb.ServerAddress]uint64{a: 5}); err == nil {
t.Fatal("expected empty merged volume to be rejected")
}
if err := evaluateMergedVolume(vid, 4, map[pb.ServerAddress]uint64{a: 5, b: 3}); err == nil {
t.Fatal("expected short merged volume to be rejected")
}
if err := evaluateMergedVolume(vid, 5, map[pb.ServerAddress]uint64{a: 5, b: 3}); err != nil {
t.Fatalf("expected merged volume matching largest replica to pass: %v", err)
}
if err := evaluateMergedVolume(vid, 9, map[pb.ServerAddress]uint64{a: 5, b: 3}); err != nil {
t.Fatalf("expected larger merged volume to pass: %v", err)
}
}
func TestMergeNeedleStreamsOrdersByTimestamp(t *testing.T) {
streamA := &sliceNeedleStream{needles: []*needle.Needle{
{Id: 1, AppendAtNs: 10_000_000_100},