Delete the EC placement package now that encode/repair use ecbalancer.Place (#9624)

Delete the EC placement package and the dead encode planner code

Now that encode (and repair) place via ecbalancer.Place, nothing uses the
erasure_coding/placement package or the EC-only planner machinery
(ecPlacementPlanner, diskInfosToCandidates, calculateECScoreCandidate,
distributeECShards) in detection.go. Removes them and the package, along with the
planner-direct unit tests.
This commit is contained in:
Chris Lu
2026-05-22 20:32:09 -07:00
committed by GitHub
parent 0566fbd552
commit d1665750e1
4 changed files with 0 additions and 1047 deletions

View File

@@ -1,461 +0,0 @@
// Package placement provides consolidated EC shard placement logic used by
// both shell commands and worker tasks.
//
// This package encapsulates the algorithms for:
// - Selecting destination nodes/disks for EC shards
// - Ensuring proper spread across racks, servers, and disks
// - Balancing shards across the cluster
package placement
import (
"fmt"
"sort"
"strings"
)
// DiskCandidate represents a disk that can receive EC shards
type DiskCandidate struct {
NodeID string
DiskID uint32
DataCenter string
Rack string
DiskType string // disk type (hdd/ssd/...) — empty means HardDrive
// Capacity information
VolumeCount int64
MaxVolumeCount int64
ShardCount int // Current number of EC shards on this disk
FreeSlots int // Available slots for new shards
// Load information
LoadCount int // Number of active tasks on this disk
}
// NodeCandidate represents a server node that can receive EC shards
type NodeCandidate struct {
NodeID string
DataCenter string
Rack string
FreeSlots int
ShardCount int // Total shards across all disks
Disks []*DiskCandidate // All disks on this node
}
// PlacementRequest configures EC shard placement behavior
type PlacementRequest struct {
// ShardsNeeded is the total number of shards to place
ShardsNeeded int
// MaxShardsPerServer limits how many shards can be placed on a single server
// 0 means no limit (but prefer spreading when possible)
MaxShardsPerServer int
// MaxShardsPerRack limits how many shards can be placed in a single rack
// 0 means no limit
MaxShardsPerRack int
// MaxTaskLoad is the maximum task load count for a disk to be considered
MaxTaskLoad int
// PreferDifferentServers when true, spreads shards across different servers
// before using multiple disks on the same server
PreferDifferentServers bool
// PreferDifferentRacks when true, spreads shards across different racks
// before using multiple servers in the same rack
PreferDifferentRacks bool
// PreferredDiskType, when non-empty, biases placement toward disks of
// this type. Disks of the preferred type are exhausted (subject to the
// other diversity preferences) before disks of any other type are
// considered. Empty means no disk-type bias — all suitable disks form a
// single pool, matching pre-#9423 behavior.
PreferredDiskType string
}
// PlacementResult contains the selected destinations for EC shards
type PlacementResult struct {
SelectedDisks []*DiskCandidate
// Statistics
ServersUsed int
RacksUsed int
DCsUsed int
// Distribution maps
ShardsPerServer map[string]int
ShardsPerRack map[string]int
ShardsPerDC map[string]int
// SpilledToOtherDiskType is set when PlacementRequest.PreferredDiskType
// was non-empty but the preferred-type pool could not satisfy
// ShardsNeeded, so placement had to spill onto disks of other types.
// Callers can log a warning when this is true.
SpilledToOtherDiskType bool
}
// SelectDestinations selects the best disks for EC shard placement.
// This is the main entry point for EC placement logic.
//
// Disk-type preference (#9423): when config.PreferredDiskType is non-empty,
// suitable disks are partitioned into a matching-type tier and a
// fallback tier. Each tier is run through the diversity passes below;
// the fallback tier is only consulted if the matching tier runs out of
// candidates before ShardsNeeded is satisfied. Empty PreferredDiskType
// processes all suitable disks as one tier, preserving prior behavior.
//
// Within each tier, the algorithm works in multiple passes:
// 1. First pass: Select one disk from each rack (maximize rack diversity)
// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity)
// 3. Third pass: Select additional disks from servers already used (maximize disk diversity)
func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*PlacementResult, error) {
if len(disks) == 0 {
return nil, fmt.Errorf("no disk candidates provided")
}
if config.ShardsNeeded <= 0 {
return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded)
}
// Filter suitable disks
suitable := filterSuitableDisks(disks, config)
if len(suitable) == 0 {
return nil, fmt.Errorf("no suitable disks found after filtering")
}
result := &PlacementResult{
SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded),
ShardsPerServer: make(map[string]int),
ShardsPerRack: make(map[string]int),
ShardsPerDC: make(map[string]int),
}
usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool
usedServers := make(map[string]bool) // nodeID -> bool
usedRacks := make(map[string]bool) // "dc:rack" -> bool
// Partition suitable into preferred-disk-type / fallback tiers.
// Process the preferred tier first; only spill to fallback when the
// preferred pool can't satisfy ShardsNeeded.
preferredTier, fallbackTier := partitionByDiskType(suitable, config.PreferredDiskType)
selectFromTier(preferredTier, result, usedDisks, usedServers, usedRacks, config)
if config.PreferredDiskType != "" && len(result.SelectedDisks) < config.ShardsNeeded && len(fallbackTier) > 0 {
before := len(result.SelectedDisks)
selectFromTier(fallbackTier, result, usedDisks, usedServers, usedRacks, config)
if len(result.SelectedDisks) > before {
result.SpilledToOtherDiskType = true
}
}
// Calculate final statistics
result.ServersUsed = len(usedServers)
result.RacksUsed = len(usedRacks)
dcSet := make(map[string]bool)
for _, disk := range result.SelectedDisks {
dcSet[disk.DataCenter] = true
}
result.DCsUsed = len(dcSet)
return result, nil
}
// partitionByDiskType splits disks into (matching, fallback) based on the
// preferred disk type. If preferred is empty, everything goes into the
// matching tier and fallback is empty — i.e. existing single-pool behavior.
//
// Empty DiskCandidate.DiskType is treated as HardDriveType ("hdd") to
// mirror weed/storage/types.ToDiskType's normalization, so a
// PreferredDiskType of "hdd" matches disks reporting "" — otherwise EC
// shards from an HDD source would always spill onto disks that happen to
// report their type as "" (HardDriveType).
func partitionByDiskType(disks []*DiskCandidate, preferred string) (matching, fallback []*DiskCandidate) {
if preferred == "" {
return disks, nil
}
pref := normalizeDiskType(preferred)
for _, d := range disks {
if normalizeDiskType(d.DiskType) == pref {
matching = append(matching, d)
} else {
fallback = append(fallback, d)
}
}
return matching, fallback
}
// normalizeDiskType lower-cases the input and folds "" to "hdd" so the
// HardDriveType sentinel ("") and explicit "hdd"/"HDD" all compare equal.
func normalizeDiskType(t string) string {
t = strings.ToLower(t)
if t == "" {
return "hdd"
}
return t
}
// selectFromTier runs the three diversity passes against `tier`, mutating
// `result` and the used* maps in place. Passes stop as soon as ShardsNeeded
// is reached. The function is a no-op when the tier is empty or the result
// already has enough shards, so it is safe to call once per tier.
func selectFromTier(tier []*DiskCandidate, result *PlacementResult,
usedDisks, usedServers, usedRacks map[string]bool,
config PlacementRequest) {
if len(tier) == 0 || len(result.SelectedDisks) >= config.ShardsNeeded {
return
}
rackToDisks := groupDisksByRack(tier)
// Pass 1: Select one disk from each rack (maximize rack diversity).
// When this is the fallback tier (preferred tier already populated
// usedRacks), skip those racks so the spillover still spreads onto
// new racks instead of doubling up on ones already picked.
if config.PreferDifferentRacks {
// Sort racks by number of available servers (descending) to prioritize racks with more options
sortedRacks := sortRacksByServerCount(rackToDisks)
for _, rackKey := range sortedRacks {
if len(result.SelectedDisks) >= config.ShardsNeeded {
break
}
if usedRacks[rackKey] {
continue
}
rackDisks := rackToDisks[rackKey]
// Select best disk from this rack, preferring a new server
disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config)
if disk != nil {
addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
}
}
}
// Pass 2: Select disks from unused servers in already-used racks
if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded {
for _, rackKey := range getSortedRackKeys(rackToDisks) {
if len(result.SelectedDisks) >= config.ShardsNeeded {
break
}
rackDisks := rackToDisks[rackKey]
for _, disk := range sortDisksByScore(rackDisks) {
if len(result.SelectedDisks) >= config.ShardsNeeded {
break
}
diskKey := getDiskKey(disk)
if usedDisks[diskKey] {
continue
}
// Skip if server already used (we want different servers in this pass)
if usedServers[disk.NodeID] {
continue
}
// Check server limit
if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer {
continue
}
// Check rack limit
if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
continue
}
addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
}
}
}
// Pass 3: Fill remaining slots from already-used servers (different disks)
// Use round-robin across servers to balance shards evenly
if len(result.SelectedDisks) < config.ShardsNeeded {
// Group remaining disks by server (within this tier)
serverToRemainingDisks := make(map[string][]*DiskCandidate)
for _, disk := range tier {
if !usedDisks[getDiskKey(disk)] {
serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk)
}
}
// Sort each server's disks by score
for serverID := range serverToRemainingDisks {
serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID])
}
// Round-robin: repeatedly select from the server with the fewest shards
for len(result.SelectedDisks) < config.ShardsNeeded {
// Find server with fewest shards that still has available disks
var bestServer string
minShards := -1
for serverID, disks := range serverToRemainingDisks {
if len(disks) == 0 {
continue
}
// Check server limit
if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer {
continue
}
shardCount := result.ShardsPerServer[serverID]
if minShards == -1 || shardCount < minShards {
minShards = shardCount
bestServer = serverID
} else if shardCount == minShards && serverID < bestServer {
// Tie-break by server name for determinism
bestServer = serverID
}
}
if bestServer == "" {
// No more servers with available disks
break
}
// Pop the best disk from this server
disks := serverToRemainingDisks[bestServer]
disk := disks[0]
serverToRemainingDisks[bestServer] = disks[1:]
// Check rack limit
if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack {
continue
}
addDiskToResult(result, disk, usedDisks, usedServers, usedRacks)
}
}
}
// filterSuitableDisks filters disks that are suitable for EC placement
func filterSuitableDisks(disks []*DiskCandidate, config PlacementRequest) []*DiskCandidate {
var suitable []*DiskCandidate
for _, disk := range disks {
if disk.FreeSlots <= 0 {
continue
}
if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad {
continue
}
suitable = append(suitable, disk)
}
return suitable
}
// groupDisksByRack groups disks by their rack (dc:rack key)
func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate {
result := make(map[string][]*DiskCandidate)
for _, disk := range disks {
key := getRackKey(disk)
result[key] = append(result[key], disk)
}
return result
}
// getRackKey returns the unique key for a rack (dc:rack)
func getRackKey(disk *DiskCandidate) string {
return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
}
// getDiskKey returns the unique key for a disk (nodeID:diskID)
func getDiskKey(disk *DiskCandidate) string {
return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID)
}
// sortRacksByServerCount returns rack keys sorted by number of servers (ascending)
func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string {
// Count unique servers per rack
rackServerCount := make(map[string]int)
for rackKey, disks := range rackToDisks {
servers := make(map[string]bool)
for _, disk := range disks {
servers[disk.NodeID] = true
}
rackServerCount[rackKey] = len(servers)
}
keys := getSortedRackKeys(rackToDisks)
sort.Slice(keys, func(i, j int) bool {
// Sort by server count (descending) to pick from racks with more options first
return rackServerCount[keys[i]] > rackServerCount[keys[j]]
})
return keys
}
// getSortedRackKeys returns rack keys in a deterministic order
func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string {
keys := make([]string, 0, len(rackToDisks))
for k := range rackToDisks {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
// selectBestDiskFromRack selects the best disk from a rack for EC placement
// It prefers servers that haven't been used yet
func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementRequest) *DiskCandidate {
var bestDisk *DiskCandidate
bestScore := -1.0
bestIsFromUnusedServer := false
for _, disk := range disks {
if usedDisks[getDiskKey(disk)] {
continue
}
isFromUnusedServer := !usedServers[disk.NodeID]
score := calculateDiskScore(disk)
// Prefer unused servers
if isFromUnusedServer && !bestIsFromUnusedServer {
bestDisk = disk
bestScore = score
bestIsFromUnusedServer = true
} else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore {
bestDisk = disk
bestScore = score
}
}
return bestDisk
}
// sortDisksByScore returns disks sorted by score (best first)
func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate {
sorted := make([]*DiskCandidate, len(disks))
copy(sorted, disks)
sort.Slice(sorted, func(i, j int) bool {
return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j])
})
return sorted
}
// calculateDiskScore calculates a score for a disk candidate
// Higher score is better
func calculateDiskScore(disk *DiskCandidate) float64 {
score := 0.0
// Primary factor: available capacity (lower utilization is better)
if disk.MaxVolumeCount > 0 {
utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount)
score += (1.0 - utilization) * 60.0 // Up to 60 points
} else {
score += 30.0 // Default if no max count
}
// Secondary factor: fewer shards already on this disk is better
score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points
// Tertiary factor: lower load is better
score += float64(10 - disk.LoadCount) // Up to 10 points
return score
}
// addDiskToResult adds a disk to the result and updates tracking maps
func addDiskToResult(result *PlacementResult, disk *DiskCandidate,
usedDisks, usedServers, usedRacks map[string]bool) {
diskKey := getDiskKey(disk)
rackKey := getRackKey(disk)
result.SelectedDisks = append(result.SelectedDisks, disk)
usedDisks[diskKey] = true
usedServers[disk.NodeID] = true
usedRacks[rackKey] = true
result.ShardsPerServer[disk.NodeID]++
result.ShardsPerRack[rackKey]++
result.ShardsPerDC[disk.DataCenter]++
}

