diff --git a/weed/worker/tasks/balance/detection.go b/weed/worker/tasks/balance/detection.go index ac4181a00..811c0e9f8 100644 --- a/weed/worker/tasks/balance/detection.go +++ b/weed/worker/tasks/balance/detection.go @@ -279,6 +279,38 @@ func detectForDiskType(diskType string, diskMetrics []*types.VolumeHealthMetrics break } + // Per-move convergence guard: mirrors weed/shell/command_volume_balance.go + // to prevent oscillation and destination overshoot when MaxVolumeCount + // values are heterogeneous. Without this guard, the greedy max→min + // algorithm can schedule moves that flip which server is most-utilized + // (source becomes min, destination becomes max), producing A→B, B→A + // oscillation within a single detection cycle. + // + // Check: after the move, the destination's utilization must not strictly + // exceed the source's utilization. If it would, no single move can + // improve balance — stop here. This also handles heterogeneous capacity + // correctly by comparing post-move utilization ratios rather than raw + // counts. The integer discretization is handled automatically: when + // counts cannot match the ideal exactly, the check still admits moves + // that reduce the max/min gap without flipping it. + maxCap := serverMaxVolumes[maxServer] + minCap := serverMaxVolumes[minServer] + if allServersHaveMaxInfo && maxCap > 0 && minCap > 0 { + newSrcUtil := float64(effectiveCounts[maxServer]-1) / float64(maxCap) + newDstUtil := float64(effectiveCounts[minServer]+1) / float64(minCap) + if newDstUtil > newSrcUtil { + if len(results) == 0 { + glog.Infof("BALANCE [%s]: No tasks created - no beneficial move available. After move, dest %s util would be %.1f%% vs source %s util %.1f%%", + diskType, minServer, newDstUtil*100, maxServer, newSrcUtil*100) + } else { + glog.Infof("BALANCE [%s]: Created %d task(s), no more beneficial moves available. After move, dest %s util would be %.1f%% vs source %s util %.1f%%", + diskType, len(results), minServer, newDstUtil*100, maxServer, newSrcUtil*100) + } + balanced = true + break + } + } + // Select a volume from the overloaded server using per-server cursor var selectedVolume *types.VolumeHealthMetrics serverVols := volumesByServer[maxServer] @@ -507,47 +539,58 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric // resolveBalanceDestination resolves the destination for a balance operation // when the target server is already known (chosen by the detection loop's -// effective volume counts). It finds the appropriate disk and address for the -// target server in the topology. +// effective volume counts). It finds a disk on the target server that can +// accept another volume after accounting for ALL pending and assigned tasks +// on that disk (via ActiveTopology.GetEffectiveAvailableCapacity) — not the +// static VolumeCount from the topology snapshot. This keeps destination +// planning consistent with the loop's effective-count bookkeeping and +// prevents over-scheduling when multiple moves are planned within the same +// detection cycle. func resolveBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics, targetServer string) (*topology.DestinationPlan, error) { - topologyInfo := activeTopology.GetTopologyInfo() - if topologyInfo == nil { - return nil, fmt.Errorf("no topology info available") + nodeDisks := activeTopology.GetNodeDisks(targetServer) + if len(nodeDisks) == 0 { + return nil, fmt.Errorf("target server %s not found in topology", targetServer) } - // Find the target node in the topology and get its disk info - for _, dc := range topologyInfo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for _, node := range rack.DataNodeInfos { - if node.Id != targetServer { - continue - } - // Find an available disk matching the volume's disk type - for diskTypeName, diskInfo := range node.DiskInfos { - if diskTypeName != selectedVolume.DiskType { - continue - } - if diskInfo.MaxVolumeCount > 0 && diskInfo.VolumeCount >= diskInfo.MaxVolumeCount { - continue // disk is full - } - targetAddress, err := util.ResolveServerAddress(node.Id, activeTopology) - if err != nil { - return nil, fmt.Errorf("failed to resolve address for target server %s: %v", node.Id, err) - } - return &topology.DestinationPlan{ - TargetNode: node.Id, - TargetAddress: targetAddress, - TargetDisk: diskInfo.DiskId, - TargetRack: rack.Id, - TargetDC: dc.Id, - ExpectedSize: selectedVolume.Size, - }, nil - } - return nil, fmt.Errorf("target server %s has no available disk of type %s", targetServer, selectedVolume.DiskType) + var eligibleDisk *topology.DiskInfo + for _, disk := range nodeDisks { + if disk == nil || disk.DiskInfo == nil { + continue + } + if disk.DiskType != selectedVolume.DiskType { + continue + } + // Use effective capacity so that prior moves planned in this same + // detection cycle (already registered as pending tasks) reduce the + // available slots. A disk with VolumeCount << MaxVolumeCount in the + // topology snapshot can still be effectively full if many in-flight + // tasks already target it. + if disk.DiskInfo.MaxVolumeCount > 0 { + available := activeTopology.GetEffectiveAvailableCapacity(disk.NodeID, disk.DiskID) + if available <= 0 { + continue } } + eligibleDisk = disk + break } - return nil, fmt.Errorf("target server %s not found in topology", targetServer) + + if eligibleDisk == nil { + return nil, fmt.Errorf("target server %s has no available disk of type %s", targetServer, selectedVolume.DiskType) + } + + targetAddress, err := util.ResolveServerAddress(eligibleDisk.NodeID, activeTopology) + if err != nil { + return nil, fmt.Errorf("failed to resolve address for target server %s: %v", eligibleDisk.NodeID, err) + } + return &topology.DestinationPlan{ + TargetNode: eligibleDisk.NodeID, + TargetAddress: targetAddress, + TargetDisk: eligibleDisk.DiskID, + TargetRack: eligibleDisk.Rack, + TargetDC: eligibleDisk.DataCenter, + ExpectedSize: selectedVolume.Size, + }, nil } // planBalanceDestination plans the destination for a balance operation using diff --git a/weed/worker/tasks/balance/detection_test.go b/weed/worker/tasks/balance/detection_test.go index d62e6b9a5..6d1aa1b99 100644 --- a/weed/worker/tasks/balance/detection_test.go +++ b/weed/worker/tasks/balance/detection_test.go @@ -1110,3 +1110,245 @@ func TestDetection_NodeFilter(t *testing.T) { t.Logf("Created %d tasks within node-a,node-b scope", len(tasks)) } + +// TestDetection_HeterogeneousMax_NoOvershootNoOscillation is a regression test +// for a volume.balance bug in the plugin worker: when servers have different +// MaxVolumeCount values and the cluster is near (but above) the imbalance +// threshold, the greedy max→min algorithm could schedule moves that FLIP +// which server is the most-utilized, producing oscillation across a single +// detection cycle and pushing destination servers above the cluster-ideal +// utilization. +// +// Setup: +// +// node-a: 11 volumes, max=20 → util=0.55 +// node-b: 5 volumes, max=10 → util=0.50 +// ideal = 16/30 ≈ 0.533 +// +// One naive move a→b leaves a=10/20=0.50 and b=6/10=0.60 — which flips the +// imbalance and pushes b well above the cluster ideal. The next iteration +// (without a per-move guard) would plan the reverse move, and so on. +// +// Invariants that must hold after detection: +// 1. No destination's effective utilization exceeds the cluster-ideal ratio. +// 2. Tasks flow in at most one direction between any server pair (no +// oscillation within a single detection cycle). +// 3. The final imbalance (after applying all planned moves) is not strictly +// worse than the initial imbalance. +func TestDetection_HeterogeneousMax_NoOvershootNoOscillation(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1", maxVolumes: 20}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1", maxVolumes: 10}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 11)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 5)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + // Strict threshold makes detection eager to move, exposing the overshoot + // bug when naive greedy selection is used. + conf := defaultConf() + conf.ImbalanceThreshold = 0.05 + + tasks, _, err := Detection(metrics, clusterInfo, conf, 50) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + maxCap := map[string]float64{"node-a": 20, "node-b": 10} + const idealUtil = 16.0 / 30.0 // 0.5333... + + // (1) No destination server (recipient of at least one move) should end up + // above the cluster-ideal utilization. Source servers may remain slightly + // above ideal when no further beneficial move is possible — that is fine. + effective := computeEffectiveCounts(servers, metrics, tasks) + destinations := make(map[string]bool) + for _, task := range tasks { + if task.TypedParams != nil && len(task.TypedParams.Targets) > 0 { + addr := task.TypedParams.Targets[0].Node + // address → server id mapping matches buildTopology: ":8080" + for _, s := range servers { + if s.id+":8080" == addr || s.id == addr { + destinations[s.id] = true + break + } + } + } + } + for server := range destinations { + util := float64(effective[server]) / maxCap[server] + if util > idealUtil+1e-9 { + t.Errorf("destination %s effective util %.3f exceeds cluster ideal %.3f (count=%d, cap=%.0f)", + server, util, idealUtil, effective[server], maxCap[server]) + } + } + + // (2) Tasks should never flow both directions between node-a and node-b. + aAsSource, bAsSource := 0, 0 + for _, task := range tasks { + switch task.Server { + case "node-a": + aAsSource++ + case "node-b": + bAsSource++ + } + } + if aAsSource > 0 && bAsSource > 0 { + t.Errorf("detection oscillated: %d tasks with node-a as source and %d with node-b as source", + aAsSource, bAsSource) + } + + // (3) Final imbalance must not be worse than initial imbalance. + initUtilA := 11.0 / 20.0 // 0.55 + initUtilB := 5.0 / 10.0 // 0.50 + initDiff := initUtilA - initUtilB + + finalUtilA := float64(effective["node-a"]) / maxCap["node-a"] + finalUtilB := float64(effective["node-b"]) / maxCap["node-b"] + finalDiff := finalUtilA - finalUtilB + if finalDiff < 0 { + finalDiff = -finalDiff + } + if finalDiff > initDiff+1e-9 { + t.Errorf("detection made imbalance worse: initial diff %.3f, final diff %.3f (tasks=%d, effective=%v)", + initDiff, finalDiff, len(tasks), effective) + } + + t.Logf("tasks=%d, effective=%v, final diff=%.3f (initial=%.3f)", len(tasks), effective, finalDiff, initDiff) +} + +// TestDetection_RespectsClusterIdealUtilization verifies that in a 3-server +// cluster with heterogeneous MaxVolumeCount values, detection does not push +// any destination above its proportional fair share of the cluster-ideal +// utilization. Without a per-move guard, the greedy algorithm happily fills +// the most-underutilized disk well past the cluster ideal, mirroring the +// real-world "nodes get filled up to ~99% capacity" failure mode reported +// after the 4.17 upgrade. +// +// Setup: +// +// node-a: 20 volumes, max=40 (util 0.50) +// node-b: 10 volumes, max=20 (util 0.50) +// node-c: 1 volume, max=10 (util 0.10) <- small, underloaded +// ideal = 31/70 ≈ 0.443 +// +// A count-only greedy balancer would drain a and b into c until c.util reaches +// ~0.5 (matching the heavier servers), pushing c's utilization above the +// cluster ideal. The correct behavior is to stop moves to c once its +// utilization reaches the cluster ideal. +func TestDetection_RespectsClusterIdealUtilization(t *testing.T) { + servers := []serverSpec{ + {id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1", maxVolumes: 40}, + {id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1", maxVolumes: 20}, + {id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1", maxVolumes: 10}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("node-a", "hdd", "dc1", "rack1", "c1", 1, 20)...) + metrics = append(metrics, makeVolumes("node-b", "hdd", "dc1", "rack1", "c1", 100, 10)...) + metrics = append(metrics, makeVolumes("node-c", "hdd", "dc1", "rack1", "c1", 200, 1)...) + + at := buildTopology(servers, metrics) + clusterInfo := &types.ClusterInfo{ActiveTopology: at} + + tasks, _, err := Detection(metrics, clusterInfo, defaultConf(), 200) + if err != nil { + t.Fatalf("Detection failed: %v", err) + } + + const idealUtil = 31.0 / 70.0 // 0.4428... + maxCap := map[string]float64{"node-a": 40, "node-b": 20, "node-c": 10} + + effective := computeEffectiveCounts(servers, metrics, tasks) + for server, count := range effective { + util := float64(count) / maxCap[server] + // Allow one-volume slack to account for integer rounding on small caps: + // a server with cap=10 cannot land exactly on 0.443; the nearest counts + // are 4 (0.40) and 5 (0.50). The guard should stop at 4. + slack := 1.0 / maxCap[server] + if util > idealUtil+slack+1e-9 { + t.Errorf("server %s (cap=%.0f) effective util %.3f exceeds cluster ideal %.3f (+%.3f slack); count=%d", + server, maxCap[server], util, idealUtil, slack, count) + } + } + + t.Logf("tasks=%d, effective=%v, ideal=%.3f", len(tasks), effective, idealUtil) +} + +// TestResolveBalanceDestination_UsesEffectiveCapacity verifies that +// resolveBalanceDestination respects ActiveTopology's effective available +// capacity — i.e., the destination check factors in pending and assigned +// tasks already registered against the disk. Without this, the destination +// planner reads a stale VolumeCount from the topology snapshot and keeps +// approving the same disk even after many moves have been planned against +// it within a single detection cycle. +func TestResolveBalanceDestination_UsesEffectiveCapacity(t *testing.T) { + servers := []serverSpec{ + {id: "src-node", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1", maxVolumes: 100}, + {id: "dst-node", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1", maxVolumes: 10}, + } + + var metrics []*types.VolumeHealthMetrics + metrics = append(metrics, makeVolumes("src-node", "hdd", "dc1", "rack1", "c1", 1, 50)...) + // dst-node starts with 8 volumes → 2 slots free + metrics = append(metrics, makeVolumes("dst-node", "hdd", "dc1", "rack1", "c1", 1000, 8)...) + + at := buildTopology(servers, metrics) + + // Simulate two prior balance moves already planned in this detection cycle + // that target dst-node:2. Effective capacity should drop to 0. + for i := 0; i < 2; i++ { + err := at.AddPendingTask(topology.TaskSpec{ + TaskID: fmt.Sprintf("pending-%d", i), + TaskType: topology.TaskTypeBalance, + VolumeID: uint32(9000 + i), + VolumeSize: 1024, + Sources: []topology.TaskSourceSpec{ + {ServerID: "src-node", DiskID: 1}, + }, + Destinations: []topology.TaskDestinationSpec{ + {ServerID: "dst-node", DiskID: 2}, + }, + }) + if err != nil { + t.Fatalf("seeding pending task %d failed: %v", i, err) + } + } + + candidate := &types.VolumeHealthMetrics{ + VolumeID: 500, + Server: "src-node", + ServerAddress: "src-node:8080", + DiskType: "hdd", + Collection: "c1", + Size: 1024, + DataCenter: "dc1", + Rack: "rack1", + } + + if _, err := resolveBalanceDestination(at, candidate, "dst-node"); err == nil { + t.Error("expected resolveBalanceDestination to fail because dst-node's disk is effectively full " + + "(VolumeCount=8, MaxVolumeCount=10, 2 pending destination tasks)") + } + + // Sanity check: a server with no pending tasks targeting it should still + // resolve successfully, proving the helper itself is working. + servers2 := []serverSpec{ + {id: "src-node", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1", maxVolumes: 100}, + {id: "dst-node", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack1", maxVolumes: 10}, + } + metrics2 := append([]*types.VolumeHealthMetrics(nil), + makeVolumes("src-node", "hdd", "dc1", "rack1", "c1", 1, 50)...) + metrics2 = append(metrics2, makeVolumes("dst-node", "hdd", "dc1", "rack1", "c1", 1000, 8)...) + at2 := buildTopology(servers2, metrics2) + plan, err := resolveBalanceDestination(at2, candidate, "dst-node") + if err != nil { + t.Fatalf("expected resolveBalanceDestination to succeed on fresh topology: %v", err) + } + if plan.TargetNode != "dst-node" { + t.Errorf("unexpected target node: got %q want %q", plan.TargetNode, "dst-node") + } +} diff --git a/weed/worker/tasks/ec_balance/detection.go b/weed/worker/tasks/ec_balance/detection.go index b91dff701..3d2a2da9e 100644 --- a/weed/worker/tasks/ec_balance/detection.go +++ b/weed/worker/tasks/ec_balance/detection.go @@ -3,6 +3,7 @@ package ec_balance import ( "context" "fmt" + "math" "sort" "time" @@ -425,9 +426,12 @@ func detectCrossRackImbalance(vid uint32, collection string, nodes map[string]*e }) movedFromRack++ - // Reserve capacity on destination so it isn't picked again + // Reserve capacity on destination so it isn't picked again, + // and release one slot on the source so later volumes in this + // same detection run see its true available capacity. rackShardCount[destNode.rack]++ rackShardCount[rackID]-- + node.freeSlots++ destNode.freeSlots-- } } @@ -507,6 +511,7 @@ func detectWithinRackImbalance(vid uint32, collection string, nodes map[string]* moved++ nodeShardCount[nodeID]-- nodeShardCount[destNode.nodeID]++ + node.freeSlots++ destNode.freeSlots-- } } @@ -544,25 +549,51 @@ func detectGlobalImbalance(nodes map[string]*ecNodeInfo, racks map[string]*ecRac continue } - // Check if imbalance exceeds threshold - if !exceedsImbalanceThreshold(nodeShardCounts, totalShards, len(rack.nodes), config.ImbalanceThreshold) { + // Snapshot each node's total shard capacity (current shards from allowed + // volumes plus any remaining free slots). Capacity is fixed for the + // duration of this loop — moves conserve total shards across the rack, + // so the denominator does not change as nodeShardCounts shift. + nodeCapacity := make(map[string]int, len(rack.nodes)) + for nodeID, count := range nodeShardCounts { + nodeCapacity[nodeID] = count + rack.nodes[nodeID].freeSlots + } + + // Check if imbalance exceeds threshold using utilization ratios + // (count/capacity), not raw shard counts. Raw counts would say a + // cluster is imbalanced whenever a large-capacity node holds more + // shards than a small-capacity node, even when both are at the + // same fractional fullness. + if !exceedsUtilImbalanceThreshold(nodeShardCounts, nodeCapacity, config.ImbalanceThreshold) { continue } - avgShards := ceilDivide(totalShards, len(rack.nodes)) - - // Iteratively move shards from most-loaded to least-loaded + // Iteratively move shards from most-utilized to least-utilized for i := 0; i < 10; i++ { // cap iterations to avoid infinite loops - // Find min and max nodes, skipping full nodes for min + // Find min and max nodes by utilization ratio. Min must have free + // slots so it can receive a shard; max can be any node with shards + // (we move shards out of it). Utilization-based selection is + // critical on heterogeneous racks: a large-capacity node with many + // shards in absolute terms may still be the LEAST utilized, and + // moving shards into it from a small, nearly-full node is the + // correct direction even though raw counts would suggest otherwise. var minNode, maxNode *ecNodeInfo - minCount, maxCount := totalShards+1, -1 + minUtil := math.Inf(1) + maxUtil := -1.0 + var minCount, maxCount int for nodeID, count := range nodeShardCounts { node := rack.nodes[nodeID] - if count < minCount && node.freeSlots > 0 { + cap := nodeCapacity[nodeID] + if cap <= 0 { + continue + } + util := float64(count) / float64(cap) + if util < minUtil && node.freeSlots > 0 { + minUtil = util minCount = count minNode = node } - if count > maxCount { + if util > maxUtil { + maxUtil = util maxCount = count maxNode = rack.nodes[nodeID] } @@ -571,10 +602,21 @@ func detectGlobalImbalance(nodes map[string]*ecNodeInfo, racks map[string]*ecRac if maxNode == nil || minNode == nil || maxNode.nodeID == minNode.nodeID { break } - if maxCount <= avgShards || minCount+1 > avgShards { + + // Per-move convergence guard: reject any move where the + // destination's post-move utilization would strictly exceed the + // source's post-move utilization. This mirrors the guard in + // weed/worker/tasks/balance/detection.go and terminates the loop + // once no further beneficial move exists, preventing oscillation + // and overshoot on heterogeneous racks. + maxCap := nodeCapacity[maxNode.nodeID] + minCap := nodeCapacity[minNode.nodeID] + if maxCap <= 0 || minCap <= 0 { break } - if maxCount-minCount <= 1 { + newSrcUtil := float64(maxCount-1) / float64(maxCap) + newDstUtil := float64(minCount+1) / float64(minCap) + if newDstUtil > newSrcUtil { break } @@ -608,8 +650,23 @@ func detectGlobalImbalance(nodes map[string]*ecNodeInfo, racks map[string]*ecRac targetDisk: 0, phase: "global", }) + // Update in-memory shard placement so the next iteration + // of this loop picks a different shard. Without this, the + // inner loop always finds the lowest-set bit and emits + // duplicate move requests for the same physical shard. + shardBit := uint32(1 << uint(shardID)) + info.shardBits &^= shardBit + if minInfo == nil { + minInfo = &ecVolumeInfo{ + collection: info.collection, + diskID: info.diskID, + } + minNode.ecShards[vid] = minInfo + } + minInfo.shardBits |= shardBit nodeShardCounts[maxNode.nodeID]-- nodeShardCounts[minNode.nodeID]++ + maxNode.freeSlots++ minNode.freeSlots-- moved = true break @@ -713,6 +770,41 @@ func exceedsImbalanceThreshold(counts map[string]int, total int, numGroups int, return imbalanceRatio > threshold } +// exceedsUtilImbalanceThreshold checks whether the per-node utilization ratio +// (shard count / shard slot capacity) is skewed beyond the given threshold. +// Unlike exceedsImbalanceThreshold, it compares fractional fullness rather +// than raw counts so that racks with heterogeneous MaxVolumeCount are +// evaluated correctly — a large-capacity node holding more shards than a +// small-capacity node is not considered imbalanced if both are at the same +// fractional fullness. Nodes with zero capacity are skipped. +func exceedsUtilImbalanceThreshold(counts map[string]int, capacities map[string]int, threshold float64) bool { + minUtil := math.Inf(1) + maxUtil := -1.0 + seen := 0 + for nodeID, count := range counts { + cap := capacities[nodeID] + if cap <= 0 { + continue + } + util := float64(count) / float64(cap) + if util < minUtil { + minUtil = util + } + if util > maxUtil { + maxUtil = util + } + seen++ + } + if seen < 2 || maxUtil <= 0 { + return false + } + avg := (maxUtil + minUtil) / 2 + if avg == 0 { + return false + } + return (maxUtil-minUtil)/avg > threshold +} + // applyMovesToTopology simulates planned moves on the in-memory topology // so subsequent detection phases see updated shard placement. func applyMovesToTopology(moves []*shardMove) { diff --git a/weed/worker/tasks/ec_balance/detection_test.go b/weed/worker/tasks/ec_balance/detection_test.go index 214137ab3..86b3d8b34 100644 --- a/weed/worker/tasks/ec_balance/detection_test.go +++ b/weed/worker/tasks/ec_balance/detection_test.go @@ -253,6 +253,79 @@ func TestDetectGlobalImbalance(t *testing.T) { } } +// TestDetectGlobalImbalance_HeterogeneousCapacity is a regression test for +// the Phase 4 rebalancer on heterogeneous racks. node1 holds more shards in +// absolute terms but has much higher capacity, so it is actually the LESS +// utilized node; node2 holds fewer shards but is nearly full. The greedy +// algorithm must pick the most-utilized node as the source and move shards +// in the direction that reduces fractional fullness, NOT in the direction +// that would equalize raw counts (which here would overfill node2). +// +// Scenario: +// +// node1: 10 shards, freeSlots=90 → capacity 100, util 10% +// node2: 3 shards, freeSlots=2 → capacity 5, util 60% +// +// Correct behavior: move shards FROM node2 TO node1 (draining the +// most-utilized node), until no further improvement is possible. Also +// verifies that moves are de-duplicated — the inner loop must update +// shardBits between iterations so each proposed move refers to a distinct +// physical shard. +func TestDetectGlobalImbalance_HeterogeneousCapacity(t *testing.T) { + nodes := map[string]*ecNodeInfo{ + "node1": { + nodeID: "node1", address: "node1:8080", rack: "dc1:rack1", freeSlots: 90, + ecShards: map[uint32]*ecVolumeInfo{ + 100: {collection: "col1", shardBits: 0x3FF}, // 10 shards + }, + }, + "node2": { + nodeID: "node2", address: "node2:8080", rack: "dc1:rack1", freeSlots: 2, + ecShards: map[uint32]*ecVolumeInfo{ + 200: {collection: "col1", shardBits: 0b111}, // 3 shards + }, + }, + } + racks := map[string]*ecRackInfo{ + "dc1:rack1": { + nodes: map[string]*ecNodeInfo{"node1": nodes["node1"], "node2": nodes["node2"]}, + freeSlots: 92, + }, + } + + config := NewDefaultConfig() + config.ImbalanceThreshold = 0.01 + moves := detectGlobalImbalance(nodes, racks, config, nil) + + if len(moves) == 0 { + t.Fatal("expected moves from high-util node2 to low-util node1, got 0") + } + + // Every move must drain the higher-util node (node2) and target the + // lower-util node (node1). A raw-count-based greedy algorithm would + // pick the opposite direction — that is the bug this test guards. + for _, move := range moves { + if move.source.nodeID != "node2" { + t.Errorf("expected source node2 (util 0.60), got %s", move.source.nodeID) + } + if move.target.nodeID != "node1" { + t.Errorf("expected target node1 (util 0.10), got %s", move.target.nodeID) + } + } + + // Verify no duplicate (volumeID, shardID) pairs — the inner loop must + // update shardBits between iterations so each move refers to a distinct + // physical shard. + seen := make(map[[2]int]bool, len(moves)) + for _, move := range moves { + key := [2]int{int(move.volumeID), move.shardID} + if seen[key] { + t.Errorf("duplicate move for volume %d shard %d", move.volumeID, move.shardID) + } + seen[key] = true + } +} + func TestDetectGlobalImbalanceSkipsFullNodes(t *testing.T) { // node2 has 0 free slots — should not be chosen as destination nodes := map[string]*ecNodeInfo{