From d4e39b499b66f9125803485f412e0556cd913583 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 22 May 2026 20:22:09 -0700 Subject: [PATCH] EC placement: shared replica-placement resolver, snapshot + Place core, capacity fixes, tiering (#9621) * Add shared super_block.ResolveReplicaPlacement; use it in ec_balance * Add ecbalancer.FromActiveTopology snapshot constructor for EC encode/repair * Add ecbalancer.Place greenfield/repair placement core (strict + durability-first) * topology: add GetEffectiveAvailableEcShardSlots; FromActiveTopology uses shard-granular free slots GetDisksWithEffectiveCapacity flattens reserved shard slots into volume slots via integer truncation, so an in-flight EC task reserving a non-multiple-of- DataShardsCount number of shards was lost from the snapshot and freeSlots was over-reported. GetEffectiveAvailableEcShardSlots subtracts the full reservation impact at shard granularity. * ecbalancer.Place: reject nodes without a free disk of the requested type FromActiveTopology keeps all disk types in the snapshot, so an SSD-only request could be routed to a node with only HDD capacity (pickBestDiskOnNode then returns disk 0 on the wrong tier). Filter rack/node selection to those with a free disk of the requested type. * ecbalancer.Place: enforce ReplicaPlacement DiffDataCenterCount (per-DC shard cap) * ecbalancer: enforce DiffDataCenterCount in balance (cross-DC phase + cross-rack DC cap) Adds a cross-DC corrective phase that drains data centers holding more than DiffDataCenterCount shards of a volume, and a per-DC cap on cross-rack move targets. Both are no-ops when DiffDataCenterCount is unset, so balance output is unchanged for non-DC placements. * topology: ratio-aware EC shard slots and provisional empty-disk slot GetEffectiveAvailableEcShardSlots now takes the target collection's data-shard count, so a 4+2 volume's larger shards are not over-counted at 10 per volume slot; and it keeps the one provisional slot for freshly started empty servers that report max=0, matching getEffectiveAvailableCapacityUnsafe. FromActiveTopology threads the ratio through. * ecbalancer.Place: explicit disk-type filter signal (fix HDD vs any ambiguity) HardDriveType normalizes to "", which collided with "" meaning any disk. Add Constraints.FilterDiskType and normalize both sides so a hdd request matches disks reported as "" and never leaks to SSD, while filter=false still means any. * ecbalancer: add clearShardAccounting for repair snapshot reconciliation Clears one disk's copy of a shard from per-domain accounting and recomputes the node-level union (preserving a kept copy on another disk of the same node), without crediting capacity. Repair uses it to drop to-be-deleted copies before placing missing shards. * ecbalancer: don't cap cross-DC target racks when DiffRackCount is unset len(racks)+1 wrongly limited each target rack (3 in a 2-rack cluster), so draining a DC could stop short of the DiffDataCenterCount cap. Use MaxShardCount+1 as the effectively-unlimited default. * topology/ecbalancer: ratio-correct EC capacity accounting Reservation shard slots (default ShardsPerVolumeSlot units) are now converted to the target ratio before subtracting, and existing EC shards are charged by size (targetDataShards/shardDataShards) so a 2+1 shard isn't counted as one 10+4 slot. Per-shard ratio lookup is behind shardDataShards (OSS uses the standard ratio). * ecbalancer.Place: candidate tiering and eligible-rack caps Adds a per-disk eligibility/preference abstraction so Place supports: - preferred-tag whole-plan retry (try disks carrying the earliest tags first, widen to all only if a tier cannot place every shard; reports SpilledOutsidePreferredTags), - soft disk-type spill via DiskTypePolicy (Any/Prefer/Require): Prefer fills the preferred type then spills, reporting SpilledToOtherDiskType; Require filters, - even per-rack caps that divide by racks holding an eligible disk, so a tiered cluster (e.g. SSDs in 2 of 4 racks) isn't capped impossibly low. Disk tags carried via Node.AddDiskTags + FromActiveTopology. * ecbalancer: export ClearShardAccounting for repair snapshot reconciliation * ecbalancer: address review feedback (ratio rounding, bitmap walk, same-DC moves) - topology/ecbalancer: round shard-reservation and existing-shard footprint up when converting to target-ratio shard slots, so a sub-slot reservation is not truncated to zero and free capacity is not overstated for low-data-shard layouts (targetDataShards < ds). - erasure_coding: add ShardBits.All iterator and use it across the balancer, cross-DC phase, and placement scoring instead of scanning 0..MaxShardCount and probing Has on every id. - ecbalancer: allow same-DC cross-rack moves when a DC already sits at its DiffDataCenterCount cap; a same-DC move leaves the DC total unchanged. Add a regression test that fails without the guard. - ecbalancer cross-DC phase: pick targets via the eligible-aware pickNodeInRackEligible/pickBestDiskEligible helpers so the disk-type filter is honored and a 0 disk id is not mistaken for a valid selection. * ecbalancer: test ecShardSlotsOnDisk fractional round-up Cover the mixed-ratio path (targetDataShards < existing data shards) so a shard's fractional footprint is never floored to zero and free capacity is not overstated. Exercises the round-up via the targetDataShards parameter; OSS uses the standard ratio at runtime while the enterprise build hits it with real per-volume ratios. * ecbalancer: assert node B rack in TestFromActiveTopology * ecbalancer: split Destination into separate DataCenter and bare Rack Replace the composite "dc:rack" Rack field on Destination with separate DataCenter and bare Rack values, matching topology.DiskInfo and the worker-task convention. Callers (and tests) read the data center directly instead of parsing the composite with strings.SplitN. * shell ec.balance: use utilization-based global balancing (parity with worker) The shell's global rebalance phase balanced by raw shard count; switch it to fractional fullness (shards/capacity), as the worker already does. On uniform capacity the two agree; on heterogeneous capacity it fills nodes proportionally instead of driving small-capacity nodes toward full. Updates the heterogeneous-capacity regression test to assert even fullness (~equal shards/capacity per node) rather than even shard count. * ecbalancer: bounded-proportional per-DC shard spread DiffDataCenterCount was enforced only as a ceiling (drain-to-cap), which could leave a within-cap-but-lopsided DC distribution under a loose cap (e.g. 10/4 of 14 with cap=10). Now the cross-DC phase, the cross-rack DC guard, and Place all target boundedMaxPerDC = min(DiffDataCenterCount, max(ceil(total/numDCs), parityShards)): shards spread proportionally across DCs, but no tighter than the durability floor (once each DC holds <= parityShards a DC loss is recoverable, so further spreading only adds cross-DC/WAN traffic). No-op when DiffDataCenterCount is 0; identical to before when the cap is the binding constraint. * ecbalancer: drop DiffDataCenterCount enforcement for EC placement The 1-byte volume ReplicaPlacement packs xyz into x*100+y*10+z<=255, so the DC digit can only be 0-2 -- far too small to be a meaningful per-DC EC shard cap (a cap of 1-2 would demand 7-14 DCs for a 10+4 volume). It's volume replica-placement, not an EC spec. Removes the cross-DC balance phase, the DC guard in the cross-rack phase, and the per-DC cap in Place (and the just-added bounded-proportional logic); EC relies on the RP-independent rack/node even spread instead. Rack/node caps (DiffRackCount/SameRackCount) are unchanged. Per-domain EC caps are left for a real EC placement spec. * ecbalancer: enforce per-disk durability cap; symmetric reserve/release Place now refuses to put more than parityShards shards of a volume on a single disk (pickBestDiskEligible skips a disk once it holds parityShards of the volume, a hard cap not relaxed even in durability-first). Previously Place assigned by free capacity, so a skewed near-full cluster could pile >parityShards onto one disk -> losing it loses the volume; only distinct-disk count was checked. This covers encode and repair (both route through Place); the caller skips/leaves the volume rather than minting an unrecoverable layout. Also makes reserveShard decrement freeSlots unconditionally, symmetric with releaseShard's unconditional increment (the old guarded decrement could credit a phantom slot on release if a shard were ever reserved onto a full disk). * ecbalancer: add Topology.ReleaseVolumeShards (clear + credit) for greenfield encode Releases all of a volume's shards from the snapshot and credits the freed disk capacity, so a greenfield encode can plan as if stale EC shards from a prior failed attempt are gone. Safe to credit because the encode task deletes stale shards (cleanupStaleEcShards) before distributing the new ones. Distinct from ClearShardAccounting (repair), which does not credit. * ecbalancer: ReleaseVolumeShards credits node freeSlots, not just disks releaseShard only increments per-disk freeSlots, but rack capacity is summed from node freeSlots (buildRacks) and node freeSlots gates node eligibility. Crediting only disks left a node/rack looking full after releasing stale shards, so a greenfield encode still couldn't use the freed capacity. Now credits the node by the total disk-slots freed. * ecbalancer: correct PlacementMode docs (encode uses durability-first) PlaceStrict was labeled '(encode)' but encode uses PlaceDurabilityFirst. Clarify that durability-first is used by both encode and repair, reports relaxations in PlaceResult.Relaxed, and never relaxes the per-disk durability cap. * ecbalancer: treat SameRackCount as a direct per-node shard cap The 3rd ReplicaPlacement digit now caps shards per node at exactly the digit value, matching how DiffRackCount (2nd digit) caps per rack, instead of allowing digit+1 per node. This makes the per-rack and per-node caps consistent and matches the documented "digits cap EC shards per rack and per node" semantics; e.g. 011 now means at most one shard per rack and one per node. --- weed/admin/topology/capacity.go | 64 ++ weed/shell/command_ec_common.go | 4 + weed/shell/command_ec_test.go | 33 +- weed/storage/erasure_coding/ec_shards_info.go | 15 + .../erasure_coding/ecbalancer/balancer.go | 62 +- .../erasure_coding/ecbalancer/place.go | 558 ++++++++++++++++++ .../erasure_coding/ecbalancer/place_test.go | 422 +++++++++++++ .../erasure_coding/ecbalancer/shard_ratio.go | 14 + .../erasure_coding/ecbalancer/snapshot.go | 113 ++++ .../ecbalancer/snapshot_test.go | 137 +++++ weed/storage/super_block/replica_placement.go | 26 + weed/worker/tasks/ec_balance/detection.go | 19 +- 12 files changed, 1417 insertions(+), 50 deletions(-) create mode 100644 weed/storage/erasure_coding/ecbalancer/place.go create mode 100644 weed/storage/erasure_coding/ecbalancer/place_test.go create mode 100644 weed/storage/erasure_coding/ecbalancer/shard_ratio.go create mode 100644 weed/storage/erasure_coding/ecbalancer/snapshot.go create mode 100644 weed/storage/erasure_coding/ecbalancer/snapshot_test.go diff --git a/weed/admin/topology/capacity.go b/weed/admin/topology/capacity.go index e59494974..a4ad150b4 100644 --- a/weed/admin/topology/capacity.go +++ b/weed/admin/topology/capacity.go @@ -54,6 +54,70 @@ func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, d return at.getEffectiveAvailableCapacityUnsafe(disk) } +// GetEffectiveAvailableEcShardSlots returns a disk's free EC shard slots, +// accounting for in-flight task reservations at shard granularity. Unlike the +// volume-slot views (GetDisksWithEffectiveCapacity / GetEffectiveAvailableCapacity), +// this does not truncate sub-volume shard reservations: it subtracts the full +// reservation impact (volume slots converted to shard slots, plus the raw shard +// slots) so a reservation that is not a whole multiple of ShardsPerVolumeSlot is +// not lost. It does NOT subtract the EC shards already persisted on the disk; +// callers that track those (from EcShardInfos) subtract them separately. +// +// shardsPerVolume is the number of EC shards of the target collection that fit in +// one volume slot (i.e. its data-shard count): a 4+2 volume's shards are ~1/4 of a +// volume each, so one volume slot holds 4 of them, not the default +// ShardsPerVolumeSlot. Pass <= 0 to use the default. Using the target ratio keeps +// Place from over-filling a disk for low-data-shard layouts. +func (at *ActiveTopology) GetEffectiveAvailableEcShardSlots(nodeID string, diskID uint32, shardsPerVolume int) int { + if shardsPerVolume <= 0 { + shardsPerVolume = ShardsPerVolumeSlot + } + + at.mutex.RLock() + defer at.mutex.RUnlock() + + diskKey := fmt.Sprintf("%s:%d", nodeID, diskID) + disk, exists := at.disks[diskKey] + if !exists || disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil { + return 0 + } + + info := disk.DiskInfo.DiskInfo + base := info.MaxVolumeCount - info.VolumeCount + if base <= 0 && info.MaxVolumeCount == 0 && info.VolumeCount == 0 && + len(info.VolumeInfos) == 0 && len(info.EcShardInfos) == 0 { + // Freshly started empty servers can report max=0 before publishing concrete + // limits; keep one provisional slot so EC placement still sees the disk, + // mirroring getEffectiveAvailableCapacityUnsafe. + base = 1 + } + if base < 0 { + base = 0 + } + // calculateTaskStorageImpact reports consumption as positive, so subtract it. + // Volume-slot reservations scale by the target ratio; the sub-volume shard-slot + // remainder is in default units and subtracted as-is (a small approximation). + impact := at.getEffectiveCapacityUnsafe(disk) + // impact.ShardSlots is recorded in default ShardsPerVolumeSlot units; convert it + // to the target ratio's shard slots before subtracting (identity when + // shardsPerVolume == ShardsPerVolumeSlot). Round a positive reservation up so a + // sub-slot reservation (e.g. 1 default slot against a 4-shard target) is not + // truncated to zero and wrongly counted as free. + scaledShardImpact := int64(impact.ShardSlots) * int64(shardsPerVolume) + if scaledShardImpact > 0 { + scaledShardImpact = (scaledShardImpact + int64(ShardsPerVolumeSlot) - 1) / int64(ShardsPerVolumeSlot) + } else { + scaledShardImpact /= int64(ShardsPerVolumeSlot) + } + free := base*int64(shardsPerVolume) - + int64(impact.VolumeSlots)*int64(shardsPerVolume) - + scaledShardImpact + if free < 0 { + free = 0 + } + return int(free) +} + // GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk // This shows the net impact from all pending and assigned tasks func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange { diff --git a/weed/shell/command_ec_common.go b/weed/shell/command_ec_common.go index 9f3cbbfb8..1690e9801 100644 --- a/weed/shell/command_ec_common.go +++ b/weed/shell/command_ec_common.go @@ -781,6 +781,10 @@ func (ecb *ecBalancer) balance(collections []string) error { ImbalanceThreshold: 0, // the shell balances to an even distribution ReplicaPlacement: ecb.replicaPlacement, Ratio: shellECRatio, + // Balance the global phase by fractional fullness so heterogeneous-capacity + // nodes fill proportionally (matching the worker). This is identical to raw + // shard count when capacities are uniform. + GlobalUtilizationBased: true, }) return ecb.executeMoves(moves) } diff --git a/weed/shell/command_ec_test.go b/weed/shell/command_ec_test.go index af9ee3da9..05db28149 100644 --- a/weed/shell/command_ec_test.go +++ b/weed/shell/command_ec_test.go @@ -340,7 +340,11 @@ func TestCommandEcBalanceIssue8793Topology(t *testing.T) { // Simulate 22 EC volumes across 14 nodes (each volume has 14 shards, 1 per node). // Give nodes 0-3 an extra volume each (vol 23-26, all 14 shards) to create imbalance. // Before balancing: nodes 0-3 have 22+14=36 shards each, nodes 4-13 have 22 shards each. - // Total = 4*36 + 10*22 = 144+220 = 364. Average = ceil(364/14) = 26. + // Total = 4*36 + 10*22 = 144+220 = 364. Capacities are heterogeneous (max 80 vs + // 33), so the utilization-based global phase balances by fullness, not count: + // 364 shards over 9*80+5*33=885 slots is ~41% full, so large nodes settle near + // 33 shards and small nodes near 14 (all ~41% full) rather than ~26 each, which + // would drive the small nodes to ~79%. type nodeSpec struct { id string @@ -396,8 +400,17 @@ func TestCommandEcBalanceIssue8793Topology(t *testing.T) { ecb.balance([]string{"cldata"}) - // Verify: no node should exceed the average + // Verify even FULLNESS (shards/capacity), not even count: with heterogeneous + // capacities the utilization-based global phase fills nodes proportionally, so + // large nodes hold more shards than small ones while every node ends near the + // overall fullness. (An even-count result would over-fill the small nodes.) + capacityByID := make(map[string]int, len(nodes)) + for _, ns := range nodes { + capacityByID[ns.id] = ns.maxSlot + } + totalShards := 0 + totalCapacity := 0 shardCounts := make(map[string]int) for _, node := range ecb.ecNodes { count := 0 @@ -408,14 +421,22 @@ func TestCommandEcBalanceIssue8793Topology(t *testing.T) { } shardCounts[node.info.Id] = count totalShards += count + totalCapacity += capacityByID[node.info.Id] } - avg := ceilDivide(totalShards, len(ecNodes)) + overallFullness := float64(totalShards) / float64(totalCapacity) + // Tolerance well below the gap a count-even result would show (small nodes + // would sit ~38 points above overall), but above integer-rounding skew. + const tolerance = 0.05 for _, node := range ecb.ecNodes { count := shardCounts[node.info.Id] - t.Logf("AFTER node %s: %d shards (avg %d)", node.info.Id, count, avg) - if count > avg { - t.Errorf("node %s has %d shards, expected at most %d (avg)", node.info.Id, count, avg) + capacity := capacityByID[node.info.Id] + fullness := float64(count) / float64(capacity) + t.Logf("AFTER node %s: %d/%d shards (%.0f%% full, overall %.0f%%)", + node.info.Id, count, capacity, fullness*100, overallFullness*100) + if diff := fullness - overallFullness; diff > tolerance || diff < -tolerance { + t.Errorf("node %s fullness %.1f%% deviates from overall %.1f%% by more than %.0f points", + node.info.Id, fullness*100, overallFullness*100, tolerance*100) } } } diff --git a/weed/storage/erasure_coding/ec_shards_info.go b/weed/storage/erasure_coding/ec_shards_info.go index 04126230d..87a62a249 100644 --- a/weed/storage/erasure_coding/ec_shards_info.go +++ b/weed/storage/erasure_coding/ec_shards_info.go @@ -2,6 +2,7 @@ package erasure_coding import ( "fmt" + "iter" "math/bits" "sort" "strings" @@ -40,6 +41,20 @@ func (sb ShardBits) Count() int { return bits.OnesCount32(uint32(sb)) } +// All iterates the shard ids present in the bitmap, in ascending order. It walks +// only the set bits (trailing-zero scan), so cost scales with the number of +// shards present rather than the full id range. Prefer this over scanning +// 0..MaxShardCount and calling Has on each id. +func (sb ShardBits) All() iter.Seq[ShardId] { + return func(yield func(ShardId) bool) { + for b := uint32(sb); b != 0; b &= b - 1 { + if !yield(ShardId(bits.TrailingZeros32(b))) { + return + } + } + } +} + // ShardsInfo encapsulates information for EC shards with memory-efficient storage type ShardsInfo struct { mu sync.RWMutex diff --git a/weed/storage/erasure_coding/ecbalancer/balancer.go b/weed/storage/erasure_coding/ecbalancer/balancer.go index 3cf7aaca9..3793df83c 100644 --- a/weed/storage/erasure_coding/ecbalancer/balancer.go +++ b/weed/storage/erasure_coding/ecbalancer/balancer.go @@ -43,6 +43,7 @@ type Node struct { type disk struct { diskID uint32 diskType string + tags []string // placement tags, for preferred-tag tiering freeSlots int shardCount int // total EC shards on this disk across all volumes } @@ -88,8 +89,8 @@ type Options struct { GlobalMaxMovesPerRack int // GlobalUtilizationBased selects the global phase's balance metric: when true, // nodes are balanced by fractional fullness (shards/capacity), which suits - // heterogeneous-capacity racks; when false, by raw shard count. The worker - // uses utilization; the shell uses raw count. + // heterogeneous-capacity racks; when false, by raw shard count. Both the worker + // and the shell enable it; the two metrics agree when capacities are uniform. GlobalUtilizationBased bool } @@ -132,6 +133,14 @@ func (n *Node) AddDisk(diskID uint32, diskType string, freeSlots, shardCount int n.disks[diskID] = &disk{diskID: diskID, diskType: diskType, freeSlots: freeSlots, shardCount: shardCount} } +// AddDiskTags records placement tags (e.g. "ssd","fast") for a disk, used by +// preferred-tag tiering in Place. Call after AddDisk; a no-op if the disk is unknown. +func (n *Node) AddDiskTags(diskID uint32, tags []string) { + if d, ok := n.disks[diskID]; ok { + d.tags = append([]string(nil), tags...) + } +} + // AddShards records that the volume's shards in bits live on diskID. Call it // only for the volumes that should be balanced; the disk's overall occupancy is // reported separately via AddDisk. @@ -251,10 +260,8 @@ func detectDuplicateShards(vk volKey, nodes map[string]*Node) []*move { if !ok { continue } - for shardID := 0; shardID < erasure_coding.MaxShardCount; shardID++ { - if info.shardBits.Has(erasure_coding.ShardId(shardID)) { - shardLocations[shardID] = append(shardLocations[shardID], node) - } + for sid := range info.shardBits.All() { + shardLocations[int(sid)] = append(shardLocations[int(sid)], node) } } @@ -354,8 +361,11 @@ func balanceShardTypeAcrossRacks(vk volKey, nodes map[string]*Node, racks map[st destRack, ok := pickTarget(rackKeys, shardsPerRack, maxPerRack, antiAffinity, func(r string) bool { return racks[r].freeSlots > 0 }, func(r string) bool { - if rp != nil && rp.DiffRackCount > 0 { - return rackShardCount[r] < rp.DiffRackCount + if rp == nil { + return true + } + if rp.DiffRackCount > 0 && rackShardCount[r] >= rp.DiffRackCount { + return false } return true }) @@ -403,7 +413,7 @@ func pickNodeInRack(r *rack, vk volKey, rp *super_block.ReplicaPlacement) *Node continue } count := volumeShardCount(node, vk) - if rp != nil && rp.SameRackCount > 0 && count >= rp.SameRackCount+1 { + if rp != nil && rp.SameRackCount > 0 && count >= rp.SameRackCount { continue } if best == nil || count < bestCount { @@ -479,7 +489,7 @@ func balanceShardTypeAcrossNodes(vk volKey, r *rack, diskType string, dataShards func(n string) bool { return n != pm.src.id && r.nodes[n].freeSlots > 0 }, func(n string) bool { if rp != nil && rp.SameRackCount > 0 { - return nodeShardCount[n] < rp.SameRackCount+1 + return nodeShardCount[n] < rp.SameRackCount } return true }) @@ -610,13 +620,10 @@ func detectGlobalImbalance(nodes map[string]*Node, racks map[string]*rack, diskT if pass == 1 && !volumeOnMin { continue // pass 1: only volumes already on the destination } - // Iterate the full shard-id space so custom ratios with more than - // the standard total (ids 14..MaxShardCount-1) are candidates too. - for shardID := 0; shardID < erasure_coding.MaxShardCount; shardID++ { - sid := erasure_coding.ShardId(shardID) - if !info.shardBits.Has(sid) { - continue - } + // Walk the volume's actual shard bitmap so custom ratios with more + // than the standard total (ids 14..MaxShardCount-1) are candidates too. + for sid := range info.shardBits.All() { + shardID := int(sid) if minInfo != nil && minInfo.shardBits.Has(sid) { continue } @@ -669,10 +676,8 @@ func shardsByGroup(vk volKey, nodes map[string]*Node, dataShards int, key func(* continue } k := key(node) - for s := 0; s < erasure_coding.MaxShardCount; s++ { - if !info.shardBits.Has(erasure_coding.ShardId(s)) { - continue - } + for sid := range info.shardBits.All() { + s := int(sid) if s < dataShards { dataPer[k] = append(dataPer[k], s) } else { @@ -747,11 +752,8 @@ func pickBestDiskOnNode(node *Node, vk volKey, diskType string, shardID, dataSha bits := info.diskShardBits[diskID] existingShards = bits.Count() if dataShardCount > 0 { - for s := 0; s < erasure_coding.MaxShardCount; s++ { - if !bits.Has(erasure_coding.ShardId(s)) { - continue - } - if s < dataShardCount { + for sid := range bits.All() { + if int(sid) < dataShardCount { hasData = true } else { hasParity = true @@ -807,9 +809,11 @@ func reserveShard(node *Node, vk volKey, shardID int, diskID uint32) { info.diskShardBits[diskID] = info.diskShardBits[diskID].Set(sid) if d, ok := node.disks[diskID]; ok { d.shardCount++ - if d.freeSlots > 0 { - d.freeSlots-- - } + // Decrement unconditionally so reserve/release stay symmetric (releaseShard + // credits a slot unconditionally). Callers only reserve onto disks + // pickBestDisk* already vetted as having free slots, so this won't go + // negative; if it ever did, freeSlots<=0 correctly reads as full. + d.freeSlots-- } } diff --git a/weed/storage/erasure_coding/ecbalancer/place.go b/weed/storage/erasure_coding/ecbalancer/place.go new file mode 100644 index 000000000..9f98f18ba --- /dev/null +++ b/weed/storage/erasure_coding/ecbalancer/place.go @@ -0,0 +1,558 @@ +package ecbalancer + +import ( + "fmt" + "sort" + "strings" + + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + storagetypes "github.com/seaweedfs/seaweedfs/weed/storage/types" +) + +// Constraints configures a Place call. Ratio resolves a collection's +// (dataShards, parityShards); nil uses the standard scheme. ReplicaPlacement, +// when non-nil, caps shards per rack (DiffRackCount = max shards/rack) and per +// node within a rack (SameRackCount = max shards/node); both digits are direct +// hard caps. The data-center digit (DiffDataCenterCount) is not honored: +// the 1-byte volume ReplicaPlacement can only encode 0-2 there, too small to be a +// meaningful per-DC EC shard cap, so EC relies on the rack/node even spread instead. +// +// DiskTypePolicy controls how DiskType constrains placement (Any / Prefer / +// Require). PreferredTags drives whole-plan tag tiering: Place tries disks +// carrying the earliest tags first and widens to all disks only if a tier cannot +// place every shard. +type Constraints struct { + DiskType string + DiskTypePolicy DiskTypePolicy + PreferredTags []string + ReplicaPlacement *super_block.ReplicaPlacement + Ratio func(collection string) (dataShards, parityShards int) +} + +// DiskTypePolicy controls how Constraints.DiskType constrains placement. +type DiskTypePolicy int + +const ( + DiskTypeAny DiskTypePolicy = iota // any disk type + DiskTypePrefer // prefer DiskType, spill to other types if needed + DiskTypeRequire // only DiskType (HardDriveType when "") +) + +// diskTypeEqual compares disk types after normalization, so "" and "hdd" (both +// HardDriveType) are equal. +func diskTypeEqual(a, b string) bool { + return storagetypes.ToDiskType(a).String() == storagetypes.ToDiskType(b).String() +} + +// diskHasAnyTag reports whether the disk carries any of the given tags. +func diskHasAnyTag(d *disk, tags []string) bool { + for _, want := range tags { + for _, have := range d.tags { + if have == want { + return true + } + } + } + return false +} + +// Destination is a chosen target for one shard. DataCenter and Rack are kept as +// separate values (matching topology.DiskInfo) rather than a "dc:rack" composite, +// so callers read them directly instead of parsing. +type Destination struct { + Node string + DiskID uint32 + DataCenter string + Rack string // bare rack id within DataCenter +} + +// PlaceResult holds the chosen destinations, which constraints had to be relaxed +// (durability-first only), and whether placement spilled outside the preferred +// disk type or tag tiers (for parity with today's logging). +type PlaceResult struct { + Destinations map[int]Destination + Relaxed []string + SpilledToOtherDiskType bool + SpilledOutsidePreferredTags bool +} + +// PlacementMode selects the strictness/relaxation policy. +type PlacementMode int + +const ( + // PlaceStrict: caps and ReplicaPlacement are hard. Place fails rather than + // violate them, so the caller can defer (leave the volume as-is and retry). + PlaceStrict PlacementMode = iota + // PlaceDurabilityFirst (used by both encode and repair): relax per-type caps -> + // data/parity anti-affinity -> ReplicaPlacement, in that order, until each shard + // lands, reporting what was relaxed in PlaceResult.Relaxed. The per-disk + // durability cap (<= parityShards per disk) is never relaxed. Fails only if no + // disk has free capacity. Encode places best-effort this way and rebalancing + // tightens the spread afterward. + PlaceDurabilityFirst +) + +// relaxation controls which placement-quality constraints are enforced on an +// attempt. preferring fresh nodes (repair's "avoid surviving-shard nodes") is not +// listed: pickNodeInRack already selects the node with the fewest shards of the +// volume, so survivors are deprioritized with built-in fallback. +type relaxation struct { + caps bool + antiAffinity bool + rp bool +} + +func (r relaxation) relaxedNames() []string { + var n []string + if !r.caps { + n = append(n, "caps") + } + if !r.antiAffinity { + n = append(n, "anti-affinity") + } + if !r.rp { + n = append(n, "replica-placement") + } + return n +} + +var strictAttempts = []relaxation{{caps: true, antiAffinity: true, rp: true}} + +var durabilityAttempts = []relaxation{ + {caps: true, antiAffinity: true, rp: true}, + {caps: false, antiAffinity: true, rp: true}, + {caps: false, antiAffinity: false, rp: true}, + {caps: false, antiAffinity: false, rp: false}, +} + +type placedEntry struct { + node *Node + sid int + rackKey string +} + +// Place assigns destinations for the `need` shard ids of volume (collection,vid), +// reading the volume's already-placed shards from the snapshot (so encode passes +// an empty-for-this-volume snapshot, repair passes one seeded with the surviving +// shards). +// +// Tag tiering (whole-plan retry): it tries the preferred-tag tiers in order, each +// a complete candidate set, and returns the first tier that places every shard; +// only when it falls through to the no-tag tier does it set +// SpilledOutsidePreferredTags. Within a tier, disk-type Prefer spills to other +// types per shard (SpilledToOtherDiskType); Require filters strictly. +func (t *Topology) Place(vid uint32, collection string, need []int, c Constraints, mode PlacementMode) (*PlaceResult, error) { + if len(need) == 0 { + return &PlaceResult{Destinations: map[int]Destination{}}, nil + } + + vk := volKey{collection: collection, vid: vid} + dataShards, parityShards := erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount + if c.Ratio != nil { + if d, p := c.Ratio(collection); d > 0 && p > 0 { + dataShards, parityShards = d, p + } + } + + racks := buildRacks(t.nodes) + if len(racks) == 0 { + return nil, fmt.Errorf("no racks available for EC placement") + } + rackKeys := sortedKeys(racks) + + // Disk-type eligibility (Require filters; Any/Prefer admit all) and the soft + // type preference applied in scoring under Prefer. + typeEligible := func(d *disk) bool { + if c.DiskTypePolicy == DiskTypeRequire { + return diskTypeEqual(d.diskType, c.DiskType) + } + return true + } + var prefer func(*disk) bool + if c.DiskTypePolicy == DiskTypePrefer { + prefer = func(d *disk) bool { return diskTypeEqual(d.diskType, c.DiskType) } + } + + // Whole-plan retry over preferred-tag tiers; the first tier that places every + // shard wins. Reaching the no-tag tier means we spilled outside the tags. + tiers := tagTiers(c.PreferredTags) + var lastErr error + for _, tierTags := range tiers { + tt := tierTags + eligible := func(d *disk) bool { + return typeEligible(d) && (len(tt) == 0 || diskHasAnyTag(d, tt)) + } + res, err := t.tryPlace(vk, need, dataShards, parityShards, racks, rackKeys, mode, c.ReplicaPlacement, eligible, prefer) + if err != nil { + lastErr = err + continue + } + if len(c.PreferredTags) > 0 && len(tierTags) == 0 { + res.SpilledOutsidePreferredTags = true + } + return res, nil + } + return nil, lastErr +} + +// tagTiers returns the eligibility tag-sets in increasing breadth, ending with an +// empty set ("any disk"). Empty preferredTags yields a single any-disk tier. +func tagTiers(preferredTags []string) [][]string { + if len(preferredTags) == 0 { + return [][]string{nil} + } + tiers := make([][]string, 0, len(preferredTags)+1) + for k := range preferredTags { + tiers = append(tiers, append([]string(nil), preferredTags[:k+1]...)) + } + return append(tiers, nil) +} + +// tryPlace runs one whole-plan placement attempt restricted to disks satisfying +// `eligible`, with `prefer` (may be nil) ranking soft-preferred disks first. It +// journals reservations and rolls them all back if any shard cannot be placed, so +// a failed tier leaves the snapshot unchanged for the next attempt. +func (t *Topology) tryPlace(vk volKey, need []int, dataShards, parityShards int, racks map[string]*rack, rackKeys []string, mode PlacementMode, rp *super_block.ReplicaPlacement, eligible func(*disk) bool, prefer func(*disk) bool) (*PlaceResult, error) { + result := &PlaceResult{Destinations: make(map[int]Destination, len(need))} + + // Per-type shard ids per rack (even caps), total shard count per rack + // (DiffRackCount), and the racks bearing each type (anti-affinity) — all seeded + // from the volume's existing shards. + shardsPerRack := map[bool]map[string][]int{true: {}, false: {}} + rackShardCount := map[string]int{} + bearing := map[bool]map[string]bool{true: {}, false: {}} + for _, n := range t.nodes { + info, ok := n.shards[vk] + if !ok { + continue + } + for sid := range info.shardBits.All() { + s := int(sid) + isData := s < dataShards + shardsPerRack[isData][n.rack] = append(shardsPerRack[isData][n.rack], s) + rackShardCount[n.rack]++ + bearing[isData][n.rack] = true + } + } + + // Even per-rack caps divide by racks that actually have an eligible free disk, + // not all racks (the snapshot keeps every disk type/tag), so a valid tiered + // cluster — e.g. SSDs in only 2 of 4 racks — is not capped impossibly low. + numEligibleRacks := 0 + for _, rk := range rackKeys { + if rackHasFreeDisk(racks[rk], eligible) { + numEligibleRacks++ + } + } + if numEligibleRacks < 1 { + numEligibleRacks = 1 + } + + attempts := strictAttempts + if mode == PlaceDurabilityFirst { + attempts = durabilityAttempts + } + + var journal []placedEntry + relaxedSeen := map[string]bool{} + spilledType := false + + placeShard := func(sid int, isData bool) bool { + typeTotal := dataShards + if !isData { + typeTotal = parityShards + } + for _, rl := range attempts { + node, diskID, spilled, ok := chooseShardDest(vk, sid, isData, dataShards, typeTotal, numEligibleRacks, parityShards, racks, rackKeys, rp, eligible, prefer, shardsPerRack[isData], rackShardCount, bearing, rl) + if !ok { + continue + } + reserveShard(node, vk, sid, diskID) + node.freeSlots-- + racks[node.rack].freeSlots-- + shardsPerRack[isData][node.rack] = append(shardsPerRack[isData][node.rack], sid) + rackShardCount[node.rack]++ + bearing[isData][node.rack] = true + journal = append(journal, placedEntry{node: node, sid: sid, rackKey: node.rack}) + result.Destinations[sid] = Destination{ + Node: node.id, + DiskID: diskID, + DataCenter: node.dc, + Rack: strings.TrimPrefix(node.rack, node.dc+":"), + } + if spilled { + spilledType = true + } + for _, name := range rl.relaxedNames() { + relaxedSeen[name] = true + } + return true + } + return false + } + + // Data shards first, then parity, so parity can avoid data-bearing racks. + for _, isData := range []bool{true, false} { + for _, sid := range shardsOfType(need, isData, dataShards) { + if placeShard(sid, isData) { + continue + } + for _, e := range journal { + releaseShard(e.node, vk, e.sid) + e.node.freeSlots++ + racks[e.rackKey].freeSlots++ + } + return nil, fmt.Errorf("cannot place EC shard %d of volume %d (collection %q)", sid, vk.vid, vk.collection) + } + } + + result.SpilledToOtherDiskType = spilledType + for name := range relaxedSeen { + result.Relaxed = append(result.Relaxed, name) + } + sort.Strings(result.Relaxed) + return result, nil +} + +// chooseShardDest selects a (node, disk) for one shard at the given relaxation +// level: pick a rack (even per-type cap + ReplicaPlacement caps + two-pass +// anti-affinity to the opposite type), then the least-loaded eligible node, then +// the best eligible disk. The third return reports whether the disk spilled off +// the soft-preferred type. ok=false when no rack/node/disk fits. +func chooseShardDest(vk volKey, sid int, isData bool, dataShards, typeTotal, numEligibleRacks, maxPerDisk int, racks map[string]*rack, rackKeys []string, rp *super_block.ReplicaPlacement, eligible func(*disk) bool, prefer func(*disk) bool, shardsPerRackType map[string][]int, rackShardCount map[string]int, bearing map[bool]map[string]bool, rl relaxation) (*Node, uint32, bool, bool) { + maxPerRack := numEligibleRacks*typeTotal + 1 // effectively unlimited when caps are relaxed + if rl.caps { + if maxPerRack = ceilDivide(typeTotal, numEligibleRacks); maxPerRack < 1 { + maxPerRack = 1 + } + } + + var anti map[string]bool + if rl.antiAffinity { + anti = bearing[!isData] // racks already holding the opposite shard type + } + + if !rl.rp { + rp = nil + } + // A rack is eligible only if it is under the per-rack shard cap (DiffRackCount), + // enforced only when set (and relaxed with rp). + withinLimit := func(r string) bool { + if rp == nil { + return true + } + if rp.DiffRackCount > 0 && rackShardCount[r] >= rp.DiffRackCount { + return false + } + return true + } + + destRack, ok := pickTarget(rackKeys, shardsPerRackType, maxPerRack, anti, + func(r string) bool { return racks[r].freeSlots > 0 && rackHasFreeDisk(racks[r], eligible) }, + withinLimit) + if !ok { + return nil, 0, false, false + } + node := pickNodeInRackEligible(racks[destRack], vk, rp, eligible) + if node == nil { + return nil, 0, false, false + } + diskID, ok, spilled := pickBestDiskEligible(node, vk, eligible, prefer, sid, dataShards, maxPerDisk) + if !ok { + return nil, 0, false, false + } + return node, diskID, spilled, true +} + +// nodeHasFreeDisk reports whether the node has a free disk satisfying eligible. +func nodeHasFreeDisk(n *Node, eligible func(*disk) bool) bool { + for _, d := range n.disks { + if d.freeSlots > 0 && eligible(d) { + return true + } + } + return false +} + +// rackHasFreeDisk reports whether any node in the rack has a free eligible disk. +func rackHasFreeDisk(r *rack, eligible func(*disk) bool) bool { + for _, n := range r.nodes { + if n.freeSlots > 0 && nodeHasFreeDisk(n, eligible) { + return true + } + } + return false +} + +// pickNodeInRackEligible is pickNodeInRack restricted to nodes that have a free +// eligible disk. FromActiveTopology keeps all disk types/tags in the snapshot, so +// without this a node with free volume slots but no eligible disk could be chosen. +func pickNodeInRackEligible(r *rack, vk volKey, rp *super_block.ReplicaPlacement, eligible func(*disk) bool) *Node { + var best *Node + bestCount := -1 + for _, id := range sortedNodeKeys(r.nodes) { + node := r.nodes[id] + if node.freeSlots <= 0 { + continue + } + if !nodeHasFreeDisk(node, eligible) { + continue + } + count := volumeShardCount(node, vk) + if rp != nil && rp.SameRackCount > 0 && count >= rp.SameRackCount { + continue + } + if best == nil || count < bestCount { + best, bestCount = node, count + } + } + return best +} + +// pickBestDiskEligible chooses the best eligible disk on a node, ranking +// soft-preferred disks (prefer != nil && prefer(d)) ahead of others so disk-type +// Prefer uses the preferred type when available but spills otherwise. Returns the +// disk id, whether one was found, and whether the chosen disk spilled off the +// preferred type. +func pickBestDiskEligible(node *Node, vk volKey, eligible func(*disk) bool, prefer func(*disk) bool, shardID, dataShardCount, maxPerDisk int) (uint32, bool, bool) { + isDataShard := dataShardCount > 0 && shardID < dataShardCount + info := node.shards[vk] + var bestDiskID uint32 + bestScore := -1 + bestPreferred := false + for _, diskID := range sortedDiskKeys(node.disks) { + d := node.disks[diskID] + if !eligible(d) || d.freeSlots <= 0 { + continue + } + existingShards := 0 + hasData := false + hasParity := false + if info != nil { + bits := info.diskShardBits[diskID] + existingShards = bits.Count() + if dataShardCount > 0 { + for sid := range bits.All() { + if int(sid) < dataShardCount { + hasData = true + } else { + hasParity = true + } + } + } + } + // Durability: never put more than maxPerDisk (parityShards) shards of this + // volume on one disk, or losing that disk would lose more than EC can + // recover. Hard cap, enforced even under durability-first relaxation. + if maxPerDisk > 0 && existingShards >= maxPerDisk { + continue + } + score := d.shardCount*10 + existingShards*100 + if dataShardCount > 0 { + if isDataShard && hasParity { + score += 1000 + } else if !isDataShard && hasData { + score += 1000 + } + } + preferred := prefer == nil || prefer(d) + if !preferred { + score += 100000 // strongly deprioritize spilling to a non-preferred type + } + if bestScore == -1 || score < bestScore { + bestScore = score + bestDiskID = diskID + bestPreferred = preferred + } + } + if bestScore == -1 { + return 0, false, false + } + return bestDiskID, true, prefer != nil && !bestPreferred +} + +// clearShardAccounting removes one shard copy of a volume from the snapshot's +// per-domain accounting (the volume's shard bits) WITHOUT crediting disk capacity. +// It clears only the given physical disk's bit, then recomputes the node-level +// union from the remaining disk bits, so a kept copy of the same shard on another +// disk of the same node still counts toward caps / ReplicaPlacement / anti-affinity. +// +// Repair uses this to drop the duplicate/mismatched copies it plans to delete +// before placing missing shards, so those copies do not inflate placement +// accounting. Capacity is deliberately NOT credited: the deletes run only after +// the rebuilt shards are distributed, so the slots are not free at plan time. This +// is distinct from releaseShard, which credits freeSlots and clears the union. +func clearShardAccounting(node *Node, vk volKey, shardID int, diskID uint32) { + info, ok := node.shards[vk] + if !ok { + return + } + sid := erasure_coding.ShardId(shardID) + if bits, ok := info.diskShardBits[diskID]; ok { + info.diskShardBits[diskID] = bits.Clear(sid) + } + var union erasure_coding.ShardBits + for _, b := range info.diskShardBits { + union |= b + } + info.shardBits = union +} + +// ClearShardAccounting drops one shard copy of a volume from placement accounting +// without crediting capacity (see clearShardAccounting). Repair calls it for each +// copy it plans to delete before placing missing shards, so those copies do not +// inflate caps/RP/anti-affinity. No-op for an unknown node. +func (t *Topology) ClearShardAccounting(nodeID, collection string, vid uint32, shardID int, diskID uint32) { + n, ok := t.nodes[nodeID] + if !ok { + return + } + clearShardAccounting(n, volKey{collection: collection, vid: vid}, shardID, diskID) +} + +// ReleaseVolumeShards removes every shard of a volume from the snapshot and +// credits the freed disk capacity. A greenfield encode calls this so any stale +// EC shards left by a prior failed attempt (which the encode task deletes before +// distributing the new shards) neither occupy capacity nor skew anti-affinity / +// per-disk caps during planning. Unlike repair's ClearShardAccounting, it credits +// freeSlots because the deletes run before the new writes. +func (t *Topology) ReleaseVolumeShards(collection string, vid uint32) { + vk := volKey{collection: collection, vid: vid} + for _, n := range t.nodes { + info, ok := n.shards[vk] + if !ok { + continue + } + // freed is the total disk-slots the volume occupies on this node (a shard may + // sit on more than one disk). releaseShard credits each disk's freeSlots; + // credit the node's freeSlots by the same total, since rack capacity is summed + // from node freeSlots (buildRacks) and node freeSlots gates node eligibility. + freed := 0 + for _, bits := range info.diskShardBits { + freed += bits.Count() + } + sids := make([]int, 0, info.shardBits.Count()) + for sid := range info.shardBits.All() { + sids = append(sids, int(sid)) + } + for _, sid := range sids { + releaseShard(n, vk, sid) + } + n.freeSlots += freed + delete(n.shards, vk) + } +} + +// shardsOfType returns the sorted subset of need that are data shards (id < +// dataShards) when isData, else the parity subset. +func shardsOfType(need []int, isData bool, dataShards int) []int { + var out []int + for _, s := range need { + if (s < dataShards) == isData { + out = append(out, s) + } + } + sort.Ints(out) + return out +} diff --git a/weed/storage/erasure_coding/ecbalancer/place_test.go b/weed/storage/erasure_coding/ecbalancer/place_test.go new file mode 100644 index 000000000..6af01a331 --- /dev/null +++ b/weed/storage/erasure_coding/ecbalancer/place_test.go @@ -0,0 +1,422 @@ +package ecbalancer + +import ( + "fmt" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" +) + +// buildPlaceTopo makes a topology of racks x nodesPerRack, each node one disk with +// perDiskFree free EC shard slots. +func buildPlaceTopo(racks, nodesPerRack, perDiskFree int) *Topology { + topo := NewTopology() + for r := 0; r < racks; r++ { + rackKey := fmt.Sprintf("dc1:rack%d", r) + for n := 0; n < nodesPerRack; n++ { + id := fmt.Sprintf("10.0.%d.%d:8080", r, n) + node := topo.AddNode(id, "dc1", rackKey, perDiskFree) + node.AddDisk(0, "", perDiskFree, 0) + } + } + return topo +} + +func allShards() []int { + out := make([]int, erasure_coding.TotalShardsCount) + for i := range out { + out[i] = i + } + return out +} + +// TestPlaceStrictSpreadAndCaps places a fresh 10+4 volume and checks every shard +// lands on a distinct node and no rack exceeds the even per-type cap. +func TestPlaceStrictSpreadAndCaps(t *testing.T) { + const racks = 4 + topo := buildPlaceTopo(racks, 4, 50) + + res, err := topo.Place(1, "c1", allShards(), Constraints{}, PlaceStrict) + if err != nil { + t.Fatalf("Place: %v", err) + } + if len(res.Destinations) != erasure_coding.TotalShardsCount { + t.Fatalf("placed %d shards, want %d", len(res.Destinations), erasure_coding.TotalShardsCount) + } + + usedNodes := map[string]bool{} + dataPerRack := map[string]int{} + parityPerRack := map[string]int{} + for sid, d := range res.Destinations { + if usedNodes[d.Node] { + t.Errorf("node %s reused for shard %d (expected distinct nodes with ample capacity)", d.Node, sid) + } + usedNodes[d.Node] = true + if sid < erasure_coding.DataShardsCount { + dataPerRack[d.Rack]++ + } else { + parityPerRack[d.Rack]++ + } + } + dataCap := ceilDivide(erasure_coding.DataShardsCount, racks) + parityCap := ceilDivide(erasure_coding.ParityShardsCount, racks) + for rk, n := range dataPerRack { + if n > dataCap { + t.Errorf("rack %s holds %d data shards, cap %d", rk, n, dataCap) + } + } + for rk, n := range parityPerRack { + if n > parityCap { + t.Errorf("rack %s holds %d parity shards, cap %d", rk, n, parityCap) + } + } +} + +// TestPlaceStrictFailsAndRollsBack: a single tiny disk cannot hold 14 shards, so +// strict Place fails and leaves the snapshot untouched. +func TestPlaceStrictFailsAndRollsBack(t *testing.T) { + topo := buildPlaceTopo(1, 1, 2) // one node, room for 2 shards + node := topo.nodes["10.0.0.0:8080"] + freeBefore := node.freeSlots + diskFreeBefore := node.disks[0].freeSlots + + _, err := topo.Place(1, "c1", allShards(), Constraints{}, PlaceStrict) + if err == nil { + t.Fatal("expected Place to fail on insufficient capacity") + } + if info, ok := node.shards[volKey{collection: "c1", vid: 1}]; ok && info.shardBits.Count() != 0 { + t.Errorf("volume shard bits left on node after failed strict Place (rollback incomplete): %b", info.shardBits) + } + if node.freeSlots != freeBefore { + t.Errorf("node freeSlots = %d after rollback, want %d", node.freeSlots, freeBefore) + } + if node.disks[0].freeSlots != diskFreeBefore { + t.Errorf("disk freeSlots = %d after rollback, want %d", node.disks[0].freeSlots, diskFreeBefore) + } +} + +// TestPlaceDurabilityFirstRelaxesRP: a ReplicaPlacement rack limit too tight for +// the shard count makes strict fail, while durability-first relaxes RP to place +// everything and reports the relaxation. +func TestPlaceDurabilityFirstRelaxesRP(t *testing.T) { + rp := &super_block.ReplicaPlacement{DiffRackCount: 3} // <=3 shards per rack + topo := buildPlaceTopo(2, 8, 50) // 2 racks: 2*3=6 < 14 under RP + + if _, err := topo.Place(1, "c1", allShards(), Constraints{ReplicaPlacement: rp}, PlaceStrict); err == nil { + t.Fatal("strict Place should fail when RP rack limit cannot fit all shards") + } + + topo = buildPlaceTopo(2, 8, 50) + res, err := topo.Place(1, "c1", allShards(), Constraints{ReplicaPlacement: rp}, PlaceDurabilityFirst) + if err != nil { + t.Fatalf("durability-first Place: %v", err) + } + if len(res.Destinations) != erasure_coding.TotalShardsCount { + t.Fatalf("placed %d shards, want %d", len(res.Destinations), erasure_coding.TotalShardsCount) + } + relaxedRP := false + for _, r := range res.Relaxed { + if r == "replica-placement" { + relaxedRP = true + } + } + if !relaxedRP { + t.Errorf("expected replica-placement relaxation, got %v", res.Relaxed) + } +} + +// TestPlaceSameRackCountIsDirectPerNodeCap: the 3rd ReplicaPlacement digit +// (SameRackCount) caps shards per node directly (max == digit), matching the +// per-rack DiffRackCount cap rather than allowing digit+1 per node. +func TestPlaceSameRackCountIsDirectPerNodeCap(t *testing.T) { + rp := &super_block.ReplicaPlacement{SameRackCount: 2} // <=2 shards per node + + // 5 single-node racks: 5 nodes * 2 = 10 < 14, so a strict 10+4 placement + // cannot satisfy the per-node cap and must fail. Under the old digit+1 reading + // the cap would be 3/node => 15 slots and this would have wrongly succeeded. + topo := buildPlaceTopo(5, 1, 50) + if _, err := topo.Place(1, "c1", allShards(), Constraints{ReplicaPlacement: rp}, PlaceStrict); err == nil { + t.Fatal("strict Place should fail: 5 nodes cannot hold 14 shards at <=2 per node") + } + + // Durability-first relaxes the unsatisfiable per-node cap, still places every + // shard, and reports the relaxation so it isn't silently weakened. + topo = buildPlaceTopo(5, 1, 50) + res, err := topo.Place(1, "c1", allShards(), Constraints{ReplicaPlacement: rp}, PlaceDurabilityFirst) + if err != nil { + t.Fatalf("durability-first Place: %v", err) + } + if len(res.Destinations) != erasure_coding.TotalShardsCount { + t.Fatalf("placed %d shards, want %d", len(res.Destinations), erasure_coding.TotalShardsCount) + } + relaxedRP := false + for _, r := range res.Relaxed { + if r == "replica-placement" { + relaxedRP = true + } + } + if !relaxedRP { + t.Errorf("expected replica-placement relaxation, got %v", res.Relaxed) + } +} + +// TestPlaceDiskTypeHardFilter: with DiskType set, shards land only on disks of +// that type, even though the snapshot also contains other-typed disks. +func TestPlaceDiskTypeHardFilter(t *testing.T) { + topo := NewTopology() + for r := 0; r < 4; r++ { + rackKey := fmt.Sprintf("dc1:rack%d", r) + ssd := topo.AddNode(fmt.Sprintf("ssd-%d:8080", r), "dc1", rackKey, 50) + ssd.AddDisk(0, "ssd", 50, 0) + hdd := topo.AddNode(fmt.Sprintf("hdd-%d:8080", r), "dc1", rackKey, 50) + hdd.AddDisk(0, "hdd", 50, 0) + } + + res, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "ssd", DiskTypePolicy: DiskTypeRequire}, PlaceStrict) + if err != nil { + t.Fatalf("Place ssd: %v", err) + } + for sid, d := range res.Destinations { + node := topo.nodes[d.Node] + disk := node.disks[d.DiskID] + if disk == nil || disk.diskType != "ssd" { + t.Errorf("shard %d placed on non-ssd disk: node=%s diskID=%d", sid, d.Node, d.DiskID) + } + } +} + +// TestPlaceDiskTypeUnavailableFails: a request for a disk type with no matching +// disks fails rather than silently placing on the wrong tier. +func TestPlaceDiskTypeUnavailableFails(t *testing.T) { + topo := NewTopology() + for r := 0; r < 4; r++ { + n := topo.AddNode(fmt.Sprintf("hdd-%d:8080", r), "dc1", fmt.Sprintf("dc1:rack%d", r), 50) + n.AddDisk(0, "hdd", 50, 0) + } + if _, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "ssd", DiskTypePolicy: DiskTypeRequire}, PlaceStrict); err == nil { + t.Fatal("expected Place to fail when no disks of the requested type exist") + } +} + +// TestPlaceHDDRequestMatchesEmptyTypeDisks: a "hdd" request normalizes to +// HardDriveType ("") and must land on the HDD disk (reported as ""), never the SSD +// disk, even on nodes that have both. +func TestPlaceHDDRequestMatchesEmptyTypeDisks(t *testing.T) { + topo := NewTopology() + for r := 0; r < 6; r++ { + n := topo.AddNode(fmt.Sprintf("n%d:8080", r), "dc1", fmt.Sprintf("dc1:rack%d", r), 100) + n.AddDisk(0, "", 50, 0) // HDD (HardDriveType, reported as "") + n.AddDisk(1, "ssd", 50, 0) // SSD + } + + res, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "hdd", DiskTypePolicy: DiskTypeRequire}, PlaceStrict) + if err != nil { + t.Fatalf("Place hdd: %v", err) + } + for sid, d := range res.Destinations { + if d.DiskID != 0 { // disk 0 is the HDD disk on every node + t.Errorf("shard %d placed on disk %d (expected HDD disk 0) on node %s", sid, d.DiskID, d.Node) + } + } +} + +// TestPlaceDurabilityCapRejectsSkewed: in a near-full cluster where only one disk +// has spare room, Place must not pile more than parityShards shards onto it (losing +// it would then lose more than EC can recover). It fails instead, so the caller +// leaves the volume unencoded rather than minting an unrecoverable layout. +func TestPlaceDurabilityCapRejectsSkewed(t *testing.T) { + topo := NewTopology() + // One spacious node plus four nearly-full ones, all in a single rack. + a := topo.AddNode("a:8080", "dc1", "dc1:rack0", 100) + a.AddDisk(0, "", 100, 0) + for i := 0; i < 4; i++ { + n := topo.AddNode(fmt.Sprintf("b%d:8080", i), "dc1", "dc1:rack0", 1) + n.AddDisk(0, "", 1, 0) + } + + // 14 shards, parity 4: node a is capped at 4, the others hold 1 each -> at most + // 4+4=8 placeable without exceeding parityShards on a disk, so Place must fail. + // (Without the per-disk cap, a would greedily absorb 10 shards and "succeed".) + if _, err := topo.Place(1, "c1", allShards(), Constraints{}, PlaceDurabilityFirst); err == nil { + t.Fatal("expected Place to fail rather than pile >parityShards shards on one disk") + } +} + +// TestReleaseVolumeShards: removes all of a volume's shards from the snapshot and +// credits the freed capacity at BOTH disk and node level (rack capacity sums node +// freeSlots), mirroring how FromActiveTopology accounts stale shards. +func TestReleaseVolumeShards(t *testing.T) { + topo := NewTopology() + // Node total 20 = two disks of 10. Two stale shards occupy one slot each, so the + // snapshot would show disk0/disk1 at 9 and node at 18 (as FromActiveTopology does). + n := topo.AddNode("n0:8080", "dc1", "dc1:rack0", 20) + n.AddDisk(0, "", 10, 0) + n.AddDisk(1, "", 10, 0) + vk := volKey{collection: "c1", vid: 1} + n.AddShards(1, "c1", 0, erasure_coding.ShardBits(uint32(1)<<3)) + n.AddShards(1, "c1", 1, erasure_coding.ShardBits(uint32(1)<<7)) + n.disks[0].freeSlots = 9 + n.disks[1].freeSlots = 9 + n.freeSlots = 18 + + topo.ReleaseVolumeShards("c1", 1) + + if _, ok := n.shards[vk]; ok { + t.Error("volume shards should be gone after ReleaseVolumeShards") + } + if n.disks[0].freeSlots != 10 || n.disks[1].freeSlots != 10 { + t.Errorf("disk freeSlots not restored: disk0=%d, disk1=%d (want 10, 10)", n.disks[0].freeSlots, n.disks[1].freeSlots) + } + if n.freeSlots != 20 { + t.Errorf("node freeSlots = %d, want 20 (must be credited at node level too)", n.freeSlots) + } +} + +// TestClearShardAccounting: dropping one disk's copy of a shard preserves a kept +// copy of the same shard on another disk of the same node, and credits no capacity. +func TestClearShardAccounting(t *testing.T) { + topo := NewTopology() + n := topo.AddNode("n0:8080", "dc1", "dc1:rack0", 50) + n.AddDisk(0, "", 50, 0) + n.AddDisk(1, "", 50, 0) + vk := volKey{collection: "c1", vid: 1} + // Shard 3 lives on disk 0 (keep) and disk 1 (duplicate to delete). + n.AddShards(1, "c1", 0, erasure_coding.ShardBits(uint32(1)<<3)) + n.AddShards(1, "c1", 1, erasure_coding.ShardBits(uint32(1)<<3)) + if got := n.shards[vk].shardBits.Count(); got != 1 { + t.Fatalf("union count = %d, want 1", got) + } + freeBefore := n.disks[1].freeSlots + + clearShardAccounting(n, vk, 3, 1) + + if !n.shards[vk].shardBits.Has(erasure_coding.ShardId(3)) { + t.Error("kept copy of shard 3 (disk 0) lost from the node-level union") + } + if n.shards[vk].diskShardBits[1].Has(erasure_coding.ShardId(3)) { + t.Error("disk-1 copy of shard 3 was not cleared") + } + if n.disks[1].freeSlots != freeBefore { + t.Errorf("freeSlots changed %d -> %d; clearShardAccounting must not credit capacity", freeBefore, n.disks[1].freeSlots) + } +} + +// TestPlaceDiskTypePreferSpills: DiskTypePrefer fills the preferred type first and +// spills the remainder to other types, reporting SpilledToOtherDiskType. SSD is +// scarce (one tiny SSD per node) so the volume must spill to HDD, but there are +// enough disks to keep each within the parityShards durability cap. +func TestPlaceDiskTypePreferSpills(t *testing.T) { + topo := NewTopology() + for r := 0; r < 8; r++ { + n := topo.AddNode(fmt.Sprintf("n%d:8080", r), "dc1", fmt.Sprintf("dc1:rack%d", r), 100) + n.AddDisk(0, "ssd", 1, 0) // tiny SSD: 1 shard + n.AddDisk(1, "", 50, 0) // roomy HDD + } + + res, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "ssd", DiskTypePolicy: DiskTypePrefer}, PlaceDurabilityFirst) + if err != nil { + t.Fatalf("Place: %v", err) + } + if len(res.Destinations) != erasure_coding.TotalShardsCount { + t.Fatalf("placed %d, want %d", len(res.Destinations), erasure_coding.TotalShardsCount) + } + ssd, hdd := 0, 0 + for _, d := range res.Destinations { + if topo.nodes[d.Node].disks[d.DiskID].diskType == "ssd" { + ssd++ + } else { + hdd++ + } + } + if ssd == 0 || hdd == 0 { + t.Errorf("expected prefer-then-spill: some shards on SSD and some on HDD, got ssd=%d hdd=%d", ssd, hdd) + } + if !res.SpilledToOtherDiskType { + t.Error("expected SpilledToOtherDiskType when SSD cannot hold every shard") + } +} + +// TestPlacePreferredTagsUseTaggedDisks: when tagged disks can hold the whole plan, +// every shard lands on a tagged disk and no spill is reported. +func TestPlacePreferredTagsUseTaggedDisks(t *testing.T) { + topo := NewTopology() + for r := 0; r < 4; r++ { + rackKey := fmt.Sprintf("dc1:rack%d", r) + fast := topo.AddNode(fmt.Sprintf("fast-%d:8080", r), "dc1", rackKey, 50) + fast.AddDisk(0, "", 50, 0) + fast.AddDiskTags(0, []string{"fast"}) + topo.AddNode(fmt.Sprintf("slow-%d:8080", r), "dc1", rackKey, 50).AddDisk(0, "", 50, 0) + } + + res, err := topo.Place(1, "c1", allShards(), Constraints{PreferredTags: []string{"fast"}}, PlaceStrict) + if err != nil { + t.Fatalf("Place: %v", err) + } + for sid, d := range res.Destinations { + if !diskHasAnyTag(topo.nodes[d.Node].disks[d.DiskID], []string{"fast"}) { + t.Errorf("shard %d placed on an untagged disk (node %s)", sid, d.Node) + } + } + if res.SpilledOutsidePreferredTags { + t.Error("did not expect tag spill when tagged disks suffice") + } +} + +// TestPlacePreferredTagsSpillWhenInsufficient: when the tagged tier cannot hold the +// whole plan, Place falls back to all disks and reports SpilledOutsidePreferredTags. +func TestPlacePreferredTagsSpillWhenInsufficient(t *testing.T) { + topo := NewTopology() + for r := 0; r < 4; r++ { + n := topo.AddNode(fmt.Sprintf("n-%d:8080", r), "dc1", fmt.Sprintf("dc1:rack%d", r), 50) + if r == 0 { + n.AddDisk(0, "", 5, 0) // the only tagged disk, too small for 14 shards + n.AddDiskTags(0, []string{"fast"}) + } else { + n.AddDisk(0, "", 50, 0) + } + } + + res, err := topo.Place(1, "c1", allShards(), Constraints{PreferredTags: []string{"fast"}}, PlaceStrict) + if err != nil { + t.Fatalf("Place: %v", err) + } + if len(res.Destinations) != erasure_coding.TotalShardsCount { + t.Fatalf("placed %d, want %d", len(res.Destinations), erasure_coding.TotalShardsCount) + } + if !res.SpilledOutsidePreferredTags { + t.Error("expected SpilledOutsidePreferredTags when the single fast disk cannot hold the plan") + } +} + +// TestPlaceStrictCapsCountEligibleRacks: with DiskTypeRequire, the even per-rack +// cap divides by racks that have a matching disk, not all racks, so SSDs in only +// some racks still place successfully. +func TestPlaceStrictCapsCountEligibleRacks(t *testing.T) { + topo := NewTopology() + // SSDs live in only 2 of 4 racks, with several SSD nodes per rack so 14 shards + // fit at <= parityShards per disk. The even per-rack cap must divide by the 2 + // eligible racks (ceil(10/2)=5 data/rack), not all 4 (ceil(10/4)=3 -> infeasible). + for r := 0; r < 4; r++ { + rackKey := fmt.Sprintf("dc1:rack%d", r) + topo.AddNode(fmt.Sprintf("hdd-%d:8080", r), "dc1", rackKey, 50).AddDisk(0, "", 50, 0) + if r < 2 { + for n := 0; n < 4; n++ { + topo.AddNode(fmt.Sprintf("ssd-%d-%d:8080", r, n), "dc1", rackKey, 50).AddDisk(0, "ssd", 50, 0) + } + } + } + + res, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "ssd", DiskTypePolicy: DiskTypeRequire}, PlaceStrict) + if err != nil { + t.Fatalf("Place ssd in 2/4 racks: %v", err) + } + if len(res.Destinations) != erasure_coding.TotalShardsCount { + t.Fatalf("placed %d, want %d", len(res.Destinations), erasure_coding.TotalShardsCount) + } + for sid, d := range res.Destinations { + if disk := topo.nodes[d.Node].disks[d.DiskID]; disk == nil || disk.diskType != "ssd" { + t.Errorf("shard %d not on an SSD disk: node=%s disk=%d", sid, d.Node, d.DiskID) + } + } +} diff --git a/weed/storage/erasure_coding/ecbalancer/shard_ratio.go b/weed/storage/erasure_coding/ecbalancer/shard_ratio.go new file mode 100644 index 000000000..08cebb336 --- /dev/null +++ b/weed/storage/erasure_coding/ecbalancer/shard_ratio.go @@ -0,0 +1,14 @@ +package ecbalancer + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// shardDataShards returns the data-shard count of the volume an EC shard belongs +// to, used to size the shard's disk footprint. OSS uses the standard ratio for +// every volume; custom per-volume ratios are an enterprise feature, so the +// enterprise build overrides this to read the per-shard ratio. +func shardDataShards(eci *master_pb.VolumeEcShardInformationMessage) int { + return erasure_coding.DataShardsCount +} diff --git a/weed/storage/erasure_coding/ecbalancer/snapshot.go b/weed/storage/erasure_coding/ecbalancer/snapshot.go new file mode 100644 index 000000000..9d5c81103 --- /dev/null +++ b/weed/storage/erasure_coding/ecbalancer/snapshot.go @@ -0,0 +1,113 @@ +package ecbalancer + +import ( + "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// FromActiveTopology builds a Topology snapshot from the cluster's ActiveTopology +// using the reservation-aware effective-capacity view that EC encode and repair +// rely on. It collects ALL EC-eligible disks with no hard disk-type filter; +// disk-type preference is applied later by callers (Place). Per-disk free EC shard +// slots come from GetEffectiveAvailableEcShardSlots (shard-granular, so in-flight +// task reservations that are not whole-volume multiples are not lost) minus the EC +// shards already persisted on the disk. Rack keys are composite "dc:rack". +// +// dataShards is the target collection's data-shard count, used to size free EC +// shard slots correctly for custom ratios (a 4+2 volume's shards are larger, so +// fewer fit per volume slot). Pass <= 0 for the default scheme. Because of this, +// the snapshot is ratio-specific; build one per collection ratio being placed. +// +// This is the encode/repair-side constructor; balance keeps its own raw-topology +// builder (buildBalancerTopology) until the snapshot sources are reconciled. +func FromActiveTopology(at *topology.ActiveTopology, dataShards int) *Topology { + topo := NewTopology() + if at == nil { + return topo + } + + disks := at.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 0) + + // Accumulate node-level free slots and group the node's disks together. + nodeFree := make(map[string]int) + nodeDC := make(map[string]string) + nodeRack := make(map[string]string) + byNode := make(map[string][]*topology.DiskInfo) + for _, d := range disks { + if d == nil || d.DiskInfo == nil { + continue + } + if free := perDiskFreeECSlots(at, d, dataShards); free > 0 { + nodeFree[d.NodeID] += free + } + nodeDC[d.NodeID] = d.DataCenter + nodeRack[d.NodeID] = d.DataCenter + ":" + d.Rack + byNode[d.NodeID] = append(byNode[d.NodeID], d) + } + + for nodeID, ds := range byNode { + node := topo.AddNode(nodeID, nodeDC[nodeID], nodeRack[nodeID], nodeFree[nodeID]) + for _, d := range ds { + free := perDiskFreeECSlots(at, d, dataShards) + if free < 0 { + free = 0 + } + node.AddDisk(d.DiskID, d.DiskType, free, ecShardCountOnDisk(d)) + node.AddDiskTags(d.DiskID, d.DiskInfo.Tags) + for _, eci := range d.DiskInfo.EcShardInfos { + if eci.DiskId != d.DiskID { + continue + } + node.AddShards(eci.Id, eci.Collection, d.DiskID, erasure_coding.ShardBits(eci.EcIndexBits)) + } + } + } + + return topo +} + +// perDiskFreeECSlots returns the disk's free EC shard slots for a volume with +// dataShards data shards: the topology's shard-granular effective availability +// (sized by the target ratio) minus the EC shards already on the disk, also +// expressed in the target ratio's shard slots so mixed-ratio disks are charged by +// size rather than raw count. +func perDiskFreeECSlots(at *topology.ActiveTopology, d *topology.DiskInfo, dataShards int) int { + return at.GetEffectiveAvailableEcShardSlots(d.NodeID, d.DiskID, dataShards) - ecShardSlotsOnDisk(d, dataShards) +} + +// ecShardCountOnDisk counts the EC shards physically on this disk (matching +// eci.DiskId), across all volumes. Used as a per-disk load metric for scoring. +func ecShardCountOnDisk(d *topology.DiskInfo) int { + count := 0 + for _, eci := range d.DiskInfo.EcShardInfos { + if eci.DiskId == d.DiskID { + count += erasure_coding.GetShardCount(eci) + } + } + return count +} + +// ecShardSlotsOnDisk returns the EC shards already on the disk expressed in the +// TARGET collection's shard slots. A shard of a collection with d data shards +// occupies ~1/d of a volume, i.e. targetDataShards/d target slots, so a 2+1 shard +// counted against a 10+4 snapshot consumes ~5 slots, not 1. +func ecShardSlotsOnDisk(d *topology.DiskInfo, targetDataShards int) int { + if targetDataShards <= 0 { + targetDataShards = erasure_coding.DataShardsCount + } + total := 0 + for _, eci := range d.DiskInfo.EcShardInfos { + if eci.DiskId != d.DiskID { + continue + } + ds := shardDataShards(eci) + if ds <= 0 { + ds = erasure_coding.DataShardsCount + } + // Round up so an existing shard always consumes at least its fractional + // footprint; flooring lets a low-data-shard volume (targetDataShards < ds) + // count as zero target slots and overstate the disk's free capacity. + total += (erasure_coding.GetShardCount(eci)*targetDataShards + ds - 1) / ds + } + return total +} diff --git a/weed/storage/erasure_coding/ecbalancer/snapshot_test.go b/weed/storage/erasure_coding/ecbalancer/snapshot_test.go new file mode 100644 index 000000000..a7e54dbcf --- /dev/null +++ b/weed/storage/erasure_coding/ecbalancer/snapshot_test.go @@ -0,0 +1,137 @@ +package ecbalancer + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/admin/topology" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" +) + +// TestFromActiveTopology verifies the encode/repair-side snapshot constructor maps +// nodes, per-disk EC shard counts, per-volume shard bits, and free slots from an +// ActiveTopology. Shard accounting is asserted exactly; free slots are asserted to +// be positive (their exact value depends on effective-capacity internals). +func TestFromActiveTopology(t *testing.T) { + const vid uint32 = 7 + at := topology.NewActiveTopology(10) + + // Node A holds one EC shard (id 3) of volume 7 on disk 0; node B is empty. + nodeA := &master_pb.DataNodeInfo{ + Id: "10.0.0.1:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": { + DiskId: 0, + MaxVolumeCount: 100, + VolumeCount: 1, + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{{ + Id: vid, + Collection: "c1", + EcIndexBits: uint32(1) << 3, + DiskId: 0, + }}, + }, + }, + } + nodeB := &master_pb.DataNodeInfo{ + Id: "10.0.0.2:8080", + DiskInfos: map[string]*master_pb.DiskInfo{ + "hdd": {DiskId: 0, MaxVolumeCount: 100, VolumeCount: 0}, + }, + } + if err := at.UpdateTopology(&master_pb.TopologyInfo{ + DataCenterInfos: []*master_pb.DataCenterInfo{{ + Id: "dc1", + RackInfos: []*master_pb.RackInfo{{ + Id: "rack1", + DataNodeInfos: []*master_pb.DataNodeInfo{nodeA, nodeB}, + }}, + }}, + }); err != nil { + t.Fatalf("UpdateTopology: %v", err) + } + + topo := FromActiveTopology(at, 0) + + if got := len(topo.nodes); got != 2 { + t.Fatalf("node count = %d, want 2", got) + } + + a := topo.nodes["10.0.0.1:8080"] + if a == nil { + t.Fatal("node A missing from snapshot") + } + if a.rack != "dc1:rack1" { + t.Errorf("node A rack = %q, want dc1:rack1", a.rack) + } + diskA := a.disks[0] + if diskA == nil { + t.Fatal("node A disk 0 missing") + } + if diskA.shardCount != 1 { + t.Errorf("node A disk 0 shardCount = %d, want 1", diskA.shardCount) + } + vs := a.shards[volKey{collection: "c1", vid: vid}] + if vs == nil { + t.Fatal("volume 7 shards not recorded on node A") + } + if !vs.shardBits.Has(erasure_coding.ShardId(3)) { + t.Errorf("node A volume 7 shardBits %b missing shard 3", vs.shardBits) + } + if vs.shardBits.Count() != 1 { + t.Errorf("node A volume 7 shard count = %d, want 1", vs.shardBits.Count()) + } + + b := topo.nodes["10.0.0.2:8080"] + if b == nil { + t.Fatal("node B missing from snapshot") + } + if b.rack != "dc1:rack1" { + t.Errorf("node B rack = %q, want dc1:rack1", b.rack) + } + if diskB := b.disks[0]; diskB == nil || diskB.shardCount != 0 { + t.Errorf("node B disk 0 shardCount = %v, want 0", diskB) + } + if len(b.shards) != 0 { + t.Errorf("node B should hold no volume shards, got %d", len(b.shards)) + } + + // Free slots should be positive on both near-empty disks. + if a.freeSlots <= 0 || b.freeSlots <= 0 { + t.Errorf("free slots not positive: A=%d B=%d", a.freeSlots, b.freeSlots) + } +} + +// TestEcShardSlotsOnDiskRoundsUp covers the mixed-ratio (targetDataShards < +// existingDataShards) conversion: an existing shard's fractional footprint must +// round up so it is never floored to zero, which would overstate free capacity. +// OSS always uses the standard ratio at runtime, but ecShardSlotsOnDisk takes the +// target data-shard count as a parameter, so the fractional path is exercised +// directly here; the enterprise build reaches it with real per-volume ratios. +func TestEcShardSlotsOnDiskRoundsUp(t *testing.T) { + // A single shard (id 3) of a standard 10-data-shard volume on disk 0. + disk := &topology.DiskInfo{ + DiskID: 0, + DiskInfo: &master_pb.DiskInfo{ + DiskId: 0, + EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{{ + Id: 7, + Collection: "c1", + EcIndexBits: uint32(1) << 3, + DiskId: 0, + }}, + }, + } + + // Against a 2-data-shard target the shard occupies 2/10 of a slot, which must + // round up to 1 rather than floor to 0. + if got := ecShardSlotsOnDisk(disk, 2); got != 1 { + t.Errorf("ecShardSlotsOnDisk(target=2) = %d, want 1 (rounded up from 0.2)", got) + } + + // Identity case: target equals the existing data-shard count, so the shard + // consumes exactly its whole-number footprint. + if got := ecShardSlotsOnDisk(disk, erasure_coding.DataShardsCount); got != 1 { + t.Errorf("ecShardSlotsOnDisk(target=%d) = %d, want 1", erasure_coding.DataShardsCount, got) + } +} diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go index 8ec44d48c..1562882a6 100644 --- a/weed/storage/super_block/replica_placement.go +++ b/weed/storage/super_block/replica_placement.go @@ -2,6 +2,8 @@ package super_block import ( "fmt" + + "github.com/seaweedfs/seaweedfs/weed/glog" ) type ReplicaPlacement struct { @@ -77,3 +79,27 @@ func (rp *ReplicaPlacement) String() string { func (rp *ReplicaPlacement) GetCopyCount() int { return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 } + +// ResolveReplicaPlacement picks the EC shard replica placement constraint: an +// explicit spec wins; otherwise the cluster default (typically the master's +// configured default replication). A missing, invalid, or zero-replication value +// yields nil, meaning even spread / no constraint. Shared by EC encode, repair, +// and balance so the three resolve replica placement identically. +func ResolveReplicaPlacement(explicitSpec, clusterDefault string) *ReplicaPlacement { + spec := explicitSpec + if spec == "" { + spec = clusterDefault + } + if spec == "" { + return nil + } + rp, err := NewReplicaPlacementFromString(spec) + if err != nil { + glog.Warningf("ignoring invalid replica placement %q: %v", spec, err) + return nil + } + if !rp.HasReplication() { + return nil + } + return rp +} diff --git a/weed/worker/tasks/ec_balance/detection.go b/weed/worker/tasks/ec_balance/detection.go index d472b8936..d62d394db 100644 --- a/weed/worker/tasks/ec_balance/detection.go +++ b/weed/worker/tasks/ec_balance/detection.go @@ -242,22 +242,11 @@ func resolveECRatio(_ *types.ClusterInfo, _ string) (int, int) { // replication (matching the shell ec.balance default). A missing, invalid, or // zero-replication value yields nil, meaning even spread / no constraint. func resolveReplicaPlacement(ecConfig *Config, clusterInfo *types.ClusterInfo) *super_block.ReplicaPlacement { - spec := ecConfig.ReplicaPlacement - if spec == "" && clusterInfo != nil { - spec = clusterInfo.DefaultReplicaPlacement + clusterDefault := "" + if clusterInfo != nil { + clusterDefault = clusterInfo.DefaultReplicaPlacement } - if spec == "" { - return nil - } - rp, err := super_block.NewReplicaPlacementFromString(spec) - if err != nil { - glog.Warningf("EC balance: ignoring invalid replica placement %q: %v", spec, err) - return nil - } - if !rp.HasReplication() { - return nil - } - return rp + return super_block.ResolveReplicaPlacement(ecConfig.ReplicaPlacement, clusterDefault) } func normalizeECShardCounts(dataShards, parityShards int) (int, int) {