fix(ec): skip re-encode when EC shards already exist for the volume (#9448)

When an earlier EC encoding succeeded but the post-encode source-delete
left a regular replica behind on one of the servers, the next detection
cycle proposes the same volume again. The new encode tries to redistribute
shards to targets that already have them mounted, the volume server
returns `ec volume %d is mounted; refusing overwrite`, the task fails,
and detection re-queues the volume. The cycle repeats forever — issue
#9448.

The existing `metric.IsECVolume` skip catches the case where the canonical
metric is reported on the EC-shard side of the heartbeat, but when the
master sees BOTH a regular replica AND its EC shards in the same volume
list, the canonical metric we pick is the regular replica and
IsECVolume is false. Add a second guard that checks the topology
directly via `findExistingECShards` (already present and indexed) and
skip the volume when any shards exist, logging a warning that points
the admin at the stuck source.

This breaks the loop. Auto-cleanup of the orphaned replica is left as
follow-up work — deleting a source replica from inside the detector is
only safe with a re-verification step right before the delete, plus a
config opt-in, and is best done in its own change.
This commit is contained in:
Chris Lu
2026-05-11 20:30:04 -07:00
parent 532b088262
commit af09e1ec7e
2 changed files with 90 additions and 0 deletions

View File

@@ -102,6 +102,27 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
continue
}
// Skip when EC shards for this (volume, collection) already exist in the
// topology. metric.IsECVolume above is set only for the EC-side metric
// path, so when the master heartbeats BOTH a regular replica AND its
// EC shards in parallel — the "stuck source" state from #9448 — the
// canonical metric we picked is the regular replica with
// IsECVolume=false. Re-proposing an encode in that state collides with
// the already-mounted shards on the targets (the volume server returns
// "ec volume %d is mounted; refusing overwrite") and the detector
// re-queues forever. Skip and surface the orphaned source replica so
// an admin can clean it up.
if clusterInfo.ActiveTopology != nil {
if existingShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection); len(existingShards) > 0 {
glog.Warningf("EC Detection: Volume %d already has EC shards on %d location(s) in the topology; "+
"skipping re-encode. Source replica on %s appears orphaned (issue #9448). "+
"To clean up, run `volume.delete -volumeId=%d` on the source server after verifying the EC shards are healthy.",
metric.VolumeID, len(existingShards), metric.Server, metric.VolumeID)
skippedAlreadyEC++
continue
}
}
// Check minimum size requirement
if metric.Size < minSizeBytes {
skippedTooSmall++

View File

@@ -127,6 +127,75 @@ func TestECPlacementPlannerFallsBackWhenTagsInsufficient(t *testing.T) {
assert.Less(t, taggedCount, len(selected))
}
// TestDetectionSkipsWhenECShardsAlreadyExist guards against issue #9448: a
// regular replica that survived a previous successful EC encode (source
// delete didn't clean it up for some reason) gets re-proposed for encoding,
// the new encode collides with the already-mounted shards on the targets
// ("ec volume %d is mounted; refusing overwrite"), and detection loops
// forever on the same volume. Detection must see the existing shards and
// skip the volume so an admin can clean it up out-of-band.
func TestDetectionSkipsWhenECShardsAlreadyExist(t *testing.T) {
const volumeID uint32 = 42
const collection = ""
activeTopology := topology.NewActiveTopology(10)
nodes := make([]*master_pb.DataNodeInfo, 0, erasure_coding.TotalShardsCount)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
nodeID := fmt.Sprintf("127.0.0.1:%d", 8080+i)
diskInfo := &master_pb.DiskInfo{
DiskId: 0,
VolumeCount: 1,
MaxVolumeCount: 100,
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{{
Id: volumeID,
Collection: collection,
EcIndexBits: uint32(1) << uint(i),
DiskId: 0,
}},
}
if i == 0 {
// The orphaned source replica that the previous encode's source
// delete didn't remove.
diskInfo.VolumeInfos = []*master_pb.VolumeInformationMessage{{
Id: volumeID,
Collection: collection,
DiskId: 0,
DiskType: "hdd",
Size: 200 * 1024 * 1024,
}}
}
nodes = append(nodes, &master_pb.DataNodeInfo{
Id: nodeID,
DiskInfos: map[string]*master_pb.DiskInfo{"hdd": diskInfo},
})
}
require.NoError(t, activeTopology.UpdateTopology(&master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{{
Id: "dc1",
RackInfos: []*master_pb.RackInfo{{
Id: "rack1",
DataNodeInfos: nodes,
}},
}},
}))
clusterInfo := &types.ClusterInfo{ActiveTopology: activeTopology}
metrics := []*types.VolumeHealthMetrics{{
VolumeID: volumeID,
Server: nodes[0].Id,
Size: 200 * 1024 * 1024,
Collection: collection,
FullnessRatio: 0.9,
LastModified: time.Now().Add(-time.Hour),
Age: 10 * time.Minute,
}}
results, hasMore, err := Detection(context.Background(), metrics, clusterInfo, NewDefaultConfig(), 0)
require.NoError(t, err)
require.False(t, hasMore)
require.Empty(t, results, "stuck source replica must not produce a new EC encoding proposal")
}
func TestDetectionContextCancellation(t *testing.T) {
activeTopology := buildActiveTopology(t, 5, []string{"hdd", "ssd"}, 50, 0)
clusterInfo := &types.ClusterInfo{ActiveTopology: activeTopology}