Files
seaweedfs/weed/worker/tasks/erasure_coding/plugin_handler.go
Chris Lu 0566fbd552 EC encode: place shards via ecbalancer.Place + configurable replica placement (#9623)
* Add shared super_block.ResolveReplicaPlacement; use it in ec_balance

* Add ecbalancer.FromActiveTopology snapshot constructor for EC encode/repair

* Add ecbalancer.Place greenfield/repair placement core (strict + durability-first)

* topology: add GetEffectiveAvailableEcShardSlots; FromActiveTopology uses shard-granular free slots

GetDisksWithEffectiveCapacity flattens reserved shard slots into volume slots via
integer truncation, so an in-flight EC task reserving a non-multiple-of-
DataShardsCount number of shards was lost from the snapshot and freeSlots was
over-reported. GetEffectiveAvailableEcShardSlots subtracts the full reservation
impact at shard granularity.

* ecbalancer.Place: reject nodes without a free disk of the requested type

FromActiveTopology keeps all disk types in the snapshot, so an SSD-only request
could be routed to a node with only HDD capacity (pickBestDiskOnNode then returns
disk 0 on the wrong tier). Filter rack/node selection to those with a free disk
of the requested type.

* ecbalancer.Place: enforce ReplicaPlacement DiffDataCenterCount (per-DC shard cap)

* ecbalancer: enforce DiffDataCenterCount in balance (cross-DC phase + cross-rack DC cap)

Adds a cross-DC corrective phase that drains data centers holding more than
DiffDataCenterCount shards of a volume, and a per-DC cap on cross-rack move
targets. Both are no-ops when DiffDataCenterCount is unset, so balance output is
unchanged for non-DC placements.

* topology: ratio-aware EC shard slots and provisional empty-disk slot

GetEffectiveAvailableEcShardSlots now takes the target collection's data-shard
count, so a 4+2 volume's larger shards are not over-counted at 10 per volume slot;
and it keeps the one provisional slot for freshly started empty servers that
report max=0, matching getEffectiveAvailableCapacityUnsafe. FromActiveTopology
threads the ratio through.

* ecbalancer.Place: explicit disk-type filter signal (fix HDD vs any ambiguity)

HardDriveType normalizes to "", which collided with "" meaning any disk. Add
Constraints.FilterDiskType and normalize both sides so a hdd request matches disks
reported as "" and never leaks to SSD, while filter=false still means any.

* ecbalancer: add clearShardAccounting for repair snapshot reconciliation

Clears one disk's copy of a shard from per-domain accounting and recomputes the
node-level union (preserving a kept copy on another disk of the same node), without
crediting capacity. Repair uses it to drop to-be-deleted copies before placing
missing shards.

* ecbalancer: don't cap cross-DC target racks when DiffRackCount is unset

len(racks)+1 wrongly limited each target rack (3 in a 2-rack cluster), so draining
a DC could stop short of the DiffDataCenterCount cap. Use MaxShardCount+1 as the
effectively-unlimited default.

* topology/ecbalancer: ratio-correct EC capacity accounting

Reservation shard slots (default ShardsPerVolumeSlot units) are now converted to
the target ratio before subtracting, and existing EC shards are charged by size
(targetDataShards/shardDataShards) so a 2+1 shard isn't counted as one 10+4 slot.
Per-shard ratio lookup is behind shardDataShards (OSS uses the standard ratio).

* ecbalancer.Place: candidate tiering and eligible-rack caps

Adds a per-disk eligibility/preference abstraction so Place supports:
- preferred-tag whole-plan retry (try disks carrying the earliest tags first,
  widen to all only if a tier cannot place every shard; reports
  SpilledOutsidePreferredTags),
- soft disk-type spill via DiskTypePolicy (Any/Prefer/Require): Prefer fills the
  preferred type then spills, reporting SpilledToOtherDiskType; Require filters,
- even per-rack caps that divide by racks holding an eligible disk, so a tiered
  cluster (e.g. SSDs in 2 of 4 racks) isn't capped impossibly low.
Disk tags carried via Node.AddDiskTags + FromActiveTopology.

* ecbalancer: export ClearShardAccounting for repair snapshot reconciliation

* ecbalancer: address review feedback (ratio rounding, bitmap walk, same-DC moves)

- topology/ecbalancer: round shard-reservation and existing-shard footprint up
  when converting to target-ratio shard slots, so a sub-slot reservation is not
  truncated to zero and free capacity is not overstated for low-data-shard
  layouts (targetDataShards < ds).
- erasure_coding: add ShardBits.All iterator and use it across the balancer,
  cross-DC phase, and placement scoring instead of scanning 0..MaxShardCount and
  probing Has on every id.
- ecbalancer: allow same-DC cross-rack moves when a DC already sits at its
  DiffDataCenterCount cap; a same-DC move leaves the DC total unchanged. Add a
  regression test that fails without the guard.
- ecbalancer cross-DC phase: pick targets via the eligible-aware
  pickNodeInRackEligible/pickBestDiskEligible helpers so the disk-type filter is
  honored and a 0 disk id is not mistaken for a valid selection.

* ecbalancer: test ecShardSlotsOnDisk fractional round-up

Cover the mixed-ratio path (targetDataShards < existing data shards) so a
shard's fractional footprint is never floored to zero and free capacity is not
overstated. Exercises the round-up via the targetDataShards parameter; OSS uses
the standard ratio at runtime while the enterprise build hits it with real
per-volume ratios.

* ecbalancer: assert node B rack in TestFromActiveTopology

* ecbalancer: split Destination into separate DataCenter and bare Rack

Replace the composite "dc:rack" Rack field on Destination with separate
DataCenter and bare Rack values, matching topology.DiskInfo and the worker-task
convention. Callers (and tests) read the data center directly instead of parsing
the composite with strings.SplitN.

* shell ec.balance: use utilization-based global balancing (parity with worker)

The shell's global rebalance phase balanced by raw shard count; switch it to
fractional fullness (shards/capacity), as the worker already does. On uniform
capacity the two agree; on heterogeneous capacity it fills nodes proportionally
instead of driving small-capacity nodes toward full.

Updates the heterogeneous-capacity regression test to assert even fullness
(~equal shards/capacity per node) rather than even shard count.

* ecbalancer: bounded-proportional per-DC shard spread

DiffDataCenterCount was enforced only as a ceiling (drain-to-cap), which could
leave a within-cap-but-lopsided DC distribution under a loose cap (e.g. 10/4 of 14
with cap=10). Now the cross-DC phase, the cross-rack DC guard, and Place all target
boundedMaxPerDC = min(DiffDataCenterCount, max(ceil(total/numDCs), parityShards)):
shards spread proportionally across DCs, but no tighter than the durability floor
(once each DC holds <= parityShards a DC loss is recoverable, so further spreading
only adds cross-DC/WAN traffic). No-op when DiffDataCenterCount is 0; identical to
before when the cap is the binding constraint.

* ecbalancer: drop DiffDataCenterCount enforcement for EC placement

The 1-byte volume ReplicaPlacement packs xyz into x*100+y*10+z<=255, so the DC
digit can only be 0-2 -- far too small to be a meaningful per-DC EC shard cap (a
cap of 1-2 would demand 7-14 DCs for a 10+4 volume). It's volume replica-placement,
not an EC spec. Removes the cross-DC balance phase, the DC guard in the cross-rack
phase, and the per-DC cap in Place (and the just-added bounded-proportional logic);
EC relies on the RP-independent rack/node even spread instead. Rack/node caps
(DiffRackCount/SameRackCount) are unchanged. Per-domain EC caps are left for a real
EC placement spec.

* ecbalancer: enforce per-disk durability cap; symmetric reserve/release

Place now refuses to put more than parityShards shards of a volume on a single
disk (pickBestDiskEligible skips a disk once it holds parityShards of the volume,
a hard cap not relaxed even in durability-first). Previously Place assigned by
free capacity, so a skewed near-full cluster could pile >parityShards onto one
disk -> losing it loses the volume; only distinct-disk count was checked. This
covers encode and repair (both route through Place); the caller skips/leaves the
volume rather than minting an unrecoverable layout.

Also makes reserveShard decrement freeSlots unconditionally, symmetric with
releaseShard's unconditional increment (the old guarded decrement could credit a
phantom slot on release if a shard were ever reserved onto a full disk).

