EC detection: build placement snapshot once per cycle (fix large-topology timeout) (#9625)

* EC detection: build placement snapshot once per cycle, not per volume

planECDestinations rebuilt the full ecbalancer snapshot (FromActiveTopology) for
every eligible volume, and resolved each shard destination's address via
ResolveServerAddress, which rebuilds the whole node map on every call. Both are
O(volumes x topology) and made detection time out on large clusters
(TestErasureCodingDetectionLargeTopology: 300k volumes hit the 2-minute
deadline).

Build the snapshot and the node-address map once per detection cycle and pass
them in. planECDestinations now reserves the shards it assigns directly into the
shared snapshot, so volumes planned later in the same cycle still see the reduced
capacity (previously this was observed by rebuilding from ActiveTopology's
pending tasks). Large-topology detection drops from a 120s timeout to ~3.5s.

* Update weed/worker/tasks/erasure_coding/detection.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Chris Lu
2026-05-22 22:20:39 -07:00
committed by GitHub
parent d1665750e1
commit 21f2699624
3 changed files with 62 additions and 14 deletions

View File

@@ -92,6 +92,20 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
}
sort.Slice(groupKeys, func(i, j int) bool { return groupKeys[i] < groupKeys[j] })
// Build the EC placement snapshot once per detection cycle. planECDestinations
// reserves the shards it assigns directly into it, so volumes planned later in
// this cycle see the reduced capacity. Rebuilding it per volume re-walked the
// whole topology (and ActiveTopology's growing pending-task set) every time,
// which is O(volumes × topology) and times out on large clusters. The node
// address map is precomputed for the same reason (ResolveServerAddress rebuilds
// the full node map on every call).
var ecSnapshot *ecbalancer.Topology
var nodeAddresses map[string]string
if clusterInfo != nil && clusterInfo.ActiveTopology != nil {
ecSnapshot = ecbalancer.FromActiveTopology(clusterInfo.ActiveTopology, erasure_coding.DataShardsCount)
nodeAddresses = buildNodeAddressMap(clusterInfo.ActiveTopology)
}
// Iterate over groups to check criteria and creation tasks
for idx, volumeID := range groupKeys {
if ctx != nil {
@@ -231,7 +245,7 @@ func Detection(ctx context.Context, metrics []*types.VolumeHealthMetrics, cluste
glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID)
dataShards := erasure_coding.DataShardsCount
parityShards := erasure_coding.ParityShardsCount
multiPlan, shardsPerPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig, replicaPlacement, dataShards, parityShards)
multiPlan, shardsPerPlan, err := planECDestinations(ecSnapshot, nodeAddresses, metric, ecConfig, replicaPlacement, dataShards, parityShards)
if err != nil {
glog.V(2).Infof("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
consecutivePlanningFailures++
@@ -438,17 +452,38 @@ func countTopologyNodes(at *topology.ActiveTopology) int {
return n
}
// buildNodeAddressMap resolves every node's server address once per detection
// cycle. ResolveServerAddress rebuilds the full node map on each call, so
// resolving per shard destination is O(destinations × topology); callers build
// this map once and look up addresses from it.
func buildNodeAddressMap(at *topology.ActiveTopology) map[string]string {
if at == nil {
return nil
}
allNodes := at.GetAllNodes()
m := make(map[string]string, len(allNodes))
for id, n := range allNodes {
m[id] = string(pb.NewServerAddressFromDataNode(n))
}
return m
}
// planECDestinations places all shards of the volume via the shared ecbalancer
// policy and returns the per-disk destination plans plus, parallel to them, the
// shard ids ecbalancer.Place assigned to each disk (so createECTargets and the
// capacity reservations use the real assignment, not a round-robin guess).
//
// snap is the cycle-wide placement snapshot, built once by the caller and reused
// across volumes: Place reserves the shards it assigns into it, so later volumes
// in the same detection cycle see the reduced capacity. Rebuilding it per volume
// is O(volumes × topology) and times out on large clusters.
//
// Encode is lenient (PlaceDurabilityFirst): it relaxes caps/anti-affinity/RP as
// needed rather than fail, and prefers the source disk type but spills if that
// type can't hold every shard. rp is the resolved replica placement (may be nil).
func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config, rp *super_block.ReplicaPlacement, dataShards, parityShards int) (*topology.MultiDestinationPlan, [][]uint32, error) {
if at == nil {
return nil, nil, fmt.Errorf("active topology not available for EC placement")
func planECDestinations(snap *ecbalancer.Topology, nodeAddresses map[string]string, metric *types.VolumeHealthMetrics, ecConfig *Config, rp *super_block.ReplicaPlacement, dataShards, parityShards int) (*topology.MultiDestinationPlan, [][]uint32, error) {
if snap == nil {
return nil, nil, fmt.Errorf("EC placement snapshot not available")
}
if dataShards <= 0 || parityShards <= 0 {
return nil, nil, fmt.Errorf("invalid EC ratio: dataShards=%d parityShards=%d", dataShards, parityShards)
@@ -459,7 +494,6 @@ func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthM
minTotalDisks := (totalShards + parityShards - 1) / parityShards
expectedShardSize := uint64(metric.Size) / uint64(dataShards)
snap := ecbalancer.FromActiveTopology(at, dataShards)
// Encode is greenfield: any EC shards already present for this volume are stale
// leftovers from a prior failed attempt, which the task deletes
// (cleanupStaleEcShards) before distributing the new shards. Release them so they
@@ -530,9 +564,9 @@ func planECDestinations(at *topology.ActiveTopology, metric *types.VolumeHealthM
dcCount := make(map[string]int)
for _, key := range order {
g := groups[key]
targetAddress, err := workerutil.ResolveServerAddress(g.node, at)
if err != nil {
return nil, nil, fmt.Errorf("failed to resolve address for target server %s: %v", g.node, err)
targetAddress, ok := nodeAddresses[g.node]
if !ok {
return nil, nil, fmt.Errorf("failed to resolve address for target server %s", g.node)
}
plans = append(plans, &topology.DestinationPlan{
TargetNode: g.node,

View File

@@ -4,6 +4,7 @@ import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -27,7 +28,9 @@ func TestPlanECDestinationsPrefersSourceDiskType_FullCluster(t *testing.T) {
DiskType: "ssd", // the property being plumbed end-to-end
}
plan, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount)
nodeAddresses := buildNodeAddressMap(activeTopology)
plan, _, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.Len(t, plan.Plans, erasure_coding.TotalShardsCount)
@@ -72,7 +75,9 @@ func TestPlanECDestinationsSpillsToOtherDiskType_WhenPreferredScarce(t *testing.
DiskType: "ssd",
}
plan, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount)
nodeAddresses := buildNodeAddressMap(activeTopology)
plan, _, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.Len(t, plan.Plans, erasure_coding.TotalShardsCount)

View File

@@ -9,6 +9,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding/ecbalancer"
"github.com/seaweedfs/seaweedfs/weed/worker/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -24,7 +25,9 @@ func TestPlanECDestinationsUsesPlanner(t *testing.T) {
Collection: "",
}
plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount)
nodeAddresses := buildNodeAddressMap(activeTopology)
plan, shardsPerPlan, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.NotNil(t, plan)
requireAllShardsPlaced(t, plan, shardsPerPlan)
@@ -273,7 +276,9 @@ func TestPlanECDestinationsSpreadsAcrossPhysicalDisks(t *testing.T) {
Collection: "",
}
plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount)
nodeAddresses := buildNodeAddressMap(activeTopology)
plan, shardsPerPlan, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.NotNil(t, plan)
requireAllShardsPlaced(t, plan, shardsPerPlan)
@@ -289,7 +294,9 @@ func TestPlanECDestinationsFailsWithInsufficientCapacity(t *testing.T) {
Collection: "",
}
_, _, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount)
nodeAddresses := buildNodeAddressMap(activeTopology)
_, _, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.Error(t, err)
}
@@ -338,7 +345,9 @@ func TestPlanECDestinationsPacksWhenFewerDisksThanShards(t *testing.T) {
Collection: "",
}
plan, shardsPerPlan, err := planECDestinations(activeTopology, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
snap := ecbalancer.FromActiveTopology(activeTopology, erasure_coding.DataShardsCount)
nodeAddresses := buildNodeAddressMap(activeTopology)
plan, shardsPerPlan, err := planECDestinations(snap, nodeAddresses, metric, NewDefaultConfig(), nil, erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
require.NoError(t, err)
require.NotNil(t, plan)
// Packed onto the available disks: more than one shard per disk but never more