From 691e601e6f21078d074e4cc39a9922d8017bc411 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 28 May 2026 13:06:21 -0700 Subject: [PATCH] fix(ec): prefer credible replica as canonical metric in EC detection (#9717) * fix(ec): prefer credible replica as canonical metric in EC detection An interrupted encode can leave a 0-byte .dat replica behind. When that stub sits on a lower-sorting server than the real replica, the lowest-server canonical pick reported Size=0, tripped the min-size gate, and the volume was stranded in skippedTooSmall: detection never proposed an encode, so the partial EC shards were never cleared and re-distribute kept hitting the mounted-volume guard. selectCanonicalMetric now prefers the lowest-server credible replica (data-bearing, not already EC), falling back to the lowest-server metric only when nothing is credible so the downstream gates skip as before. A leftover EC shard set on a lower server no longer short-circuits the volume at the IsECVolume guard either, so the orphan-source cleanup and re-encode paths get their chance. * fix(ec): treat a bare superblock .dat as a stub too An interrupted encode or copy can write the 8-byte superblock and then fail, leaving an 8-byte .dat with no data. isStubReplica used a strict < so that file slipped through as credible, could win the canonical pick on a low server, and re-tripped the min-size gate. Use <= the superblock so a data-less .dat never shadows a real replica. --- weed/worker/tasks/erasure_coding/detection.go | 11 +- weed/worker/tasks/erasure_coding/planning.go | 43 ++++++ .../tasks/erasure_coding/planning_test.go | 140 ++++++++++++++++++ 3 files changed, 187 insertions(+), 7 deletions(-) create mode 100644 weed/worker/tasks/erasure_coding/planning.go create mode 100644 weed/worker/tasks/erasure_coding/planning_test.go diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index 666a2d7ac..c22197625 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -123,13 +123,10 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste groupMetrics := volumeGroups[volumeID] - // Find canonical metric (lowest Server ID) to ensure consistent task deduplication - metric := groupMetrics[0] - for _, m := range groupMetrics { - if m.Server < metric.Server { - metric = m - } - } + // Prefer the lowest-server credible replica so a 0-byte stub or a + // leftover EC shard set on a lower server can't become canonical and + // strand the volume in skippedTooSmall / skippedAlreadyEC. + metric := selectCanonicalMetric(groupMetrics) // Skip if already EC volume if metric.IsECVolume { diff --git a/weed/worker/tasks/erasure_coding/planning.go b/weed/worker/tasks/erasure_coding/planning.go new file mode 100644 index 000000000..0d6eb6b99 --- /dev/null +++ b/weed/worker/tasks/erasure_coding/planning.go @@ -0,0 +1,43 @@ +package erasure_coding + +import ( + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/worker/types" +) + +// isStubReplica reports whether a regular replica's .dat is too small to hold +// any data — at most a bare superblock. An interrupted encode or copy can +// leave a 0-byte .dat, or write the 8-byte superblock and then fail; either +// way the file is not an encode source and must not be counted toward +// size/fullness checks. +func isStubReplica(size uint64) bool { + return size <= uint64(super_block.SuperBlockSize) +} + +// selectCanonicalMetric picks the metric that drives the encode checks and +// names the encode source server. It prefers the lowest-server credible +// replica — data-bearing and not already EC — so a 0-byte stub or a leftover +// EC shard set sharing the volume id cannot become canonical and strand the +// volume (a stub trips the min-size gate; an EC metric trips the IsECVolume +// guard, hiding the volume from both orphan-source cleanup and re-encode). +// When nothing is credible there is nothing to encode, so the lowest-server +// metric is returned unchanged and the downstream gates make the skip +// decision as before. +func selectCanonicalMetric(group []*types.VolumeHealthMetrics) *types.VolumeHealthMetrics { + var lowest, credible *types.VolumeHealthMetrics + for _, m := range group { + if lowest == nil || m.Server < lowest.Server { + lowest = m + } + if m.IsECVolume || isStubReplica(m.Size) { + continue + } + if credible == nil || m.Server < credible.Server { + credible = m + } + } + if credible != nil { + return credible + } + return lowest +} diff --git a/weed/worker/tasks/erasure_coding/planning_test.go b/weed/worker/tasks/erasure_coding/planning_test.go new file mode 100644 index 000000000..87c72e36d --- /dev/null +++ b/weed/worker/tasks/erasure_coding/planning_test.go @@ -0,0 +1,140 @@ +package erasure_coding + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/worker/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIsStubReplica(t *testing.T) { + assert.True(t, isStubReplica(0), "0-byte .dat is a stub") + assert.True(t, isStubReplica(uint64(super_block.SuperBlockSize)-1), "below a superblock is a stub") + assert.True(t, isStubReplica(uint64(super_block.SuperBlockSize)), "a bare superblock holds no data — a stub") + assert.False(t, isStubReplica(uint64(super_block.SuperBlockSize)+1), "data beyond the superblock is a real replica") + assert.False(t, isStubReplica(200*1024*1024), "a data-bearing replica is not a stub") +} + +// A 0-byte stub left by an interrupted encode often sorts to a lower server id +// than the real replica. The old lowest-server canonical pick then reported +// Size=0, tripped the min-size gate, and the volume was stranded in +// skippedTooSmall forever. selectCanonicalMetric must skip the stub and return +// the data-bearing replica. +func TestSelectCanonicalMetricPrefersCredibleOverLowServerStub(t *testing.T) { + stub := &types.VolumeHealthMetrics{VolumeID: 13, Server: "10.0.0.1:8080", Size: 0} + real := &types.VolumeHealthMetrics{VolumeID: 13, Server: "10.0.0.4:8080", Size: 200 * 1024 * 1024} + + got := selectCanonicalMetric([]*types.VolumeHealthMetrics{stub, real}) + require.Same(t, real, got) +} + +// An EC-side metric (the partial shards from a failed encode) can also sort +// below the regular replica. Picking it would short-circuit at the +// IsECVolume guard and skip the volume, hiding it from both the orphan-source +// cleanup and the re-encode path. The credible regular replica must win. +func TestSelectCanonicalMetricSkipsECMetrics(t *testing.T) { + ecMetric := &types.VolumeHealthMetrics{VolumeID: 13, Server: "10.0.0.1:8080", Size: 50 * 1024 * 1024, IsECVolume: true} + real := &types.VolumeHealthMetrics{VolumeID: 13, Server: "10.0.0.4:8080", Size: 200 * 1024 * 1024} + + got := selectCanonicalMetric([]*types.VolumeHealthMetrics{ecMetric, real}) + require.Same(t, real, got) +} + +// When nothing is credible there is nothing to encode. Fall back to the +// lowest-server metric so the downstream gates (min-size / IsECVolume) make +// the skip decision exactly as before — selectCanonicalMetric must not invent +// a source. +func TestSelectCanonicalMetricAllStubsFallsBackToLowestServer(t *testing.T) { + stubHi := &types.VolumeHealthMetrics{VolumeID: 13, Server: "10.0.0.4:8080", Size: 0} + stubLo := &types.VolumeHealthMetrics{VolumeID: 13, Server: "10.0.0.1:8080", Size: 0} + + got := selectCanonicalMetric([]*types.VolumeHealthMetrics{stubHi, stubLo}) + require.Same(t, stubLo, got) +} + +// Among several credible replicas the lowest server id still wins, preserving +// the deterministic canonical choice the task-dedup logic relies on. +func TestSelectCanonicalMetricTieBreaksByServerAmongCredible(t *testing.T) { + hi := &types.VolumeHealthMetrics{VolumeID: 13, Server: "10.0.0.7:8080", Size: 200 * 1024 * 1024} + lo := &types.VolumeHealthMetrics{VolumeID: 13, Server: "10.0.0.2:8080", Size: 200 * 1024 * 1024} + + got := selectCanonicalMetric([]*types.VolumeHealthMetrics{hi, lo}) + require.Same(t, lo, got) +} + +func TestSelectCanonicalMetricEmpty(t *testing.T) { + require.Nil(t, selectCanonicalMetric(nil)) + require.Nil(t, selectCanonicalMetric([]*types.VolumeHealthMetrics{})) +} + +// End-to-end regression for the stranded-volume bug: a volume whose lowest +// server holds a 0-byte stub and whose real replica is on a higher server must +// still be proposed for EC encoding, not silently dropped as too-small. +func TestDetectionEncodesDespiteLowServerStub(t *testing.T) { + const volumeID uint32 = 13 + activeTopology := buildStubReplicaTopology(t, volumeID) + clusterInfo := &types.ClusterInfo{ActiveTopology: activeTopology} + + lastModified := time.Now().Add(-2 * time.Hour) + metrics := []*types.VolumeHealthMetrics{ + // Stub on the lowest server: an interrupted encode left a 0-byte .dat. + {VolumeID: volumeID, Server: "127.0.0.1:8080", Size: 0, FullnessRatio: 0, LastModified: lastModified, Age: time.Since(lastModified)}, + // The real replica on a higher server. + {VolumeID: volumeID, Server: "127.0.0.1:8081", Size: 200 * 1024 * 1024, FullnessRatio: 0.96, LastModified: lastModified, Age: time.Since(lastModified)}, + } + + results, _, err := Detection(context.Background(), metrics, clusterInfo, NewDefaultConfig(), 0) + require.NoError(t, err) + require.Len(t, results, 1, "volume with a real replica must be proposed for EC despite a low-server stub") + assert.Equal(t, types.TaskTypeErasureCoding, results[0].TaskType) + assert.Equal(t, volumeID, results[0].VolumeID) +} + +// buildStubReplicaTopology builds a cluster with TotalShardsCount single-disk +// nodes (enough targets to place every shard) where node 0 holds a 0-byte stub +// replica of volumeID and node 1 holds the real replica. +func buildStubReplicaTopology(t *testing.T, volumeID uint32) *topology.ActiveTopology { + t.Helper() + activeTopology := topology.NewActiveTopology(10) + nodes := make([]*master_pb.DataNodeInfo, 0, erasure_coding.TotalShardsCount) + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + diskInfo := &master_pb.DiskInfo{ + DiskId: 0, + MaxVolumeCount: 200, + } + switch i { + case 0: + diskInfo.VolumeInfos = []*master_pb.VolumeInformationMessage{{ + Id: volumeID, DiskId: 0, DiskType: "hdd", Size: 0, + }} + diskInfo.VolumeCount = 1 + case 1: + diskInfo.VolumeInfos = []*master_pb.VolumeInformationMessage{{ + Id: volumeID, DiskId: 0, DiskType: "hdd", Size: 200 * 1024 * 1024, + }} + diskInfo.VolumeCount = 1 + } + nodes = append(nodes, &master_pb.DataNodeInfo{ + Id: fmt.Sprintf("127.0.0.1:%d", 8080+i), + DiskInfos: map[string]*master_pb.DiskInfo{"hdd": diskInfo}, + }) + } + require.NoError(t, activeTopology.UpdateTopology(&master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{{ + Id: "dc1", + RackInfos: []*master_pb.RackInfo{{ + Id: "rack1", + DataNodeInfos: nodes, + }}, + }}, + })) + return activeTopology +}