diff --git a/weed/worker/tasks/erasure_coding/detection.go b/weed/worker/tasks/erasure_coding/detection.go index aa3636ce6..666a2d7ac 100644 --- a/weed/worker/tasks/erasure_coding/detection.go +++ b/weed/worker/tasks/erasure_coding/detection.go @@ -92,6 +92,20 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste } sort.Slice(groupKeys, func(i, j int) bool { return groupKeys[i] < groupKeys[j] }) + // Build the EC placement snapshot once per detection cycle. planECDestinations + // reserves the shards it assigns directly into it, so volumes planned later in + // this cycle see the reduced capacity. Rebuilding it per volume re-walked the + // whole topology (and ActiveTopology's growing pending-task set) every time, + // which is O(volumes × topology) and times out on large clusters. The node + // address map is precomputed for the same reason (ResolveServerAddress rebuilds + // the full node map on every call). + var ecSnapshot *ecbalancer.Topology + var nodeAddresses map[string]string + if clusterInfo != nil && clusterInfo.ActiveTopology != nil { + ecSnapshot = ecbalancer.FromActiveTopology(clusterInfo.ActiveTopology, erasure_coding.DataShardsCount) + nodeAddresses = buildNodeAddressMap(clusterInfo.ActiveTopology) + } + // Iterate over groups to check criteria and creation tasks for idx, volumeID := range groupKeys { if ctx != nil { @@ -231,7 +245,7 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID) dataShards := erasure_coding.DataShardsCount parityShards := erasure_coding.ParityShardsCount - multiPlan, shardsPerPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig, replicaPlacement, dataShards, parityShards) + multiPlan, shardsPerPlan, err := planECDestinations(ecSnapshot, nodeAddresses, metric, ecConfig, replicaPlacement, dataShards, parityShards) if err != nil { glog.V(2).Infof("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err) consecutivePlanningFailures++ @@ -438,17 +452,38 @@ func countTopologyNodes(at *topology.ActiveTopology) int { return n } +// buildNodeAddressMap resolves every node's server address once per detection +// cycle. ResolveServerAddress rebuilds the full node map on each call, so +// resolving per shard destination is O(destinations × topology); callers build +// this map once and look up addresses from it. +func buildNodeAddressMap(at *topology.ActiveTopology) map[string]string { + if at == nil { + return nil + } + allNodes := at.GetAllNodes() + m := make(map[string]string, len(allNodes)) + for id, n := range allNodes { + m[id] = string(pb.NewServerAddressFromDataNode(n)) + } + return m +} + // planECDestinations places all shards of the volume via the shared ecbalancer // policy and returns the per-disk destination plans plus, parallel to them, the // shard ids ecbalancer.Place assigned to each disk (so createECTargets and the // capacity reservations use the real assignment, not a round-robin guess). // +// snap is the cycle-wide placement snapshot, built once by the caller and reused +// across volumes: Place reserves the shards it assigns into it, so later volumes +// in the same detection cycle see the reduced capacity. Rebuilding it per volume +// is O(volumes × topology) and times out on large clusters. +// // Encode is lenient (PlaceDurabilityFirst): it relaxes caps/anti-affinity/RP as // needed rather than fail, and prefers the source disk type but spills if that // type can't hold every shard. rp is the resolved replica placement (may be nil). -func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config, rp *super_block.ReplicaPlacement, dataShards, parityShards int) (*topology.MultiDestinationPlan, [][]uint32, error) { - if at == nil { - return nil, nil, fmt.Errorf("active topology not available for EC placement") +func planECDestinations(snap *ecbalancer.Topology, nodeAddresses map[string]string, metric *types.VolumeHealthMetrics, ecConfig *Config, rp *super_block.ReplicaPlacement, dataShards, parityShards int) (*topology.MultiDestinationPlan, [][]uint32, error) { + if snap == nil { + return nil, nil, fmt.Errorf("EC placement snapshot not available") } if dataShards <= 0 || parityShards <= 0 { return nil, nil, fmt.Errorf("invalid EC ratio: dataShards=%d parityShards=%d", dataShards, parityShards) @@ -459,7 +494,6 @@ func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthM minTotalDisks := (totalShards + parityShards - 1) / parityShards expectedShardSize := uint64(metric.Size) / uint64(dataShards) - snap := ecbalancer.FromActiveTopology(at, dataShards) // Encode is greenfield: any EC shards already present for this volume are stale // leftovers from a prior failed attempt, which the task deletes // (cleanupStaleEcShards) before distributing the new shards. Release them so they @@ -530,9 +564,9 @@ func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthM dcCount := make(map[string]int) for _, key := range order { g := groups[key] - targetAddress, err := workerutil.ResolveServerAddress(g.node, at) - if err != nil { - return nil, nil, fmt.Errorf("failed to resolve address for target server %s: %v", g.node, err) + targetAddress, ok := nodeAddresses[g.node] + if !ok { + return nil, nil, fmt.Errorf("failed to resolve address for target server %s", g.node) } plans = append(plans, &topology.DestinationPlan{ TargetNode: g.node, diff --git a/weed/worker/tasks/erasure_coding/detection_disk_type_test.go b/weed/worker/tasks/erasure_coding/detection_disk_type_test.go index 45b79bae6..e1fa35da9 100644 --- a/weed/worker/tasks/erasure_coding/detection_disk_type_test.go +++ b/weed/worker/tasks/erasure_coding/detection_disk_type_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer" "github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -27,7 +28,9 @@ func TestPlanECDestinationsPrefersSourceDiskType_FullCluster(t *testing.T) { DiskType: "ssd", // the property being plumbed end-to-end } - plan, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, _, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.Len(t, plan.Plans, erasure_coding.TotalShardsCount) @@ -72,7 +75,9 @@ func TestPlanECDestinationsSpillsToOtherDiskType_WhenPreferredScarce(t *testing. DiskType: "ssd", } - plan, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, _, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.Len(t, plan.Plans, erasure_coding.TotalShardsCount) diff --git a/weed/worker/tasks/erasure_coding/detection_test.go b/weed/worker/tasks/erasure_coding/detection_test.go index 5ae7809b2..18de939df 100644 --- a/weed/worker/tasks/erasure_coding/detection_test.go +++ b/weed/worker/tasks/erasure_coding/detection_test.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/admin/topology" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding" + "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer" "github.com/seaweedfs/seaweedfs/weed/worker/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -24,7 +25,9 @@ func TestPlanECDestinationsUsesPlanner(t *testing.T) { Collection: "", } - plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, shardsPerPlan, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.NotNil(t, plan) requireAllShardsPlaced(t, plan, shardsPerPlan) @@ -273,7 +276,9 @@ func TestPlanECDestinationsSpreadsAcrossPhysicalDisks(t *testing.T) { Collection: "", } - plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, shardsPerPlan, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.NotNil(t, plan) requireAllShardsPlaced(t, plan, shardsPerPlan) @@ -289,7 +294,9 @@ func TestPlanECDestinationsFailsWithInsufficientCapacity(t *testing.T) { Collection: "", } - _, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + _, _, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.Error(t, err) } @@ -338,7 +345,9 @@ func TestPlanECDestinationsPacksWhenFewerDisksThanShards(t *testing.T) { Collection: "", } - plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) + snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount) + nodeAddresses := buildNodeAddressMap(activeTopology) + plan, shardsPerPlan, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) require.NoError(t, err) require.NotNil(t, plan) // Packed onto the available disks: more than one shard per disk but never more