View File

@@ -1,143 +0,0 @@
package placement
import (
"strconv"
"testing"
)
// makeDisk builds a DiskCandidate with sensible defaults; tests override
// only the fields they care about.
func makeDisk(node, rack, diskType string, diskID uint32) *DiskCandidate {
return &DiskCandidate{
NodeID: node,
DiskID: diskID,
DataCenter: "dc1",
Rack: rack,
DiskType: diskType,
VolumeCount: 0,
MaxVolumeCount: 100,
FreeSlots: 100,
}
}
func disksByType(disks []*DiskCandidate) map[string]int {
out := map[string]int{}
for _, d := range disks {
out[d.DiskType]++
}
return out
}
func newRequest(shards int, preferred string) PlacementRequest {
return PlacementRequest{
ShardsNeeded: shards,
PreferDifferentServers: true,
PreferDifferentRacks: true,
PreferredDiskType: preferred,
}
}
// Plenty of SSD disks available: placement should fill entirely from SSD
// when PreferredDiskType="ssd", leaving HDDs untouched and not flagging
// spillover.
func TestSelectDestinations_PrefersMatchingDiskType(t *testing.T) {
var disks []*DiskCandidate
for i := 0; i < 6; i++ {
disks = append(disks, makeDisk("ssd-"+strconv.Itoa(i), "r"+strconv.Itoa(i%3), "ssd", uint32(i)))
}
for i := 0; i < 6; i++ {
disks = append(disks, makeDisk("hdd-"+strconv.Itoa(i), "r"+strconv.Itoa(i%3), "", uint32(i)))
}
result, err := SelectDestinations(disks, newRequest(4, "ssd"))
if err != nil {
t.Fatalf("SelectDestinations: %v", err)
}
if got := len(result.SelectedDisks); got != 4 {
t.Fatalf("selected %d disks, want 4", got)
}
if counts := disksByType(result.SelectedDisks); counts["ssd"] != 4 {
t.Fatalf("disk-type counts = %v, want ssd=4", counts)
}
if result.SpilledToOtherDiskType {
t.Fatalf("SpilledToOtherDiskType should be false when preferred pool was sufficient")
}
}
// Only one SSD disk available but 4 shards needed: placement must consume
// the SSD first, then spill to HDD for the remainder, and report spillover.
func TestSelectDestinations_SpillsWhenPreferredScarce(t *testing.T) {
disks := []*DiskCandidate{
makeDisk("ssd-0", "r0", "ssd", 0),
makeDisk("hdd-0", "r1", "", 0),
makeDisk("hdd-1", "r2", "", 0),
makeDisk("hdd-2", "r3", "", 0),
}
result, err := SelectDestinations(disks, newRequest(4, "ssd"))
if err != nil {
t.Fatalf("SelectDestinations: %v", err)
}
if got := len(result.SelectedDisks); got != 4 {
t.Fatalf("selected %d disks, want 4", got)
}
counts := disksByType(result.SelectedDisks)
if counts["ssd"] != 1 || counts[""] != 3 {
t.Fatalf("disk-type counts = %v, want ssd=1 hdd=3", counts)
}
if !result.SpilledToOtherDiskType {
t.Fatalf("SpilledToOtherDiskType should be true after falling back to HDD")
}
}
// PreferredDiskType="hdd" must match disks whose DiskType is "" (the
// HardDriveType sentinel) — otherwise EC encoding of an HDD source would
// always spill onto HDDs that happen to report disk_type="" even though
// the cluster has plenty of matching capacity.
func TestSelectDestinations_PreferredHddMatchesEmptyDiskType(t *testing.T) {
disks := []*DiskCandidate{
makeDisk("hdd-0", "r0", "", 0), // HardDriveType sentinel
makeDisk("hdd-1", "r1", "", 0), // HardDriveType sentinel
makeDisk("ssd-0", "r2", "ssd", 0),
}
result, err := SelectDestinations(disks, newRequest(2, "hdd"))
if err != nil {
t.Fatalf("SelectDestinations: %v", err)
}
if got := len(result.SelectedDisks); got != 2 {
t.Fatalf("selected %d disks, want 2", got)
}
// Both selected disks must be HDD-reporting (i.e. DiskType == ""),
// and no spillover should have been required.
for _, d := range result.SelectedDisks {
if d.DiskType != "" {
t.Errorf("selected disk %s has DiskType=%q, want \"\" (HardDriveType)", d.NodeID, d.DiskType)
}
}
if result.SpilledToOtherDiskType {
t.Fatalf("SpilledToOtherDiskType should be false when HDD pool matches preferred=hdd")
}
}
// Empty PreferredDiskType: pre-#9423 behavior, single pool, no spillover
// flag regardless of disk-type mix.
func TestSelectDestinations_EmptyPreferredDiskTypeKeepsPriorBehavior(t *testing.T) {
disks := []*DiskCandidate{
makeDisk("ssd-0", "r0", "ssd", 0),
makeDisk("hdd-0", "r1", "", 0),
makeDisk("hdd-1", "r2", "", 0),
makeDisk("ssd-1", "r3", "ssd", 0),
}
result, err := SelectDestinations(disks, newRequest(3, ""))
if err != nil {
t.Fatalf("SelectDestinations: %v", err)
}
if got := len(result.SelectedDisks); got != 3 {
t.Fatalf("selected %d disks, want 3", got)
}
if result.SpilledToOtherDiskType {
t.Fatalf("SpilledToOtherDiskType should never be set when PreferredDiskType is empty")
}
}

