From d02ee6d5dfbd166235be9fbc42e44f1536a01307 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 30 Jun 2026 20:02:23 -0700 Subject: [PATCH] balance: share replica-placement logic between shell and worker (#10169) The replica-placement rule (data-center/rack/same-node limits plus host anti-affinity) existed three times: the shell's satisfyReplicaPlacement/isGoodMove used by volume.balance, fix.replication, and tier.move, and a line-for-line port in the maintenance balance worker. Move the canonical logic into weed/topology/balancer on a shared Location type; the shell and worker keep thin adapters that convert their own location representation and call it. Behavior is unchanged (the shared IsGoodMove keeps the shell's reject-move-to-self guard, and all four replica test suites pass). --- weed/shell/command_volume_balance.go | 54 ++---- weed/shell/command_volume_fix_replication.go | 96 +--------- weed/topology/balancer/replica_placement.go | 176 ++++++++++++++++++ .../balancer/replica_placement_test.go | 36 ++++ .../worker/tasks/balance/replica_placement.go | 170 ++--------------- 5 files changed, 261 insertions(+), 271 deletions(-) create mode 100644 weed/topology/balancer/replica_placement.go create mode 100644 weed/topology/balancer/replica_placement_test.go diff --git a/weed/shell/command_volume_balance.go b/weed/shell/command_volume_balance.go index ba4e7d269..dc3b2b37e 100644 --- a/weed/shell/command_volume_balance.go +++ b/weed/shell/command_volume_balance.go @@ -659,43 +659,29 @@ func moveVolume(commandEnv *CommandEnv, v *master_pb.VolumeInformationMessage, f return nil } +// toBalancerLocation converts a shell replica location to the shared placement +// abstraction, resolving the physical host for machine anti-affinity. +func toBalancerLocation(loc *location) balancer.Location { + return balancer.Location{ + DataCenter: loc.dc, + Rack: loc.rack, + NodeID: loc.dataNode.Id, + Host: pb.NewServerAddressFromDataNode(loc.dataNode).ToHost(), + } +} + func isGoodMove(placement *super_block.ReplicaPlacement, existingReplicas []*VolumeReplica, sourceNode, targetNode *Node) bool { - for _, replica := range existingReplicas { - if replica.location.dataNode.Id == targetNode.info.Id && - replica.location.rack == targetNode.rack && - replica.location.dc == targetNode.dc { - // never move to existing nodes - return false - } + locs := make([]balancer.Location, len(existingReplicas)) + for i, replica := range existingReplicas { + locs[i] = toBalancerLocation(replica.location) } - - // existing replicas except the one on sourceNode - existingReplicasExceptSourceNode := make([]*VolumeReplica, 0) - for _, replica := range existingReplicas { - if replica.location.dataNode.Id != sourceNode.info.Id { - existingReplicasExceptSourceNode = append(existingReplicasExceptSourceNode, replica) - } + target := balancer.Location{ + DataCenter: targetNode.dc, + Rack: targetNode.rack, + NodeID: targetNode.info.Id, + Host: pb.NewServerAddressFromDataNode(targetNode.info).ToHost(), } - - // Don't move a replica onto a machine (host) that already holds one of this - // volume's replicas: servers sharing a host are one fault domain, so both would - // die together. Best-effort -- skip and let balancing try the next target. - targetHost := pb.NewServerAddressFromDataNode(targetNode.info).ToHost() - for _, replica := range existingReplicasExceptSourceNode { - if pb.NewServerAddressFromDataNode(replica.location.dataNode).ToHost() == targetHost { - return false - } - } - - // target location - targetLocation := location{ - dc: targetNode.dc, - rack: targetNode.rack, - dataNode: targetNode.info, - } - - // check if this satisfies replication requirements - return satisfyReplicaPlacement(placement, existingReplicasExceptSourceNode, targetLocation) + return balancer.IsGoodMove(placement, locs, sourceNode.info.Id, target) } // addDiskFreeBytes adjusts a disk's reported free bytes by delta (negative when a diff --git a/weed/shell/command_volume_fix_replication.go b/weed/shell/command_volume_fix_replication.go index 6476b41d5..e8e54be37 100644 --- a/weed/shell/command_volume_fix_replication.go +++ b/weed/shell/command_volume_fix_replication.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle_map" "github.com/seaweedfs/seaweedfs/weed/storage/types" + "github.com/seaweedfs/seaweedfs/weed/topology/balancer" "github.com/seaweedfs/seaweedfs/weed/util/wildcard" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -474,95 +475,16 @@ func satisfyReplicaCurrentLocation(replicaPlacement *super_block.ReplicaPlacemen return false } */ +// satisfyReplicaPlacement reports whether placing a replica at possibleLocation +// is consistent with the replication policy given the existing replicas. Thin +// adapter over weed/topology/balancer so the shell and the maintenance worker +// share one placement implementation. func satisfyReplicaPlacement(replicaPlacement *super_block.ReplicaPlacement, replicas []*VolumeReplica, possibleLocation location) bool { - - existingDataCenters, _, existingDataNodes := countReplicas(replicas) - - if _, found := existingDataNodes[possibleLocation.String()]; found { - // avoid duplicated volume on the same data node - return false + locs := make([]balancer.Location, len(replicas)) + for i, r := range replicas { + locs[i] = toBalancerLocation(r.location) } - - primaryDataCenters, _ := findTopKeys(existingDataCenters) - - // ensure data center count is within limit - if _, found := existingDataCenters[possibleLocation.DataCenter()]; !found { - // different from existing dcs - if len(existingDataCenters) < replicaPlacement.DiffDataCenterCount+1 { - // lack on different dcs - return true - } else { - // adding this would go over the different dcs limit - return false - } - } - // now this is same as one of the existing data center - if !isAmong(possibleLocation.DataCenter(), primaryDataCenters) { - // not on one of the primary dcs - return false - } - - // now this is one of the primary dcs - primaryDcRacks := make(map[string]int) - for _, replica := range replicas { - if replica.location.DataCenter() != possibleLocation.DataCenter() { - continue - } - primaryDcRacks[replica.location.Rack()] += 1 - } - primaryRacks, _ := findTopKeys(primaryDcRacks) - sameRackCount := primaryDcRacks[possibleLocation.Rack()] - - // ensure rack count is within limit - if _, found := primaryDcRacks[possibleLocation.Rack()]; !found { - // different from existing racks - if len(primaryDcRacks) < replicaPlacement.DiffRackCount+1 { - // lack on different racks - return true - } else { - // adding this would go over the different racks limit - return false - } - } - // now this is same as one of the existing racks - if !isAmong(possibleLocation.Rack(), primaryRacks) { - // not on the primary rack - return false - } - - // now this is on the primary rack - - // different from existing data nodes - if sameRackCount < replicaPlacement.SameRackCount+1 { - // lack on same rack - return true - } else { - // adding this would go over the same data node limit - return false - } - -} - -func findTopKeys(m map[string]int) (topKeys []string, max int) { - for k, c := range m { - if max < c { - topKeys = topKeys[:0] - topKeys = append(topKeys, k) - max = c - } else if max == c { - topKeys = append(topKeys, k) - } - } - return -} - -func isAmong(key string, keys []string) bool { - for _, k := range keys { - if k == key { - return true - } - } - return false + return balancer.SatisfyReplicaPlacement(replicaPlacement, locs, toBalancerLocation(&possibleLocation)) } type VolumeReplica struct { diff --git a/weed/topology/balancer/replica_placement.go b/weed/topology/balancer/replica_placement.go new file mode 100644 index 000000000..3af6ef002 --- /dev/null +++ b/weed/topology/balancer/replica_placement.go @@ -0,0 +1,176 @@ +package balancer + +import ( + "slices" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" +) + +// Location identifies where a volume replica lives, at the granularity replica +// placement reasons about: data center, rack, data node, and physical host +// (machine). Servers sharing a host are one fault domain. It is the shared shape +// the shell (from master_pb) and the worker (from ActiveTopology) both adapt to. +type Location struct { + DataCenter string + Rack string + NodeID string + Host string +} + +// HostFromAddress returns the physical machine (host/IP) of a server address, +// falling back to the node id (== ip:port in the common case) when no address is +// known. +func HostFromAddress(address, nodeID string) string { + if address == "" { + address = nodeID + } + return pb.ServerAddress(address).ToHost() +} + +type rackKey struct { + DataCenter string + Rack string +} + +type nodeKey struct { + DataCenter string + Rack string + NodeID string +} + +// IsGoodMove reports whether moving a volume from sourceNodeID onto target keeps +// the volume's replica placement policy satisfied, given the current replica +// locations. It refuses moving onto the source itself, onto a node that already +// holds a replica, and onto a host that already holds another replica (best-effort +// machine anti-affinity), then defers to SatisfyReplicaPlacement. +func IsGoodMove(rp *super_block.ReplicaPlacement, existingReplicas []Location, sourceNodeID string, target Location) bool { + if rp == nil || !rp.HasReplication() { + return true // no replication constraint + } + + // Never move onto the source node itself. + if target.NodeID == sourceNodeID { + return false + } + + // Build the replica set after the move: remove the one replica being moved off + // the source, keeping any others (an over-replicated node may hold more than + // one replica of this volume, and only one is moving). + afterMove := make([]Location, 0, len(existingReplicas)) + sourceFound := false + for _, r := range existingReplicas { + if r.NodeID == sourceNodeID && !sourceFound { + sourceFound = true + } else { + afterMove = append(afterMove, r) + } + } + if !sourceFound { + // Source not in the replica list — cluster state may be inconsistent. + // Treat as unsafe to avoid an incorrect placement decision. + return false + } + + // Best-effort machine anti-affinity: don't move a replica onto a host that + // already holds another replica of this volume, so a single machine failure + // can't take out two replicas. + if target.Host != "" { + for _, r := range afterMove { + if r.Host == target.Host { + return false + } + } + } + + return SatisfyReplicaPlacement(rp, afterMove, target) +} + +// SatisfyReplicaPlacement reports whether placing a replica at target is +// consistent with the replication policy given the existing replicas. It is the +// shared implementation for the shell (volume.balance / fix.replication / +// tier.move) and the maintenance balance worker. +func SatisfyReplicaPlacement(rp *super_block.ReplicaPlacement, replicas []Location, target Location) bool { + existingDCs, _, existingNodes := countReplicas(replicas) + + targetNK := nodeKey{DataCenter: target.DataCenter, Rack: target.Rack, NodeID: target.NodeID} + if _, found := existingNodes[targetNK]; found { + // avoid a duplicated volume on the same data node + return false + } + + primaryDCs, _ := findTopDCKeys(existingDCs) + + // ensure the data center count is within limit + if _, found := existingDCs[target.DataCenter]; !found { + // different from existing dcs + return len(existingDCs) < rp.DiffDataCenterCount+1 + } + // same as one of the existing data centers + if !slices.Contains(primaryDCs, target.DataCenter) { + return false + } + + // on a primary dc — check racks within this DC + primaryDcRacks := make(map[rackKey]int) + for _, r := range replicas { + if r.DataCenter != target.DataCenter { + continue + } + primaryDcRacks[rackKey{DataCenter: r.DataCenter, Rack: r.Rack}]++ + } + + targetRK := rackKey{DataCenter: target.DataCenter, Rack: target.Rack} + primaryRacks, _ := findTopRackKeys(primaryDcRacks) + sameRackCount := primaryDcRacks[targetRK] + + if _, found := primaryDcRacks[targetRK]; !found { + // different from existing racks + return len(primaryDcRacks) < rp.DiffRackCount+1 + } + // same as one of the existing racks + if !slices.Contains(primaryRacks, targetRK) { + return false + } + + // on the primary rack — check the same-rack count + return sameRackCount < rp.SameRackCount+1 +} + +func countReplicas(replicas []Location) (dcCounts map[string]int, rackCounts map[rackKey]int, nodeCounts map[nodeKey]int) { + dcCounts = make(map[string]int) + rackCounts = make(map[rackKey]int) + nodeCounts = make(map[nodeKey]int) + for _, r := range replicas { + dcCounts[r.DataCenter]++ + rackCounts[rackKey{DataCenter: r.DataCenter, Rack: r.Rack}]++ + nodeCounts[nodeKey{DataCenter: r.DataCenter, Rack: r.Rack, NodeID: r.NodeID}]++ + } + return +} + +func findTopDCKeys(m map[string]int) (topKeys []string, max int) { + for k, c := range m { + if max < c { + topKeys = topKeys[:0] + topKeys = append(topKeys, k) + max = c + } else if max == c { + topKeys = append(topKeys, k) + } + } + return +} + +func findTopRackKeys(m map[rackKey]int) (topKeys []rackKey, max int) { + for k, c := range m { + if max < c { + topKeys = topKeys[:0] + topKeys = append(topKeys, k) + max = c + } else if max == c { + topKeys = append(topKeys, k) + } + } + return +} diff --git a/weed/topology/balancer/replica_placement_test.go b/weed/topology/balancer/replica_placement_test.go new file mode 100644 index 000000000..ef2c342ea --- /dev/null +++ b/weed/topology/balancer/replica_placement_test.go @@ -0,0 +1,36 @@ +package balancer + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/storage/super_block" +) + +func loc(dc, rack, node string) Location { + return Location{DataCenter: dc, Rack: rack, NodeID: node} +} + +// When a node abnormally holds more than one replica of a volume, moving one of +// them off must keep the other in the after-move set, so the placement check +// still sees the rack's true occupancy. +func TestIsGoodMove_OverReplicatedSourceKeepsOtherReplica(t *testing.T) { + rp, err := super_block.NewReplicaPlacementFromString("001") // up to 2 copies in a rack + if err != nil { + t.Fatal(err) + } + // n1 abnormally holds two replicas; n2 holds a third — all in rack r1. + existing := []Location{loc("dc1", "r1", "n1"), loc("dc1", "r1", "n1"), loc("dc1", "r1", "n2")} + + // Moving ONE replica off n1 onto a new node n3 in r1 must be rejected: r1 would + // still hold two replicas (the retained n1 and n2), at the SameRackCount limit. + if IsGoodMove(rp, existing, "n1", loc("dc1", "r1", "n3")) { + t.Error("expected reject: moving one replica off an over-replicated node must keep its other replica in the rack count") + } + + // The normal case is unaffected: two replicas correctly in r1, move one to a + // third node in r1 — still within the SameRackCount=1 (two-per-rack) limit. + normal := []Location{loc("dc1", "r1", "n1"), loc("dc1", "r1", "n2")} + if !IsGoodMove(rp, normal, "n1", loc("dc1", "r1", "n3")) { + t.Error("expected allow: a normal move within the rack limit") + } +} diff --git a/weed/worker/tasks/balance/replica_placement.go b/weed/worker/tasks/balance/replica_placement.go index d84333ec3..23a88755e 100644 --- a/weed/worker/tasks/balance/replica_placement.go +++ b/weed/worker/tasks/balance/replica_placement.go @@ -1,167 +1,37 @@ package balance import ( - "slices" - - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" + "github.com/seaweedfs/seaweedfs/weed/topology/balancer" "github.com/seaweedfs/seaweedfs/weed/worker/types" ) // hostFromAddress returns the physical machine (host/IP) of a server address, -// falling back to the node id (== ip:port in the common case) when no address is -// known. Servers sharing a host are one fault domain. +// falling back to the node id when no address is known. func hostFromAddress(address, nodeID string) string { - if address == "" { - address = nodeID - } - return pb.ServerAddress(address).ToHost() + return balancer.HostFromAddress(address, nodeID) } -// rackKey uniquely identifies a rack within a data center. -type rackKey struct { - DataCenter string - Rack string -} - -// nodeKey uniquely identifies a node within a rack. -type nodeKey struct { - DataCenter string - Rack string - NodeID string -} - -// IsGoodMove checks whether moving a volume from sourceNodeID to target -// would satisfy the volume's replica placement policy, given the current -// set of replica locations. +// IsGoodMove checks whether moving a volume from sourceNodeID to target would +// satisfy the volume's replica placement policy. It is a thin adapter over the +// shared placement logic in weed/topology/balancer so the shell command and this +// worker cannot drift. func IsGoodMove(rp *super_block.ReplicaPlacement, existingReplicas []types.ReplicaLocation, sourceNodeID string, target types.ReplicaLocation) bool { - if rp == nil || !rp.HasReplication() { - return true // no replication constraint + locs := make([]balancer.Location, len(existingReplicas)) + for i, r := range existingReplicas { + locs[i] = toBalancerLocation(r) } - - // Build the replica set after the move: remove source, add target - afterMove := make([]types.ReplicaLocation, 0, len(existingReplicas)) - sourceFound := false - for _, r := range existingReplicas { - if r.NodeID == sourceNodeID { - sourceFound = true - } else { - afterMove = append(afterMove, r) - } - } - if !sourceFound { - // Source not in replica list — cluster state may be inconsistent. - // Treat as unsafe to avoid incorrect placement decisions. - return false - } - - // Best-effort machine anti-affinity: don't move a replica onto a host that - // already holds another replica of this volume, so a single machine failure - // can't take out two replicas. - if target.Host != "" { - for _, r := range afterMove { - if r.Host == target.Host { - return false - } - } - } - - return satisfyReplicaPlacement(rp, afterMove, target) + return balancer.IsGoodMove(rp, locs, sourceNodeID, toBalancerLocation(target)) } -// satisfyReplicaPlacement checks whether placing a replica at target -// is consistent with the replication policy, given the existing replicas. -// Ported from weed/shell/command_volume_fix_replication.go -func satisfyReplicaPlacement(rp *super_block.ReplicaPlacement, replicas []types.ReplicaLocation, target types.ReplicaLocation) bool { - existingDCs, _, existingNodes := countReplicas(replicas) - - targetNK := nodeKey{DataCenter: target.DataCenter, Rack: target.Rack, NodeID: target.NodeID} - if _, found := existingNodes[targetNK]; found { - // avoid duplicated volume on the same data node - return false +// toBalancerLocation copies a worker replica location to the shared placement +// type field-by-field, so a future change to either struct is a compile error +// here rather than a silent mismatch. +func toBalancerLocation(r types.ReplicaLocation) balancer.Location { + return balancer.Location{ + DataCenter: r.DataCenter, + Rack: r.Rack, + NodeID: r.NodeID, + Host: r.Host, } - - primaryDCs, _ := findTopDCKeys(existingDCs) - - // ensure data center count is within limit - if _, found := existingDCs[target.DataCenter]; !found { - // different from existing dcs - if len(existingDCs) < rp.DiffDataCenterCount+1 { - return true - } - return false - } - // now same as one of existing data centers - if !slices.Contains(primaryDCs, target.DataCenter) { - return false - } - - // now on a primary dc - check racks within this DC - primaryDcRacks := make(map[rackKey]int) - for _, r := range replicas { - if r.DataCenter != target.DataCenter { - continue - } - primaryDcRacks[rackKey{DataCenter: r.DataCenter, Rack: r.Rack}]++ - } - - targetRK := rackKey{DataCenter: target.DataCenter, Rack: target.Rack} - primaryRacks, _ := findTopRackKeys(primaryDcRacks) - sameRackCount := primaryDcRacks[targetRK] - - if _, found := primaryDcRacks[targetRK]; !found { - // different from existing racks - if len(primaryDcRacks) < rp.DiffRackCount+1 { - return true - } - return false - } - // same as one of existing racks - if !slices.Contains(primaryRacks, targetRK) { - return false - } - - // on primary rack - check same-rack count - if sameRackCount < rp.SameRackCount+1 { - return true - } - return false -} - -func countReplicas(replicas []types.ReplicaLocation) (dcCounts map[string]int, rackCounts map[rackKey]int, nodeCounts map[nodeKey]int) { - dcCounts = make(map[string]int) - rackCounts = make(map[rackKey]int) - nodeCounts = make(map[nodeKey]int) - for _, r := range replicas { - dcCounts[r.DataCenter]++ - rackCounts[rackKey{DataCenter: r.DataCenter, Rack: r.Rack}]++ - nodeCounts[nodeKey{DataCenter: r.DataCenter, Rack: r.Rack, NodeID: r.NodeID}]++ - } - return -} - -func findTopDCKeys(m map[string]int) (topKeys []string, max int) { - for k, c := range m { - if max < c { - topKeys = topKeys[:0] - topKeys = append(topKeys, k) - max = c - } else if max == c { - topKeys = append(topKeys, k) - } - } - return -} - -func findTopRackKeys(m map[rackKey]int) (topKeys []rackKey, max int) { - for k, c := range m { - if max < c { - topKeys = topKeys[:0] - topKeys = append(topKeys, k) - max = c - } else if max == c { - topKeys = append(topKeys, k) - } - } - return }