fix(ec): pack EC shards onto fewer disks instead of refusing the task (#9588)

The planner refused to create an EC task unless it found totalShards
distinct (server, disk_id) targets, so a cluster with fewer disks than
shards (e.g. 8 single-disk servers for a 10+4 scheme) could never encode.

A disk safely holds several distinct shards of one volume: each is its own
.ecNN file and ReceiveFile keys by that extension. Drop the strict check and
let createECTargets round-robin shards across the available disks, matching
ec.encode's "4,4,3,3" fallback. The minTotalDisks floor (ceil(total/parity))
already keeps any disk under parityShards shards, so the volume still
survives losing any one disk.

Reserve capacity for the actual per-disk shard count rather than assuming
one shard each, so packing doesn't over-commit disk slots.
This commit is contained in:
Chris Lu
2026-05-20 11:50:42 -07:00
committed by GitHub
parent 5af7d12f04
commit 024b59fb31
2 changed files with 108 additions and 16 deletions

View File

@@ -282,16 +282,21 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
len(replicaLocations), len(existingECShards), metric.VolumeID, len(sources))
// Convert shard destinations to TaskDestinationSpec
// Convert shard destinations to TaskDestinationSpec. With fewer
// disks than shards a destination holds several shards, so reserve
// capacity for the actual per-disk shard count (round-robin matches
// createECTargets) rather than assuming one shard each.
destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination
shardSize := int64(expectedShardSize)
shardsPerDest := distributeECShards(dataShards+parityShards, len(shardDestinations))
for i, dest := range shardDestinations {
shardCount := len(shardsPerDest[i])
shardImpact := topology.CalculateECShardStorageImpact(int32(shardCount), int64(expectedShardSize))
destSize := int64(expectedShardSize) * int64(shardCount)
destinations[i] = topology.TaskDestinationSpec{
ServerID: dest,
DiskID: shardDiskIDs[i],
StorageImpact: &shardImpact,
EstimatedSize: &shardSize,
EstimatedSize: &destSize,
}
}
@@ -698,13 +703,19 @@ func planECDestinations(planner *ecPlacementPlanner, metric *types.VolumeHealthM
return nil, err
}
if len(selectedDisks) < minTotalDisks {
return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), minTotalDisks)
return nil, fmt.Errorf("found %d disks, but EC %d+%d needs at least %d disks so no disk holds more than %d shards",
len(selectedDisks), dataShards, parityShards, minTotalDisks, parityShards)
}
// One shard per (server, disk_id): #9185's disk_id-aware ReceiveFile rejects
// a second shard on the same disk.
// Fewer than totalShards disks is fine: createECTargets round-robins the
// shards across the available disks, packing several distinct shards onto a
// disk when needed (matching ec.encode's "spread as 4,4,3,3" fallback for
// small clusters). A disk holding several shards of one volume is safe —
// each is a separate .ecNN file and ReceiveFile keys by that extension. The
// minTotalDisks floor above keeps any single disk under parityShards shards,
// so the volume still survives losing any one disk.
if len(selectedDisks) < totalShards {
return nil, fmt.Errorf("found %d disks, but EC %d+%d needs %d distinct (server, disk_id) targets",
len(selectedDisks), dataShards, parityShards, totalShards)
glog.V(1).Infof("EC volume %d: only %d disks for %d shards, packing up to %d shards per disk",
metric.VolumeID, len(selectedDisks), totalShards, (totalShards+len(selectedDisks)-1)/len(selectedDisks))
}
var plans []*topology.DestinationPlan
@@ -761,13 +772,12 @@ func planECDestinations(planner *ecPlacementPlanner, metric *types.VolumeHealthM
}, nil
}
// createECTargets builds TaskTargets with one shard per plan entry.
// planECDestinations ensures numTargets == totalShards.
func createECTargets(multiPlan *topology.MultiDestinationPlan, dataShards, parityShards int) []*worker_pb.TaskTarget {
var targets []*worker_pb.TaskTarget
numTargets := len(multiPlan.Plans)
totalShards := dataShards + parityShards
// distributeECShards assigns shard ids 0..totalShards-1 across numTargets
// targets round-robin, so each target holds either floor or ceil of
// totalShards/numTargets shards. When numTargets < totalShards this packs
// several shards onto a target; planECDestinations guarantees numTargets is at
// least ceil(totalShards/parityShards), so no target exceeds parityShards shards.
func distributeECShards(totalShards, numTargets int) [][]uint32 {
targetShards := make([][]uint32, numTargets)
for i := range targetShards {
targetShards[i] = make([]uint32, 0)
@@ -776,6 +786,17 @@ func createECTargets(multiPlan *topology.MultiDestinationPlan, dataShards, parit
targetIndex := shardId % numTargets
targetShards[targetIndex] = append(targetShards[targetIndex], uint32(shardId))
}
return targetShards
}
// createECTargets builds TaskTargets, round-robining shards across the plan
// entries. With fewer disks than shards a target receives several shard ids.
func createECTargets(multiPlan *topology.MultiDestinationPlan, dataShards, parityShards int) []*worker_pb.TaskTarget {
var targets []*worker_pb.TaskTarget
numTargets := len(multiPlan.Plans)
totalShards := dataShards + parityShards
targetShards := distributeECShards(totalShards, numTargets)
for i, plan := range multiPlan.Plans {
target := &worker_pb.TaskTarget{

View File

@@ -402,6 +402,77 @@ func TestPlanECDestinationsFailsWithInsufficientCapacity(t *testing.T) {
require.Error(t, err)
}
// #9586: with fewer single-disk servers than total shards, EC must still plan
// by packing several shards onto a disk (ec.encode's "4,4,3,3" fallback) rather
// than refusing. The reporter has 8 single-disk servers across 3 racks and a
// 10+4 scheme — 8 disks for 14 shards. minTotalDisks (ceil(14/4)=4) keeps any
// disk under parityShards shards, so durability holds.
func TestPlanECDestinationsPacksWhenFewerDisksThanShards(t *testing.T) {
const numServers = 8
// rack3 holds 4 servers, rack1 and rack2 hold 2 each, mirroring the report.
racks := []string{"rack3", "rack3", "rack3", "rack3", "rack1", "rack1", "rack2", "rack2"}
activeTopology := topology.NewActiveTopology(10)
rackNodes := make(map[string][]*master_pb.DataNodeInfo)
for i := 0; i < numServers; i++ {
nodeID := fmt.Sprintf("192.168.1.%d:%d", 143+i/3, 8080+i)
rackNodes[racks[i]] = append(rackNodes[racks[i]], &master_pb.DataNodeInfo{
Id: nodeID,
DiskInfos: map[string]*master_pb.DiskInfo{
"hdd": {
DiskId: 0,
VolumeCount: 1,
MaxVolumeCount: 200,
VolumeInfos: []*master_pb.VolumeInformationMessage{{
Id: uint32(i + 1),
DiskId: 0,
DiskType: "hdd",
}},
},
},
})
}
rackInfos := make([]*master_pb.RackInfo, 0, len(rackNodes))
for _, rackID := range []string{"rack1", "rack2", "rack3"} {
rackInfos = append(rackInfos, &master_pb.RackInfo{Id: rackID, DataNodeInfos: rackNodes[rackID]})
}
require.NoError(t, activeTopology.UpdateTopology(&master_pb.TopologyInfo{
DataCenterInfos: []*master_pb.DataCenterInfo{{Id: "dc1", RackInfos: rackInfos}},
}))
planner := newECPlacementPlanner(activeTopology, nil)
require.NotNil(t, planner)
metric := &types.VolumeHealthMetrics{
VolumeID: 4569,
Server: "192.168.1.145:8081",
Size: 100 * 1024 * 1024,
Collection: "",
}
plan, err := planECDestinations(planner, metric, NewDefaultConfig(), erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.NotNil(t, plan)
// One plan entry per available disk; fewer than the 14 shards.
require.Equal(t, numServers, len(plan.Plans))
// createECTargets must cover all 14 shards exactly once, packing onto the
// available disks without any disk exceeding parityShards shards.
targets := createECTargets(plan, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.Equal(t, numServers, len(targets))
seenShards := make(map[uint32]bool)
for _, target := range targets {
require.LessOrEqual(t, len(target.ShardIds), erasure_coding.ParityShardsCount,
"no disk may hold more than parityShards shards, else losing it loses the volume")
for _, shardId := range target.ShardIds {
require.False(t, seenShards[shardId], "shard %d assigned to more than one target", shardId)
seenShards[shardId] = true
}
}
require.Len(t, seenShards, erasure_coding.TotalShardsCount, "every shard must be placed exactly once")
}
func buildVolumeMetricsForIDs(count int) []*types.VolumeHealthMetrics {
metrics := make([]*types.VolumeHealthMetrics, 0, count)
now := time.Now()