View File

@@ -15,9 +15,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
workerutil "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util"
@@ -421,273 +419,6 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
return results, hasMore, nil
}
type ecDiskState struct {
baseAvailable int64
reservedVolumes int32
reservedShardSlots int32
}
type ecPlacementPlanner struct {
activeTopology *topology.ActiveTopology
candidates []*placement.DiskCandidate
candidateByKey map[string]*placement.DiskCandidate
diskStates map[string]*ecDiskState
diskTags map[string][]string
preferredTags []string
}
func newECPlacementPlanner(activeTopology *topology.ActiveTopology, preferredTags []string) *ecPlacementPlanner {
if activeTopology == nil {
return nil
}
disks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 0)
candidates := diskInfosToCandidates(disks)
tagsByKey := collectDiskTags(disks)
normalizedPreferredTags := util.NormalizeTagList(preferredTags)
if len(candidates) == 0 {
return &ecPlacementPlanner{
activeTopology: activeTopology,
candidates: candidates,
candidateByKey: map[string]*placement.DiskCandidate{},
diskStates: map[string]*ecDiskState{},
diskTags: tagsByKey,
preferredTags: normalizedPreferredTags,
}
}
candidateByKey := make(map[string]*placement.DiskCandidate, len(candidates))
diskStates := make(map[string]*ecDiskState, len(candidates))
for _, candidate := range candidates {
key := ecDiskKey(candidate.NodeID, candidate.DiskID)
candidateByKey[key] = candidate
diskStates[key] = &ecDiskState{
baseAvailable: int64(candidate.FreeSlots),
}
}
return &ecPlacementPlanner{
activeTopology: activeTopology,
candidates: candidates,
candidateByKey: candidateByKey,
diskStates: diskStates,
diskTags: tagsByKey,
preferredTags: normalizedPreferredTags,
}
}
func (p *ecPlacementPlanner) selectDestinations(sourceRack, sourceDC, sourceDiskType string, shardsNeeded int) ([]*placement.DiskCandidate, error) {
if p == nil || p.activeTopology == nil {
return nil, fmt.Errorf("ec placement planner is not initialized")
}
if shardsNeeded <= 0 {
return nil, fmt.Errorf("invalid shardsNeeded %d", shardsNeeded)
}
config := placement.PlacementRequest{
ShardsNeeded: shardsNeeded,
MaxShardsPerServer: 0,
MaxShardsPerRack: 0,
MaxTaskLoad: topology.MaxTaskLoadForECPlacement,
PreferDifferentServers: true,
PreferDifferentRacks: true,
// Bias placement toward disks matching the source volume's disk
// type; placement spills to other types only if the preferred
// pool can't satisfy ShardsNeeded (#9423).
PreferredDiskType: sourceDiskType,
}
var lastErr error
for _, candidates := range p.buildCandidateSets(shardsNeeded) {
if len(candidates) == 0 {
continue
}
result, err := placement.SelectDestinations(candidates, config)
if err == nil {
if result.SpilledToOtherDiskType {
glog.Warningf("EC placement spilled to disks outside preferred disk type %q to reach %d shards (source rack=%s dc=%s)",
sourceDiskType, shardsNeeded, sourceRack, sourceDC)
}
return result.SelectedDisks, nil
}
lastErr = err
}
if lastErr == nil {
lastErr = fmt.Errorf("no EC placement candidates available")
}
return nil, lastErr
}
func (p *ecPlacementPlanner) applyTaskReservations(volumeSize int64, sources []topology.TaskSourceSpec, destinations []topology.TaskDestinationSpec) {
if p == nil {
return
}
touched := make(map[string]bool)
for _, source := range sources {
impact := p.sourceImpact(source, volumeSize)
p.applyImpact(source.ServerID, source.DiskID, impact)
p.bumpShardCount(source.ServerID, source.DiskID, impact.ShardSlots)
key := ecDiskKey(source.ServerID, source.DiskID)
if !touched[key] {
p.bumpLoad(source.ServerID, source.DiskID)
touched[key] = true
}
}
for _, dest := range destinations {
impact := p.destinationImpact(dest, volumeSize)
p.applyImpact(dest.ServerID, dest.DiskID, impact)
p.bumpShardCount(dest.ServerID, dest.DiskID, impact.ShardSlots)
key := ecDiskKey(dest.ServerID, dest.DiskID)
if !touched[key] {
p.bumpLoad(dest.ServerID, dest.DiskID)
touched[key] = true
}
}
}
func (p *ecPlacementPlanner) sourceImpact(source topology.TaskSourceSpec, volumeSize int64) topology.StorageSlotChange {
if source.StorageImpact != nil {
return *source.StorageImpact
}
if source.CleanupType == topology.CleanupECShards {
return topology.CalculateECShardCleanupImpact(volumeSize)
}
impact, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, volumeSize)
return impact
}
func (p *ecPlacementPlanner) destinationImpact(dest topology.TaskDestinationSpec, volumeSize int64) topology.StorageSlotChange {
if dest.StorageImpact != nil {
return *dest.StorageImpact
}
_, impact := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, volumeSize)
return impact
}
func (p *ecPlacementPlanner) applyImpact(nodeID string, diskID uint32, impact topology.StorageSlotChange) {
if impact.IsZero() {
return
}
key := ecDiskKey(nodeID, diskID)
state, ok := p.diskStates[key]
if !ok {
return
}
state.reservedVolumes += impact.VolumeSlots
state.reservedShardSlots += impact.ShardSlots
available := state.baseAvailable - int64(state.reservedVolumes) - int64(state.reservedShardSlots)/int64(topology.ShardsPerVolumeSlot)
if available < 0 {
available = 0
}
if candidate, ok := p.candidateByKey[key]; ok {
candidate.FreeSlots = int(available)
candidate.VolumeCount = candidate.MaxVolumeCount - available
}
}
func (p *ecPlacementPlanner) bumpLoad(nodeID string, diskID uint32) {
key := ecDiskKey(nodeID, diskID)
if candidate, ok := p.candidateByKey[key]; ok {
candidate.LoadCount++
}
}
func (p *ecPlacementPlanner) bumpShardCount(nodeID string, diskID uint32, delta int32) {
if delta == 0 {
return
}
key := ecDiskKey(nodeID, diskID)
if candidate, ok := p.candidateByKey[key]; ok {
candidate.ShardCount += int(delta)
if candidate.ShardCount < 0 {
candidate.ShardCount = 0
}
}
}
func ecDiskKey(nodeID string, diskID uint32) string {
return fmt.Sprintf("%s:%d", nodeID, diskID)
}
func collectDiskTags(disks []*topology.DiskInfo) map[string][]string {
tagMap := make(map[string][]string, len(disks))
for _, disk := range disks {
if disk == nil || disk.DiskInfo == nil {
continue
}
key := ecDiskKey(disk.NodeID, disk.DiskID)
tags := util.NormalizeTagList(disk.DiskInfo.Tags)
if len(tags) > 0 {
tagMap[key] = tags
}
}
return tagMap
}
func diskHasTag(tags []string, tag string) bool {
if tag == "" || len(tags) == 0 {
return false
}
for _, candidate := range tags {
if candidate == tag {
return true
}
}
return false
}
// buildCandidateSets builds tiered candidate sets for preferred-tag prioritized placement.
// For a planner with preferredTags, it accumulates disks matching each tag in order into
// progressively larger tiers. It emits a candidate set once a tier reaches shardsNeeded,
// then continues accumulating for subsequent tags. Finally, it falls back to the full
// p.candidates set if preferred-tag tiers are insufficient. This ensures tagged disks
// are selected first before falling back to all available candidates.
func (p *ecPlacementPlanner) buildCandidateSets(shardsNeeded int) [][]*placement.DiskCandidate {
if p == nil {
return nil
}
if len(p.preferredTags) == 0 {
return [][]*placement.DiskCandidate{p.candidates}
}
selected := make(map[string]bool, len(p.candidates))
var tier []*placement.DiskCandidate
var candidateSets [][]*placement.DiskCandidate
for _, tag := range p.preferredTags {
for _, candidate := range p.candidates {
key := ecDiskKey(candidate.NodeID, candidate.DiskID)
if selected[key] {
continue
}
if diskHasTag(p.diskTags[key], tag) {
selected[key] = true
tier = append(tier, candidate)
}
}
if shardsNeeded > 0 && len(tier) >= shardsNeeded {
candidateSets = append(candidateSets, append([]*placement.DiskCandidate(nil), tier...))
}
}
// Defensive check: selectDestinations always ensures shardsNeeded > 0 before calling
// buildCandidateSets, but this branch handles direct callers and edge cases.
if shardsNeeded <= 0 && len(tier) > 0 {
candidateSets = append(candidateSets, append([]*placement.DiskCandidate(nil), tier...))
}
if len(tier) < len(p.candidates) {
candidateSets = append(candidateSets, p.candidates)
} else if len(candidateSets) == 0 {
candidateSets = append(candidateSets, p.candidates)
}
return candidateSets
}
// planECDestinations plans the destinations for erasure coding operation.
// dataShards/parityShards are parameters so callers can drive non-10+4 ratios.
// countTopologyNodes counts volume-server nodes in the active topology, used by
// the min-node safety gate.
func countTopologyNodes(at *topology.ActiveTopology) int {
@@ -827,23 +558,6 @@ func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthM
}, shardsPerPlan, nil
}
// distributeECShards assigns shard ids 0..totalShards-1 across numTargets
// targets round-robin, so each target holds either floor or ceil of
// totalShards/numTargets shards. When numTargets < totalShards this packs
// several shards onto a target; planECDestinations guarantees numTargets is at
// least ceil(totalShards/parityShards), so no target exceeds parityShards shards.
func distributeECShards(totalShards, numTargets int) [][]uint32 {
targetShards := make([][]uint32, numTargets)
for i := range targetShards {
targetShards[i] = make([]uint32, 0)
}
for shardId := 0; shardId < totalShards; shardId++ {
targetIndex := shardId % numTargets
targetShards[targetIndex] = append(targetShards[targetIndex], uint32(shardId))
}
return targetShards
}
// createECTargets builds TaskTargets from the per-disk plans and the shard ids
// ecbalancer.Place assigned to each (shardsPerPlan is parallel to multiPlan.Plans).
func createECTargets(multiPlan *topology.MultiDestinationPlan, shardsPerPlan [][]uint32) []*worker_pb.TaskTarget {
@@ -913,68 +627,6 @@ func createECTaskParams(dataShards, parityShards int, sourceDiskType string) *wo
}
}
// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice
func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate {
var candidates []*placement.DiskCandidate
for _, disk := range disks {
if disk.DiskInfo == nil {
continue
}
// Calculate free slots (using default max if not set)
freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount)
if freeSlots < 0 {
freeSlots = 0
}
// Calculate EC shard count for this specific disk
// EcShardInfos contains all shards, so we need to filter by DiskId and sum actual shard counts
ecShardCount := 0
if disk.DiskInfo.EcShardInfos != nil {
for _, shardInfo := range disk.DiskInfo.EcShardInfos {
if shardInfo.DiskId == disk.DiskID {
ecShardCount += erasure_coding.GetShardCount(shardInfo)
}
}
}
candidates = append(candidates, &placement.DiskCandidate{
NodeID: disk.NodeID,
DiskID: disk.DiskID,
DataCenter: disk.DataCenter,
Rack: disk.Rack,
DiskType: disk.DiskType,
VolumeCount: disk.DiskInfo.VolumeCount,
MaxVolumeCount: disk.DiskInfo.MaxVolumeCount,
ShardCount: ecShardCount,
FreeSlots: freeSlots,
LoadCount: disk.LoadCount,
})
}
return candidates
}
// calculateECScoreCandidate calculates placement score for EC operations.
// Used for logging and plan metadata.
func calculateECScoreCandidate(disk *placement.DiskCandidate, sourceRack, sourceDC string) float64 {
if disk == nil {
return 0.0
}
score := 0.0
// Prefer disks with available capacity (primary factor)
if disk.MaxVolumeCount > 0 {
utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount)
score += (1.0 - utilization) * 60.0 // Up to 60 points for available capacity
}
// Consider current load (secondary factor)
score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
return score
}
// findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
// Uses O(1) indexed lookup for optimal performance on large clusters.
func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {

View File

@@ -14,37 +14,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestECPlacementPlannerApplyReservations(t *testing.T) {
activeTopology := buildActiveTopology(t, 1, []string{"hdd"}, 10, 0)
planner := newECPlacementPlanner(activeTopology, nil)
require.NotNil(t, planner)
key := ecDiskKey("10.0.0.1:8080", 0)
candidate, ok := planner.candidateByKey[key]
require.True(t, ok)
assert.Equal(t, 10, candidate.FreeSlots)
assert.Equal(t, 0, candidate.ShardCount)
assert.Equal(t, 0, candidate.LoadCount)
shardImpact := topology.CalculateECShardStorageImpact(1, 1)
destinations := make([]topology.TaskDestinationSpec, 10)
for i := 0; i < 10; i++ {
destinations[i] = topology.TaskDestinationSpec{
ServerID: "10.0.0.1:8080",
DiskID: 0,
StorageImpact: &shardImpact,
}
}
planner.applyTaskReservations(1024, nil, destinations)
candidate = planner.candidateByKey[key]
assert.Equal(t, 9, candidate.FreeSlots, "10 shard slots should reduce available volume slots by 1")
assert.Equal(t, 10, candidate.ShardCount)
assert.Equal(t, 1, candidate.LoadCount, "load should only be incremented once per disk")
}
func TestPlanECDestinationsUsesPlanner(t *testing.T) {
activeTopology := buildActiveTopology(t, 7, []string{"hdd", "ssd"}, 100, 0)
@@ -61,70 +30,6 @@ func TestPlanECDestinationsUsesPlanner(t *testing.T) {
requireAllShardsPlaced(t, plan, shardsPerPlan)
}
func TestECPlacementPlannerPrefersTaggedDisks(t *testing.T) {
activeTopology := buildActiveTopology(t, 3, []string{"hdd"}, 10, 0)
topo := activeTopology.GetTopologyInfo()
for _, dc := range topo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for k, node := range rack.DataNodeInfos {
for diskType := range node.DiskInfos {
if k < 2 {
node.DiskInfos[diskType].Tags = []string{"fast"}
} else {
node.DiskInfos[diskType].Tags = []string{"slow"}
}
}
}
}
}
require.NoError(t, activeTopology.UpdateTopology(topo))
planner := newECPlacementPlanner(activeTopology, []string{"fast"})
require.NotNil(t, planner)
selected, err := planner.selectDestinations("", "", "", 2)
require.NoError(t, err)
require.Len(t, selected, 2)
for _, candidate := range selected {
key := ecDiskKey(candidate.NodeID, candidate.DiskID)
assert.True(t, diskHasTag(planner.diskTags[key], "fast"))
}
}
func TestECPlacementPlannerFallsBackWhenTagsInsufficient(t *testing.T) {
activeTopology := buildActiveTopology(t, 3, []string{"hdd"}, 10, 0)
topo := activeTopology.GetTopologyInfo()
for _, dc := range topo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for i, node := range rack.DataNodeInfos {
for diskType := range node.DiskInfos {
if i == 0 {
node.DiskInfos[diskType].Tags = []string{"fast"}
}
}
}
}
}
require.NoError(t, activeTopology.UpdateTopology(topo))
planner := newECPlacementPlanner(activeTopology, []string{"fast"})
require.NotNil(t, planner)
selected, err := planner.selectDestinations("", "", "", 3)
require.NoError(t, err)
require.Len(t, selected, 3)
taggedCount := 0
for _, candidate := range selected {
key := ecDiskKey(candidate.NodeID, candidate.DiskID)
if diskHasTag(planner.diskTags[key], "fast") {
taggedCount++
}
}
assert.Less(t, taggedCount, len(selected))
}
// TestDetectionSkipsWhenECShardsAlreadyExist guards against issue #9448: a
// regular replica that survived a previous successful EC encode (source
// delete didn't clean it up for some reason) gets re-proposed for encoding,