From 024b59fb3157a564b32c996cf2f0c2de3eb9c46f Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 20 May 2026 11:50:42 -0700 Subject: [PATCH] fix(ec): pack EC shards onto fewer disks instead of refusing the task (#9588) The planner refused to create an EC task unless it found totalShards distinct (server, disk_id) targets, so a cluster with fewer disks than shards (e.g. 8 single-disk servers for a 10+4 scheme) could never encode. A disk safely holds several distinct shards of one volume: each is its own .ecNN file and ReceiveFile keys by that extension. Drop the strict check and let createECTargets round-robin shards across the available disks, matching ec.encode's "4,4,3,3" fallback. The minTotalDisks floor (ceil(total/parity)) already keeps any disk under parityShards shards, so the volume still survives losing any one disk. Reserve capacity for the actual per-disk shard count rather than assuming one shard each, so packing doesn't over-commit disk slots. --- weed/worker/tasks/erasure_coding/detection.go | 53 +++++++++----- .../tasks/erasure_coding/detection_test.go | 71 +++++++++++++++++++ 2 files changed, 108 insertions(+), 16 deletions(-) diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index a962159c9..a9134022c 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -282,16 +282,21 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)", len(replicaLocations), len(existingECShards), metric.VolumeID, len(sources)) - // Convert shard destinations to TaskDestinationSpec + // Convert shard destinations to TaskDestinationSpec. With fewer + // disks than shards a destination holds several shards, so reserve + // capacity for the actual per-disk shard count (round-robin matches + // createECTargets) rather than assuming one shard each. destinations := make([]topology.TaskDestinationSpec, len(shardDestinations)) - shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination - shardSize := int64(expectedShardSize) + shardsPerDest := distributeECShards(dataShards+parityShards, len(shardDestinations)) for i, dest := range shardDestinations { + shardCount := len(shardsPerDest[i]) + shardImpact := topology.CalculateECShardStorageImpact(int32(shardCount), int64(expectedShardSize)) + destSize := int64(expectedShardSize) * int64(shardCount) destinations[i] = topology.TaskDestinationSpec{ ServerID: dest, DiskID: shardDiskIDs[i], StorageImpact: &shardImpact, - EstimatedSize: &shardSize, + EstimatedSize: &destSize, } } @@ -698,13 +703,19 @@ func planECDestinations(planner *ecPlacementPlanner, metric *types.VolumeHealthM return nil, err } if len(selectedDisks) < minTotalDisks { - return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), minTotalDisks) + return nil, fmt.Errorf("found %d disks, but EC %d+%d needs at least %d disks so no disk holds more than %d shards", + len(selectedDisks), dataShards, parityShards, minTotalDisks, parityShards) } - // One shard per (server, disk_id): #9185's disk_id-aware ReceiveFile rejects - // a second shard on the same disk. + // Fewer than totalShards disks is fine: createECTargets round-robins the + // shards across the available disks, packing several distinct shards onto a + // disk when needed (matching ec.encode's "spread as 4,4,3,3" fallback for + // small clusters). A disk holding several shards of one volume is safe — + // each is a separate .ecNN file and ReceiveFile keys by that extension. The + // minTotalDisks floor above keeps any single disk under parityShards shards, + // so the volume still survives losing any one disk. if len(selectedDisks) < totalShards { - return nil, fmt.Errorf("found %d disks, but EC %d+%d needs %d distinct (server, disk_id) targets", - len(selectedDisks), dataShards, parityShards, totalShards) + glog.V(1).Infof("EC volume %d: only %d disks for %d shards, packing up to %d shards per disk", + metric.VolumeID, len(selectedDisks), totalShards, (totalShards+len(selectedDisks)-1)/len(selectedDisks)) } var plans []*topology.DestinationPlan @@ -761,13 +772,12 @@ func planECDestinations(planner *ecPlacementPlanner, metric *types.VolumeHealthM }, nil } -// createECTargets builds TaskTargets with one shard per plan entry. -// planECDestinations ensures numTargets == totalShards. -func createECTargets(multiPlan *topology.MultiDestinationPlan, dataShards, parityShards int) []*worker_pb.TaskTarget { - var targets []*worker_pb.TaskTarget - numTargets := len(multiPlan.Plans) - totalShards := dataShards + parityShards - +// distributeECShards assigns shard ids 0..totalShards-1 across numTargets +// targets round-robin, so each target holds either floor or ceil of +// totalShards/numTargets shards. When numTargets < totalShards this packs +// several shards onto a target; planECDestinations guarantees numTargets is at +// least ceil(totalShards/parityShards), so no target exceeds parityShards shards. +func distributeECShards(totalShards, numTargets int) [][]uint32 { targetShards := make([][]uint32, numTargets) for i := range targetShards { targetShards[i] = make([]uint32, 0) @@ -776,6 +786,17 @@ func createECTargets(multiPlan *topology.MultiDestinationPlan, dataShards, parit targetIndex := shardId % numTargets targetShards[targetIndex] = append(targetShards[targetIndex], uint32(shardId)) } + return targetShards +} + +// createECTargets builds TaskTargets, round-robining shards across the plan +// entries. With fewer disks than shards a target receives several shard ids. +func createECTargets(multiPlan *topology.MultiDestinationPlan, dataShards, parityShards int) []*worker_pb.TaskTarget { + var targets []*worker_pb.TaskTarget + numTargets := len(multiPlan.Plans) + totalShards := dataShards + parityShards + + targetShards := distributeECShards(totalShards, numTargets) for i, plan := range multiPlan.Plans { target := &worker_pb.TaskTarget{ diff --git a/weed/worker/tasks/erasure_coding/detection_test.go b/weed/worker/tasks/erasure_coding/detection_test.go index 91ca8aab5..11e5676f4 100644 --- a/weed/worker/tasks/erasure_coding/detection_test.go +++ b/weed/worker/tasks/erasure_coding/detection_test.go @@ -402,6 +402,77 @@ func TestPlanECDestinationsFailsWithInsufficientCapacity(t *testing.T) { require.Error(t, err) } +// #9586: with fewer single-disk servers than total shards, EC must still plan +// by packing several shards onto a disk (ec.encode's "4,4,3,3" fallback) rather +// than refusing. The reporter has 8 single-disk servers across 3 racks and a +// 10+4 scheme — 8 disks for 14 shards. minTotalDisks (ceil(14/4)=4) keeps any +// disk under parityShards shards, so durability holds. +func TestPlanECDestinationsPacksWhenFewerDisksThanShards(t *testing.T) { + const numServers = 8 + // rack3 holds 4 servers, rack1 and rack2 hold 2 each, mirroring the report. + racks := []string{"rack3", "rack3", "rack3", "rack3", "rack1", "rack1", "rack2", "rack2"} + + activeTopology := topology.NewActiveTopology(10) + rackNodes := make(map[string][]*master_pb.DataNodeInfo) + for i := 0; i < numServers; i++ { + nodeID := fmt.Sprintf("192.168.1.%d:%d", 143+i/3, 8080+i) + rackNodes[racks[i]] = append(rackNodes[racks[i]], &master_pb.DataNodeInfo{ + Id: nodeID, + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + DiskId: 0, + VolumeCount: 1, + MaxVolumeCount: 200, + VolumeInfos: []*master_pb.VolumeInformationMessage{{ + Id: uint32(i + 1), + DiskId: 0, + DiskType: "hdd", + }}, + }, + }, + }) + } + rackInfos := make([]*master_pb.RackInfo, 0, len(rackNodes)) + for _, rackID := range []string{"rack1", "rack2", "rack3"} { + rackInfos = append(rackInfos, &master_pb.RackInfo{Id: rackID, DataNodeInfos: rackNodes[rackID]}) + } + require.NoError(t, activeTopology.UpdateTopology(&master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{{Id: "dc1", RackInfos: rackInfos}}, + })) + + planner := newECPlacementPlanner(activeTopology, nil) + require.NotNil(t, planner) + + metric := &types.VolumeHealthMetrics{ + VolumeID: 4569, + Server: "192.168.1.145:8081", + Size: 100 * 1024 * 1024, + Collection: "", + } + + plan, err := planECDestinations(planner, metric, NewDefaultConfig(), erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + require.NoError(t, err) + require.NotNil(t, plan) + // One plan entry per available disk; fewer than the 14 shards. + require.Equal(t, numServers, len(plan.Plans)) + + // createECTargets must cover all 14 shards exactly once, packing onto the + // available disks without any disk exceeding parityShards shards. + targets := createECTargets(plan, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + require.Equal(t, numServers, len(targets)) + + seenShards := make(map[uint32]bool) + for _, target := range targets { + require.LessOrEqual(t, len(target.ShardIds), erasure_coding.ParityShardsCount, + "no disk may hold more than parityShards shards, else losing it loses the volume") + for _, shardId := range target.ShardIds { + require.False(t, seenShards[shardId], "shard %d assigned to more than one target", shardId) + seenShards[shardId] = true + } + } + require.Len(t, seenShards, erasure_coding.TotalShardsCount, "every shard must be placed exactly once") +} + func buildVolumeMetricsForIDs(count int) []*types.VolumeHealthMetrics { metrics := make([]*types.VolumeHealthMetrics, 0, count) now := time.Now()