Files
seaweedfs/weed/worker/tasks/erasure_coding/plugin_handler.go
Chris Lu d221a64262 fix(ec): skip re-encode when EC shards already exist for the volume (#9448) (#9458)
* fix(ec): skip re-encode when EC shards already exist for the volume (#9448)

When an earlier EC encoding succeeded but the post-encode source-delete
left a regular replica behind on one of the servers, the next detection
cycle proposes the same volume again. The new encode tries to redistribute
shards to targets that already have them mounted, the volume server
returns `ec volume %d is mounted; refusing overwrite`, the task fails,
and detection re-queues the volume. The cycle repeats forever — issue
#9448.

The existing `metric.IsECVolume` skip catches the case where the canonical
metric is reported on the EC-shard side of the heartbeat, but when the
master sees BOTH a regular replica AND its EC shards in the same volume
list, the canonical metric we pick is the regular replica and
IsECVolume is false. Add a second guard that checks the topology
directly via `findExistingECShards` (already present and indexed) and
skip the volume when any shards exist, logging a warning that points
the admin at the stuck source.

This breaks the loop. Auto-cleanup of the orphaned replica is left as
follow-up work — deleting a source replica from inside the detector is
only safe with a re-verification step right before the delete, plus a
config opt-in, and is best done in its own change.

* fix(ec): #9448 guard only fires when EC shard set is complete

The first version of the #9448 guard tripped on `len(existingShards) > 0`,
which is broader than necessary. The existing recovery branch in the
encode arm (around the `existingECShards` block, ~line 216) is designed
to fold partial leftover shards from a previously failed encode into
the new task as cleanup sources. Skipping unconditionally on any
existing shards made that branch dead code, regressing the recovery
behavior Gemini flagged in the review of af09e1ec7.

Two corrections:

  1. New helper `countExistingEcShardsForVolume` walks each disk's
     `EcIndexBits` bitmap and ORs the results into a `ShardBits`,
     returning the distinct-shard popcount. This is the right unit:
     a single `VolumeEcShardInformationMessage` can carry several
     shards, so `len(EcShardInfos)` is not the same as the number
     of present shards. Per Gemini's "use helper functions that walk
     the actual shard bitmap" note.
  2. The guard now fires only when `shardCount >= totalShards`.
     Partial shard sets fall through to the existing recovery branch,
     unchanged.

Tests:
  - TestDetectionSkipsWhenECShardsAlreadyExist: complete shards →
    no proposal (the regression test for #9448 itself, unchanged
    intent, rewritten on top of new helpers).
  - TestDetectionAllowsRegularReplicaWhenShardsPartial: partial
    shards → guard does NOT swallow the volume; the encode arm
    still gets a chance.
  - TestCountExistingEcShardsForVolume: the helper walks the
    bitmap correctly even when one info entry packs multiple
    shards on one disk.

The dangerous `volume.delete` hint in the warning is unchanged for
now — it gets fixed in the next commit.

* fix(ec): drop dangerous shell-command hint from #9448 warning

The previous warning told operators to run `volume.delete -volumeId=%d`
in the SeaweedFS shell to clean up the orphaned source replica. That
command is cluster-wide — it deletes every replica of the volume,
including the EC shards, which share the same volume id. Running it
in the state the message describes would cause the data loss the
guard exists to prevent.

Replace it with explicit guidance that the cleanup must be a targeted
VolumeDelete RPC against the source server only, and that the
shell command is the exact wrong thing to use here. The next two
commits add the plumbing and the auto-execution of that targeted
delete so most operators never see this hint at all.

Per Gemini comment on af09e1ec7.

* feat(worker): plumb grpc dial option through ClusterInfo

Add ClusterInfo.GrpcDialOption (optional) and set it in the
erasure_coding plugin handler. Lets the detector make targeted
gRPC calls during detection — used by the follow-up commit to
auto-clean orphan source replicas via VolumeDelete RPCs.

Zero-value safe: existing detectors that don't need RPC access
get a nil DialOption and ignore the field.

* feat(ec): auto-clean orphan source replica via targeted VolumeDelete

Builds on the previous commits: the guard now identifies the
#9448 stuck-source state and a gRPC dial option is available on
ClusterInfo. When both are true, detection auto-cleans the
orphaned regular replica instead of just warning the operator.

New helper `cleanupOrphanSourceReplicas`:

  1. Re-verifies the EC shard set is still complete via
     `countExistingEcShardsForVolume` against the live topology
     snapshot. If the count dropped between detection start and
     the cleanup decision (a volume server going down mid-cycle),
     it aborts — the source replica is the only complete copy and
     deleting it without a healthy shard set would be data loss.
  2. Issues targeted VolumeDelete RPCs to each regular-replica
     server via `operation.WithVolumeServerClient`. That RPC only
     touches the regular volume on the targeted server; EC shards
     live in a separate store path and are not affected. This is
     the safe alternative to the cluster-wide `volume.delete`
     shell command we previously warned against.

If the cleanup partially fails (one replica delete errors, others
succeed), detection logs the failure and continues to skip the
volume. The next detection cycle will try again. We deliberately
don't fall back to a re-encode because that would just collide
with the mounted shards on the targets again.

When no dial option is available the existing warning still
points operators at the safe manual procedure.
2026-05-11 23:12:57 -07:00

847 lines
27 KiB
Go

package erasure_coding
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
func init() {
pluginworker.RegisterHandler(pluginworker.HandlerFactory{
JobType: "erasure_coding",
Category: pluginworker.CategoryHeavy,
Aliases: []string{"erasure-coding", "erasure.coding", "ec"},
Build: func(opts pluginworker.HandlerBuildOptions) (pluginworker.JobHandler, error) {
return NewErasureCodingHandler(opts.GrpcDialOption, opts.WorkingDir), nil
},
})
}
type erasureCodingWorkerConfig struct {
TaskConfig *Config
}
// ErasureCodingHandler is the plugin job handler for erasure coding.
type ErasureCodingHandler struct {
grpcDialOption grpc.DialOption
workingDir string
}
func NewErasureCodingHandler(grpcDialOption grpc.DialOption, workingDir string) *ErasureCodingHandler {
return &ErasureCodingHandler{grpcDialOption: grpcDialOption, workingDir: strings.TrimSpace(workingDir)}
}
func (h *ErasureCodingHandler) Capability() *plugin_pb.JobTypeCapability {
return &plugin_pb.JobTypeCapability{
JobType: "erasure_coding",
CanDetect: true,
CanExecute: true,
MaxDetectionConcurrency: 1,
MaxExecutionConcurrency: 1,
DisplayName: "EC Encoding",
Description: "Converts full and quiet volumes into EC shards",
Weight: 80,
}
}
func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
return &plugin_pb.JobTypeDescriptor{
JobType: "erasure_coding",
DisplayName: "EC Encoding",
Description: "Detect and execute erasure coding for suitable volumes",
Icon: "fas fa-shield-alt",
DescriptorVersion: 1,
AdminConfigForm: &plugin_pb.ConfigForm{
FormId: "erasure-coding-admin",
Title: "Erasure Coding Admin Config",
Description: "Admin-side controls for erasure coding detection scope.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "scope",
Title: "Scope",
Description: "Optional filters applied before erasure coding detection.",
Fields: []*plugin_pb.ConfigField{
{
Name: "collection_filter",
Label: "Collection Filter",
Description: "Only detect erasure coding opportunities in this collection when set.",
Placeholder: "all collections",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"collection_filter": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
WorkerConfigForm: &plugin_pb.ConfigForm{
FormId: "erasure-coding-worker",
Title: "Erasure Coding Worker Config",
Description: "Worker-side detection thresholds.",
Sections: []*plugin_pb.ConfigSection{
{
SectionId: "thresholds",
Title: "Detection Thresholds",
Description: "Controls for when erasure coding jobs should be proposed.",
Fields: []*plugin_pb.ConfigField{
{
Name: "quiet_for_seconds",
Label: "Quiet Period (s)",
Description: "Volume must remain unmodified for at least this duration.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
{
Name: "fullness_ratio",
Label: "Fullness Ratio",
Description: "Minimum volume fullness ratio to trigger erasure coding.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_DOUBLE,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0}},
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 1}},
},
{
Name: "min_size_mb",
Label: "Minimum Volume Size (MB)",
Description: "Only volumes larger than this size are considered.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
Required: true,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
},
{
Name: "preferred_tags",
Label: "Preferred Tags",
Description: "Comma-separated disk tags to prioritize for EC shard placement, ordered by preference.",
Placeholder: "fast,ssd",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"preferred_tags": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
Enabled: true,
DetectionIntervalMinutes: 17,
DetectionTimeoutSeconds: 300,
MaxJobsPerDetection: 500,
GlobalExecutionConcurrency: 16,
PerWorkerExecutionConcurrency: 4,
RetryLimit: 1,
RetryBackoffSeconds: 30,
JobTypeMaxRuntimeSeconds: 1800,
ExecutionTimeoutSeconds: 1800,
},
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300},
},
"fullness_ratio": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
},
"preferred_tags": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
},
},
}
}
func (h *ErasureCodingHandler) Detect(
ctx context.Context,
request *plugin_pb.RunDetectionRequest,
sender pluginworker.DetectionSender,
) error {
if request == nil {
return fmt.Errorf("run detection request is nil")
}
if sender == nil {
return fmt.Errorf("detection sender is nil")
}
if request.JobType != "" && request.JobType != "erasure_coding" {
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.JobType)
}
workerConfig := deriveErasureCodingWorkerConfig(request.GetWorkerConfigValues())
collectionFilter := strings.TrimSpace(pluginworker.ReadStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
if collectionFilter != "" {
workerConfig.TaskConfig.CollectionFilter = collectionFilter
}
masters := make([]string, 0)
if request.ClusterContext != nil {
masters = append(masters, request.ClusterContext.MasterGrpcAddresses...)
}
metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter)
if err != nil {
return err
}
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology, GrpcDialOption: h.grpcDialOption}
maxResults := int(request.MaxResults)
if maxResults < 0 {
maxResults = 0
}
results, hasMore, err := Detection(ctx, metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
if err != nil {
return err
}
if traceErr := emitErasureCodingDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results, maxResults, hasMore); traceErr != nil {
glog.Warningf("Plugin worker failed to emit erasure_coding detection trace: %v", traceErr)
}
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
for _, result := range results {
proposal, proposalErr := buildErasureCodingProposal(result, h.workingDir)
if proposalErr != nil {
glog.Warningf("Plugin worker skip invalid erasure_coding proposal: %v", proposalErr)
continue
}
proposals = append(proposals, proposal)
}
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
JobType: "erasure_coding",
Proposals: proposals,
HasMore: hasMore,
}); err != nil {
return err
}
return sender.SendComplete(&plugin_pb.DetectionComplete{
JobType: "erasure_coding",
Success: true,
TotalProposals: int32(len(proposals)),
})
}
func emitErasureCodingDetectionDecisionTrace(
sender pluginworker.DetectionSender,
metrics []*workertypes.VolumeHealthMetrics,
taskConfig *Config,
results []*workertypes.TaskDetectionResult,
maxResults int,
hasMore bool,
) error {
if sender == nil || taskConfig == nil {
return nil
}
quietThreshold := time.Duration(taskConfig.QuietForSeconds) * time.Second
minSizeBytes := uint64(taskConfig.MinSizeMB) * 1024 * 1024
allowedCollections := wildcard.CompileWildcardMatchers(taskConfig.CollectionFilter)
volumeGroups := make(map[uint32][]*workertypes.VolumeHealthMetrics)
for _, metric := range metrics {
if metric == nil {
continue
}
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
}
skippedAlreadyEC := 0
skippedTooSmall := 0
skippedCollectionFilter := 0
skippedQuietTime := 0
skippedFullness := 0
for _, groupMetrics := range volumeGroups {
if len(groupMetrics) == 0 {
continue
}
metric := groupMetrics[0]
for _, candidate := range groupMetrics {
if candidate != nil && candidate.Server < metric.Server {
metric = candidate
}
}
if metric == nil {
continue
}
if metric.IsECVolume {
skippedAlreadyEC++
continue
}
if metric.Size < minSizeBytes {
skippedTooSmall++
continue
}
if len(allowedCollections) > 0 && !wildcard.MatchesAnyWildcard(allowedCollections, metric.Collection) {
skippedCollectionFilter++
continue
}
if metric.Age < quietThreshold {
skippedQuietTime++
continue
}
if metric.FullnessRatio < taskConfig.FullnessRatio {
skippedFullness++
continue
}
}
totalVolumes := len(metrics)
summarySuffix := ""
if hasMore {
summarySuffix = fmt.Sprintf(" (max_results=%d reached; remaining volumes not evaluated)", maxResults)
}
summaryMessage := ""
if len(results) == 0 {
summaryMessage = fmt.Sprintf(
"EC detection: No tasks created for %d volumes%s (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
totalVolumes,
summarySuffix,
skippedAlreadyEC,
skippedTooSmall,
skippedCollectionFilter,
skippedQuietTime,
skippedFullness,
)
} else {
summaryMessage = fmt.Sprintf(
"EC detection: Created %d task(s)%s from %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
len(results),
summarySuffix,
totalVolumes,
skippedAlreadyEC,
skippedTooSmall,
skippedCollectionFilter,
skippedQuietTime,
skippedFullness,
)
}
if err := sender.SendActivity(pluginworker.BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{
"total_volumes": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)},
},
"selected_tasks": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))},
},
"max_results": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(maxResults)},
},
"has_more": {
Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: hasMore},
},
"skipped_already_ec": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedAlreadyEC)},
},
"skipped_too_small": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedTooSmall)},
},
"skipped_filtered": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedCollectionFilter)},
},
"skipped_not_quiet": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedQuietTime)},
},
"skipped_not_full": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedFullness)},
},
"quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
},
"min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
},
"fullness_threshold_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
},
})); err != nil {
return err
}
detailsEmitted := 0
for _, metric := range metrics {
if metric == nil || metric.IsECVolume {
continue
}
sizeMB := float64(metric.Size) / (1024 * 1024)
message := fmt.Sprintf(
"ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
metric.VolumeID,
sizeMB,
taskConfig.MinSizeMB,
metric.Age.Truncate(time.Minute),
quietThreshold.Truncate(time.Minute),
metric.FullnessRatio*100,
taskConfig.FullnessRatio*100,
)
if err := sender.SendActivity(pluginworker.BuildDetectorActivity("decision_volume", message, map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.VolumeID)},
},
"size_mb": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: sizeMB},
},
"required_min_size_mb": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
},
"age_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.Age.Seconds())},
},
"required_quiet_for_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
},
"fullness_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: metric.FullnessRatio * 100},
},
"required_fullness_percent": {
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
},
})); err != nil {
return err
}
detailsEmitted++
if detailsEmitted >= 3 {
break
}
}
return nil
}
func (h *ErasureCodingHandler) Execute(
ctx context.Context,
request *plugin_pb.ExecuteJobRequest,
sender pluginworker.ExecutionSender,
) error {
if request == nil || request.Job == nil {
return fmt.Errorf("execute request/job is nil")
}
if sender == nil {
return fmt.Errorf("execution sender is nil")
}
if request.Job.JobType != "" && request.Job.JobType != "erasure_coding" {
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.Job.JobType)
}
params, err := decodeErasureCodingTaskParams(request.Job)
if err != nil {
return err
}
applyErasureCodingExecutionDefaults(params, request.GetClusterContext(), h.workingDir)
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
return fmt.Errorf("erasure coding source node is required")
}
if len(params.Targets) == 0 {
return fmt.Errorf("erasure coding targets are required")
}
task := NewErasureCodingTask(
request.Job.JobId,
params.Sources[0].Node,
params.VolumeId,
params.Collection,
h.grpcDialOption,
)
execCtx, execCancel := context.WithCancel(ctx)
defer execCancel()
task.SetProgressCallback(func(progress float64, stage string) {
message := fmt.Sprintf("erasure coding progress %.0f%%", progress)
if strings.TrimSpace(stage) != "" {
message = stage
}
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_RUNNING,
ProgressPercent: progress,
Stage: stage,
Message: message,
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity(stage, message),
},
}); err != nil {
execCancel()
}
})
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
ProgressPercent: 0,
Stage: "assigned",
Message: "erasure coding job accepted",
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity("assigned", "erasure coding job accepted"),
},
}); err != nil {
return err
}
if err := task.Execute(execCtx, params); err != nil {
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
State: plugin_pb.JobState_JOB_STATE_FAILED,
ProgressPercent: 100,
Stage: "failed",
Message: err.Error(),
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity("failed", err.Error()),
},
})
return err
}
sourceNode := params.Sources[0].Node
resultSummary := fmt.Sprintf("erasure coding completed for volume %d across %d targets", params.VolumeId, len(params.Targets))
return sender.SendCompleted(&plugin_pb.JobCompleted{
JobId: request.Job.JobId,
JobType: request.Job.JobType,
Success: true,
Result: &plugin_pb.JobResult{
Summary: resultSummary,
OutputValues: map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(params.VolumeId)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"target_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
},
},
},
Activities: []*plugin_pb.ActivityEvent{
pluginworker.BuildExecutorActivity("completed", resultSummary),
},
})
}
func (h *ErasureCodingHandler) collectVolumeMetrics(
ctx context.Context,
masterAddresses []string,
collectionFilter string,
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) {
metrics, activeTopology, _, err := pluginworker.CollectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption)
return metrics, activeTopology, err
}
func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *erasureCodingWorkerConfig {
taskConfig := NewDefaultConfig()
quietForSeconds := pluginworker.ReadIntConfig(values, "quiet_for_seconds", taskConfig.QuietForSeconds)
if quietForSeconds < 0 {
quietForSeconds = 0
}
taskConfig.QuietForSeconds = quietForSeconds
fullnessRatio := pluginworker.ReadDoubleConfig(values, "fullness_ratio", taskConfig.FullnessRatio)
if fullnessRatio < 0 {
fullnessRatio = 0
}
if fullnessRatio > 1 {
fullnessRatio = 1
}
taskConfig.FullnessRatio = fullnessRatio
minSizeMB := pluginworker.ReadIntConfig(values, "min_size_mb", taskConfig.MinSizeMB)
if minSizeMB < 1 {
minSizeMB = 1
}
taskConfig.MinSizeMB = minSizeMB
taskConfig.PreferredTags = util.NormalizeTagList(pluginworker.ReadStringListConfig(values, "preferred_tags"))
return &erasureCodingWorkerConfig{
TaskConfig: taskConfig,
}
}
func buildErasureCodingProposal(
result *workertypes.TaskDetectionResult,
baseWorkingDir string,
) (*plugin_pb.JobProposal, error) {
if result == nil {
return nil, fmt.Errorf("task detection result is nil")
}
if result.TypedParams == nil {
return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID)
}
params := proto.Clone(result.TypedParams).(*worker_pb.TaskParams)
applyErasureCodingExecutionDefaults(params, nil, baseWorkingDir)
paramsPayload, err := proto.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshal task params: %w", err)
}
proposalID := strings.TrimSpace(result.TaskID)
if proposalID == "" {
proposalID = fmt.Sprintf("erasure-coding-%d-%d", result.VolumeID, time.Now().UnixNano())
}
dedupeKey := fmt.Sprintf("erasure_coding:%d", result.VolumeID)
if result.Collection != "" {
dedupeKey += ":" + result.Collection
}
sourceNode := ""
if len(params.Sources) > 0 {
sourceNode = strings.TrimSpace(params.Sources[0].Node)
}
summary := fmt.Sprintf("Erasure code volume %d", result.VolumeID)
if sourceNode != "" {
summary = fmt.Sprintf("Erasure code volume %d from %s", result.VolumeID, sourceNode)
}
// EC encoding reads the full volume, computes shards, and writes 14
// shards out to target nodes. Budget 10 min/GB (roughly 2x a plain copy)
// so the scheduler grants a deadline scaled to volume size.
volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1
estimatedRuntimeSeconds := volumeSizeGB * 10 * 60
return &plugin_pb.JobProposal{
ProposalId: proposalID,
DedupeKey: dedupeKey,
JobType: "erasure_coding",
Priority: pluginworker.MapTaskPriority(result.Priority),
Summary: summary,
Detail: strings.TrimSpace(result.Reason),
Parameters: map[string]*plugin_pb.ConfigValue{
"task_params_pb": {
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramsPayload},
},
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(result.VolumeID)},
},
"source_server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
},
"collection": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
},
"target_count": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
},
"estimated_runtime_seconds": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
},
},
Labels: map[string]string{
"task_type": "erasure_coding",
"volume_id": fmt.Sprintf("%d", result.VolumeID),
"collection": result.Collection,
"source_node": sourceNode,
"target_count": fmt.Sprintf("%d", len(params.Targets)),
},
}, nil
}
func decodeErasureCodingTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) {
if job == nil {
return nil, fmt.Errorf("job spec is nil")
}
if payload := pluginworker.ReadBytesConfig(job.Parameters, "task_params_pb"); len(payload) > 0 {
params := &worker_pb.TaskParams{}
if err := proto.Unmarshal(payload, params); err != nil {
return nil, fmt.Errorf("unmarshal task_params_pb: %w", err)
}
if params.TaskId == "" {
params.TaskId = job.JobId
}
return params, nil
}
volumeID := pluginworker.ReadUint32Config(job.Parameters, "volume_id", 0)
sourceNode := strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "source_server", ""))
if sourceNode == "" {
sourceNode = strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "server", ""))
}
targetServers := pluginworker.ReadStringListConfig(job.Parameters, "target_servers")
if len(targetServers) == 0 {
targetServers = pluginworker.ReadStringListConfig(job.Parameters, "targets")
}
collection := pluginworker.ReadStringConfig(job.Parameters, "collection", "")
dataShards := pluginworker.ReadInt32Config(job.Parameters, "data_shards", int32(ecstorage.DataShardsCount))
if dataShards <= 0 {
dataShards = int32(ecstorage.DataShardsCount)
}
parityShards := pluginworker.ReadInt32Config(job.Parameters, "parity_shards", int32(ecstorage.ParityShardsCount))
if parityShards <= 0 {
parityShards = int32(ecstorage.ParityShardsCount)
}
sourceDiskType := strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "source_disk_type", ""))
totalShards := int(dataShards + parityShards)
if volumeID == 0 {
return nil, fmt.Errorf("missing volume_id in job parameters")
}
if sourceNode == "" {
return nil, fmt.Errorf("missing source_server in job parameters")
}
if len(targetServers) == 0 {
return nil, fmt.Errorf("missing target_servers in job parameters")
}
if len(targetServers) < totalShards {
return nil, fmt.Errorf("insufficient target_servers: got %d, need at least %d", len(targetServers), totalShards)
}
shardAssignments := assignECShardIDs(totalShards, len(targetServers))
targets := make([]*worker_pb.TaskTarget, 0, len(targetServers))
for i := 0; i < len(targetServers); i++ {
targetNode := strings.TrimSpace(targetServers[i])
if targetNode == "" {
continue
}
targets = append(targets, &worker_pb.TaskTarget{
Node: targetNode,
VolumeId: volumeID,
ShardIds: shardAssignments[i],
})
}
if len(targets) < totalShards {
return nil, fmt.Errorf("insufficient non-empty target_servers after normalization: got %d, need at least %d", len(targets), totalShards)
}
return &worker_pb.TaskParams{
TaskId: job.JobId,
VolumeId: volumeID,
Collection: collection,
Sources: []*worker_pb.TaskSource{
{
Node: sourceNode,
VolumeId: volumeID,
},
},
Targets: targets,
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
ErasureCodingParams: &worker_pb.ErasureCodingTaskParams{
DataShards: dataShards,
ParityShards: parityShards,
SourceDiskType: sourceDiskType,
},
},
}, nil
}
func applyErasureCodingExecutionDefaults(
params *worker_pb.TaskParams,
clusterContext *plugin_pb.ClusterContext,
baseWorkingDir string,
) {
if params == nil {
return
}
ecParams := params.GetErasureCodingParams()
if ecParams == nil {
ecParams = &worker_pb.ErasureCodingTaskParams{
DataShards: ecstorage.DataShardsCount,
ParityShards: ecstorage.ParityShardsCount,
}
params.TaskParams = &worker_pb.TaskParams_ErasureCodingParams{ErasureCodingParams: ecParams}
}
if ecParams.DataShards <= 0 {
ecParams.DataShards = ecstorage.DataShardsCount
}
if ecParams.ParityShards <= 0 {
ecParams.ParityShards = ecstorage.ParityShardsCount
}
ecParams.WorkingDir = defaultErasureCodingWorkingDir(baseWorkingDir)
ecParams.CleanupSource = true
if strings.TrimSpace(ecParams.MasterClient) == "" && clusterContext != nil && len(clusterContext.MasterGrpcAddresses) > 0 {
ecParams.MasterClient = clusterContext.MasterGrpcAddresses[0]
}
totalShards := int(ecParams.DataShards + ecParams.ParityShards)
if totalShards <= 0 {
totalShards = ecstorage.TotalShardsCount
}
needsShardAssignment := false
for _, target := range params.Targets {
if target == nil || len(target.ShardIds) == 0 {
needsShardAssignment = true
break
}
}
if needsShardAssignment && len(params.Targets) > 0 {
assignments := assignECShardIDs(totalShards, len(params.Targets))
for i := 0; i < len(params.Targets); i++ {
if params.Targets[i] == nil {
continue
}
if len(params.Targets[i].ShardIds) == 0 {
params.Targets[i].ShardIds = assignments[i]
}
}
}
}
func assignECShardIDs(totalShards int, targetCount int) [][]uint32 {
if targetCount <= 0 {
return nil
}
if totalShards <= 0 {
totalShards = ecstorage.TotalShardsCount
}
assignments := make([][]uint32, targetCount)
for i := 0; i < totalShards; i++ {
targetIndex := i % targetCount
assignments[targetIndex] = append(assignments[targetIndex], uint32(i))
}
return assignments
}
func defaultErasureCodingWorkingDir(baseWorkingDir string) string {
dir := strings.TrimSpace(baseWorkingDir)
if dir == "" {
return filepath.Join(".", "erasure_coding")
}
return filepath.Join(dir, "erasure_coding")
}