mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-14 05:41:29 +00:00
fix(ec): skip re-encode when EC shards already exist for the volume (#9448)
When an earlier EC encoding succeeded but the post-encode source-delete left a regular replica behind on one of the servers, the next detection cycle proposes the same volume again. The new encode tries to redistribute shards to targets that already have them mounted, the volume server returns `ec volume %d is mounted; refusing overwrite`, the task fails, and detection re-queues the volume. The cycle repeats forever — issue #9448. The existing `metric.IsECVolume` skip catches the case where the canonical metric is reported on the EC-shard side of the heartbeat, but when the master sees BOTH a regular replica AND its EC shards in the same volume list, the canonical metric we pick is the regular replica and IsECVolume is false. Add a second guard that checks the topology directly via `findExistingECShards` (already present and indexed) and skip the volume when any shards exist, logging a warning that points the admin at the stuck source. This breaks the loop. Auto-cleanup of the orphaned replica is left as follow-up work — deleting a source replica from inside the detector is only safe with a re-verification step right before the delete, plus a config opt-in, and is best done in its own change.
This commit is contained in:
@@ -102,6 +102,27 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip when EC shards for this (volume, collection) already exist in the
|
||||
// topology. metric.IsECVolume above is set only for the EC-side metric
|
||||
// path, so when the master heartbeats BOTH a regular replica AND its
|
||||
// EC shards in parallel — the "stuck source" state from #9448 — the
|
||||
// canonical metric we picked is the regular replica with
|
||||
// IsECVolume=false. Re-proposing an encode in that state collides with
|
||||
// the already-mounted shards on the targets (the volume server returns
|
||||
// "ec volume %d is mounted; refusing overwrite") and the detector
|
||||
// re-queues forever. Skip and surface the orphaned source replica so
|
||||
// an admin can clean it up.
|
||||
if clusterInfo.ActiveTopology != nil {
|
||||
if existingShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection); len(existingShards) > 0 {
|
||||
glog.Warningf("EC Detection: Volume %d already has EC shards on %d location(s) in the topology; "+
|
||||
"skipping re-encode. Source replica on %s appears orphaned (issue #9448). "+
|
||||
"To clean up, run `volume.delete -volumeId=%d` on the source server after verifying the EC shards are healthy.",
|
||||
metric.VolumeID, len(existingShards), metric.Server, metric.VolumeID)
|
||||
skippedAlreadyEC++
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Check minimum size requirement
|
||||
if metric.Size < minSizeBytes {
|
||||
skippedTooSmall++
|
||||
|
||||
@@ -127,6 +127,75 @@ func TestECPlacementPlannerFallsBackWhenTagsInsufficient(t *testing.T) {
|
||||
assert.Less(t, taggedCount, len(selected))
|
||||
}
|
||||
|
||||
// TestDetectionSkipsWhenECShardsAlreadyExist guards against issue #9448: a
|
||||
// regular replica that survived a previous successful EC encode (source
|
||||
// delete didn't clean it up for some reason) gets re-proposed for encoding,
|
||||
// the new encode collides with the already-mounted shards on the targets
|
||||
// ("ec volume %d is mounted; refusing overwrite"), and detection loops
|
||||
// forever on the same volume. Detection must see the existing shards and
|
||||
// skip the volume so an admin can clean it up out-of-band.
|
||||
func TestDetectionSkipsWhenECShardsAlreadyExist(t *testing.T) {
|
||||
const volumeID uint32 = 42
|
||||
const collection = ""
|
||||
|
||||
activeTopology := topology.NewActiveTopology(10)
|
||||
nodes := make([]*master_pb.DataNodeInfo, 0, erasure_coding.TotalShardsCount)
|
||||
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
|
||||
nodeID := fmt.Sprintf("127.0.0.1:%d", 8080+i)
|
||||
diskInfo := &master_pb.DiskInfo{
|
||||
DiskId: 0,
|
||||
VolumeCount: 1,
|
||||
MaxVolumeCount: 100,
|
||||
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{{
|
||||
Id: volumeID,
|
||||
Collection: collection,
|
||||
EcIndexBits: uint32(1) << uint(i),
|
||||
DiskId: 0,
|
||||
}},
|
||||
}
|
||||
if i == 0 {
|
||||
// The orphaned source replica that the previous encode's source
|
||||
// delete didn't remove.
|
||||
diskInfo.VolumeInfos = []*master_pb.VolumeInformationMessage{{
|
||||
Id: volumeID,
|
||||
Collection: collection,
|
||||
DiskId: 0,
|
||||
DiskType: "hdd",
|
||||
Size: 200 * 1024 * 1024,
|
||||
}}
|
||||
}
|
||||
nodes = append(nodes, &master_pb.DataNodeInfo{
|
||||
Id: nodeID,
|
||||
DiskInfos: map[string]*master_pb.DiskInfo{"hdd": diskInfo},
|
||||
})
|
||||
}
|
||||
require.NoError(t, activeTopology.UpdateTopology(&master_pb.TopologyInfo{
|
||||
DataCenterInfos: []*master_pb.DataCenterInfo{{
|
||||
Id: "dc1",
|
||||
RackInfos: []*master_pb.RackInfo{{
|
||||
Id: "rack1",
|
||||
DataNodeInfos: nodes,
|
||||
}},
|
||||
}},
|
||||
}))
|
||||
|
||||
clusterInfo := &types.ClusterInfo{ActiveTopology: activeTopology}
|
||||
metrics := []*types.VolumeHealthMetrics{{
|
||||
VolumeID: volumeID,
|
||||
Server: nodes[0].Id,
|
||||
Size: 200 * 1024 * 1024,
|
||||
Collection: collection,
|
||||
FullnessRatio: 0.9,
|
||||
LastModified: time.Now().Add(-time.Hour),
|
||||
Age: 10 * time.Minute,
|
||||
}}
|
||||
|
||||
results, hasMore, err := Detection(context.Background(), metrics, clusterInfo, NewDefaultConfig(), 0)
|
||||
require.NoError(t, err)
|
||||
require.False(t, hasMore)
|
||||
require.Empty(t, results, "stuck source replica must not produce a new EC encoding proposal")
|
||||
}
|
||||
|
||||
func TestDetectionContextCancellation(t *testing.T) {
|
||||
activeTopology := buildActiveTopology(t, 5, []string{"hdd", "ssd"}, 50, 0)
|
||||
clusterInfo := &types.ClusterInfo{ActiveTopology: activeTopology}
|
||||
|
||||
Reference in New Issue
Block a user