diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index a2c045ff8..d69840c4c 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -102,6 +102,27 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste continue } + // Skip when EC shards for this (volume, collection) already exist in the + // topology. metric.IsECVolume above is set only for the EC-side metric + // path, so when the master heartbeats BOTH a regular replica AND its + // EC shards in parallel — the "stuck source" state from #9448 — the + // canonical metric we picked is the regular replica with + // IsECVolume=false. Re-proposing an encode in that state collides with + // the already-mounted shards on the targets (the volume server returns + // "ec volume %d is mounted; refusing overwrite") and the detector + // re-queues forever. Skip and surface the orphaned source replica so + // an admin can clean it up. + if clusterInfo.ActiveTopology != nil { + if existingShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection); len(existingShards) > 0 { + glog.Warningf("EC Detection: Volume %d already has EC shards on %d location(s) in the topology; "+ + "skipping re-encode. Source replica on %s appears orphaned (issue #9448). "+ + "To clean up, run `volume.delete -volumeId=%d` on the source server after verifying the EC shards are healthy.", + metric.VolumeID, len(existingShards), metric.Server, metric.VolumeID) + skippedAlreadyEC++ + continue + } + } + // Check minimum size requirement if metric.Size < minSizeBytes { skippedTooSmall++ diff --git a/weed/worker/tasks/erasure_coding/detection_test.go b/weed/worker/tasks/erasure_coding/detection_test.go index 4ded8e2bc..36b6ee56c 100644 --- a/weed/worker/tasks/erasure_coding/detection_test.go +++ b/weed/worker/tasks/erasure_coding/detection_test.go @@ -127,6 +127,75 @@ func TestECPlacementPlannerFallsBackWhenTagsInsufficient(t *testing.T) { assert.Less(t, taggedCount, len(selected)) } +// TestDetectionSkipsWhenECShardsAlreadyExist guards against issue #9448: a +// regular replica that survived a previous successful EC encode (source +// delete didn't clean it up for some reason) gets re-proposed for encoding, +// the new encode collides with the already-mounted shards on the targets +// ("ec volume %d is mounted; refusing overwrite"), and detection loops +// forever on the same volume. Detection must see the existing shards and +// skip the volume so an admin can clean it up out-of-band. +func TestDetectionSkipsWhenECShardsAlreadyExist(t *testing.T) { + const volumeID uint32 = 42 + const collection = "" + + activeTopology := topology.NewActiveTopology(10) + nodes := make([]*master_pb.DataNodeInfo, 0, erasure_coding.TotalShardsCount) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + nodeID := fmt.Sprintf("127.0.0.1:%d", 8080+i) + diskInfo := &master_pb.DiskInfo{ + DiskId: 0, + VolumeCount: 1, + MaxVolumeCount: 100, + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{{ + Id: volumeID, + Collection: collection, + EcIndexBits: uint32(1) << uint(i), + DiskId: 0, + }}, + } + if i == 0 { + // The orphaned source replica that the previous encode's source + // delete didn't remove. + diskInfo.VolumeInfos = []*master_pb.VolumeInformationMessage{{ + Id: volumeID, + Collection: collection, + DiskId: 0, + DiskType: "hdd", + Size: 200 * 1024 * 1024, + }} + } + nodes = append(nodes, &master_pb.DataNodeInfo{ + Id: nodeID, + DiskInfos: map[string]*master_pb.DiskInfo{"hdd": diskInfo}, + }) + } + require.NoError(t, activeTopology.UpdateTopology(&master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{{ + Id: "dc1", + RackInfos: []*master_pb.RackInfo{{ + Id: "rack1", + DataNodeInfos: nodes, + }}, + }}, + })) + + clusterInfo := &types.ClusterInfo{ActiveTopology: activeTopology} + metrics := []*types.VolumeHealthMetrics{{ + VolumeID: volumeID, + Server: nodes[0].Id, + Size: 200 * 1024 * 1024, + Collection: collection, + FullnessRatio: 0.9, + LastModified: time.Now().Add(-time.Hour), + Age: 10 * time.Minute, + }} + + results, hasMore, err := Detection(context.Background(), metrics, clusterInfo, NewDefaultConfig(), 0) + require.NoError(t, err) + require.False(t, hasMore) + require.Empty(t, results, "stuck source replica must not produce a new EC encoding proposal") +} + func TestDetectionContextCancellation(t *testing.T) { activeTopology := buildActiveTopology(t, 5, []string{"hdd", "ssd"}, 50, 0) clusterInfo := &types.ClusterInfo{ActiveTopology: activeTopology}