* ecbalancer: add Topology.ReleaseVolumeShards (clear + credit) for greenfield encode

Releases all of a volume's shards from the snapshot and credits the freed disk
capacity, so a greenfield encode can plan as if stale EC shards from a prior failed
attempt are gone. Safe to credit because the encode task deletes stale shards
(cleanupStaleEcShards) before distributing the new ones. Distinct from
ClearShardAccounting (repair), which does not credit.

* ecbalancer: ReleaseVolumeShards credits node freeSlots, not just disks

releaseShard only increments per-disk freeSlots, but rack capacity is summed from
node freeSlots (buildRacks) and node freeSlots gates node eligibility. Crediting
only disks left a node/rack looking full after releasing stale shards, so a
greenfield encode still couldn't use the freed capacity. Now credits the node by
the total disk-slots freed.

* ecbalancer: correct PlacementMode docs (encode uses durability-first)

PlaceStrict was labeled '(encode)' but encode uses PlaceDurabilityFirst. Clarify
that durability-first is used by both encode and repair, reports relaxations in
PlaceResult.Relaxed, and never relaxes the per-disk durability cap.

* ecbalancer: treat SameRackCount as a direct per-node shard cap

The 3rd ReplicaPlacement digit now caps shards per node at exactly the digit
value, matching how DiffRackCount (2nd digit) caps per rack, instead of allowing
digit+1 per node. This makes the per-rack and per-node caps consistent and
matches the documented "digits cap EC shards per rack and per node" semantics;
e.g. 011 now means at most one shard per rack and one per node.

