mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-23 18:21:28 +00:00
EC placement: shared replica-placement resolver, snapshot + Place core, capacity fixes, tiering (#9621)
* Add shared super_block.ResolveReplicaPlacement; use it in ec_balance * Add ecbalancer.FromActiveTopology snapshot constructor for EC encode/repair * Add ecbalancer.Place greenfield/repair placement core (strict + durability-first) * topology: add GetEffectiveAvailableEcShardSlots; FromActiveTopology uses shard-granular free slots GetDisksWithEffectiveCapacity flattens reserved shard slots into volume slots via integer truncation, so an in-flight EC task reserving a non-multiple-of- DataShardsCount number of shards was lost from the snapshot and freeSlots was over-reported. GetEffectiveAvailableEcShardSlots subtracts the full reservation impact at shard granularity. * ecbalancer.Place: reject nodes without a free disk of the requested type FromActiveTopology keeps all disk types in the snapshot, so an SSD-only request could be routed to a node with only HDD capacity (pickBestDiskOnNode then returns disk 0 on the wrong tier). Filter rack/node selection to those with a free disk of the requested type. * ecbalancer.Place: enforce ReplicaPlacement DiffDataCenterCount (per-DC shard cap) * ecbalancer: enforce DiffDataCenterCount in balance (cross-DC phase + cross-rack DC cap) Adds a cross-DC corrective phase that drains data centers holding more than DiffDataCenterCount shards of a volume, and a per-DC cap on cross-rack move targets. Both are no-ops when DiffDataCenterCount is unset, so balance output is unchanged for non-DC placements. * topology: ratio-aware EC shard slots and provisional empty-disk slot GetEffectiveAvailableEcShardSlots now takes the target collection's data-shard count, so a 4+2 volume's larger shards are not over-counted at 10 per volume slot; and it keeps the one provisional slot for freshly started empty servers that report max=0, matching getEffectiveAvailableCapacityUnsafe. FromActiveTopology threads the ratio through. * ecbalancer.Place: explicit disk-type filter signal (fix HDD vs any ambiguity) HardDriveType normalizes to "", which collided with "" meaning any disk. Add Constraints.FilterDiskType and normalize both sides so a hdd request matches disks reported as "" and never leaks to SSD, while filter=false still means any. * ecbalancer: add clearShardAccounting for repair snapshot reconciliation Clears one disk's copy of a shard from per-domain accounting and recomputes the node-level union (preserving a kept copy on another disk of the same node), without crediting capacity. Repair uses it to drop to-be-deleted copies before placing missing shards. * ecbalancer: don't cap cross-DC target racks when DiffRackCount is unset len(racks)+1 wrongly limited each target rack (3 in a 2-rack cluster), so draining a DC could stop short of the DiffDataCenterCount cap. Use MaxShardCount+1 as the effectively-unlimited default. * topology/ecbalancer: ratio-correct EC capacity accounting Reservation shard slots (default ShardsPerVolumeSlot units) are now converted to the target ratio before subtracting, and existing EC shards are charged by size (targetDataShards/shardDataShards) so a 2+1 shard isn't counted as one 10+4 slot. Per-shard ratio lookup is behind shardDataShards (OSS uses the standard ratio). * ecbalancer.Place: candidate tiering and eligible-rack caps Adds a per-disk eligibility/preference abstraction so Place supports: - preferred-tag whole-plan retry (try disks carrying the earliest tags first, widen to all only if a tier cannot place every shard; reports SpilledOutsidePreferredTags), - soft disk-type spill via DiskTypePolicy (Any/Prefer/Require): Prefer fills the preferred type then spills, reporting SpilledToOtherDiskType; Require filters, - even per-rack caps that divide by racks holding an eligible disk, so a tiered cluster (e.g. SSDs in 2 of 4 racks) isn't capped impossibly low. Disk tags carried via Node.AddDiskTags + FromActiveTopology. * ecbalancer: export ClearShardAccounting for repair snapshot reconciliation * ecbalancer: address review feedback (ratio rounding, bitmap walk, same-DC moves) - topology/ecbalancer: round shard-reservation and existing-shard footprint up when converting to target-ratio shard slots, so a sub-slot reservation is not truncated to zero and free capacity is not overstated for low-data-shard layouts (targetDataShards < ds). - erasure_coding: add ShardBits.All iterator and use it across the balancer, cross-DC phase, and placement scoring instead of scanning 0..MaxShardCount and probing Has on every id. - ecbalancer: allow same-DC cross-rack moves when a DC already sits at its DiffDataCenterCount cap; a same-DC move leaves the DC total unchanged. Add a regression test that fails without the guard. - ecbalancer cross-DC phase: pick targets via the eligible-aware pickNodeInRackEligible/pickBestDiskEligible helpers so the disk-type filter is honored and a 0 disk id is not mistaken for a valid selection. * ecbalancer: test ecShardSlotsOnDisk fractional round-up Cover the mixed-ratio path (targetDataShards < existing data shards) so a shard's fractional footprint is never floored to zero and free capacity is not overstated. Exercises the round-up via the targetDataShards parameter; OSS uses the standard ratio at runtime while the enterprise build hits it with real per-volume ratios. * ecbalancer: assert node B rack in TestFromActiveTopology * ecbalancer: split Destination into separate DataCenter and bare Rack Replace the composite "dc:rack" Rack field on Destination with separate DataCenter and bare Rack values, matching topology.DiskInfo and the worker-task convention. Callers (and tests) read the data center directly instead of parsing the composite with strings.SplitN. * shell ec.balance: use utilization-based global balancing (parity with worker) The shell's global rebalance phase balanced by raw shard count; switch it to fractional fullness (shards/capacity), as the worker already does. On uniform capacity the two agree; on heterogeneous capacity it fills nodes proportionally instead of driving small-capacity nodes toward full. Updates the heterogeneous-capacity regression test to assert even fullness (~equal shards/capacity per node) rather than even shard count. * ecbalancer: bounded-proportional per-DC shard spread DiffDataCenterCount was enforced only as a ceiling (drain-to-cap), which could leave a within-cap-but-lopsided DC distribution under a loose cap (e.g. 10/4 of 14 with cap=10). Now the cross-DC phase, the cross-rack DC guard, and Place all target boundedMaxPerDC = min(DiffDataCenterCount, max(ceil(total/numDCs), parityShards)): shards spread proportionally across DCs, but no tighter than the durability floor (once each DC holds <= parityShards a DC loss is recoverable, so further spreading only adds cross-DC/WAN traffic). No-op when DiffDataCenterCount is 0; identical to before when the cap is the binding constraint. * ecbalancer: drop DiffDataCenterCount enforcement for EC placement The 1-byte volume ReplicaPlacement packs xyz into x*100+y*10+z<=255, so the DC digit can only be 0-2 -- far too small to be a meaningful per-DC EC shard cap (a cap of 1-2 would demand 7-14 DCs for a 10+4 volume). It's volume replica-placement, not an EC spec. Removes the cross-DC balance phase, the DC guard in the cross-rack phase, and the per-DC cap in Place (and the just-added bounded-proportional logic); EC relies on the RP-independent rack/node even spread instead. Rack/node caps (DiffRackCount/SameRackCount) are unchanged. Per-domain EC caps are left for a real EC placement spec. * ecbalancer: enforce per-disk durability cap; symmetric reserve/release Place now refuses to put more than parityShards shards of a volume on a single disk (pickBestDiskEligible skips a disk once it holds parityShards of the volume, a hard cap not relaxed even in durability-first). Previously Place assigned by free capacity, so a skewed near-full cluster could pile >parityShards onto one disk -> losing it loses the volume; only distinct-disk count was checked. This covers encode and repair (both route through Place); the caller skips/leaves the volume rather than minting an unrecoverable layout. Also makes reserveShard decrement freeSlots unconditionally, symmetric with releaseShard's unconditional increment (the old guarded decrement could credit a phantom slot on release if a shard were ever reserved onto a full disk). * ecbalancer: add Topology.ReleaseVolumeShards (clear + credit) for greenfield encode Releases all of a volume's shards from the snapshot and credits the freed disk capacity, so a greenfield encode can plan as if stale EC shards from a prior failed attempt are gone. Safe to credit because the encode task deletes stale shards (cleanupStaleEcShards) before distributing the new ones. Distinct from ClearShardAccounting (repair), which does not credit. * ecbalancer: ReleaseVolumeShards credits node freeSlots, not just disks releaseShard only increments per-disk freeSlots, but rack capacity is summed from node freeSlots (buildRacks) and node freeSlots gates node eligibility. Crediting only disks left a node/rack looking full after releasing stale shards, so a greenfield encode still couldn't use the freed capacity. Now credits the node by the total disk-slots freed. * ecbalancer: correct PlacementMode docs (encode uses durability-first) PlaceStrict was labeled '(encode)' but encode uses PlaceDurabilityFirst. Clarify that durability-first is used by both encode and repair, reports relaxations in PlaceResult.Relaxed, and never relaxes the per-disk durability cap. * ecbalancer: treat SameRackCount as a direct per-node shard cap The 3rd ReplicaPlacement digit now caps shards per node at exactly the digit value, matching how DiffRackCount (2nd digit) caps per rack, instead of allowing digit+1 per node. This makes the per-rack and per-node caps consistent and matches the documented "digits cap EC shards per rack and per node" semantics; e.g. 011 now means at most one shard per rack and one per node.
This commit is contained in:
@@ -54,6 +54,70 @@ func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, d
|
||||
return at.getEffectiveAvailableCapacityUnsafe(disk)
|
||||
}
|
||||
|
||||
// GetEffectiveAvailableEcShardSlots returns a disk's free EC shard slots,
|
||||
// accounting for in-flight task reservations at shard granularity. Unlike the
|
||||
// volume-slot views (GetDisksWithEffectiveCapacity / GetEffectiveAvailableCapacity),
|
||||
// this does not truncate sub-volume shard reservations: it subtracts the full
|
||||
// reservation impact (volume slots converted to shard slots, plus the raw shard
|
||||
// slots) so a reservation that is not a whole multiple of ShardsPerVolumeSlot is
|
||||
// not lost. It does NOT subtract the EC shards already persisted on the disk;
|
||||
// callers that track those (from EcShardInfos) subtract them separately.
|
||||
//
|
||||
// shardsPerVolume is the number of EC shards of the target collection that fit in
|
||||
// one volume slot (i.e. its data-shard count): a 4+2 volume's shards are ~1/4 of a
|
||||
// volume each, so one volume slot holds 4 of them, not the default
|
||||
// ShardsPerVolumeSlot. Pass <= 0 to use the default. Using the target ratio keeps
|
||||
// Place from over-filling a disk for low-data-shard layouts.
|
||||
func (at *ActiveTopology) GetEffectiveAvailableEcShardSlots(nodeID string, diskID uint32, shardsPerVolume int) int {
|
||||
if shardsPerVolume <= 0 {
|
||||
shardsPerVolume = ShardsPerVolumeSlot
|
||||
}
|
||||
|
||||
at.mutex.RLock()
|
||||
defer at.mutex.RUnlock()
|
||||
|
||||
diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
|
||||
disk, exists := at.disks[diskKey]
|
||||
if !exists || disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
info := disk.DiskInfo.DiskInfo
|
||||
base := info.MaxVolumeCount - info.VolumeCount
|
||||
if base <= 0 && info.MaxVolumeCount == 0 && info.VolumeCount == 0 &&
|
||||
len(info.VolumeInfos) == 0 && len(info.EcShardInfos) == 0 {
|
||||
// Freshly started empty servers can report max=0 before publishing concrete
|
||||
// limits; keep one provisional slot so EC placement still sees the disk,
|
||||
// mirroring getEffectiveAvailableCapacityUnsafe.
|
||||
base = 1
|
||||
}
|
||||
if base < 0 {
|
||||
base = 0
|
||||
}
|
||||
// calculateTaskStorageImpact reports consumption as positive, so subtract it.
|
||||
// Volume-slot reservations scale by the target ratio; the sub-volume shard-slot
|
||||
// remainder is in default units and subtracted as-is (a small approximation).
|
||||
impact := at.getEffectiveCapacityUnsafe(disk)
|
||||
// impact.ShardSlots is recorded in default ShardsPerVolumeSlot units; convert it
|
||||
// to the target ratio's shard slots before subtracting (identity when
|
||||
// shardsPerVolume == ShardsPerVolumeSlot). Round a positive reservation up so a
|
||||
// sub-slot reservation (e.g. 1 default slot against a 4-shard target) is not
|
||||
// truncated to zero and wrongly counted as free.
|
||||
scaledShardImpact := int64(impact.ShardSlots) * int64(shardsPerVolume)
|
||||
if scaledShardImpact > 0 {
|
||||
scaledShardImpact = (scaledShardImpact + int64(ShardsPerVolumeSlot) - 1) / int64(ShardsPerVolumeSlot)
|
||||
} else {
|
||||
scaledShardImpact /= int64(ShardsPerVolumeSlot)
|
||||
}
|
||||
free := base*int64(shardsPerVolume) -
|
||||
int64(impact.VolumeSlots)*int64(shardsPerVolume) -
|
||||
scaledShardImpact
|
||||
if free < 0 {
|
||||
free = 0
|
||||
}
|
||||
return int(free)
|
||||
}
|
||||
|
||||
// GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
|
||||
// This shows the net impact from all pending and assigned tasks
|
||||
func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
|
||||
|
||||
@@ -781,6 +781,10 @@ func (ecb *ecBalancer) balance(collections []string) error {
|
||||
ImbalanceThreshold: 0, // the shell balances to an even distribution
|
||||
ReplicaPlacement: ecb.replicaPlacement,
|
||||
Ratio: shellECRatio,
|
||||
// Balance the global phase by fractional fullness so heterogeneous-capacity
|
||||
// nodes fill proportionally (matching the worker). This is identical to raw
|
||||
// shard count when capacities are uniform.
|
||||
GlobalUtilizationBased: true,
|
||||
})
|
||||
return ecb.executeMoves(moves)
|
||||
}
|
||||
|
||||
@@ -340,7 +340,11 @@ func TestCommandEcBalanceIssue8793Topology(t *testing.T) {
|
||||
// Simulate 22 EC volumes across 14 nodes (each volume has 14 shards, 1 per node).
|
||||
// Give nodes 0-3 an extra volume each (vol 23-26, all 14 shards) to create imbalance.
|
||||
// Before balancing: nodes 0-3 have 22+14=36 shards each, nodes 4-13 have 22 shards each.
|
||||
// Total = 4*36 + 10*22 = 144+220 = 364. Average = ceil(364/14) = 26.
|
||||
// Total = 4*36 + 10*22 = 144+220 = 364. Capacities are heterogeneous (max 80 vs
|
||||
// 33), so the utilization-based global phase balances by fullness, not count:
|
||||
// 364 shards over 9*80+5*33=885 slots is ~41% full, so large nodes settle near
|
||||
// 33 shards and small nodes near 14 (all ~41% full) rather than ~26 each, which
|
||||
// would drive the small nodes to ~79%.
|
||||
|
||||
type nodeSpec struct {
|
||||
id string
|
||||
@@ -396,8 +400,17 @@ func TestCommandEcBalanceIssue8793Topology(t *testing.T) {
|
||||
|
||||
ecb.balance([]string{"cldata"})
|
||||
|
||||
// Verify: no node should exceed the average
|
||||
// Verify even FULLNESS (shards/capacity), not even count: with heterogeneous
|
||||
// capacities the utilization-based global phase fills nodes proportionally, so
|
||||
// large nodes hold more shards than small ones while every node ends near the
|
||||
// overall fullness. (An even-count result would over-fill the small nodes.)
|
||||
capacityByID := make(map[string]int, len(nodes))
|
||||
for _, ns := range nodes {
|
||||
capacityByID[ns.id] = ns.maxSlot
|
||||
}
|
||||
|
||||
totalShards := 0
|
||||
totalCapacity := 0
|
||||
shardCounts := make(map[string]int)
|
||||
for _, node := range ecb.ecNodes {
|
||||
count := 0
|
||||
@@ -408,14 +421,22 @@ func TestCommandEcBalanceIssue8793Topology(t *testing.T) {
|
||||
}
|
||||
shardCounts[node.info.Id] = count
|
||||
totalShards += count
|
||||
totalCapacity += capacityByID[node.info.Id]
|
||||
}
|
||||
avg := ceilDivide(totalShards, len(ecNodes))
|
||||
overallFullness := float64(totalShards) / float64(totalCapacity)
|
||||
|
||||
// Tolerance well below the gap a count-even result would show (small nodes
|
||||
// would sit ~38 points above overall), but above integer-rounding skew.
|
||||
const tolerance = 0.05
|
||||
for _, node := range ecb.ecNodes {
|
||||
count := shardCounts[node.info.Id]
|
||||
t.Logf("AFTER node %s: %d shards (avg %d)", node.info.Id, count, avg)
|
||||
if count > avg {
|
||||
t.Errorf("node %s has %d shards, expected at most %d (avg)", node.info.Id, count, avg)
|
||||
capacity := capacityByID[node.info.Id]
|
||||
fullness := float64(count) / float64(capacity)
|
||||
t.Logf("AFTER node %s: %d/%d shards (%.0f%% full, overall %.0f%%)",
|
||||
node.info.Id, count, capacity, fullness*100, overallFullness*100)
|
||||
if diff := fullness - overallFullness; diff > tolerance || diff < -tolerance {
|
||||
t.Errorf("node %s fullness %.1f%% deviates from overall %.1f%% by more than %.0f points",
|
||||
node.info.Id, fullness*100, overallFullness*100, tolerance*100)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package erasure_coding
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"iter"
|
||||
"math/bits"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -40,6 +41,20 @@ func (sb ShardBits) Count() int {
|
||||
return bits.OnesCount32(uint32(sb))
|
||||
}
|
||||
|
||||
// All iterates the shard ids present in the bitmap, in ascending order. It walks
|
||||
// only the set bits (trailing-zero scan), so cost scales with the number of
|
||||
// shards present rather than the full id range. Prefer this over scanning
|
||||
// 0..MaxShardCount and calling Has on each id.
|
||||
func (sb ShardBits) All() iter.Seq[ShardId] {
|
||||
return func(yield func(ShardId) bool) {
|
||||
for b := uint32(sb); b != 0; b &= b - 1 {
|
||||
if !yield(ShardId(bits.TrailingZeros32(b))) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ShardsInfo encapsulates information for EC shards with memory-efficient storage
|
||||
type ShardsInfo struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
@@ -43,6 +43,7 @@ type Node struct {
|
||||
type disk struct {
|
||||
diskID uint32
|
||||
diskType string
|
||||
tags []string // placement tags, for preferred-tag tiering
|
||||
freeSlots int
|
||||
shardCount int // total EC shards on this disk across all volumes
|
||||
}
|
||||
@@ -88,8 +89,8 @@ type Options struct {
|
||||
GlobalMaxMovesPerRack int
|
||||
// GlobalUtilizationBased selects the global phase's balance metric: when true,
|
||||
// nodes are balanced by fractional fullness (shards/capacity), which suits
|
||||
// heterogeneous-capacity racks; when false, by raw shard count. The worker
|
||||
// uses utilization; the shell uses raw count.
|
||||
// heterogeneous-capacity racks; when false, by raw shard count. Both the worker
|
||||
// and the shell enable it; the two metrics agree when capacities are uniform.
|
||||
GlobalUtilizationBased bool
|
||||
}
|
||||
|
||||
@@ -132,6 +133,14 @@ func (n *Node) AddDisk(diskID uint32, diskType string, freeSlots, shardCount int
|
||||
n.disks[diskID] = &disk{diskID: diskID, diskType: diskType, freeSlots: freeSlots, shardCount: shardCount}
|
||||
}
|
||||
|
||||
// AddDiskTags records placement tags (e.g. "ssd","fast") for a disk, used by
|
||||
// preferred-tag tiering in Place. Call after AddDisk; a no-op if the disk is unknown.
|
||||
func (n *Node) AddDiskTags(diskID uint32, tags []string) {
|
||||
if d, ok := n.disks[diskID]; ok {
|
||||
d.tags = append([]string(nil), tags...)
|
||||
}
|
||||
}
|
||||
|
||||
// AddShards records that the volume's shards in bits live on diskID. Call it
|
||||
// only for the volumes that should be balanced; the disk's overall occupancy is
|
||||
// reported separately via AddDisk.
|
||||
@@ -251,10 +260,8 @@ func detectDuplicateShards(vk volKey, nodes map[string]*Node) []*move {
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for shardID := 0; shardID < erasure_coding.MaxShardCount; shardID++ {
|
||||
if info.shardBits.Has(erasure_coding.ShardId(shardID)) {
|
||||
shardLocations[shardID] = append(shardLocations[shardID], node)
|
||||
}
|
||||
for sid := range info.shardBits.All() {
|
||||
shardLocations[int(sid)] = append(shardLocations[int(sid)], node)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -354,8 +361,11 @@ func balanceShardTypeAcrossRacks(vk volKey, nodes map[string]*Node, racks map[st
|
||||
destRack, ok := pickTarget(rackKeys, shardsPerRack, maxPerRack, antiAffinity,
|
||||
func(r string) bool { return racks[r].freeSlots > 0 },
|
||||
func(r string) bool {
|
||||
if rp != nil && rp.DiffRackCount > 0 {
|
||||
return rackShardCount[r] < rp.DiffRackCount
|
||||
if rp == nil {
|
||||
return true
|
||||
}
|
||||
if rp.DiffRackCount > 0 && rackShardCount[r] >= rp.DiffRackCount {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
@@ -403,7 +413,7 @@ func pickNodeInRack(r *rack, vk volKey, rp *super_block.ReplicaPlacement) *Node
|
||||
continue
|
||||
}
|
||||
count := volumeShardCount(node, vk)
|
||||
if rp != nil && rp.SameRackCount > 0 && count >= rp.SameRackCount+1 {
|
||||
if rp != nil && rp.SameRackCount > 0 && count >= rp.SameRackCount {
|
||||
continue
|
||||
}
|
||||
if best == nil || count < bestCount {
|
||||
@@ -479,7 +489,7 @@ func balanceShardTypeAcrossNodes(vk volKey, r *rack, diskType string, dataShards
|
||||
func(n string) bool { return n != pm.src.id && r.nodes[n].freeSlots > 0 },
|
||||
func(n string) bool {
|
||||
if rp != nil && rp.SameRackCount > 0 {
|
||||
return nodeShardCount[n] < rp.SameRackCount+1
|
||||
return nodeShardCount[n] < rp.SameRackCount
|
||||
}
|
||||
return true
|
||||
})
|
||||
@@ -610,13 +620,10 @@ func detectGlobalImbalance(nodes map[string]*Node, racks map[string]*rack, diskT
|
||||
if pass == 1 && !volumeOnMin {
|
||||
continue // pass 1: only volumes already on the destination
|
||||
}
|
||||
// Iterate the full shard-id space so custom ratios with more than
|
||||
// the standard total (ids 14..MaxShardCount-1) are candidates too.
|
||||
for shardID := 0; shardID < erasure_coding.MaxShardCount; shardID++ {
|
||||
sid := erasure_coding.ShardId(shardID)
|
||||
if !info.shardBits.Has(sid) {
|
||||
continue
|
||||
}
|
||||
// Walk the volume's actual shard bitmap so custom ratios with more
|
||||
// than the standard total (ids 14..MaxShardCount-1) are candidates too.
|
||||
for sid := range info.shardBits.All() {
|
||||
shardID := int(sid)
|
||||
if minInfo != nil && minInfo.shardBits.Has(sid) {
|
||||
continue
|
||||
}
|
||||
@@ -669,10 +676,8 @@ func shardsByGroup(vk volKey, nodes map[string]*Node, dataShards int, key func(*
|
||||
continue
|
||||
}
|
||||
k := key(node)
|
||||
for s := 0; s < erasure_coding.MaxShardCount; s++ {
|
||||
if !info.shardBits.Has(erasure_coding.ShardId(s)) {
|
||||
continue
|
||||
}
|
||||
for sid := range info.shardBits.All() {
|
||||
s := int(sid)
|
||||
if s < dataShards {
|
||||
dataPer[k] = append(dataPer[k], s)
|
||||
} else {
|
||||
@@ -747,11 +752,8 @@ func pickBestDiskOnNode(node *Node, vk volKey, diskType string, shardID, dataSha
|
||||
bits := info.diskShardBits[diskID]
|
||||
existingShards = bits.Count()
|
||||
if dataShardCount > 0 {
|
||||
for s := 0; s < erasure_coding.MaxShardCount; s++ {
|
||||
if !bits.Has(erasure_coding.ShardId(s)) {
|
||||
continue
|
||||
}
|
||||
if s < dataShardCount {
|
||||
for sid := range bits.All() {
|
||||
if int(sid) < dataShardCount {
|
||||
hasData = true
|
||||
} else {
|
||||
hasParity = true
|
||||
@@ -807,9 +809,11 @@ func reserveShard(node *Node, vk volKey, shardID int, diskID uint32) {
|
||||
info.diskShardBits[diskID] = info.diskShardBits[diskID].Set(sid)
|
||||
if d, ok := node.disks[diskID]; ok {
|
||||
d.shardCount++
|
||||
if d.freeSlots > 0 {
|
||||
d.freeSlots--
|
||||
}
|
||||
// Decrement unconditionally so reserve/release stay symmetric (releaseShard
|
||||
// credits a slot unconditionally). Callers only reserve onto disks
|
||||
// pickBestDisk* already vetted as having free slots, so this won't go
|
||||
// negative; if it ever did, freeSlots<=0 correctly reads as full.
|
||||
d.freeSlots--
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
558
weed/storage/erasure_coding/ecbalancer/place.go
Normal file
558
weed/storage/erasure_coding/ecbalancer/place.go
Normal file
@@ -0,0 +1,558 @@
|
||||
package ecbalancer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
storagetypes "github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
)
|
||||
|
||||
// Constraints configures a Place call. Ratio resolves a collection's
|
||||
// (dataShards, parityShards); nil uses the standard scheme. ReplicaPlacement,
|
||||
// when non-nil, caps shards per rack (DiffRackCount = max shards/rack) and per
|
||||
// node within a rack (SameRackCount = max shards/node); both digits are direct
|
||||
// hard caps. The data-center digit (DiffDataCenterCount) is not honored:
|
||||
// the 1-byte volume ReplicaPlacement can only encode 0-2 there, too small to be a
|
||||
// meaningful per-DC EC shard cap, so EC relies on the rack/node even spread instead.
|
||||
//
|
||||
// DiskTypePolicy controls how DiskType constrains placement (Any / Prefer /
|
||||
// Require). PreferredTags drives whole-plan tag tiering: Place tries disks
|
||||
// carrying the earliest tags first and widens to all disks only if a tier cannot
|
||||
// place every shard.
|
||||
type Constraints struct {
|
||||
DiskType string
|
||||
DiskTypePolicy DiskTypePolicy
|
||||
PreferredTags []string
|
||||
ReplicaPlacement *super_block.ReplicaPlacement
|
||||
Ratio func(collection string) (dataShards, parityShards int)
|
||||
}
|
||||
|
||||
// DiskTypePolicy controls how Constraints.DiskType constrains placement.
|
||||
type DiskTypePolicy int
|
||||
|
||||
const (
|
||||
DiskTypeAny DiskTypePolicy = iota // any disk type
|
||||
DiskTypePrefer // prefer DiskType, spill to other types if needed
|
||||
DiskTypeRequire // only DiskType (HardDriveType when "")
|
||||
)
|
||||
|
||||
// diskTypeEqual compares disk types after normalization, so "" and "hdd" (both
|
||||
// HardDriveType) are equal.
|
||||
func diskTypeEqual(a, b string) bool {
|
||||
return storagetypes.ToDiskType(a).String() == storagetypes.ToDiskType(b).String()
|
||||
}
|
||||
|
||||
// diskHasAnyTag reports whether the disk carries any of the given tags.
|
||||
func diskHasAnyTag(d *disk, tags []string) bool {
|
||||
for _, want := range tags {
|
||||
for _, have := range d.tags {
|
||||
if have == want {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Destination is a chosen target for one shard. DataCenter and Rack are kept as
|
||||
// separate values (matching topology.DiskInfo) rather than a "dc:rack" composite,
|
||||
// so callers read them directly instead of parsing.
|
||||
type Destination struct {
|
||||
Node string
|
||||
DiskID uint32
|
||||
DataCenter string
|
||||
Rack string // bare rack id within DataCenter
|
||||
}
|
||||
|
||||
// PlaceResult holds the chosen destinations, which constraints had to be relaxed
|
||||
// (durability-first only), and whether placement spilled outside the preferred
|
||||
// disk type or tag tiers (for parity with today's logging).
|
||||
type PlaceResult struct {
|
||||
Destinations map[int]Destination
|
||||
Relaxed []string
|
||||
SpilledToOtherDiskType bool
|
||||
SpilledOutsidePreferredTags bool
|
||||
}
|
||||
|
||||
// PlacementMode selects the strictness/relaxation policy.
|
||||
type PlacementMode int
|
||||
|
||||
const (
|
||||
// PlaceStrict: caps and ReplicaPlacement are hard. Place fails rather than
|
||||
// violate them, so the caller can defer (leave the volume as-is and retry).
|
||||
PlaceStrict PlacementMode = iota
|
||||
// PlaceDurabilityFirst (used by both encode and repair): relax per-type caps ->
|
||||
// data/parity anti-affinity -> ReplicaPlacement, in that order, until each shard
|
||||
// lands, reporting what was relaxed in PlaceResult.Relaxed. The per-disk
|
||||
// durability cap (<= parityShards per disk) is never relaxed. Fails only if no
|
||||
// disk has free capacity. Encode places best-effort this way and rebalancing
|
||||
// tightens the spread afterward.
|
||||
PlaceDurabilityFirst
|
||||
)
|
||||
|
||||
// relaxation controls which placement-quality constraints are enforced on an
|
||||
// attempt. preferring fresh nodes (repair's "avoid surviving-shard nodes") is not
|
||||
// listed: pickNodeInRack already selects the node with the fewest shards of the
|
||||
// volume, so survivors are deprioritized with built-in fallback.
|
||||
type relaxation struct {
|
||||
caps bool
|
||||
antiAffinity bool
|
||||
rp bool
|
||||
}
|
||||
|
||||
func (r relaxation) relaxedNames() []string {
|
||||
var n []string
|
||||
if !r.caps {
|
||||
n = append(n, "caps")
|
||||
}
|
||||
if !r.antiAffinity {
|
||||
n = append(n, "anti-affinity")
|
||||
}
|
||||
if !r.rp {
|
||||
n = append(n, "replica-placement")
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
var strictAttempts = []relaxation{{caps: true, antiAffinity: true, rp: true}}
|
||||
|
||||
var durabilityAttempts = []relaxation{
|
||||
{caps: true, antiAffinity: true, rp: true},
|
||||
{caps: false, antiAffinity: true, rp: true},
|
||||
{caps: false, antiAffinity: false, rp: true},
|
||||
{caps: false, antiAffinity: false, rp: false},
|
||||
}
|
||||
|
||||
type placedEntry struct {
|
||||
node *Node
|
||||
sid int
|
||||
rackKey string
|
||||
}
|
||||
|
||||
// Place assigns destinations for the `need` shard ids of volume (collection,vid),
|
||||
// reading the volume's already-placed shards from the snapshot (so encode passes
|
||||
// an empty-for-this-volume snapshot, repair passes one seeded with the surviving
|
||||
// shards).
|
||||
//
|
||||
// Tag tiering (whole-plan retry): it tries the preferred-tag tiers in order, each
|
||||
// a complete candidate set, and returns the first tier that places every shard;
|
||||
// only when it falls through to the no-tag tier does it set
|
||||
// SpilledOutsidePreferredTags. Within a tier, disk-type Prefer spills to other
|
||||
// types per shard (SpilledToOtherDiskType); Require filters strictly.
|
||||
func (t *Topology) Place(vid uint32, collection string, need []int, c Constraints, mode PlacementMode) (*PlaceResult, error) {
|
||||
if len(need) == 0 {
|
||||
return &PlaceResult{Destinations: map[int]Destination{}}, nil
|
||||
}
|
||||
|
||||
vk := volKey{collection: collection, vid: vid}
|
||||
dataShards, parityShards := erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount
|
||||
if c.Ratio != nil {
|
||||
if d, p := c.Ratio(collection); d > 0 && p > 0 {
|
||||
dataShards, parityShards = d, p
|
||||
}
|
||||
}
|
||||
|
||||
racks := buildRacks(t.nodes)
|
||||
if len(racks) == 0 {
|
||||
return nil, fmt.Errorf("no racks available for EC placement")
|
||||
}
|
||||
rackKeys := sortedKeys(racks)
|
||||
|
||||
// Disk-type eligibility (Require filters; Any/Prefer admit all) and the soft
|
||||
// type preference applied in scoring under Prefer.
|
||||
typeEligible := func(d *disk) bool {
|
||||
if c.DiskTypePolicy == DiskTypeRequire {
|
||||
return diskTypeEqual(d.diskType, c.DiskType)
|
||||
}
|
||||
return true
|
||||
}
|
||||
var prefer func(*disk) bool
|
||||
if c.DiskTypePolicy == DiskTypePrefer {
|
||||
prefer = func(d *disk) bool { return diskTypeEqual(d.diskType, c.DiskType) }
|
||||
}
|
||||
|
||||
// Whole-plan retry over preferred-tag tiers; the first tier that places every
|
||||
// shard wins. Reaching the no-tag tier means we spilled outside the tags.
|
||||
tiers := tagTiers(c.PreferredTags)
|
||||
var lastErr error
|
||||
for _, tierTags := range tiers {
|
||||
tt := tierTags
|
||||
eligible := func(d *disk) bool {
|
||||
return typeEligible(d) && (len(tt) == 0 || diskHasAnyTag(d, tt))
|
||||
}
|
||||
res, err := t.tryPlace(vk, need, dataShards, parityShards, racks, rackKeys, mode, c.ReplicaPlacement, eligible, prefer)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
if len(c.PreferredTags) > 0 && len(tierTags) == 0 {
|
||||
res.SpilledOutsidePreferredTags = true
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
// tagTiers returns the eligibility tag-sets in increasing breadth, ending with an
|
||||
// empty set ("any disk"). Empty preferredTags yields a single any-disk tier.
|
||||
func tagTiers(preferredTags []string) [][]string {
|
||||
if len(preferredTags) == 0 {
|
||||
return [][]string{nil}
|
||||
}
|
||||
tiers := make([][]string, 0, len(preferredTags)+1)
|
||||
for k := range preferredTags {
|
||||
tiers = append(tiers, append([]string(nil), preferredTags[:k+1]...))
|
||||
}
|
||||
return append(tiers, nil)
|
||||
}
|
||||
|
||||
// tryPlace runs one whole-plan placement attempt restricted to disks satisfying
|
||||
// `eligible`, with `prefer` (may be nil) ranking soft-preferred disks first. It
|
||||
// journals reservations and rolls them all back if any shard cannot be placed, so
|
||||
// a failed tier leaves the snapshot unchanged for the next attempt.
|
||||
func (t *Topology) tryPlace(vk volKey, need []int, dataShards, parityShards int, racks map[string]*rack, rackKeys []string, mode PlacementMode, rp *super_block.ReplicaPlacement, eligible func(*disk) bool, prefer func(*disk) bool) (*PlaceResult, error) {
|
||||
result := &PlaceResult{Destinations: make(map[int]Destination, len(need))}
|
||||
|
||||
// Per-type shard ids per rack (even caps), total shard count per rack
|
||||
// (DiffRackCount), and the racks bearing each type (anti-affinity) — all seeded
|
||||
// from the volume's existing shards.
|
||||
shardsPerRack := map[bool]map[string][]int{true: {}, false: {}}
|
||||
rackShardCount := map[string]int{}
|
||||
bearing := map[bool]map[string]bool{true: {}, false: {}}
|
||||
for _, n := range t.nodes {
|
||||
info, ok := n.shards[vk]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
for sid := range info.shardBits.All() {
|
||||
s := int(sid)
|
||||
isData := s < dataShards
|
||||
shardsPerRack[isData][n.rack] = append(shardsPerRack[isData][n.rack], s)
|
||||
rackShardCount[n.rack]++
|
||||
bearing[isData][n.rack] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Even per-rack caps divide by racks that actually have an eligible free disk,
|
||||
// not all racks (the snapshot keeps every disk type/tag), so a valid tiered
|
||||
// cluster — e.g. SSDs in only 2 of 4 racks — is not capped impossibly low.
|
||||
numEligibleRacks := 0
|
||||
for _, rk := range rackKeys {
|
||||
if rackHasFreeDisk(racks[rk], eligible) {
|
||||
numEligibleRacks++
|
||||
}
|
||||
}
|
||||
if numEligibleRacks < 1 {
|
||||
numEligibleRacks = 1
|
||||
}
|
||||
|
||||
attempts := strictAttempts
|
||||
if mode == PlaceDurabilityFirst {
|
||||
attempts = durabilityAttempts
|
||||
}
|
||||
|
||||
var journal []placedEntry
|
||||
relaxedSeen := map[string]bool{}
|
||||
spilledType := false
|
||||
|
||||
placeShard := func(sid int, isData bool) bool {
|
||||
typeTotal := dataShards
|
||||
if !isData {
|
||||
typeTotal = parityShards
|
||||
}
|
||||
for _, rl := range attempts {
|
||||
node, diskID, spilled, ok := chooseShardDest(vk, sid, isData, dataShards, typeTotal, numEligibleRacks, parityShards, racks, rackKeys, rp, eligible, prefer, shardsPerRack[isData], rackShardCount, bearing, rl)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
reserveShard(node, vk, sid, diskID)
|
||||
node.freeSlots--
|
||||
racks[node.rack].freeSlots--
|
||||
shardsPerRack[isData][node.rack] = append(shardsPerRack[isData][node.rack], sid)
|
||||
rackShardCount[node.rack]++
|
||||
bearing[isData][node.rack] = true
|
||||
journal = append(journal, placedEntry{node: node, sid: sid, rackKey: node.rack})
|
||||
result.Destinations[sid] = Destination{
|
||||
Node: node.id,
|
||||
DiskID: diskID,
|
||||
DataCenter: node.dc,
|
||||
Rack: strings.TrimPrefix(node.rack, node.dc+":"),
|
||||
}
|
||||
if spilled {
|
||||
spilledType = true
|
||||
}
|
||||
for _, name := range rl.relaxedNames() {
|
||||
relaxedSeen[name] = true
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// Data shards first, then parity, so parity can avoid data-bearing racks.
|
||||
for _, isData := range []bool{true, false} {
|
||||
for _, sid := range shardsOfType(need, isData, dataShards) {
|
||||
if placeShard(sid, isData) {
|
||||
continue
|
||||
}
|
||||
for _, e := range journal {
|
||||
releaseShard(e.node, vk, e.sid)
|
||||
e.node.freeSlots++
|
||||
racks[e.rackKey].freeSlots++
|
||||
}
|
||||
return nil, fmt.Errorf("cannot place EC shard %d of volume %d (collection %q)", sid, vk.vid, vk.collection)
|
||||
}
|
||||
}
|
||||
|
||||
result.SpilledToOtherDiskType = spilledType
|
||||
for name := range relaxedSeen {
|
||||
result.Relaxed = append(result.Relaxed, name)
|
||||
}
|
||||
sort.Strings(result.Relaxed)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// chooseShardDest selects a (node, disk) for one shard at the given relaxation
|
||||
// level: pick a rack (even per-type cap + ReplicaPlacement caps + two-pass
|
||||
// anti-affinity to the opposite type), then the least-loaded eligible node, then
|
||||
// the best eligible disk. The third return reports whether the disk spilled off
|
||||
// the soft-preferred type. ok=false when no rack/node/disk fits.
|
||||
func chooseShardDest(vk volKey, sid int, isData bool, dataShards, typeTotal, numEligibleRacks, maxPerDisk int, racks map[string]*rack, rackKeys []string, rp *super_block.ReplicaPlacement, eligible func(*disk) bool, prefer func(*disk) bool, shardsPerRackType map[string][]int, rackShardCount map[string]int, bearing map[bool]map[string]bool, rl relaxation) (*Node, uint32, bool, bool) {
|
||||
maxPerRack := numEligibleRacks*typeTotal + 1 // effectively unlimited when caps are relaxed
|
||||
if rl.caps {
|
||||
if maxPerRack = ceilDivide(typeTotal, numEligibleRacks); maxPerRack < 1 {
|
||||
maxPerRack = 1
|
||||
}
|
||||
}
|
||||
|
||||
var anti map[string]bool
|
||||
if rl.antiAffinity {
|
||||
anti = bearing[!isData] // racks already holding the opposite shard type
|
||||
}
|
||||
|
||||
if !rl.rp {
|
||||
rp = nil
|
||||
}
|
||||
// A rack is eligible only if it is under the per-rack shard cap (DiffRackCount),
|
||||
// enforced only when set (and relaxed with rp).
|
||||
withinLimit := func(r string) bool {
|
||||
if rp == nil {
|
||||
return true
|
||||
}
|
||||
if rp.DiffRackCount > 0 && rackShardCount[r] >= rp.DiffRackCount {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
destRack, ok := pickTarget(rackKeys, shardsPerRackType, maxPerRack, anti,
|
||||
func(r string) bool { return racks[r].freeSlots > 0 && rackHasFreeDisk(racks[r], eligible) },
|
||||
withinLimit)
|
||||
if !ok {
|
||||
return nil, 0, false, false
|
||||
}
|
||||
node := pickNodeInRackEligible(racks[destRack], vk, rp, eligible)
|
||||
if node == nil {
|
||||
return nil, 0, false, false
|
||||
}
|
||||
diskID, ok, spilled := pickBestDiskEligible(node, vk, eligible, prefer, sid, dataShards, maxPerDisk)
|
||||
if !ok {
|
||||
return nil, 0, false, false
|
||||
}
|
||||
return node, diskID, spilled, true
|
||||
}
|
||||
|
||||
// nodeHasFreeDisk reports whether the node has a free disk satisfying eligible.
|
||||
func nodeHasFreeDisk(n *Node, eligible func(*disk) bool) bool {
|
||||
for _, d := range n.disks {
|
||||
if d.freeSlots > 0 && eligible(d) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// rackHasFreeDisk reports whether any node in the rack has a free eligible disk.
|
||||
func rackHasFreeDisk(r *rack, eligible func(*disk) bool) bool {
|
||||
for _, n := range r.nodes {
|
||||
if n.freeSlots > 0 && nodeHasFreeDisk(n, eligible) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// pickNodeInRackEligible is pickNodeInRack restricted to nodes that have a free
|
||||
// eligible disk. FromActiveTopology keeps all disk types/tags in the snapshot, so
|
||||
// without this a node with free volume slots but no eligible disk could be chosen.
|
||||
func pickNodeInRackEligible(r *rack, vk volKey, rp *super_block.ReplicaPlacement, eligible func(*disk) bool) *Node {
|
||||
var best *Node
|
||||
bestCount := -1
|
||||
for _, id := range sortedNodeKeys(r.nodes) {
|
||||
node := r.nodes[id]
|
||||
if node.freeSlots <= 0 {
|
||||
continue
|
||||
}
|
||||
if !nodeHasFreeDisk(node, eligible) {
|
||||
continue
|
||||
}
|
||||
count := volumeShardCount(node, vk)
|
||||
if rp != nil && rp.SameRackCount > 0 && count >= rp.SameRackCount {
|
||||
continue
|
||||
}
|
||||
if best == nil || count < bestCount {
|
||||
best, bestCount = node, count
|
||||
}
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
||||
// pickBestDiskEligible chooses the best eligible disk on a node, ranking
|
||||
// soft-preferred disks (prefer != nil && prefer(d)) ahead of others so disk-type
|
||||
// Prefer uses the preferred type when available but spills otherwise. Returns the
|
||||
// disk id, whether one was found, and whether the chosen disk spilled off the
|
||||
// preferred type.
|
||||
func pickBestDiskEligible(node *Node, vk volKey, eligible func(*disk) bool, prefer func(*disk) bool, shardID, dataShardCount, maxPerDisk int) (uint32, bool, bool) {
|
||||
isDataShard := dataShardCount > 0 && shardID < dataShardCount
|
||||
info := node.shards[vk]
|
||||
var bestDiskID uint32
|
||||
bestScore := -1
|
||||
bestPreferred := false
|
||||
for _, diskID := range sortedDiskKeys(node.disks) {
|
||||
d := node.disks[diskID]
|
||||
if !eligible(d) || d.freeSlots <= 0 {
|
||||
continue
|
||||
}
|
||||
existingShards := 0
|
||||
hasData := false
|
||||
hasParity := false
|
||||
if info != nil {
|
||||
bits := info.diskShardBits[diskID]
|
||||
existingShards = bits.Count()
|
||||
if dataShardCount > 0 {
|
||||
for sid := range bits.All() {
|
||||
if int(sid) < dataShardCount {
|
||||
hasData = true
|
||||
} else {
|
||||
hasParity = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Durability: never put more than maxPerDisk (parityShards) shards of this
|
||||
// volume on one disk, or losing that disk would lose more than EC can
|
||||
// recover. Hard cap, enforced even under durability-first relaxation.
|
||||
if maxPerDisk > 0 && existingShards >= maxPerDisk {
|
||||
continue
|
||||
}
|
||||
score := d.shardCount*10 + existingShards*100
|
||||
if dataShardCount > 0 {
|
||||
if isDataShard && hasParity {
|
||||
score += 1000
|
||||
} else if !isDataShard && hasData {
|
||||
score += 1000
|
||||
}
|
||||
}
|
||||
preferred := prefer == nil || prefer(d)
|
||||
if !preferred {
|
||||
score += 100000 // strongly deprioritize spilling to a non-preferred type
|
||||
}
|
||||
if bestScore == -1 || score < bestScore {
|
||||
bestScore = score
|
||||
bestDiskID = diskID
|
||||
bestPreferred = preferred
|
||||
}
|
||||
}
|
||||
if bestScore == -1 {
|
||||
return 0, false, false
|
||||
}
|
||||
return bestDiskID, true, prefer != nil && !bestPreferred
|
||||
}
|
||||
|
||||
// clearShardAccounting removes one shard copy of a volume from the snapshot's
|
||||
// per-domain accounting (the volume's shard bits) WITHOUT crediting disk capacity.
|
||||
// It clears only the given physical disk's bit, then recomputes the node-level
|
||||
// union from the remaining disk bits, so a kept copy of the same shard on another
|
||||
// disk of the same node still counts toward caps / ReplicaPlacement / anti-affinity.
|
||||
//
|
||||
// Repair uses this to drop the duplicate/mismatched copies it plans to delete
|
||||
// before placing missing shards, so those copies do not inflate placement
|
||||
// accounting. Capacity is deliberately NOT credited: the deletes run only after
|
||||
// the rebuilt shards are distributed, so the slots are not free at plan time. This
|
||||
// is distinct from releaseShard, which credits freeSlots and clears the union.
|
||||
func clearShardAccounting(node *Node, vk volKey, shardID int, diskID uint32) {
|
||||
info, ok := node.shards[vk]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
sid := erasure_coding.ShardId(shardID)
|
||||
if bits, ok := info.diskShardBits[diskID]; ok {
|
||||
info.diskShardBits[diskID] = bits.Clear(sid)
|
||||
}
|
||||
var union erasure_coding.ShardBits
|
||||
for _, b := range info.diskShardBits {
|
||||
union |= b
|
||||
}
|
||||
info.shardBits = union
|
||||
}
|
||||
|
||||
// ClearShardAccounting drops one shard copy of a volume from placement accounting
|
||||
// without crediting capacity (see clearShardAccounting). Repair calls it for each
|
||||
// copy it plans to delete before placing missing shards, so those copies do not
|
||||
// inflate caps/RP/anti-affinity. No-op for an unknown node.
|
||||
func (t *Topology) ClearShardAccounting(nodeID, collection string, vid uint32, shardID int, diskID uint32) {
|
||||
n, ok := t.nodes[nodeID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
clearShardAccounting(n, volKey{collection: collection, vid: vid}, shardID, diskID)
|
||||
}
|
||||
|
||||
// ReleaseVolumeShards removes every shard of a volume from the snapshot and
|
||||
// credits the freed disk capacity. A greenfield encode calls this so any stale
|
||||
// EC shards left by a prior failed attempt (which the encode task deletes before
|
||||
// distributing the new shards) neither occupy capacity nor skew anti-affinity /
|
||||
// per-disk caps during planning. Unlike repair's ClearShardAccounting, it credits
|
||||
// freeSlots because the deletes run before the new writes.
|
||||
func (t *Topology) ReleaseVolumeShards(collection string, vid uint32) {
|
||||
vk := volKey{collection: collection, vid: vid}
|
||||
for _, n := range t.nodes {
|
||||
info, ok := n.shards[vk]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// freed is the total disk-slots the volume occupies on this node (a shard may
|
||||
// sit on more than one disk). releaseShard credits each disk's freeSlots;
|
||||
// credit the node's freeSlots by the same total, since rack capacity is summed
|
||||
// from node freeSlots (buildRacks) and node freeSlots gates node eligibility.
|
||||
freed := 0
|
||||
for _, bits := range info.diskShardBits {
|
||||
freed += bits.Count()
|
||||
}
|
||||
sids := make([]int, 0, info.shardBits.Count())
|
||||
for sid := range info.shardBits.All() {
|
||||
sids = append(sids, int(sid))
|
||||
}
|
||||
for _, sid := range sids {
|
||||
releaseShard(n, vk, sid)
|
||||
}
|
||||
n.freeSlots += freed
|
||||
delete(n.shards, vk)
|
||||
}
|
||||
}
|
||||
|
||||
// shardsOfType returns the sorted subset of need that are data shards (id <
|
||||
// dataShards) when isData, else the parity subset.
|
||||
func shardsOfType(need []int, isData bool, dataShards int) []int {
|
||||
var out []int
|
||||
for _, s := range need {
|
||||
if (s < dataShards) == isData {
|
||||
out = append(out, s)
|
||||
}
|
||||
}
|
||||
sort.Ints(out)
|
||||
return out
|
||||
}
|
||||
422
weed/storage/erasure_coding/ecbalancer/place_test.go
Normal file
422
weed/storage/erasure_coding/ecbalancer/place_test.go
Normal file
@@ -0,0 +1,422 @@
|
||||
package ecbalancer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
)
|
||||
|
||||
// buildPlaceTopo makes a topology of racks x nodesPerRack, each node one disk with
|
||||
// perDiskFree free EC shard slots.
|
||||
func buildPlaceTopo(racks, nodesPerRack, perDiskFree int) *Topology {
|
||||
topo := NewTopology()
|
||||
for r := 0; r < racks; r++ {
|
||||
rackKey := fmt.Sprintf("dc1:rack%d", r)
|
||||
for n := 0; n < nodesPerRack; n++ {
|
||||
id := fmt.Sprintf("10.0.%d.%d:8080", r, n)
|
||||
node := topo.AddNode(id, "dc1", rackKey, perDiskFree)
|
||||
node.AddDisk(0, "", perDiskFree, 0)
|
||||
}
|
||||
}
|
||||
return topo
|
||||
}
|
||||
|
||||
func allShards() []int {
|
||||
out := make([]int, erasure_coding.TotalShardsCount)
|
||||
for i := range out {
|
||||
out[i] = i
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// TestPlaceStrictSpreadAndCaps places a fresh 10+4 volume and checks every shard
|
||||
// lands on a distinct node and no rack exceeds the even per-type cap.
|
||||
func TestPlaceStrictSpreadAndCaps(t *testing.T) {
|
||||
const racks = 4
|
||||
topo := buildPlaceTopo(racks, 4, 50)
|
||||
|
||||
res, err := topo.Place(1, "c1", allShards(), Constraints{}, PlaceStrict)
|
||||
if err != nil {
|
||||
t.Fatalf("Place: %v", err)
|
||||
}
|
||||
if len(res.Destinations) != erasure_coding.TotalShardsCount {
|
||||
t.Fatalf("placed %d shards, want %d", len(res.Destinations), erasure_coding.TotalShardsCount)
|
||||
}
|
||||
|
||||
usedNodes := map[string]bool{}
|
||||
dataPerRack := map[string]int{}
|
||||
parityPerRack := map[string]int{}
|
||||
for sid, d := range res.Destinations {
|
||||
if usedNodes[d.Node] {
|
||||
t.Errorf("node %s reused for shard %d (expected distinct nodes with ample capacity)", d.Node, sid)
|
||||
}
|
||||
usedNodes[d.Node] = true
|
||||
if sid < erasure_coding.DataShardsCount {
|
||||
dataPerRack[d.Rack]++
|
||||
} else {
|
||||
parityPerRack[d.Rack]++
|
||||
}
|
||||
}
|
||||
dataCap := ceilDivide(erasure_coding.DataShardsCount, racks)
|
||||
parityCap := ceilDivide(erasure_coding.ParityShardsCount, racks)
|
||||
for rk, n := range dataPerRack {
|
||||
if n > dataCap {
|
||||
t.Errorf("rack %s holds %d data shards, cap %d", rk, n, dataCap)
|
||||
}
|
||||
}
|
||||
for rk, n := range parityPerRack {
|
||||
if n > parityCap {
|
||||
t.Errorf("rack %s holds %d parity shards, cap %d", rk, n, parityCap)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlaceStrictFailsAndRollsBack: a single tiny disk cannot hold 14 shards, so
|
||||
// strict Place fails and leaves the snapshot untouched.
|
||||
func TestPlaceStrictFailsAndRollsBack(t *testing.T) {
|
||||
topo := buildPlaceTopo(1, 1, 2) // one node, room for 2 shards
|
||||
node := topo.nodes["10.0.0.0:8080"]
|
||||
freeBefore := node.freeSlots
|
||||
diskFreeBefore := node.disks[0].freeSlots
|
||||
|
||||
_, err := topo.Place(1, "c1", allShards(), Constraints{}, PlaceStrict)
|
||||
if err == nil {
|
||||
t.Fatal("expected Place to fail on insufficient capacity")
|
||||
}
|
||||
if info, ok := node.shards[volKey{collection: "c1", vid: 1}]; ok && info.shardBits.Count() != 0 {
|
||||
t.Errorf("volume shard bits left on node after failed strict Place (rollback incomplete): %b", info.shardBits)
|
||||
}
|
||||
if node.freeSlots != freeBefore {
|
||||
t.Errorf("node freeSlots = %d after rollback, want %d", node.freeSlots, freeBefore)
|
||||
}
|
||||
if node.disks[0].freeSlots != diskFreeBefore {
|
||||
t.Errorf("disk freeSlots = %d after rollback, want %d", node.disks[0].freeSlots, diskFreeBefore)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlaceDurabilityFirstRelaxesRP: a ReplicaPlacement rack limit too tight for
|
||||
// the shard count makes strict fail, while durability-first relaxes RP to place
|
||||
// everything and reports the relaxation.
|
||||
func TestPlaceDurabilityFirstRelaxesRP(t *testing.T) {
|
||||
rp := &super_block.ReplicaPlacement{DiffRackCount: 3} // <=3 shards per rack
|
||||
topo := buildPlaceTopo(2, 8, 50) // 2 racks: 2*3=6 < 14 under RP
|
||||
|
||||
if _, err := topo.Place(1, "c1", allShards(), Constraints{ReplicaPlacement: rp}, PlaceStrict); err == nil {
|
||||
t.Fatal("strict Place should fail when RP rack limit cannot fit all shards")
|
||||
}
|
||||
|
||||
topo = buildPlaceTopo(2, 8, 50)
|
||||
res, err := topo.Place(1, "c1", allShards(), Constraints{ReplicaPlacement: rp}, PlaceDurabilityFirst)
|
||||
if err != nil {
|
||||
t.Fatalf("durability-first Place: %v", err)
|
||||
}
|
||||
if len(res.Destinations) != erasure_coding.TotalShardsCount {
|
||||
t.Fatalf("placed %d shards, want %d", len(res.Destinations), erasure_coding.TotalShardsCount)
|
||||
}
|
||||
relaxedRP := false
|
||||
for _, r := range res.Relaxed {
|
||||
if r == "replica-placement" {
|
||||
relaxedRP = true
|
||||
}
|
||||
}
|
||||
if !relaxedRP {
|
||||
t.Errorf("expected replica-placement relaxation, got %v", res.Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlaceSameRackCountIsDirectPerNodeCap: the 3rd ReplicaPlacement digit
|
||||
// (SameRackCount) caps shards per node directly (max == digit), matching the
|
||||
// per-rack DiffRackCount cap rather than allowing digit+1 per node.
|
||||
func TestPlaceSameRackCountIsDirectPerNodeCap(t *testing.T) {
|
||||
rp := &super_block.ReplicaPlacement{SameRackCount: 2} // <=2 shards per node
|
||||
|
||||
// 5 single-node racks: 5 nodes * 2 = 10 < 14, so a strict 10+4 placement
|
||||
// cannot satisfy the per-node cap and must fail. Under the old digit+1 reading
|
||||
// the cap would be 3/node => 15 slots and this would have wrongly succeeded.
|
||||
topo := buildPlaceTopo(5, 1, 50)
|
||||
if _, err := topo.Place(1, "c1", allShards(), Constraints{ReplicaPlacement: rp}, PlaceStrict); err == nil {
|
||||
t.Fatal("strict Place should fail: 5 nodes cannot hold 14 shards at <=2 per node")
|
||||
}
|
||||
|
||||
// Durability-first relaxes the unsatisfiable per-node cap, still places every
|
||||
// shard, and reports the relaxation so it isn't silently weakened.
|
||||
topo = buildPlaceTopo(5, 1, 50)
|
||||
res, err := topo.Place(1, "c1", allShards(), Constraints{ReplicaPlacement: rp}, PlaceDurabilityFirst)
|
||||
if err != nil {
|
||||
t.Fatalf("durability-first Place: %v", err)
|
||||
}
|
||||
if len(res.Destinations) != erasure_coding.TotalShardsCount {
|
||||
t.Fatalf("placed %d shards, want %d", len(res.Destinations), erasure_coding.TotalShardsCount)
|
||||
}
|
||||
relaxedRP := false
|
||||
for _, r := range res.Relaxed {
|
||||
if r == "replica-placement" {
|
||||
relaxedRP = true
|
||||
}
|
||||
}
|
||||
if !relaxedRP {
|
||||
t.Errorf("expected replica-placement relaxation, got %v", res.Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlaceDiskTypeHardFilter: with DiskType set, shards land only on disks of
|
||||
// that type, even though the snapshot also contains other-typed disks.
|
||||
func TestPlaceDiskTypeHardFilter(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
for r := 0; r < 4; r++ {
|
||||
rackKey := fmt.Sprintf("dc1:rack%d", r)
|
||||
ssd := topo.AddNode(fmt.Sprintf("ssd-%d:8080", r), "dc1", rackKey, 50)
|
||||
ssd.AddDisk(0, "ssd", 50, 0)
|
||||
hdd := topo.AddNode(fmt.Sprintf("hdd-%d:8080", r), "dc1", rackKey, 50)
|
||||
hdd.AddDisk(0, "hdd", 50, 0)
|
||||
}
|
||||
|
||||
res, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "ssd", DiskTypePolicy: DiskTypeRequire}, PlaceStrict)
|
||||
if err != nil {
|
||||
t.Fatalf("Place ssd: %v", err)
|
||||
}
|
||||
for sid, d := range res.Destinations {
|
||||
node := topo.nodes[d.Node]
|
||||
disk := node.disks[d.DiskID]
|
||||
if disk == nil || disk.diskType != "ssd" {
|
||||
t.Errorf("shard %d placed on non-ssd disk: node=%s diskID=%d", sid, d.Node, d.DiskID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlaceDiskTypeUnavailableFails: a request for a disk type with no matching
|
||||
// disks fails rather than silently placing on the wrong tier.
|
||||
func TestPlaceDiskTypeUnavailableFails(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
for r := 0; r < 4; r++ {
|
||||
n := topo.AddNode(fmt.Sprintf("hdd-%d:8080", r), "dc1", fmt.Sprintf("dc1:rack%d", r), 50)
|
||||
n.AddDisk(0, "hdd", 50, 0)
|
||||
}
|
||||
if _, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "ssd", DiskTypePolicy: DiskTypeRequire}, PlaceStrict); err == nil {
|
||||
t.Fatal("expected Place to fail when no disks of the requested type exist")
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlaceHDDRequestMatchesEmptyTypeDisks: a "hdd" request normalizes to
|
||||
// HardDriveType ("") and must land on the HDD disk (reported as ""), never the SSD
|
||||
// disk, even on nodes that have both.
|
||||
func TestPlaceHDDRequestMatchesEmptyTypeDisks(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
for r := 0; r < 6; r++ {
|
||||
n := topo.AddNode(fmt.Sprintf("n%d:8080", r), "dc1", fmt.Sprintf("dc1:rack%d", r), 100)
|
||||
n.AddDisk(0, "", 50, 0) // HDD (HardDriveType, reported as "")
|
||||
n.AddDisk(1, "ssd", 50, 0) // SSD
|
||||
}
|
||||
|
||||
res, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "hdd", DiskTypePolicy: DiskTypeRequire}, PlaceStrict)
|
||||
if err != nil {
|
||||
t.Fatalf("Place hdd: %v", err)
|
||||
}
|
||||
for sid, d := range res.Destinations {
|
||||
if d.DiskID != 0 { // disk 0 is the HDD disk on every node
|
||||
t.Errorf("shard %d placed on disk %d (expected HDD disk 0) on node %s", sid, d.DiskID, d.Node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlaceDurabilityCapRejectsSkewed: in a near-full cluster where only one disk
|
||||
// has spare room, Place must not pile more than parityShards shards onto it (losing
|
||||
// it would then lose more than EC can recover). It fails instead, so the caller
|
||||
// leaves the volume unencoded rather than minting an unrecoverable layout.
|
||||
func TestPlaceDurabilityCapRejectsSkewed(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
// One spacious node plus four nearly-full ones, all in a single rack.
|
||||
a := topo.AddNode("a:8080", "dc1", "dc1:rack0", 100)
|
||||
a.AddDisk(0, "", 100, 0)
|
||||
for i := 0; i < 4; i++ {
|
||||
n := topo.AddNode(fmt.Sprintf("b%d:8080", i), "dc1", "dc1:rack0", 1)
|
||||
n.AddDisk(0, "", 1, 0)
|
||||
}
|
||||
|
||||
// 14 shards, parity 4: node a is capped at 4, the others hold 1 each -> at most
|
||||
// 4+4=8 placeable without exceeding parityShards on a disk, so Place must fail.
|
||||
// (Without the per-disk cap, a would greedily absorb 10 shards and "succeed".)
|
||||
if _, err := topo.Place(1, "c1", allShards(), Constraints{}, PlaceDurabilityFirst); err == nil {
|
||||
t.Fatal("expected Place to fail rather than pile >parityShards shards on one disk")
|
||||
}
|
||||
}
|
||||
|
||||
// TestReleaseVolumeShards: removes all of a volume's shards from the snapshot and
|
||||
// credits the freed capacity at BOTH disk and node level (rack capacity sums node
|
||||
// freeSlots), mirroring how FromActiveTopology accounts stale shards.
|
||||
func TestReleaseVolumeShards(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
// Node total 20 = two disks of 10. Two stale shards occupy one slot each, so the
|
||||
// snapshot would show disk0/disk1 at 9 and node at 18 (as FromActiveTopology does).
|
||||
n := topo.AddNode("n0:8080", "dc1", "dc1:rack0", 20)
|
||||
n.AddDisk(0, "", 10, 0)
|
||||
n.AddDisk(1, "", 10, 0)
|
||||
vk := volKey{collection: "c1", vid: 1}
|
||||
n.AddShards(1, "c1", 0, erasure_coding.ShardBits(uint32(1)<<3))
|
||||
n.AddShards(1, "c1", 1, erasure_coding.ShardBits(uint32(1)<<7))
|
||||
n.disks[0].freeSlots = 9
|
||||
n.disks[1].freeSlots = 9
|
||||
n.freeSlots = 18
|
||||
|
||||
topo.ReleaseVolumeShards("c1", 1)
|
||||
|
||||
if _, ok := n.shards[vk]; ok {
|
||||
t.Error("volume shards should be gone after ReleaseVolumeShards")
|
||||
}
|
||||
if n.disks[0].freeSlots != 10 || n.disks[1].freeSlots != 10 {
|
||||
t.Errorf("disk freeSlots not restored: disk0=%d, disk1=%d (want 10, 10)", n.disks[0].freeSlots, n.disks[1].freeSlots)
|
||||
}
|
||||
if n.freeSlots != 20 {
|
||||
t.Errorf("node freeSlots = %d, want 20 (must be credited at node level too)", n.freeSlots)
|
||||
}
|
||||
}
|
||||
|
||||
// TestClearShardAccounting: dropping one disk's copy of a shard preserves a kept
|
||||
// copy of the same shard on another disk of the same node, and credits no capacity.
|
||||
func TestClearShardAccounting(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
n := topo.AddNode("n0:8080", "dc1", "dc1:rack0", 50)
|
||||
n.AddDisk(0, "", 50, 0)
|
||||
n.AddDisk(1, "", 50, 0)
|
||||
vk := volKey{collection: "c1", vid: 1}
|
||||
// Shard 3 lives on disk 0 (keep) and disk 1 (duplicate to delete).
|
||||
n.AddShards(1, "c1", 0, erasure_coding.ShardBits(uint32(1)<<3))
|
||||
n.AddShards(1, "c1", 1, erasure_coding.ShardBits(uint32(1)<<3))
|
||||
if got := n.shards[vk].shardBits.Count(); got != 1 {
|
||||
t.Fatalf("union count = %d, want 1", got)
|
||||
}
|
||||
freeBefore := n.disks[1].freeSlots
|
||||
|
||||
clearShardAccounting(n, vk, 3, 1)
|
||||
|
||||
if !n.shards[vk].shardBits.Has(erasure_coding.ShardId(3)) {
|
||||
t.Error("kept copy of shard 3 (disk 0) lost from the node-level union")
|
||||
}
|
||||
if n.shards[vk].diskShardBits[1].Has(erasure_coding.ShardId(3)) {
|
||||
t.Error("disk-1 copy of shard 3 was not cleared")
|
||||
}
|
||||
if n.disks[1].freeSlots != freeBefore {
|
||||
t.Errorf("freeSlots changed %d -> %d; clearShardAccounting must not credit capacity", freeBefore, n.disks[1].freeSlots)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlaceDiskTypePreferSpills: DiskTypePrefer fills the preferred type first and
|
||||
// spills the remainder to other types, reporting SpilledToOtherDiskType. SSD is
|
||||
// scarce (one tiny SSD per node) so the volume must spill to HDD, but there are
|
||||
// enough disks to keep each within the parityShards durability cap.
|
||||
func TestPlaceDiskTypePreferSpills(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
for r := 0; r < 8; r++ {
|
||||
n := topo.AddNode(fmt.Sprintf("n%d:8080", r), "dc1", fmt.Sprintf("dc1:rack%d", r), 100)
|
||||
n.AddDisk(0, "ssd", 1, 0) // tiny SSD: 1 shard
|
||||
n.AddDisk(1, "", 50, 0) // roomy HDD
|
||||
}
|
||||
|
||||
res, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "ssd", DiskTypePolicy: DiskTypePrefer}, PlaceDurabilityFirst)
|
||||
if err != nil {
|
||||
t.Fatalf("Place: %v", err)
|
||||
}
|
||||
if len(res.Destinations) != erasure_coding.TotalShardsCount {
|
||||
t.Fatalf("placed %d, want %d", len(res.Destinations), erasure_coding.TotalShardsCount)
|
||||
}
|
||||
ssd, hdd := 0, 0
|
||||
for _, d := range res.Destinations {
|
||||
if topo.nodes[d.Node].disks[d.DiskID].diskType == "ssd" {
|
||||
ssd++
|
||||
} else {
|
||||
hdd++
|
||||
}
|
||||
}
|
||||
if ssd == 0 || hdd == 0 {
|
||||
t.Errorf("expected prefer-then-spill: some shards on SSD and some on HDD, got ssd=%d hdd=%d", ssd, hdd)
|
||||
}
|
||||
if !res.SpilledToOtherDiskType {
|
||||
t.Error("expected SpilledToOtherDiskType when SSD cannot hold every shard")
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlacePreferredTagsUseTaggedDisks: when tagged disks can hold the whole plan,
|
||||
// every shard lands on a tagged disk and no spill is reported.
|
||||
func TestPlacePreferredTagsUseTaggedDisks(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
for r := 0; r < 4; r++ {
|
||||
rackKey := fmt.Sprintf("dc1:rack%d", r)
|
||||
fast := topo.AddNode(fmt.Sprintf("fast-%d:8080", r), "dc1", rackKey, 50)
|
||||
fast.AddDisk(0, "", 50, 0)
|
||||
fast.AddDiskTags(0, []string{"fast"})
|
||||
topo.AddNode(fmt.Sprintf("slow-%d:8080", r), "dc1", rackKey, 50).AddDisk(0, "", 50, 0)
|
||||
}
|
||||
|
||||
res, err := topo.Place(1, "c1", allShards(), Constraints{PreferredTags: []string{"fast"}}, PlaceStrict)
|
||||
if err != nil {
|
||||
t.Fatalf("Place: %v", err)
|
||||
}
|
||||
for sid, d := range res.Destinations {
|
||||
if !diskHasAnyTag(topo.nodes[d.Node].disks[d.DiskID], []string{"fast"}) {
|
||||
t.Errorf("shard %d placed on an untagged disk (node %s)", sid, d.Node)
|
||||
}
|
||||
}
|
||||
if res.SpilledOutsidePreferredTags {
|
||||
t.Error("did not expect tag spill when tagged disks suffice")
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlacePreferredTagsSpillWhenInsufficient: when the tagged tier cannot hold the
|
||||
// whole plan, Place falls back to all disks and reports SpilledOutsidePreferredTags.
|
||||
func TestPlacePreferredTagsSpillWhenInsufficient(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
for r := 0; r < 4; r++ {
|
||||
n := topo.AddNode(fmt.Sprintf("n-%d:8080", r), "dc1", fmt.Sprintf("dc1:rack%d", r), 50)
|
||||
if r == 0 {
|
||||
n.AddDisk(0, "", 5, 0) // the only tagged disk, too small for 14 shards
|
||||
n.AddDiskTags(0, []string{"fast"})
|
||||
} else {
|
||||
n.AddDisk(0, "", 50, 0)
|
||||
}
|
||||
}
|
||||
|
||||
res, err := topo.Place(1, "c1", allShards(), Constraints{PreferredTags: []string{"fast"}}, PlaceStrict)
|
||||
if err != nil {
|
||||
t.Fatalf("Place: %v", err)
|
||||
}
|
||||
if len(res.Destinations) != erasure_coding.TotalShardsCount {
|
||||
t.Fatalf("placed %d, want %d", len(res.Destinations), erasure_coding.TotalShardsCount)
|
||||
}
|
||||
if !res.SpilledOutsidePreferredTags {
|
||||
t.Error("expected SpilledOutsidePreferredTags when the single fast disk cannot hold the plan")
|
||||
}
|
||||
}
|
||||
|
||||
// TestPlaceStrictCapsCountEligibleRacks: with DiskTypeRequire, the even per-rack
|
||||
// cap divides by racks that have a matching disk, not all racks, so SSDs in only
|
||||
// some racks still place successfully.
|
||||
func TestPlaceStrictCapsCountEligibleRacks(t *testing.T) {
|
||||
topo := NewTopology()
|
||||
// SSDs live in only 2 of 4 racks, with several SSD nodes per rack so 14 shards
|
||||
// fit at <= parityShards per disk. The even per-rack cap must divide by the 2
|
||||
// eligible racks (ceil(10/2)=5 data/rack), not all 4 (ceil(10/4)=3 -> infeasible).
|
||||
for r := 0; r < 4; r++ {
|
||||
rackKey := fmt.Sprintf("dc1:rack%d", r)
|
||||
topo.AddNode(fmt.Sprintf("hdd-%d:8080", r), "dc1", rackKey, 50).AddDisk(0, "", 50, 0)
|
||||
if r < 2 {
|
||||
for n := 0; n < 4; n++ {
|
||||
topo.AddNode(fmt.Sprintf("ssd-%d-%d:8080", r, n), "dc1", rackKey, 50).AddDisk(0, "ssd", 50, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
res, err := topo.Place(1, "c1", allShards(), Constraints{DiskType: "ssd", DiskTypePolicy: DiskTypeRequire}, PlaceStrict)
|
||||
if err != nil {
|
||||
t.Fatalf("Place ssd in 2/4 racks: %v", err)
|
||||
}
|
||||
if len(res.Destinations) != erasure_coding.TotalShardsCount {
|
||||
t.Fatalf("placed %d, want %d", len(res.Destinations), erasure_coding.TotalShardsCount)
|
||||
}
|
||||
for sid, d := range res.Destinations {
|
||||
if disk := topo.nodes[d.Node].disks[d.DiskID]; disk == nil || disk.diskType != "ssd" {
|
||||
t.Errorf("shard %d not on an SSD disk: node=%s disk=%d", sid, d.Node, d.DiskID)
|
||||
}
|
||||
}
|
||||
}
|
||||
14
weed/storage/erasure_coding/ecbalancer/shard_ratio.go
Normal file
14
weed/storage/erasure_coding/ecbalancer/shard_ratio.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package ecbalancer
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
)
|
||||
|
||||
// shardDataShards returns the data-shard count of the volume an EC shard belongs
|
||||
// to, used to size the shard's disk footprint. OSS uses the standard ratio for
|
||||
// every volume; custom per-volume ratios are an enterprise feature, so the
|
||||
// enterprise build overrides this to read the per-shard ratio.
|
||||
func shardDataShards(eci *master_pb.VolumeEcShardInformationMessage) int {
|
||||
return erasure_coding.DataShardsCount
|
||||
}
|
||||
113
weed/storage/erasure_coding/ecbalancer/snapshot.go
Normal file
113
weed/storage/erasure_coding/ecbalancer/snapshot.go
Normal file
@@ -0,0 +1,113 @@
|
||||
package ecbalancer
|
||||
|
||||
import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
)
|
||||
|
||||
// FromActiveTopology builds a Topology snapshot from the cluster's ActiveTopology
|
||||
// using the reservation-aware effective-capacity view that EC encode and repair
|
||||
// rely on. It collects ALL EC-eligible disks with no hard disk-type filter;
|
||||
// disk-type preference is applied later by callers (Place). Per-disk free EC shard
|
||||
// slots come from GetEffectiveAvailableEcShardSlots (shard-granular, so in-flight
|
||||
// task reservations that are not whole-volume multiples are not lost) minus the EC
|
||||
// shards already persisted on the disk. Rack keys are composite "dc:rack".
|
||||
//
|
||||
// dataShards is the target collection's data-shard count, used to size free EC
|
||||
// shard slots correctly for custom ratios (a 4+2 volume's shards are larger, so
|
||||
// fewer fit per volume slot). Pass <= 0 for the default scheme. Because of this,
|
||||
// the snapshot is ratio-specific; build one per collection ratio being placed.
|
||||
//
|
||||
// This is the encode/repair-side constructor; balance keeps its own raw-topology
|
||||
// builder (buildBalancerTopology) until the snapshot sources are reconciled.
|
||||
func FromActiveTopology(at *topology.ActiveTopology, dataShards int) *Topology {
|
||||
topo := NewTopology()
|
||||
if at == nil {
|
||||
return topo
|
||||
}
|
||||
|
||||
disks := at.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 0)
|
||||
|
||||
// Accumulate node-level free slots and group the node's disks together.
|
||||
nodeFree := make(map[string]int)
|
||||
nodeDC := make(map[string]string)
|
||||
nodeRack := make(map[string]string)
|
||||
byNode := make(map[string][]*topology.DiskInfo)
|
||||
for _, d := range disks {
|
||||
if d == nil || d.DiskInfo == nil {
|
||||
continue
|
||||
}
|
||||
if free := perDiskFreeECSlots(at, d, dataShards); free > 0 {
|
||||
nodeFree[d.NodeID] += free
|
||||
}
|
||||
nodeDC[d.NodeID] = d.DataCenter
|
||||
nodeRack[d.NodeID] = d.DataCenter + ":" + d.Rack
|
||||
byNode[d.NodeID] = append(byNode[d.NodeID], d)
|
||||
}
|
||||
|
||||
for nodeID, ds := range byNode {
|
||||
node := topo.AddNode(nodeID, nodeDC[nodeID], nodeRack[nodeID], nodeFree[nodeID])
|
||||
for _, d := range ds {
|
||||
free := perDiskFreeECSlots(at, d, dataShards)
|
||||
if free < 0 {
|
||||
free = 0
|
||||
}
|
||||
node.AddDisk(d.DiskID, d.DiskType, free, ecShardCountOnDisk(d))
|
||||
node.AddDiskTags(d.DiskID, d.DiskInfo.Tags)
|
||||
for _, eci := range d.DiskInfo.EcShardInfos {
|
||||
if eci.DiskId != d.DiskID {
|
||||
continue
|
||||
}
|
||||
node.AddShards(eci.Id, eci.Collection, d.DiskID, erasure_coding.ShardBits(eci.EcIndexBits))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return topo
|
||||
}
|
||||
|
||||
// perDiskFreeECSlots returns the disk's free EC shard slots for a volume with
|
||||
// dataShards data shards: the topology's shard-granular effective availability
|
||||
// (sized by the target ratio) minus the EC shards already on the disk, also
|
||||
// expressed in the target ratio's shard slots so mixed-ratio disks are charged by
|
||||
// size rather than raw count.
|
||||
func perDiskFreeECSlots(at *topology.ActiveTopology, d *topology.DiskInfo, dataShards int) int {
|
||||
return at.GetEffectiveAvailableEcShardSlots(d.NodeID, d.DiskID, dataShards) - ecShardSlotsOnDisk(d, dataShards)
|
||||
}
|
||||
|
||||
// ecShardCountOnDisk counts the EC shards physically on this disk (matching
|
||||
// eci.DiskId), across all volumes. Used as a per-disk load metric for scoring.
|
||||
func ecShardCountOnDisk(d *topology.DiskInfo) int {
|
||||
count := 0
|
||||
for _, eci := range d.DiskInfo.EcShardInfos {
|
||||
if eci.DiskId == d.DiskID {
|
||||
count += erasure_coding.GetShardCount(eci)
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
// ecShardSlotsOnDisk returns the EC shards already on the disk expressed in the
|
||||
// TARGET collection's shard slots. A shard of a collection with d data shards
|
||||
// occupies ~1/d of a volume, i.e. targetDataShards/d target slots, so a 2+1 shard
|
||||
// counted against a 10+4 snapshot consumes ~5 slots, not 1.
|
||||
func ecShardSlotsOnDisk(d *topology.DiskInfo, targetDataShards int) int {
|
||||
if targetDataShards <= 0 {
|
||||
targetDataShards = erasure_coding.DataShardsCount
|
||||
}
|
||||
total := 0
|
||||
for _, eci := range d.DiskInfo.EcShardInfos {
|
||||
if eci.DiskId != d.DiskID {
|
||||
continue
|
||||
}
|
||||
ds := shardDataShards(eci)
|
||||
if ds <= 0 {
|
||||
ds = erasure_coding.DataShardsCount
|
||||
}
|
||||
// Round up so an existing shard always consumes at least its fractional
|
||||
// footprint; flooring lets a low-data-shard volume (targetDataShards < ds)
|
||||
// count as zero target slots and overstate the disk's free capacity.
|
||||
total += (erasure_coding.GetShardCount(eci)*targetDataShards + ds - 1) / ds
|
||||
}
|
||||
return total
|
||||
}
|
||||
137
weed/storage/erasure_coding/ecbalancer/snapshot_test.go
Normal file
137
weed/storage/erasure_coding/ecbalancer/snapshot_test.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package ecbalancer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
)
|
||||
|
||||
// TestFromActiveTopology verifies the encode/repair-side snapshot constructor maps
|
||||
// nodes, per-disk EC shard counts, per-volume shard bits, and free slots from an
|
||||
// ActiveTopology. Shard accounting is asserted exactly; free slots are asserted to
|
||||
// be positive (their exact value depends on effective-capacity internals).
|
||||
func TestFromActiveTopology(t *testing.T) {
|
||||
const vid uint32 = 7
|
||||
at := topology.NewActiveTopology(10)
|
||||
|
||||
// Node A holds one EC shard (id 3) of volume 7 on disk 0; node B is empty.
|
||||
nodeA := &master_pb.DataNodeInfo{
|
||||
Id: "10.0.0.1:8080",
|
||||
DiskInfos: map[string]*master_pb.DiskInfo{
|
||||
"hdd": {
|
||||
DiskId: 0,
|
||||
MaxVolumeCount: 100,
|
||||
VolumeCount: 1,
|
||||
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{{
|
||||
Id: vid,
|
||||
Collection: "c1",
|
||||
EcIndexBits: uint32(1) << 3,
|
||||
DiskId: 0,
|
||||
}},
|
||||
},
|
||||
},
|
||||
}
|
||||
nodeB := &master_pb.DataNodeInfo{
|
||||
Id: "10.0.0.2:8080",
|
||||
DiskInfos: map[string]*master_pb.DiskInfo{
|
||||
"hdd": {DiskId: 0, MaxVolumeCount: 100, VolumeCount: 0},
|
||||
},
|
||||
}
|
||||
if err := at.UpdateTopology(&master_pb.TopologyInfo{
|
||||
DataCenterInfos: []*master_pb.DataCenterInfo{{
|
||||
Id: "dc1",
|
||||
RackInfos: []*master_pb.RackInfo{{
|
||||
Id: "rack1",
|
||||
DataNodeInfos: []*master_pb.DataNodeInfo{nodeA, nodeB},
|
||||
}},
|
||||
}},
|
||||
}); err != nil {
|
||||
t.Fatalf("UpdateTopology: %v", err)
|
||||
}
|
||||
|
||||
topo := FromActiveTopology(at, 0)
|
||||
|
||||
if got := len(topo.nodes); got != 2 {
|
||||
t.Fatalf("node count = %d, want 2", got)
|
||||
}
|
||||
|
||||
a := topo.nodes["10.0.0.1:8080"]
|
||||
if a == nil {
|
||||
t.Fatal("node A missing from snapshot")
|
||||
}
|
||||
if a.rack != "dc1:rack1" {
|
||||
t.Errorf("node A rack = %q, want dc1:rack1", a.rack)
|
||||
}
|
||||
diskA := a.disks[0]
|
||||
if diskA == nil {
|
||||
t.Fatal("node A disk 0 missing")
|
||||
}
|
||||
if diskA.shardCount != 1 {
|
||||
t.Errorf("node A disk 0 shardCount = %d, want 1", diskA.shardCount)
|
||||
}
|
||||
vs := a.shards[volKey{collection: "c1", vid: vid}]
|
||||
if vs == nil {
|
||||
t.Fatal("volume 7 shards not recorded on node A")
|
||||
}
|
||||
if !vs.shardBits.Has(erasure_coding.ShardId(3)) {
|
||||
t.Errorf("node A volume 7 shardBits %b missing shard 3", vs.shardBits)
|
||||
}
|
||||
if vs.shardBits.Count() != 1 {
|
||||
t.Errorf("node A volume 7 shard count = %d, want 1", vs.shardBits.Count())
|
||||
}
|
||||
|
||||
b := topo.nodes["10.0.0.2:8080"]
|
||||
if b == nil {
|
||||
t.Fatal("node B missing from snapshot")
|
||||
}
|
||||
if b.rack != "dc1:rack1" {
|
||||
t.Errorf("node B rack = %q, want dc1:rack1", b.rack)
|
||||
}
|
||||
if diskB := b.disks[0]; diskB == nil || diskB.shardCount != 0 {
|
||||
t.Errorf("node B disk 0 shardCount = %v, want 0", diskB)
|
||||
}
|
||||
if len(b.shards) != 0 {
|
||||
t.Errorf("node B should hold no volume shards, got %d", len(b.shards))
|
||||
}
|
||||
|
||||
// Free slots should be positive on both near-empty disks.
|
||||
if a.freeSlots <= 0 || b.freeSlots <= 0 {
|
||||
t.Errorf("free slots not positive: A=%d B=%d", a.freeSlots, b.freeSlots)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEcShardSlotsOnDiskRoundsUp covers the mixed-ratio (targetDataShards <
|
||||
// existingDataShards) conversion: an existing shard's fractional footprint must
|
||||
// round up so it is never floored to zero, which would overstate free capacity.
|
||||
// OSS always uses the standard ratio at runtime, but ecShardSlotsOnDisk takes the
|
||||
// target data-shard count as a parameter, so the fractional path is exercised
|
||||
// directly here; the enterprise build reaches it with real per-volume ratios.
|
||||
func TestEcShardSlotsOnDiskRoundsUp(t *testing.T) {
|
||||
// A single shard (id 3) of a standard 10-data-shard volume on disk 0.
|
||||
disk := &topology.DiskInfo{
|
||||
DiskID: 0,
|
||||
DiskInfo: &master_pb.DiskInfo{
|
||||
DiskId: 0,
|
||||
EcShardInfos: []*master_pb.VolumeEcShardInformationMessage{{
|
||||
Id: 7,
|
||||
Collection: "c1",
|
||||
EcIndexBits: uint32(1) << 3,
|
||||
DiskId: 0,
|
||||
}},
|
||||
},
|
||||
}
|
||||
|
||||
// Against a 2-data-shard target the shard occupies 2/10 of a slot, which must
|
||||
// round up to 1 rather than floor to 0.
|
||||
if got := ecShardSlotsOnDisk(disk, 2); got != 1 {
|
||||
t.Errorf("ecShardSlotsOnDisk(target=2) = %d, want 1 (rounded up from 0.2)", got)
|
||||
}
|
||||
|
||||
// Identity case: target equals the existing data-shard count, so the shard
|
||||
// consumes exactly its whole-number footprint.
|
||||
if got := ecShardSlotsOnDisk(disk, erasure_coding.DataShardsCount); got != 1 {
|
||||
t.Errorf("ecShardSlotsOnDisk(target=%d) = %d, want 1", erasure_coding.DataShardsCount, got)
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,8 @@ package super_block
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
)
|
||||
|
||||
type ReplicaPlacement struct {
|
||||
@@ -77,3 +79,27 @@ func (rp *ReplicaPlacement) String() string {
|
||||
func (rp *ReplicaPlacement) GetCopyCount() int {
|
||||
return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
|
||||
}
|
||||
|
||||
// ResolveReplicaPlacement picks the EC shard replica placement constraint: an
|
||||
// explicit spec wins; otherwise the cluster default (typically the master's
|
||||
// configured default replication). A missing, invalid, or zero-replication value
|
||||
// yields nil, meaning even spread / no constraint. Shared by EC encode, repair,
|
||||
// and balance so the three resolve replica placement identically.
|
||||
func ResolveReplicaPlacement(explicitSpec, clusterDefault string) *ReplicaPlacement {
|
||||
spec := explicitSpec
|
||||
if spec == "" {
|
||||
spec = clusterDefault
|
||||
}
|
||||
if spec == "" {
|
||||
return nil
|
||||
}
|
||||
rp, err := NewReplicaPlacementFromString(spec)
|
||||
if err != nil {
|
||||
glog.Warningf("ignoring invalid replica placement %q: %v", spec, err)
|
||||
return nil
|
||||
}
|
||||
if !rp.HasReplication() {
|
||||
return nil
|
||||
}
|
||||
return rp
|
||||
}
|
||||
|
||||
@@ -242,22 +242,11 @@ func resolveECRatio(_ *types.ClusterInfo, _ string) (int, int) {
|
||||
// replication (matching the shell ec.balance default). A missing, invalid, or
|
||||
// zero-replication value yields nil, meaning even spread / no constraint.
|
||||
func resolveReplicaPlacement(ecConfig *Config, clusterInfo *types.ClusterInfo) *super_block.ReplicaPlacement {
|
||||
spec := ecConfig.ReplicaPlacement
|
||||
if spec == "" && clusterInfo != nil {
|
||||
spec = clusterInfo.DefaultReplicaPlacement
|
||||
clusterDefault := ""
|
||||
if clusterInfo != nil {
|
||||
clusterDefault = clusterInfo.DefaultReplicaPlacement
|
||||
}
|
||||
if spec == "" {
|
||||
return nil
|
||||
}
|
||||
rp, err := super_block.NewReplicaPlacementFromString(spec)
|
||||
if err != nil {
|
||||
glog.Warningf("EC balance: ignoring invalid replica placement %q: %v", spec, err)
|
||||
return nil
|
||||
}
|
||||
if !rp.HasReplication() {
|
||||
return nil
|
||||
}
|
||||
return rp
|
||||
return super_block.ResolveReplicaPlacement(ecConfig.ReplicaPlacement, clusterDefault)
|
||||
}
|
||||
|
||||
func normalizeECShardCounts(dataShards, parityShards int) (int, int) {
|
||||
|
||||
Reference in New Issue
Block a user