From 21f2699624ae7e474135800549eaaa144747ef9b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 22 May 2026 22:20:39 -0700 Subject: [PATCH] EC detection: build placement snapshot once per cycle (fix large-topology timeout) (#9625) * EC detection: build placement snapshot once per cycle, not per volume planECDestinations rebuilt the full ecbalancer snapshot (FromActiveTopology) for every eligible volume, and resolved each shard destination's address via ResolveServerAddress, which rebuilds the whole node map on every call. Both are O(volumes x topology) and made detection time out on large clusters (TestErasureCodingDetectionLargeTopology: 300k volumes hit the 2-minute deadline). Build the snapshot and the node-address map once per detection cycle and pass them in. planECDestinations now reserves the shards it assigns directly into the shared snapshot, so volumes planned later in the same cycle still see the reduced capacity (previously this was observed by rebuilding from ActiveTopology's pending tasks). Large-topology detection drops from a 120s timeout to ~3.5s. * Update weed/worker/tasks/erasure_coding/detection.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- weed/worker/tasks/erasure_coding/detection.go | 50 ++++++++++++++++--- .../detection_disk_type_test.go | 9 +++- .../tasks/erasure_coding/detection_test.go | 17 +++++-- 3 files changed, 62 insertions(+), 14 deletions(-) diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index aa3636ce6..666a2d7ac 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -92,6 +92,20 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste } sort.Slice(groupKeys, func(i, j int) bool { return groupKeys[i] < groupKeys[j] }) + // Build the EC placement snapshot once per detection cycle. planECDestinations + // reserves the shards it assigns directly into it, so volumes planned later in + // this cycle see the reduced capacity. Rebuilding it per volume re-walked the + // whole topology (and ActiveTopology's growing pending-task set) every time, + // which is O(volumes × topology) and times out on large clusters. The node + // address map is precomputed for the same reason (ResolveServerAddress rebuilds + // the full node map on every call). + var ecSnapshot *ecbalancer.Topology + var nodeAddresses map[string]string + if clusterInfo != nil && clusterInfo.ActiveTopology != nil { + ecSnapshot = ecbalancer.FromActiveTopology(clusterInfo.ActiveTopology, erasure_coding.DataShardsCount) + nodeAddresses = buildNodeAddressMap(clusterInfo.ActiveTopology) + } + // Iterate over groups to check criteria and creation tasks for idx, volumeID := range groupKeys { if ctx != nil { @@ -231,7 +245,7 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID) dataShards := erasure_coding.DataShardsCount parityShards := erasure_coding.ParityShardsCount - multiPlan, shardsPerPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig, replicaPlacement, dataShards, parityShards) + multiPlan, shardsPerPlan, err := planECDestinations(ecSnapshot, nodeAddresses, metric, ecConfig, replicaPlacement, dataShards, parityShards) if err != nil { glog.V(2).Infof("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err) consecutivePlanningFailures++ @@ -438,17 +452,38 @@ func countTopologyNodes(at *topology.ActiveTopology) int { return n } +// buildNodeAddressMap resolves every node's server address once per detection +// cycle. ResolveServerAddress rebuilds the full node map on each call, so +// resolving per shard destination is O(destinations × topology); callers build +// this map once and look up addresses from it. +func buildNodeAddressMap(at *topology.ActiveTopology) map[string]string { + if at == nil { + return nil + } + allNodes := at.GetAllNodes() + m := make(map[string]string, len(allNodes)) + for id, n := range allNodes { + m[id] = string(pb.NewServerAddressFromDataNode(n)) + } + return m +} + // planECDestinations places all shards of the volume via the shared ecbalancer // policy and returns the per-disk destination plans plus, parallel to them, the // shard ids ecbalancer.Place assigned to each disk (so createECTargets and the // capacity reservations use the real assignment, not a round-robin guess). // +// snap is the cycle-wide placement snapshot, built once by the caller and reused +// across volumes: Place reserves the shards it assigns into it, so later volumes +// in the same detection cycle see the reduced capacity. Rebuilding it per volume +// is O(volumes × topology) and times out on large clusters. +// // Encode is lenient (PlaceDurabilityFirst): it relaxes caps/anti-affinity/RP as // needed rather than fail, and prefers the source disk type but spills if that // type can't hold every shard. rp is the resolved replica placement (may be nil). -func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config, rp *super_block.ReplicaPlacement, dataShards, parityShards int) (*topology.MultiDestinationPlan, [][]uint32, error) { - if at == nil { - return nil, nil, fmt.Errorf("active topology not available for EC placement") +func planECDestinations(snap *ecbalancer.Topology, nodeAddresses map[string]string, metric *types.VolumeHealthMetrics, ecConfig *Config, rp *super_block.ReplicaPlacement, dataShards, parityShards int) (*topology.MultiDestinationPlan, [][]uint32, error) { + if snap == nil { + return nil, nil, fmt.Errorf("EC placement snapshot not available") } if dataShards <= 0 || parityShards <= 0 { return nil, nil, fmt.Errorf("invalid EC ratio: dataShards=%d parityShards=%d", dataShards, parityShards) @@ -459,7 +494,6 @@ func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthM minTotalDisks := (totalShards + parityShards - 1) / parityShards expectedShardSize := uint64(metric.Size) / uint64(dataShards) - snap := ecbalancer.FromActiveTopology(at, dataShards) // Encode is greenfield: any EC shards already present for this volume are stale // leftovers from a prior failed attempt, which the task deletes // (cleanupStaleEcShards) before distributing the new shards. Release them so they @@ -530,9 +564,9 @@ func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthM dcCount := make(map[string]int) for _, key := range order { g := groups[key] - targetAddress, err := workerutil.ResolveServerAddress(g.node, at) - if err != nil { - return nil, nil, fmt.Errorf("failed to resolve address for target server %s: %v", g.node, err) + targetAddress, ok := nodeAddresses[g.node] + if !ok { + return nil, nil, fmt.Errorf("failed to resolve address for target server %s", g.node) } plans = append(plans, &topology.DestinationPlan{ TargetNode: g.node, diff --git a/weed/worker/tasks/erasure_coding/detection_disk_type_test.go b/weed/worker/tasks/erasure_coding/detection_disk_type_test.go index 45b79bae6..e1fa35da9 100644 --- a/weed/worker/tasks/erasure_coding/detection_disk_type_test.go +++ b/weed/worker/tasks/erasure_coding/detection_disk_type_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer" "github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,7 +28,9 @@ func TestPlanECDestinationsPrefersSourceDiskType_FullCluster(t *testing.T) { DiskType: "ssd", // the property being plumbed end-to-end } - plan, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, _, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.Len(t, plan.Plans, erasure_coding.TotalShardsCount) @@ -72,7 +75,9 @@ func TestPlanECDestinationsSpillsToOtherDiskType_WhenPreferredScarce(t *testing. DiskType: "ssd", } - plan, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, _, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.Len(t, plan.Plans, erasure_coding.TotalShardsCount) diff --git a/weed/worker/tasks/erasure_coding/detection_test.go b/weed/worker/tasks/erasure_coding/detection_test.go index 5ae7809b2..18de939df 100644 --- a/weed/worker/tasks/erasure_coding/detection_test.go +++ b/weed/worker/tasks/erasure_coding/detection_test.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer" "github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -24,7 +25,9 @@ func TestPlanECDestinationsUsesPlanner(t *testing.T) { Collection: "", } - plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, shardsPerPlan, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.NotNil(t, plan) requireAllShardsPlaced(t, plan, shardsPerPlan) @@ -273,7 +276,9 @@ func TestPlanECDestinationsSpreadsAcrossPhysicalDisks(t *testing.T) { Collection: "", } - plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, shardsPerPlan, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.NotNil(t, plan) requireAllShardsPlaced(t, plan, shardsPerPlan) @@ -289,7 +294,9 @@ func TestPlanECDestinationsFailsWithInsufficientCapacity(t *testing.T) { Collection: "", } - _, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + _, _, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.Error(t, err) } @@ -338,7 +345,9 @@ func TestPlanECDestinationsPacksWhenFewerDisksThanShards(t *testing.T) { Collection: "", } - plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, shardsPerPlan, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.NotNil(t, plan) // Packed onto the available disks: more than one shard per disk but never more