* EC encode: place shards via ecbalancer.Place + configurable replica placement

Encode now plans destinations through the shared ecbalancer.Place policy
(durability-first: prefers the source disk type and honors replica placement /
caps / anti-affinity, relaxing rather than failing when capacity is tight) instead
of the EC-only placement planner. Targets and capacity reservations use Place's
actual per-disk shard assignment, not a round-robin guess; cross-volume in-cycle
capacity is tracked by ActiveTopology's pending task, so the cached planner is no
longer consulted. Adds a configurable replica_placement (proto field 6 + worker
form + reader) that overrides the master default replication.

The placement-package planner code is left in place (now unused) and removed in a
follow-up that drops the package.

* EC encode: drop unused dataShards param from createECTargets

Addresses review feedback: after switching to Place's per-disk shardsPerPlan
assignment, createECTargets no longer needs the data-shard count.

* EC encode: fix packed-target validation, greenfield stale-shard accounting, RP docs

- Validate counts distinct shard ids across targets, not target rows, so packed
  plans (fewer (node,disk) targets than shards) aren't rejected.
- planECDestinations releases the volume's stale EC shards from the snapshot before
  Place (ReleaseVolumeShards), crediting their capacity. The encode task deletes
  stale shards before distributing, so a retry on tight capacity no longer fails
  planning by counting shards that are about to be removed.
- replica_placement config/form help no longer claims a data-center limit (the DC
  digit is ignored for EC); detection logs a warning when a DC digit is set.

* EC encode: surface relaxed placement; mark replica_placement best-effort

Encode places with PlaceDurabilityFirst (the chosen lenient behavior), which can
relax caps/anti-affinity/replica-placement to avoid deferring. That was silent
(only disk-type/tag spills were logged). Now logs PlaceResult.Relaxed so a tight
replica placement isn't weakened unnoticed, and the config/form help states the
rack/node caps are best-effort during encode (enforced by rebalancing).

* EC encode: key per-disk shard grouping by struct, not formatted string

planECDestinations grouped destinations using a fmt.Sprintf("%s:%d") map key
per shard; use a {node,diskID} struct key and pre-size the map/slice to the
shard count to drop the per-shard string allocation.
2026-05-22 20:22:30 -07:00

864 lines
28 KiB
Go

