diff --git a/weed/storage/erasure_coding/placement/placement.go b/weed/storage/erasure_coding/placement/placement.go deleted file mode 100644 index efa0afa26..000000000 --- a/weed/storage/erasure_coding/placement/placement.go +++ /dev/null @@ -1,461 +0,0 @@ -// Package placement provides consolidated EC shard placement logic used by -// both shell commands and worker tasks. -// -// This package encapsulates the algorithms for: -// - Selecting destination nodes/disks for EC shards -// - Ensuring proper spread across racks, servers, and disks -// - Balancing shards across the cluster -package placement - -import ( - "fmt" - "sort" - "strings" -) - -// DiskCandidate represents a disk that can receive EC shards -type DiskCandidate struct { - NodeID string - DiskID uint32 - DataCenter string - Rack string - DiskType string // disk type (hdd/ssd/...) — empty means HardDrive - - // Capacity information - VolumeCount int64 - MaxVolumeCount int64 - ShardCount int // Current number of EC shards on this disk - FreeSlots int // Available slots for new shards - - // Load information - LoadCount int // Number of active tasks on this disk -} - -// NodeCandidate represents a server node that can receive EC shards -type NodeCandidate struct { - NodeID string - DataCenter string - Rack string - FreeSlots int - ShardCount int // Total shards across all disks - Disks []*DiskCandidate // All disks on this node -} - -// PlacementRequest configures EC shard placement behavior -type PlacementRequest struct { - // ShardsNeeded is the total number of shards to place - ShardsNeeded int - - // MaxShardsPerServer limits how many shards can be placed on a single server - // 0 means no limit (but prefer spreading when possible) - MaxShardsPerServer int - - // MaxShardsPerRack limits how many shards can be placed in a single rack - // 0 means no limit - MaxShardsPerRack int - - // MaxTaskLoad is the maximum task load count for a disk to be considered - MaxTaskLoad int - - // PreferDifferentServers when true, spreads shards across different servers - // before using multiple disks on the same server - PreferDifferentServers bool - - // PreferDifferentRacks when true, spreads shards across different racks - // before using multiple servers in the same rack - PreferDifferentRacks bool - - // PreferredDiskType, when non-empty, biases placement toward disks of - // this type. Disks of the preferred type are exhausted (subject to the - // other diversity preferences) before disks of any other type are - // considered. Empty means no disk-type bias — all suitable disks form a - // single pool, matching pre-#9423 behavior. - PreferredDiskType string -} - -// PlacementResult contains the selected destinations for EC shards -type PlacementResult struct { - SelectedDisks []*DiskCandidate - - // Statistics - ServersUsed int - RacksUsed int - DCsUsed int - - // Distribution maps - ShardsPerServer map[string]int - ShardsPerRack map[string]int - ShardsPerDC map[string]int - - // SpilledToOtherDiskType is set when PlacementRequest.PreferredDiskType - // was non-empty but the preferred-type pool could not satisfy - // ShardsNeeded, so placement had to spill onto disks of other types. - // Callers can log a warning when this is true. - SpilledToOtherDiskType bool -} - -// SelectDestinations selects the best disks for EC shard placement. -// This is the main entry point for EC placement logic. -// -// Disk-type preference (#9423): when config.PreferredDiskType is non-empty, -// suitable disks are partitioned into a matching-type tier and a -// fallback tier. Each tier is run through the diversity passes below; -// the fallback tier is only consulted if the matching tier runs out of -// candidates before ShardsNeeded is satisfied. Empty PreferredDiskType -// processes all suitable disks as one tier, preserving prior behavior. -// -// Within each tier, the algorithm works in multiple passes: -// 1. First pass: Select one disk from each rack (maximize rack diversity) -// 2. Second pass: Select one disk from each unused server in used racks (maximize server diversity) -// 3. Third pass: Select additional disks from servers already used (maximize disk diversity) -func SelectDestinations(disks []*DiskCandidate, config PlacementRequest) (*PlacementResult, error) { - if len(disks) == 0 { - return nil, fmt.Errorf("no disk candidates provided") - } - if config.ShardsNeeded <= 0 { - return nil, fmt.Errorf("shardsNeeded must be positive, got %d", config.ShardsNeeded) - } - - // Filter suitable disks - suitable := filterSuitableDisks(disks, config) - if len(suitable) == 0 { - return nil, fmt.Errorf("no suitable disks found after filtering") - } - - result := &PlacementResult{ - SelectedDisks: make([]*DiskCandidate, 0, config.ShardsNeeded), - ShardsPerServer: make(map[string]int), - ShardsPerRack: make(map[string]int), - ShardsPerDC: make(map[string]int), - } - - usedDisks := make(map[string]bool) // "nodeID:diskID" -> bool - usedServers := make(map[string]bool) // nodeID -> bool - usedRacks := make(map[string]bool) // "dc:rack" -> bool - - // Partition suitable into preferred-disk-type / fallback tiers. - // Process the preferred tier first; only spill to fallback when the - // preferred pool can't satisfy ShardsNeeded. - preferredTier, fallbackTier := partitionByDiskType(suitable, config.PreferredDiskType) - selectFromTier(preferredTier, result, usedDisks, usedServers, usedRacks, config) - if config.PreferredDiskType != "" && len(result.SelectedDisks) < config.ShardsNeeded && len(fallbackTier) > 0 { - before := len(result.SelectedDisks) - selectFromTier(fallbackTier, result, usedDisks, usedServers, usedRacks, config) - if len(result.SelectedDisks) > before { - result.SpilledToOtherDiskType = true - } - } - - // Calculate final statistics - result.ServersUsed = len(usedServers) - result.RacksUsed = len(usedRacks) - dcSet := make(map[string]bool) - for _, disk := range result.SelectedDisks { - dcSet[disk.DataCenter] = true - } - result.DCsUsed = len(dcSet) - - return result, nil -} - -// partitionByDiskType splits disks into (matching, fallback) based on the -// preferred disk type. If preferred is empty, everything goes into the -// matching tier and fallback is empty — i.e. existing single-pool behavior. -// -// Empty DiskCandidate.DiskType is treated as HardDriveType ("hdd") to -// mirror weed/storage/types.ToDiskType's normalization, so a -// PreferredDiskType of "hdd" matches disks reporting "" — otherwise EC -// shards from an HDD source would always spill onto disks that happen to -// report their type as "" (HardDriveType). -func partitionByDiskType(disks []*DiskCandidate, preferred string) (matching, fallback []*DiskCandidate) { - if preferred == "" { - return disks, nil - } - pref := normalizeDiskType(preferred) - for _, d := range disks { - if normalizeDiskType(d.DiskType) == pref { - matching = append(matching, d) - } else { - fallback = append(fallback, d) - } - } - return matching, fallback -} - -// normalizeDiskType lower-cases the input and folds "" to "hdd" so the -// HardDriveType sentinel ("") and explicit "hdd"/"HDD" all compare equal. -func normalizeDiskType(t string) string { - t = strings.ToLower(t) - if t == "" { - return "hdd" - } - return t -} - -// selectFromTier runs the three diversity passes against `tier`, mutating -// `result` and the used* maps in place. Passes stop as soon as ShardsNeeded -// is reached. The function is a no-op when the tier is empty or the result -// already has enough shards, so it is safe to call once per tier. -func selectFromTier(tier []*DiskCandidate, result *PlacementResult, - usedDisks, usedServers, usedRacks map[string]bool, - config PlacementRequest) { - - if len(tier) == 0 || len(result.SelectedDisks) >= config.ShardsNeeded { - return - } - - rackToDisks := groupDisksByRack(tier) - - // Pass 1: Select one disk from each rack (maximize rack diversity). - // When this is the fallback tier (preferred tier already populated - // usedRacks), skip those racks so the spillover still spreads onto - // new racks instead of doubling up on ones already picked. - if config.PreferDifferentRacks { - // Sort racks by number of available servers (descending) to prioritize racks with more options - sortedRacks := sortRacksByServerCount(rackToDisks) - for _, rackKey := range sortedRacks { - if len(result.SelectedDisks) >= config.ShardsNeeded { - break - } - if usedRacks[rackKey] { - continue - } - rackDisks := rackToDisks[rackKey] - // Select best disk from this rack, preferring a new server - disk := selectBestDiskFromRack(rackDisks, usedServers, usedDisks, config) - if disk != nil { - addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) - } - } - } - - // Pass 2: Select disks from unused servers in already-used racks - if config.PreferDifferentServers && len(result.SelectedDisks) < config.ShardsNeeded { - for _, rackKey := range getSortedRackKeys(rackToDisks) { - if len(result.SelectedDisks) >= config.ShardsNeeded { - break - } - rackDisks := rackToDisks[rackKey] - for _, disk := range sortDisksByScore(rackDisks) { - if len(result.SelectedDisks) >= config.ShardsNeeded { - break - } - diskKey := getDiskKey(disk) - if usedDisks[diskKey] { - continue - } - // Skip if server already used (we want different servers in this pass) - if usedServers[disk.NodeID] { - continue - } - // Check server limit - if config.MaxShardsPerServer > 0 && result.ShardsPerServer[disk.NodeID] >= config.MaxShardsPerServer { - continue - } - // Check rack limit - if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack { - continue - } - addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) - } - } - } - - // Pass 3: Fill remaining slots from already-used servers (different disks) - // Use round-robin across servers to balance shards evenly - if len(result.SelectedDisks) < config.ShardsNeeded { - // Group remaining disks by server (within this tier) - serverToRemainingDisks := make(map[string][]*DiskCandidate) - for _, disk := range tier { - if !usedDisks[getDiskKey(disk)] { - serverToRemainingDisks[disk.NodeID] = append(serverToRemainingDisks[disk.NodeID], disk) - } - } - - // Sort each server's disks by score - for serverID := range serverToRemainingDisks { - serverToRemainingDisks[serverID] = sortDisksByScore(serverToRemainingDisks[serverID]) - } - - // Round-robin: repeatedly select from the server with the fewest shards - for len(result.SelectedDisks) < config.ShardsNeeded { - // Find server with fewest shards that still has available disks - var bestServer string - minShards := -1 - for serverID, disks := range serverToRemainingDisks { - if len(disks) == 0 { - continue - } - // Check server limit - if config.MaxShardsPerServer > 0 && result.ShardsPerServer[serverID] >= config.MaxShardsPerServer { - continue - } - shardCount := result.ShardsPerServer[serverID] - if minShards == -1 || shardCount < minShards { - minShards = shardCount - bestServer = serverID - } else if shardCount == minShards && serverID < bestServer { - // Tie-break by server name for determinism - bestServer = serverID - } - } - - if bestServer == "" { - // No more servers with available disks - break - } - - // Pop the best disk from this server - disks := serverToRemainingDisks[bestServer] - disk := disks[0] - serverToRemainingDisks[bestServer] = disks[1:] - - // Check rack limit - if config.MaxShardsPerRack > 0 && result.ShardsPerRack[getRackKey(disk)] >= config.MaxShardsPerRack { - continue - } - - addDiskToResult(result, disk, usedDisks, usedServers, usedRacks) - } - } -} - -// filterSuitableDisks filters disks that are suitable for EC placement -func filterSuitableDisks(disks []*DiskCandidate, config PlacementRequest) []*DiskCandidate { - var suitable []*DiskCandidate - for _, disk := range disks { - if disk.FreeSlots <= 0 { - continue - } - if config.MaxTaskLoad > 0 && disk.LoadCount > config.MaxTaskLoad { - continue - } - suitable = append(suitable, disk) - } - return suitable -} - -// groupDisksByRack groups disks by their rack (dc:rack key) -func groupDisksByRack(disks []*DiskCandidate) map[string][]*DiskCandidate { - result := make(map[string][]*DiskCandidate) - for _, disk := range disks { - key := getRackKey(disk) - result[key] = append(result[key], disk) - } - return result -} - -// getRackKey returns the unique key for a rack (dc:rack) -func getRackKey(disk *DiskCandidate) string { - return fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack) -} - -// getDiskKey returns the unique key for a disk (nodeID:diskID) -func getDiskKey(disk *DiskCandidate) string { - return fmt.Sprintf("%s:%d", disk.NodeID, disk.DiskID) -} - -// sortRacksByServerCount returns rack keys sorted by number of servers (ascending) -func sortRacksByServerCount(rackToDisks map[string][]*DiskCandidate) []string { - // Count unique servers per rack - rackServerCount := make(map[string]int) - for rackKey, disks := range rackToDisks { - servers := make(map[string]bool) - for _, disk := range disks { - servers[disk.NodeID] = true - } - rackServerCount[rackKey] = len(servers) - } - - keys := getSortedRackKeys(rackToDisks) - sort.Slice(keys, func(i, j int) bool { - // Sort by server count (descending) to pick from racks with more options first - return rackServerCount[keys[i]] > rackServerCount[keys[j]] - }) - return keys -} - -// getSortedRackKeys returns rack keys in a deterministic order -func getSortedRackKeys(rackToDisks map[string][]*DiskCandidate) []string { - keys := make([]string, 0, len(rackToDisks)) - for k := range rackToDisks { - keys = append(keys, k) - } - sort.Strings(keys) - return keys -} - -// selectBestDiskFromRack selects the best disk from a rack for EC placement -// It prefers servers that haven't been used yet -func selectBestDiskFromRack(disks []*DiskCandidate, usedServers, usedDisks map[string]bool, config PlacementRequest) *DiskCandidate { - var bestDisk *DiskCandidate - bestScore := -1.0 - bestIsFromUnusedServer := false - - for _, disk := range disks { - if usedDisks[getDiskKey(disk)] { - continue - } - isFromUnusedServer := !usedServers[disk.NodeID] - score := calculateDiskScore(disk) - - // Prefer unused servers - if isFromUnusedServer && !bestIsFromUnusedServer { - bestDisk = disk - bestScore = score - bestIsFromUnusedServer = true - } else if isFromUnusedServer == bestIsFromUnusedServer && score > bestScore { - bestDisk = disk - bestScore = score - } - } - - return bestDisk -} - -// sortDisksByScore returns disks sorted by score (best first) -func sortDisksByScore(disks []*DiskCandidate) []*DiskCandidate { - sorted := make([]*DiskCandidate, len(disks)) - copy(sorted, disks) - sort.Slice(sorted, func(i, j int) bool { - return calculateDiskScore(sorted[i]) > calculateDiskScore(sorted[j]) - }) - return sorted -} - -// calculateDiskScore calculates a score for a disk candidate -// Higher score is better -func calculateDiskScore(disk *DiskCandidate) float64 { - score := 0.0 - - // Primary factor: available capacity (lower utilization is better) - if disk.MaxVolumeCount > 0 { - utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount) - score += (1.0 - utilization) * 60.0 // Up to 60 points - } else { - score += 30.0 // Default if no max count - } - - // Secondary factor: fewer shards already on this disk is better - score += float64(10-disk.ShardCount) * 2.0 // Up to 20 points - - // Tertiary factor: lower load is better - score += float64(10 - disk.LoadCount) // Up to 10 points - - return score -} - -// addDiskToResult adds a disk to the result and updates tracking maps -func addDiskToResult(result *PlacementResult, disk *DiskCandidate, - usedDisks, usedServers, usedRacks map[string]bool) { - diskKey := getDiskKey(disk) - rackKey := getRackKey(disk) - - result.SelectedDisks = append(result.SelectedDisks, disk) - usedDisks[diskKey] = true - usedServers[disk.NodeID] = true - usedRacks[rackKey] = true - result.ShardsPerServer[disk.NodeID]++ - result.ShardsPerRack[rackKey]++ - result.ShardsPerDC[disk.DataCenter]++ -} diff --git a/weed/storage/erasure_coding/placement/placement_test.go b/weed/storage/erasure_coding/placement/placement_test.go deleted file mode 100644 index 6062d0ef3..000000000 --- a/weed/storage/erasure_coding/placement/placement_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package placement - -import ( - "strconv" - "testing" -) - -// makeDisk builds a DiskCandidate with sensible defaults; tests override -// only the fields they care about. -func makeDisk(node, rack, diskType string, diskID uint32) *DiskCandidate { - return &DiskCandidate{ - NodeID: node, - DiskID: diskID, - DataCenter: "dc1", - Rack: rack, - DiskType: diskType, - VolumeCount: 0, - MaxVolumeCount: 100, - FreeSlots: 100, - } -} - -func disksByType(disks []*DiskCandidate) map[string]int { - out := map[string]int{} - for _, d := range disks { - out[d.DiskType]++ - } - return out -} - -func newRequest(shards int, preferred string) PlacementRequest { - return PlacementRequest{ - ShardsNeeded: shards, - PreferDifferentServers: true, - PreferDifferentRacks: true, - PreferredDiskType: preferred, - } -} - -// Plenty of SSD disks available: placement should fill entirely from SSD -// when PreferredDiskType="ssd", leaving HDDs untouched and not flagging -// spillover. -func TestSelectDestinations_PrefersMatchingDiskType(t *testing.T) { - var disks []*DiskCandidate - for i := 0; i < 6; i++ { - disks = append(disks, makeDisk("ssd-"+strconv.Itoa(i), "r"+strconv.Itoa(i%3), "ssd", uint32(i))) - } - for i := 0; i < 6; i++ { - disks = append(disks, makeDisk("hdd-"+strconv.Itoa(i), "r"+strconv.Itoa(i%3), "", uint32(i))) - } - - result, err := SelectDestinations(disks, newRequest(4, "ssd")) - if err != nil { - t.Fatalf("SelectDestinations: %v", err) - } - if got := len(result.SelectedDisks); got != 4 { - t.Fatalf("selected %d disks, want 4", got) - } - if counts := disksByType(result.SelectedDisks); counts["ssd"] != 4 { - t.Fatalf("disk-type counts = %v, want ssd=4", counts) - } - if result.SpilledToOtherDiskType { - t.Fatalf("SpilledToOtherDiskType should be false when preferred pool was sufficient") - } -} - -// Only one SSD disk available but 4 shards needed: placement must consume -// the SSD first, then spill to HDD for the remainder, and report spillover. -func TestSelectDestinations_SpillsWhenPreferredScarce(t *testing.T) { - disks := []*DiskCandidate{ - makeDisk("ssd-0", "r0", "ssd", 0), - makeDisk("hdd-0", "r1", "", 0), - makeDisk("hdd-1", "r2", "", 0), - makeDisk("hdd-2", "r3", "", 0), - } - - result, err := SelectDestinations(disks, newRequest(4, "ssd")) - if err != nil { - t.Fatalf("SelectDestinations: %v", err) - } - if got := len(result.SelectedDisks); got != 4 { - t.Fatalf("selected %d disks, want 4", got) - } - counts := disksByType(result.SelectedDisks) - if counts["ssd"] != 1 || counts[""] != 3 { - t.Fatalf("disk-type counts = %v, want ssd=1 hdd=3", counts) - } - if !result.SpilledToOtherDiskType { - t.Fatalf("SpilledToOtherDiskType should be true after falling back to HDD") - } -} - -// PreferredDiskType="hdd" must match disks whose DiskType is "" (the -// HardDriveType sentinel) — otherwise EC encoding of an HDD source would -// always spill onto HDDs that happen to report disk_type="" even though -// the cluster has plenty of matching capacity. -func TestSelectDestinations_PreferredHddMatchesEmptyDiskType(t *testing.T) { - disks := []*DiskCandidate{ - makeDisk("hdd-0", "r0", "", 0), // HardDriveType sentinel - makeDisk("hdd-1", "r1", "", 0), // HardDriveType sentinel - makeDisk("ssd-0", "r2", "ssd", 0), - } - - result, err := SelectDestinations(disks, newRequest(2, "hdd")) - if err != nil { - t.Fatalf("SelectDestinations: %v", err) - } - if got := len(result.SelectedDisks); got != 2 { - t.Fatalf("selected %d disks, want 2", got) - } - // Both selected disks must be HDD-reporting (i.e. DiskType == ""), - // and no spillover should have been required. - for _, d := range result.SelectedDisks { - if d.DiskType != "" { - t.Errorf("selected disk %s has DiskType=%q, want \"\" (HardDriveType)", d.NodeID, d.DiskType) - } - } - if result.SpilledToOtherDiskType { - t.Fatalf("SpilledToOtherDiskType should be false when HDD pool matches preferred=hdd") - } -} - -// Empty PreferredDiskType: pre-#9423 behavior, single pool, no spillover -// flag regardless of disk-type mix. -func TestSelectDestinations_EmptyPreferredDiskTypeKeepsPriorBehavior(t *testing.T) { - disks := []*DiskCandidate{ - makeDisk("ssd-0", "r0", "ssd", 0), - makeDisk("hdd-0", "r1", "", 0), - makeDisk("hdd-1", "r2", "", 0), - makeDisk("ssd-1", "r3", "ssd", 0), - } - - result, err := SelectDestinations(disks, newRequest(3, "")) - if err != nil { - t.Fatalf("SelectDestinations: %v", err) - } - if got := len(result.SelectedDisks); got != 3 { - t.Fatalf("selected %d disks, want 3", got) - } - if result.SpilledToOtherDiskType { - t.Fatalf("SpilledToOtherDiskType should never be set when PreferredDiskType is empty") - } -} diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index dd25129c4..aa3636ce6 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -15,9 +15,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer" - "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/placement" "github.com/seaweedfs/seaweedfs/weed/storage/super_block" - "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/wildcard" "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base" workerutil "github.com/seaweedfs/seaweedfs/weed/worker/tasks/util" @@ -421,273 +419,6 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste return results, hasMore, nil } -type ecDiskState struct { - baseAvailable int64 - reservedVolumes int32 - reservedShardSlots int32 -} - -type ecPlacementPlanner struct { - activeTopology *topology.ActiveTopology - candidates []*placement.DiskCandidate - candidateByKey map[string]*placement.DiskCandidate - diskStates map[string]*ecDiskState - diskTags map[string][]string - preferredTags []string -} - -func newECPlacementPlanner(activeTopology *topology.ActiveTopology, preferredTags []string) *ecPlacementPlanner { - if activeTopology == nil { - return nil - } - - disks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 0) - candidates := diskInfosToCandidates(disks) - tagsByKey := collectDiskTags(disks) - normalizedPreferredTags := util.NormalizeTagList(preferredTags) - if len(candidates) == 0 { - return &ecPlacementPlanner{ - activeTopology: activeTopology, - candidates: candidates, - candidateByKey: map[string]*placement.DiskCandidate{}, - diskStates: map[string]*ecDiskState{}, - diskTags: tagsByKey, - preferredTags: normalizedPreferredTags, - } - } - - candidateByKey := make(map[string]*placement.DiskCandidate, len(candidates)) - diskStates := make(map[string]*ecDiskState, len(candidates)) - for _, candidate := range candidates { - key := ecDiskKey(candidate.NodeID, candidate.DiskID) - candidateByKey[key] = candidate - diskStates[key] = &ecDiskState{ - baseAvailable: int64(candidate.FreeSlots), - } - } - - return &ecPlacementPlanner{ - activeTopology: activeTopology, - candidates: candidates, - candidateByKey: candidateByKey, - diskStates: diskStates, - diskTags: tagsByKey, - preferredTags: normalizedPreferredTags, - } -} - -func (p *ecPlacementPlanner) selectDestinations(sourceRack, sourceDC, sourceDiskType string, shardsNeeded int) ([]*placement.DiskCandidate, error) { - if p == nil || p.activeTopology == nil { - return nil, fmt.Errorf("ec placement planner is not initialized") - } - if shardsNeeded <= 0 { - return nil, fmt.Errorf("invalid shardsNeeded %d", shardsNeeded) - } - - config := placement.PlacementRequest{ - ShardsNeeded: shardsNeeded, - MaxShardsPerServer: 0, - MaxShardsPerRack: 0, - MaxTaskLoad: topology.MaxTaskLoadForECPlacement, - PreferDifferentServers: true, - PreferDifferentRacks: true, - // Bias placement toward disks matching the source volume's disk - // type; placement spills to other types only if the preferred - // pool can't satisfy ShardsNeeded (#9423). - PreferredDiskType: sourceDiskType, - } - - var lastErr error - for _, candidates := range p.buildCandidateSets(shardsNeeded) { - if len(candidates) == 0 { - continue - } - result, err := placement.SelectDestinations(candidates, config) - if err == nil { - if result.SpilledToOtherDiskType { - glog.Warningf("EC placement spilled to disks outside preferred disk type %q to reach %d shards (source rack=%s dc=%s)", - sourceDiskType, shardsNeeded, sourceRack, sourceDC) - } - return result.SelectedDisks, nil - } - lastErr = err - } - if lastErr == nil { - lastErr = fmt.Errorf("no EC placement candidates available") - } - return nil, lastErr -} - -func (p *ecPlacementPlanner) applyTaskReservations(volumeSize int64, sources []topology.TaskSourceSpec, destinations []topology.TaskDestinationSpec) { - if p == nil { - return - } - - touched := make(map[string]bool) - - for _, source := range sources { - impact := p.sourceImpact(source, volumeSize) - p.applyImpact(source.ServerID, source.DiskID, impact) - p.bumpShardCount(source.ServerID, source.DiskID, impact.ShardSlots) - key := ecDiskKey(source.ServerID, source.DiskID) - if !touched[key] { - p.bumpLoad(source.ServerID, source.DiskID) - touched[key] = true - } - } - - for _, dest := range destinations { - impact := p.destinationImpact(dest, volumeSize) - p.applyImpact(dest.ServerID, dest.DiskID, impact) - p.bumpShardCount(dest.ServerID, dest.DiskID, impact.ShardSlots) - key := ecDiskKey(dest.ServerID, dest.DiskID) - if !touched[key] { - p.bumpLoad(dest.ServerID, dest.DiskID) - touched[key] = true - } - } -} - -func (p *ecPlacementPlanner) sourceImpact(source topology.TaskSourceSpec, volumeSize int64) topology.StorageSlotChange { - if source.StorageImpact != nil { - return *source.StorageImpact - } - if source.CleanupType == topology.CleanupECShards { - return topology.CalculateECShardCleanupImpact(volumeSize) - } - impact, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, volumeSize) - return impact -} - -func (p *ecPlacementPlanner) destinationImpact(dest topology.TaskDestinationSpec, volumeSize int64) topology.StorageSlotChange { - if dest.StorageImpact != nil { - return *dest.StorageImpact - } - _, impact := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, volumeSize) - return impact -} - -func (p *ecPlacementPlanner) applyImpact(nodeID string, diskID uint32, impact topology.StorageSlotChange) { - if impact.IsZero() { - return - } - key := ecDiskKey(nodeID, diskID) - state, ok := p.diskStates[key] - if !ok { - return - } - - state.reservedVolumes += impact.VolumeSlots - state.reservedShardSlots += impact.ShardSlots - - available := state.baseAvailable - int64(state.reservedVolumes) - int64(state.reservedShardSlots)/int64(topology.ShardsPerVolumeSlot) - if available < 0 { - available = 0 - } - - if candidate, ok := p.candidateByKey[key]; ok { - candidate.FreeSlots = int(available) - candidate.VolumeCount = candidate.MaxVolumeCount - available - } -} - -func (p *ecPlacementPlanner) bumpLoad(nodeID string, diskID uint32) { - key := ecDiskKey(nodeID, diskID) - if candidate, ok := p.candidateByKey[key]; ok { - candidate.LoadCount++ - } -} - -func (p *ecPlacementPlanner) bumpShardCount(nodeID string, diskID uint32, delta int32) { - if delta == 0 { - return - } - key := ecDiskKey(nodeID, diskID) - if candidate, ok := p.candidateByKey[key]; ok { - candidate.ShardCount += int(delta) - if candidate.ShardCount < 0 { - candidate.ShardCount = 0 - } - } -} - -func ecDiskKey(nodeID string, diskID uint32) string { - return fmt.Sprintf("%s:%d", nodeID, diskID) -} - -func collectDiskTags(disks []*topology.DiskInfo) map[string][]string { - tagMap := make(map[string][]string, len(disks)) - for _, disk := range disks { - if disk == nil || disk.DiskInfo == nil { - continue - } - key := ecDiskKey(disk.NodeID, disk.DiskID) - tags := util.NormalizeTagList(disk.DiskInfo.Tags) - if len(tags) > 0 { - tagMap[key] = tags - } - } - return tagMap -} - -func diskHasTag(tags []string, tag string) bool { - if tag == "" || len(tags) == 0 { - return false - } - for _, candidate := range tags { - if candidate == tag { - return true - } - } - return false -} - -// buildCandidateSets builds tiered candidate sets for preferred-tag prioritized placement. -// For a planner with preferredTags, it accumulates disks matching each tag in order into -// progressively larger tiers. It emits a candidate set once a tier reaches shardsNeeded, -// then continues accumulating for subsequent tags. Finally, it falls back to the full -// p.candidates set if preferred-tag tiers are insufficient. This ensures tagged disks -// are selected first before falling back to all available candidates. -func (p *ecPlacementPlanner) buildCandidateSets(shardsNeeded int) [][]*placement.DiskCandidate { - if p == nil { - return nil - } - if len(p.preferredTags) == 0 { - return [][]*placement.DiskCandidate{p.candidates} - } - selected := make(map[string]bool, len(p.candidates)) - var tier []*placement.DiskCandidate - var candidateSets [][]*placement.DiskCandidate - for _, tag := range p.preferredTags { - for _, candidate := range p.candidates { - key := ecDiskKey(candidate.NodeID, candidate.DiskID) - if selected[key] { - continue - } - if diskHasTag(p.diskTags[key], tag) { - selected[key] = true - tier = append(tier, candidate) - } - } - if shardsNeeded > 0 && len(tier) >= shardsNeeded { - candidateSets = append(candidateSets, append([]*placement.DiskCandidate(nil), tier...)) - } - } - // Defensive check: selectDestinations always ensures shardsNeeded > 0 before calling - // buildCandidateSets, but this branch handles direct callers and edge cases. - if shardsNeeded <= 0 && len(tier) > 0 { - candidateSets = append(candidateSets, append([]*placement.DiskCandidate(nil), tier...)) - } - if len(tier) < len(p.candidates) { - candidateSets = append(candidateSets, p.candidates) - } else if len(candidateSets) == 0 { - candidateSets = append(candidateSets, p.candidates) - } - return candidateSets -} - -// planECDestinations plans the destinations for erasure coding operation. -// dataShards/parityShards are parameters so callers can drive non-10+4 ratios. // countTopologyNodes counts volume-server nodes in the active topology, used by // the min-node safety gate. func countTopologyNodes(at *topology.ActiveTopology) int { @@ -827,23 +558,6 @@ func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthM }, shardsPerPlan, nil } -// distributeECShards assigns shard ids 0..totalShards-1 across numTargets -// targets round-robin, so each target holds either floor or ceil of -// totalShards/numTargets shards. When numTargets < totalShards this packs -// several shards onto a target; planECDestinations guarantees numTargets is at -// least ceil(totalShards/parityShards), so no target exceeds parityShards shards. -func distributeECShards(totalShards, numTargets int) [][]uint32 { - targetShards := make([][]uint32, numTargets) - for i := range targetShards { - targetShards[i] = make([]uint32, 0) - } - for shardId := 0; shardId < totalShards; shardId++ { - targetIndex := shardId % numTargets - targetShards[targetIndex] = append(targetShards[targetIndex], uint32(shardId)) - } - return targetShards -} - // createECTargets builds TaskTargets from the per-disk plans and the shard ids // ecbalancer.Place assigned to each (shardsPerPlan is parallel to multiPlan.Plans). func createECTargets(multiPlan *topology.MultiDestinationPlan, shardsPerPlan [][]uint32) []*worker_pb.TaskTarget { @@ -913,68 +627,6 @@ func createECTaskParams(dataShards, parityShards int, sourceDiskType string) *wo } } -// diskInfosToCandidates converts topology.DiskInfo slice to placement.DiskCandidate slice -func diskInfosToCandidates(disks []*topology.DiskInfo) []*placement.DiskCandidate { - var candidates []*placement.DiskCandidate - for _, disk := range disks { - if disk.DiskInfo == nil { - continue - } - - // Calculate free slots (using default max if not set) - freeSlots := int(disk.DiskInfo.MaxVolumeCount - disk.DiskInfo.VolumeCount) - if freeSlots < 0 { - freeSlots = 0 - } - - // Calculate EC shard count for this specific disk - // EcShardInfos contains all shards, so we need to filter by DiskId and sum actual shard counts - ecShardCount := 0 - if disk.DiskInfo.EcShardInfos != nil { - for _, shardInfo := range disk.DiskInfo.EcShardInfos { - if shardInfo.DiskId == disk.DiskID { - ecShardCount += erasure_coding.GetShardCount(shardInfo) - } - } - } - - candidates = append(candidates, &placement.DiskCandidate{ - NodeID: disk.NodeID, - DiskID: disk.DiskID, - DataCenter: disk.DataCenter, - Rack: disk.Rack, - DiskType: disk.DiskType, - VolumeCount: disk.DiskInfo.VolumeCount, - MaxVolumeCount: disk.DiskInfo.MaxVolumeCount, - ShardCount: ecShardCount, - FreeSlots: freeSlots, - LoadCount: disk.LoadCount, - }) - } - return candidates -} - -// calculateECScoreCandidate calculates placement score for EC operations. -// Used for logging and plan metadata. -func calculateECScoreCandidate(disk *placement.DiskCandidate, sourceRack, sourceDC string) float64 { - if disk == nil { - return 0.0 - } - - score := 0.0 - - // Prefer disks with available capacity (primary factor) - if disk.MaxVolumeCount > 0 { - utilization := float64(disk.VolumeCount) / float64(disk.MaxVolumeCount) - score += (1.0 - utilization) * 60.0 // Up to 60 points for available capacity - } - - // Consider current load (secondary factor) - score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load - - return score -} - // findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume // Uses O(1) indexed lookup for optimal performance on large clusters. func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica { diff --git a/weed/worker/tasks/erasure_coding/detection_test.go b/weed/worker/tasks/erasure_coding/detection_test.go index 118f498a7..5ae7809b2 100644 --- a/weed/worker/tasks/erasure_coding/detection_test.go +++ b/weed/worker/tasks/erasure_coding/detection_test.go @@ -14,37 +14,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestECPlacementPlannerApplyReservations(t *testing.T) { - activeTopology := buildActiveTopology(t, 1, []string{"hdd"}, 10, 0) - - planner := newECPlacementPlanner(activeTopology, nil) - require.NotNil(t, planner) - - key := ecDiskKey("10.0.0.1:8080", 0) - candidate, ok := planner.candidateByKey[key] - require.True(t, ok) - assert.Equal(t, 10, candidate.FreeSlots) - assert.Equal(t, 0, candidate.ShardCount) - assert.Equal(t, 0, candidate.LoadCount) - - shardImpact := topology.CalculateECShardStorageImpact(1, 1) - destinations := make([]topology.TaskDestinationSpec, 10) - for i := 0; i < 10; i++ { - destinations[i] = topology.TaskDestinationSpec{ - ServerID: "10.0.0.1:8080", - DiskID: 0, - StorageImpact: &shardImpact, - } - } - - planner.applyTaskReservations(1024, nil, destinations) - - candidate = planner.candidateByKey[key] - assert.Equal(t, 9, candidate.FreeSlots, "10 shard slots should reduce available volume slots by 1") - assert.Equal(t, 10, candidate.ShardCount) - assert.Equal(t, 1, candidate.LoadCount, "load should only be incremented once per disk") -} - func TestPlanECDestinationsUsesPlanner(t *testing.T) { activeTopology := buildActiveTopology(t, 7, []string{"hdd", "ssd"}, 100, 0) @@ -61,70 +30,6 @@ func TestPlanECDestinationsUsesPlanner(t *testing.T) { requireAllShardsPlaced(t, plan, shardsPerPlan) } -func TestECPlacementPlannerPrefersTaggedDisks(t *testing.T) { - activeTopology := buildActiveTopology(t, 3, []string{"hdd"}, 10, 0) - topo := activeTopology.GetTopologyInfo() - for _, dc := range topo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for k, node := range rack.DataNodeInfos { - for diskType := range node.DiskInfos { - if k < 2 { - node.DiskInfos[diskType].Tags = []string{"fast"} - } else { - node.DiskInfos[diskType].Tags = []string{"slow"} - } - } - } - } - } - require.NoError(t, activeTopology.UpdateTopology(topo)) - - planner := newECPlacementPlanner(activeTopology, []string{"fast"}) - require.NotNil(t, planner) - - selected, err := planner.selectDestinations("", "", "", 2) - require.NoError(t, err) - require.Len(t, selected, 2) - - for _, candidate := range selected { - key := ecDiskKey(candidate.NodeID, candidate.DiskID) - assert.True(t, diskHasTag(planner.diskTags[key], "fast")) - } -} - -func TestECPlacementPlannerFallsBackWhenTagsInsufficient(t *testing.T) { - activeTopology := buildActiveTopology(t, 3, []string{"hdd"}, 10, 0) - topo := activeTopology.GetTopologyInfo() - for _, dc := range topo.DataCenterInfos { - for _, rack := range dc.RackInfos { - for i, node := range rack.DataNodeInfos { - for diskType := range node.DiskInfos { - if i == 0 { - node.DiskInfos[diskType].Tags = []string{"fast"} - } - } - } - } - } - require.NoError(t, activeTopology.UpdateTopology(topo)) - - planner := newECPlacementPlanner(activeTopology, []string{"fast"}) - require.NotNil(t, planner) - - selected, err := planner.selectDestinations("", "", "", 3) - require.NoError(t, err) - require.Len(t, selected, 3) - - taggedCount := 0 - for _, candidate := range selected { - key := ecDiskKey(candidate.NodeID, candidate.DiskID) - if diskHasTag(planner.diskTags[key], "fast") { - taggedCount++ - } - } - assert.Less(t, taggedCount, len(selected)) -} - // TestDetectionSkipsWhenECShardsAlreadyExist guards against issue #9448: a // regular replica that survived a previous successful EC encode (source // delete didn't clean it up for some reason) gets re-proposed for encoding,