package erasure_coding
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
func init() {
pluginworker.RegisterHandler(pluginworker.HandlerFactory{
JobType: "erasure_coding",
Category: pluginworker.CategoryHeavy,
Aliases: []string{"erasure-coding", "erasure.coding", "ec"},
Build: func(opts pluginworker.HandlerBuildOptions) (pluginworker.JobHandler, error) {
return NewErasureCodingHandler(opts.GrpcDialOption, opts.WorkingDir), nil
},
})
}
type erasureCodingWorkerConfig struct {
TaskConfig *Config
}
// ErasureCodingHandler is the plugin job handler for erasure coding.
type ErasureCodingHandler struct {
grpcDialOption grpc.DialOption
workingDir string
}
func NewErasureCodingHandler(grpcDialOption grpc.DialOption, workingDir string) *ErasureCodingHandler {
return &ErasureCodingHandler{grpcDialOption: grpcDialOption, workingDir: strings.TrimSpace(workingDir)}
}
func (h *ErasureCodingHandler) Capability() *plugin_pb.JobTypeCapability {
return &plugin_pb.JobTypeCapability{
JobType: "erasure_coding",
CanDetect: true,
CanExecute: true,
MaxDetectionConcurrency: 1,
MaxExecutionConcurrency: 1,
DisplayName: "EC Encoding",
Description: "Converts full and quiet volumes into EC shards",
Weight: 80,
}
}
func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
return &plugin_pb.JobTypeDescriptor{
JobType: "erasure_coding",
DisplayName: "EC Encoding",
Description: "Detect and execute erasure coding for suitable volumes",
Icon: "fas fa-shield-alt",
DescriptorVersion: 1,
AdminConfigForm: &plugin_pb.ConfigForm{
FormId: "erasure-coding-admin",
Title: "Erasure Coding Admin Config",
Description: "Admin-side controls for erasure coding detection scope.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "scope",
Title: "Scope",
Description: "Optional filters applied before erasure coding detection.",
Fields: []*plugin_pb.ConfigField{
{
Name: "collection_filter",
Label: "Collection Filter",
Description: "Only detect erasure coding opportunities in this collection when set.",
Placeholder: "all collections",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"collection_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
WorkerConfigForm: &plugin_pb.ConfigForm{
FormId: "erasure-coding-worker",
Title: "Erasure Coding Worker Config",
Description: "Worker-side detection thresholds.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "thresholds",
Title: "Detection Thresholds",
Description: "Controls for when erasure coding jobs should be proposed.",
Fields: []*plugin_pb.ConfigField{
{
Name: "quiet_for_seconds",
Label: "Quiet Period (s)",
Description: "Volume must remain unmodified for at least this duration.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
{
Name: "fullness_ratio",
Label: "Fullness Ratio",
Description: "Minimum volume fullness ratio to trigger erasure coding.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_DOUBLE,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0}},
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 1}},
},
{
Name: "min_size_mb",
Label: "Minimum Volume Size (MB)",
Description: "Only volumes larger than this size are considered.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
},
{
Name: "preferred_tags",
Label: "Preferred Tags",
Description: "Comma-separated disk tags to prioritize for EC shard placement, ordered by preference.",
Placeholder: "fast,ssd",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "replica_placement",
Label: "Replica Placement",
Description: "EC shard placement (e.g. 020): 2nd/3rd digits cap shards per rack/node (best-effort during encode, enforced by rebalancing); the data-center digit is ignored. Empty uses the master default.",
Placeholder: "020",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3600},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.95},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"preferred_tags": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
"replica_placement": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
Enabled: true,
DetectionIntervalMinutes: 17,
DetectionTimeoutSeconds: 300,
MaxJobsPerDetection: 500,
GlobalExecutionConcurrency: 16,
PerWorkerExecutionConcurrency: 4,
RetryLimit: 1,
RetryBackoffSeconds: 30,
JobTypeMaxRuntimeSeconds: 1800,
ExecutionTimeoutSeconds: 1800,
},
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 3600},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.95},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"preferred_tags": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
}
}
func (h *ErasureCodingHandler) Detect(
ctx context.Context,
request *plugin_pb.RunDetectionRequest,
sender pluginworker.DetectionSender,
) error {
if request == nil {
return fmt.Errorf("run detection request is nil")
}
if sender == nil {
return fmt.Errorf("detection sender is nil")
}
if request.JobType != "" && request.JobType != "erasure_coding" {
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.JobType)
}
workerConfig := deriveErasureCodingWorkerConfig(request.GetWorkerConfigValues())
collectionFilter := strings.TrimSpace(pluginworker.ReadStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
if collectionFilter != "" {
workerConfig.TaskConfig.CollectionFilter = collectionFilter
}
masters := make([]string, 0)
if request.ClusterContext != nil {
masters = append(masters, request.ClusterContext.MasterGrpcAddresses...)
}
metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter)
if err != nil {
return err
}
clusterInfo := &workertypes.ClusterInfo{
ActiveTopology: activeTopology,
GrpcDialOption: h.grpcDialOption,
DefaultReplicaPlacement: pluginworker.FetchDefaultReplicaPlacement(ctx, masters, h.grpcDialOption),
}
maxResults := int(request.MaxResults)
if maxResults < 0 {
maxResults = 0
}
results, hasMore, err := Detection(ctx, metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
if err != nil {
return err
}
if traceErr := emitErasureCodingDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results, maxResults, hasMore); traceErr != nil {
glog.Warningf("Plugin worker failed to emit erasure_coding detection trace: %v", traceErr)
}
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
for _, result := range results {
proposal, proposalErr := buildErasureCodingProposal(result, h.workingDir)
if proposalErr != nil {
glog.Warningf("Plugin worker skip invalid erasure_coding proposal: %v", proposalErr)
continue
}
proposals = append(proposals, proposal)
}
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "erasure_coding",
Proposals: proposals,
HasMore: hasMore,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "erasure_coding",
Success: true,
TotalProposals: int32(len(proposals)),
})
}
func emitErasureCodingDetectionDecisionTrace(
sender pluginworker.DetectionSender,
metrics []*workertypes.VolumeHealthMetrics,
taskConfig *Config,
results []*workertypes.TaskDetectionResult,
maxResults int,
hasMore bool,
) error {
if sender == nil || taskConfig == nil {
return nil
}
quietThreshold := time.Duration(taskConfig.QuietForSeconds) * time.Second
minSizeBytes := uint64(taskConfig.MinSizeMB) * 1024 * 1024
allowedCollections := wildcard.CompileWildcardMatchers(taskConfig.CollectionFilter)
volumeGroups := make(map[uint32][]*workertypes.VolumeHealthMetrics)
for _, metric := range metrics {
if metric == nil {
continue
}
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
}
skippedAlreadyEC := 0
skippedTooSmall := 0
skippedCollectionFilter := 0
skippedQuietTime := 0
skippedFullness := 0
for _, groupMetrics := range volumeGroups {
if len(groupMetrics) == 0 {
continue
}
metric := groupMetrics[0]
for _, candidate := range groupMetrics {
if candidate != nil && candidate.Server < metric.Server {
metric = candidate
}
}
if metric == nil {
continue
}
if metric.IsECVolume {
skippedAlreadyEC++
continue
}
if metric.Size < minSizeBytes {
skippedTooSmall++
continue
}
if len(allowedCollections) > 0 && !wildcard.MatchesAnyWildcard(allowedCollections, metric.Collection) {
skippedCollectionFilter++
continue
}
if metric.Age < quietThreshold {
skippedQuietTime++
continue
}
if metric.FullnessRatio < taskConfig.FullnessRatio {
skippedFullness++
continue
}
}
totalVolumes := len(metrics)
summarySuffix := ""
if hasMore {
summarySuffix = fmt.Sprintf(" (max_results=%d reached; remaining volumes not evaluated)", maxResults)
}
summaryMessage := ""
if len(results) == 0 {
summaryMessage = fmt.Sprintf(
"EC detection: No tasks created for %d volumes%s (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
totalVolumes,
summarySuffix,
skippedAlreadyEC,
skippedTooSmall,
skippedCollectionFilter,
skippedQuietTime,
skippedFullness,
)
} else {
summaryMessage = fmt.Sprintf(
"EC detection: Created %d task(s)%s from %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
len(results),
summarySuffix,
totalVolumes,
skippedAlreadyEC,
skippedTooSmall,
skippedCollectionFilter,
skippedQuietTime,
skippedFullness,
)
}
if err := sender.SendActivity(pluginworker.BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{
"total_volumes": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)},
},
"selected_tasks": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))},
},
"max_results": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(maxResults)},
},
"has_more": {
Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: hasMore},
},
"skipped_already_ec": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedAlreadyEC)},
},
"skipped_too_small": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedTooSmall)},
},
"skipped_filtered": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedCollectionFilter)},
},
"skipped_not_quiet": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedQuietTime)},
},
"skipped_not_full": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedFullness)},
},
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
},
"fullness_threshold_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
},
})); err != nil {
return err
}
detailsEmitted := 0
for _, metric := range metrics {
if metric == nil || metric.IsECVolume {
continue
}
sizeMB := float64(metric.Size) / (1024 * 1024)
message := fmt.Sprintf(
"ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
metric.VolumeID,
sizeMB,
taskConfig.MinSizeMB,
metric.Age.Truncate(time.Minute),
quietThreshold.Truncate(time.Minute),
metric.FullnessRatio*100,
taskConfig.FullnessRatio*100,
)
if err := sender.SendActivity(pluginworker.BuildDetectorActivity("decision_volume", message, map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.VolumeID)},
},
"size_mb": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: sizeMB},
},
"required_min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
},
"age_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.Age.Seconds())},
},
"required_quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
},
"fullness_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: metric.FullnessRatio * 100},
},
"required_fullness_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
},
})); err != nil {
return err
}
detailsEmitted++
if detailsEmitted >= 3 {
break
}
}
return nil
}
func (h *ErasureCodingHandler) Execute(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
sender pluginworker.ExecutionSender,
) error {
if request == nil || request.Job == nil {
return fmt.Errorf("execute request/job is nil")
}
if sender == nil {
return fmt.Errorf("execution sender is nil")
}
if request.Job.JobType != "" && request.Job.JobType != "erasure_coding" {
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.Job.JobType)
}
params, err := decodeErasureCodingTaskParams(request.Job)
if err != nil {
return err
}
applyErasureCodingExecutionDefaults(params, request.GetClusterContext(), h.workingDir)
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
return fmt.Errorf("erasure coding source node is required")
}
if len(params.Targets) == 0 {
return fmt.Errorf("erasure coding targets are required")
}
task := NewErasureCodingTask(
request.Job.JobId,
params.Sources[0].Node,
params.VolumeId,
params.Collection,
h.grpcDialOption,
)
execCtx, execCancel := context.WithCancel(ctx)
defer execCancel()
task.SetProgressCallback(func(progress float64, stage string) {
message := fmt.Sprintf("erasure coding progress %.0f%%", progress)
if strings.TrimSpace(stage) != "" {
message = stage
}
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: progress,
Stage: stage,
Message: message,
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity(stage, message),
},
}); err != nil {
execCancel()
}
})
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
ProgressPercent: 0,
Stage: "assigned",
Message: "erasure coding job accepted",
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity("assigned", "erasure coding job accepted"),
},
}); err != nil {
return err
}
if err := task.Execute(execCtx, params); err != nil {
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_FAILED,
ProgressPercent: 100,
Stage: "failed",
Message: err.Error(),
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity("failed", err.Error()),
},
})
return err
}
sourceNode := params.Sources[0].Node
resultSummary := fmt.Sprintf("erasure coding completed for volume %d across %d targets", params.VolumeId, len(params.Targets))
return sender.SendCompleted(&plugin_pb.JobCompleted{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
Success: true,
Result: &plugin_pb.JobResult{
Summary: resultSummary,
OutputValues: map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(params.VolumeId)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"target_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
},
},
},
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity("completed", resultSummary),
},
})
}
func (h *ErasureCodingHandler) collectVolumeMetrics(
ctx context.Context,
masterAddresses []string,
collectionFilter string,
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) {
metrics, activeTopology, _, err := pluginworker.CollectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption)
return metrics, activeTopology, err
}
func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *erasureCodingWorkerConfig {
taskConfig := NewDefaultConfig()
quietForSeconds := pluginworker.ReadIntConfig(values, "quiet_for_seconds", taskConfig.QuietForSeconds)
if quietForSeconds < 0 {
quietForSeconds = 0
}
taskConfig.QuietForSeconds = quietForSeconds
fullnessRatio := pluginworker.ReadDoubleConfig(values, "fullness_ratio", taskConfig.FullnessRatio)
if fullnessRatio < 0 {
fullnessRatio = 0
}
if fullnessRatio > 1 {
fullnessRatio = 1
}
taskConfig.FullnessRatio = fullnessRatio
minSizeMB := pluginworker.ReadIntConfig(values, "min_size_mb", taskConfig.MinSizeMB)
if minSizeMB < 1 {
minSizeMB = 1
}
taskConfig.MinSizeMB = minSizeMB
taskConfig.PreferredTags = util.NormalizeTagList(pluginworker.ReadStringListConfig(values, "preferred_tags"))
taskConfig.ReplicaPlacement = strings.TrimSpace(pluginworker.ReadStringConfig(values, "replica_placement", taskConfig.ReplicaPlacement))
return &erasureCodingWorkerConfig{
TaskConfig: taskConfig,
}
}
func buildErasureCodingProposal(
result *workertypes.TaskDetectionResult,
baseWorkingDir string,
) (*plugin_pb.JobProposal, error) {
if result == nil {
return nil, fmt.Errorf("task detection result is nil")
}
if result.TypedParams == nil {
return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID)
}
params := proto.Clone(result.TypedParams).(*worker_pb.TaskParams)
applyErasureCodingExecutionDefaults(params, nil, baseWorkingDir)
paramsPayload, err := proto.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal task params: %w", err)
}
proposalID := strings.TrimSpace(result.TaskID)
if proposalID == "" {
proposalID = fmt.Sprintf("erasure-coding-%d-%d", result.VolumeID, time.Now().UnixNano())
}
dedupeKey := fmt.Sprintf("erasure_coding:%d", result.VolumeID)
if result.Collection != "" {
dedupeKey += ":" + result.Collection
}
sourceNode := ""
if len(params.Sources) > 0 {
sourceNode = strings.TrimSpace(params.Sources[0].Node)
}
summary := fmt.Sprintf("Erasure code volume %d", result.VolumeID)
if sourceNode != "" {
summary = fmt.Sprintf("Erasure code volume %d from %s", result.VolumeID, sourceNode)
}
// EC encoding reads the full volume, computes shards, and writes 14
// shards out to target nodes. Budget 10 min/GB (roughly 2x a plain copy)
// so the scheduler grants a deadline scaled to volume size.
volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1
estimatedRuntimeSeconds := volumeSizeGB * 10 * 60
return &plugin_pb.JobProposal{
ProposalId: proposalID,
DedupeKey: dedupeKey,
JobType: "erasure_coding",
Priority: pluginworker.MapTaskPriority(result.Priority),
Summary: summary,
Detail: strings.TrimSpace(result.Reason),
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramsPayload},
},
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(result.VolumeID)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"collection": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
},
"target_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
},
"estimated_runtime_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
},
},
Labels: map[string]string{
"task_type": "erasure_coding",
"volume_id": fmt.Sprintf("%d", result.VolumeID),
"collection": result.Collection,
"source_node": sourceNode,
"target_count": fmt.Sprintf("%d", len(params.Targets)),
},
}, nil
}
func decodeErasureCodingTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) {
if job == nil {
return nil, fmt.Errorf("job spec is nil")
}
if payload := pluginworker.ReadBytesConfig(job.Parameters, "task_params_pb"); len(payload) > 0 {
params := &worker_pb.TaskParams{}
if err := proto.Unmarshal(payload, params); err != nil {
return nil, fmt.Errorf("unmarshal task_params_pb: %w", err)
}
if params.TaskId == "" {
params.TaskId = job.JobId
}
return params, nil
}
volumeID := pluginworker.ReadUint32Config(job.Parameters, "volume_id", 0)
sourceNode := strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "source_server", ""))
if sourceNode == "" {
sourceNode = strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "server", ""))
}
targetServers := pluginworker.ReadStringListConfig(job.Parameters, "target_servers")
if len(targetServers) == 0 {
targetServers = pluginworker.ReadStringListConfig(job.Parameters, "targets")
}
collection := pluginworker.ReadStringConfig(job.Parameters, "collection", "")
dataShards := pluginworker.ReadInt32Config(job.Parameters, "data_shards", int32(ecstorage.DataShardsCount))
if dataShards <= 0 {
dataShards = int32(ecstorage.DataShardsCount)
}
parityShards := pluginworker.ReadInt32Config(job.Parameters, "parity_shards", int32(ecstorage.ParityShardsCount))
if parityShards <= 0 {
parityShards = int32(ecstorage.ParityShardsCount)
}
sourceDiskType := strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "source_disk_type", ""))
totalShards := int(dataShards + parityShards)
if volumeID == 0 {
return nil, fmt.Errorf("missing volume_id in job parameters")
}
if sourceNode == "" {
return nil, fmt.Errorf("missing source_server in job parameters")
}
if len(targetServers) == 0 {
return nil, fmt.Errorf("missing target_servers in job parameters")
}
if len(targetServers) < totalShards {
return nil, fmt.Errorf("insufficient target_servers: got %d, need at least %d", len(targetServers), totalShards)
}
shardAssignments := assignECShardIDs(totalShards, len(targetServers))
targets := make([]*worker_pb.TaskTarget, 0, len(targetServers))
for i := 0; i < len(targetServers); i++ {
targetNode := strings.TrimSpace(targetServers[i])
if targetNode == "" {
continue
}
targets = append(targets, &worker_pb.TaskTarget{
Node: targetNode,
VolumeId: volumeID,
ShardIds: shardAssignments[i],
})
}
if len(targets) < totalShards {
return nil, fmt.Errorf("insufficient non-empty target_servers after normalization: got %d, need at least %d", len(targets), totalShards)
}
return &worker_pb.TaskParams{
TaskId: job.JobId,
VolumeId: volumeID,
Collection: collection,
Sources: []*worker_pb.TaskSource{
{
Node: sourceNode,
VolumeId: volumeID,
},
},
Targets: targets,
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
ErasureCodingParams: &worker_pb.ErasureCodingTaskParams{
DataShards: dataShards,
ParityShards: parityShards,
SourceDiskType: sourceDiskType,
},
},
}, nil
}
func applyErasureCodingExecutionDefaults(
params *worker_pb.TaskParams,
clusterContext *plugin_pb.ClusterContext,
baseWorkingDir string,
) {
if params == nil {
return
}
ecParams := params.GetErasureCodingParams()
if ecParams == nil {
ecParams = &worker_pb.ErasureCodingTaskParams{
DataShards: ecstorage.DataShardsCount,
ParityShards: ecstorage.ParityShardsCount,
}
params.TaskParams = &worker_pb.TaskParams_ErasureCodingParams{ErasureCodingParams: ecParams}
}
if ecParams.DataShards <= 0 {
ecParams.DataShards = ecstorage.DataShardsCount
}
if ecParams.ParityShards <= 0 {
ecParams.ParityShards = ecstorage.ParityShardsCount
}
ecParams.WorkingDir = defaultErasureCodingWorkingDir(baseWorkingDir)
ecParams.CleanupSource = true
if strings.TrimSpace(ecParams.MasterClient) == "" && clusterContext != nil && len(clusterContext.MasterGrpcAddresses) > 0 {
ecParams.MasterClient = clusterContext.MasterGrpcAddresses[0]
}
totalShards := int(ecParams.DataShards + ecParams.ParityShards)
if totalShards <= 0 {
totalShards = ecstorage.TotalShardsCount
}
needsShardAssignment := false
for _, target := range params.Targets {
if target == nil || len(target.ShardIds) == 0 {
needsShardAssignment = true
break
}
}
if needsShardAssignment && len(params.Targets) > 0 {
assignments := assignECShardIDs(totalShards, len(params.Targets))
for i := 0; i < len(params.Targets); i++ {
if params.Targets[i] == nil {
continue
}
if len(params.Targets[i].ShardIds) == 0 {
params.Targets[i].ShardIds = assignments[i]
}
}
}
}
func assignECShardIDs(totalShards int, targetCount int) [][]uint32 {
if targetCount <= 0 {
return nil
}
if totalShards <= 0 {
totalShards = ecstorage.TotalShardsCount
}
assignments := make([][]uint32, targetCount)
for i := 0; i < totalShards; i++ {
targetIndex := i % targetCount
assignments[targetIndex] = append(assignments[targetIndex], uint32(i))
}
return assignments
}
func defaultErasureCodingWorkingDir(baseWorkingDir string) string {
dir := strings.TrimSpace(baseWorkingDir)
if dir == "" {
return filepath.Join(".", "erasure_coding")
}
return filepath.Join(dir, "erasure_coding")
}