mirror of
https://github.com/seaweedfs/seaweedfs.git
synced 2026-05-16 06:41:29 +00:00
* fix(ec): skip re-encode when EC shards already exist for the volume (#9448) When an earlier EC encoding succeeded but the post-encode source-delete left a regular replica behind on one of the servers, the next detection cycle proposes the same volume again. The new encode tries to redistribute shards to targets that already have them mounted, the volume server returns `ec volume %d is mounted; refusing overwrite`, the task fails, and detection re-queues the volume. The cycle repeats forever — issue #9448. The existing `metric.IsECVolume` skip catches the case where the canonical metric is reported on the EC-shard side of the heartbeat, but when the master sees BOTH a regular replica AND its EC shards in the same volume list, the canonical metric we pick is the regular replica and IsECVolume is false. Add a second guard that checks the topology directly via `findExistingECShards` (already present and indexed) and skip the volume when any shards exist, logging a warning that points the admin at the stuck source. This breaks the loop. Auto-cleanup of the orphaned replica is left as follow-up work — deleting a source replica from inside the detector is only safe with a re-verification step right before the delete, plus a config opt-in, and is best done in its own change. * fix(ec): #9448 guard only fires when EC shard set is complete The first version of the #9448 guard tripped on `len(existingShards) > 0`, which is broader than necessary. The existing recovery branch in the encode arm (around the `existingECShards` block, ~line 216) is designed to fold partial leftover shards from a previously failed encode into the new task as cleanup sources. Skipping unconditionally on any existing shards made that branch dead code, regressing the recovery behavior Gemini flagged in the review ofaf09e1ec7. Two corrections: 1. New helper `countExistingEcShardsForVolume` walks each disk's `EcIndexBits` bitmap and ORs the results into a `ShardBits`, returning the distinct-shard popcount. This is the right unit: a single `VolumeEcShardInformationMessage` can carry several shards, so `len(EcShardInfos)` is not the same as the number of present shards. Per Gemini's "use helper functions that walk the actual shard bitmap" note. 2. The guard now fires only when `shardCount >= totalShards`. Partial shard sets fall through to the existing recovery branch, unchanged. Tests: - TestDetectionSkipsWhenECShardsAlreadyExist: complete shards → no proposal (the regression test for #9448 itself, unchanged intent, rewritten on top of new helpers). - TestDetectionAllowsRegularReplicaWhenShardsPartial: partial shards → guard does NOT swallow the volume; the encode arm still gets a chance. - TestCountExistingEcShardsForVolume: the helper walks the bitmap correctly even when one info entry packs multiple shards on one disk. The dangerous `volume.delete` hint in the warning is unchanged for now — it gets fixed in the next commit. * fix(ec): drop dangerous shell-command hint from #9448 warning The previous warning told operators to run `volume.delete -volumeId=%d` in the SeaweedFS shell to clean up the orphaned source replica. That command is cluster-wide — it deletes every replica of the volume, including the EC shards, which share the same volume id. Running it in the state the message describes would cause the data loss the guard exists to prevent. Replace it with explicit guidance that the cleanup must be a targeted VolumeDelete RPC against the source server only, and that the shell command is the exact wrong thing to use here. The next two commits add the plumbing and the auto-execution of that targeted delete so most operators never see this hint at all. Per Gemini comment onaf09e1ec7. * feat(worker): plumb grpc dial option through ClusterInfo Add ClusterInfo.GrpcDialOption (optional) and set it in the erasure_coding plugin handler. Lets the detector make targeted gRPC calls during detection — used by the follow-up commit to auto-clean orphan source replicas via VolumeDelete RPCs. Zero-value safe: existing detectors that don't need RPC access get a nil DialOption and ignore the field. * feat(ec): auto-clean orphan source replica via targeted VolumeDelete Builds on the previous commits: the guard now identifies the #9448 stuck-source state and a gRPC dial option is available on ClusterInfo. When both are true, detection auto-cleans the orphaned regular replica instead of just warning the operator. New helper `cleanupOrphanSourceReplicas`: 1. Re-verifies the EC shard set is still complete via `countExistingEcShardsForVolume` against the live topology snapshot. If the count dropped between detection start and the cleanup decision (a volume server going down mid-cycle), it aborts — the source replica is the only complete copy and deleting it without a healthy shard set would be data loss. 2. Issues targeted VolumeDelete RPCs to each regular-replica server via `operation.WithVolumeServerClient`. That RPC only touches the regular volume on the targeted server; EC shards live in a separate store path and are not affected. This is the safe alternative to the cluster-wide `volume.delete` shell command we previously warned against. If the cleanup partially fails (one replica delete errors, others succeed), detection logs the failure and continues to skip the volume. The next detection cycle will try again. We deliberately don't fall back to a re-encode because that would just collide with the mounted shards on the targets again. When no dial option is available the existing warning still points operators at the safe manual procedure.
847 lines
27 KiB
Go
847 lines
27 KiB
Go
package erasure_coding
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
|
|
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
|
|
ecstorage "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
|
|
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
func init() {
|
|
pluginworker.RegisterHandler(pluginworker.HandlerFactory{
|
|
JobType: "erasure_coding",
|
|
Category: pluginworker.CategoryHeavy,
|
|
Aliases: []string{"erasure-coding", "erasure.coding", "ec"},
|
|
Build: func(opts pluginworker.HandlerBuildOptions) (pluginworker.JobHandler, error) {
|
|
return NewErasureCodingHandler(opts.GrpcDialOption, opts.WorkingDir), nil
|
|
},
|
|
})
|
|
}
|
|
|
|
type erasureCodingWorkerConfig struct {
|
|
TaskConfig *Config
|
|
}
|
|
|
|
// ErasureCodingHandler is the plugin job handler for erasure coding.
|
|
type ErasureCodingHandler struct {
|
|
grpcDialOption grpc.DialOption
|
|
workingDir string
|
|
}
|
|
|
|
func NewErasureCodingHandler(grpcDialOption grpc.DialOption, workingDir string) *ErasureCodingHandler {
|
|
return &ErasureCodingHandler{grpcDialOption: grpcDialOption, workingDir: strings.TrimSpace(workingDir)}
|
|
}
|
|
|
|
func (h *ErasureCodingHandler) Capability() *plugin_pb.JobTypeCapability {
|
|
return &plugin_pb.JobTypeCapability{
|
|
JobType: "erasure_coding",
|
|
CanDetect: true,
|
|
CanExecute: true,
|
|
MaxDetectionConcurrency: 1,
|
|
MaxExecutionConcurrency: 1,
|
|
DisplayName: "EC Encoding",
|
|
Description: "Converts full and quiet volumes into EC shards",
|
|
Weight: 80,
|
|
}
|
|
}
|
|
|
|
func (h *ErasureCodingHandler) Descriptor() *plugin_pb.JobTypeDescriptor {
|
|
return &plugin_pb.JobTypeDescriptor{
|
|
JobType: "erasure_coding",
|
|
DisplayName: "EC Encoding",
|
|
Description: "Detect and execute erasure coding for suitable volumes",
|
|
Icon: "fas fa-shield-alt",
|
|
DescriptorVersion: 1,
|
|
AdminConfigForm: &plugin_pb.ConfigForm{
|
|
FormId: "erasure-coding-admin",
|
|
Title: "Erasure Coding Admin Config",
|
|
Description: "Admin-side controls for erasure coding detection scope.",
|
|
Sections: []*plugin_pb.ConfigSection{
|
|
{
|
|
SectionId: "scope",
|
|
Title: "Scope",
|
|
Description: "Optional filters applied before erasure coding detection.",
|
|
Fields: []*plugin_pb.ConfigField{
|
|
{
|
|
Name: "collection_filter",
|
|
Label: "Collection Filter",
|
|
Description: "Only detect erasure coding opportunities in this collection when set.",
|
|
Placeholder: "all collections",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
DefaultValues: map[string]*plugin_pb.ConfigValue{
|
|
"collection_filter": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
|
|
},
|
|
},
|
|
},
|
|
WorkerConfigForm: &plugin_pb.ConfigForm{
|
|
FormId: "erasure-coding-worker",
|
|
Title: "Erasure Coding Worker Config",
|
|
Description: "Worker-side detection thresholds.",
|
|
Sections: []*plugin_pb.ConfigSection{
|
|
{
|
|
SectionId: "thresholds",
|
|
Title: "Detection Thresholds",
|
|
Description: "Controls for when erasure coding jobs should be proposed.",
|
|
Fields: []*plugin_pb.ConfigField{
|
|
{
|
|
Name: "quiet_for_seconds",
|
|
Label: "Quiet Period (s)",
|
|
Description: "Volume must remain unmodified for at least this duration.",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
|
Required: true,
|
|
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
|
|
},
|
|
{
|
|
Name: "fullness_ratio",
|
|
Label: "Fullness Ratio",
|
|
Description: "Minimum volume fullness ratio to trigger erasure coding.",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_DOUBLE,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
|
Required: true,
|
|
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0}},
|
|
MaxValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 1}},
|
|
},
|
|
{
|
|
Name: "min_size_mb",
|
|
Label: "Minimum Volume Size (MB)",
|
|
Description: "Only volumes larger than this size are considered.",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
|
|
Required: true,
|
|
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
|
|
},
|
|
{
|
|
Name: "preferred_tags",
|
|
Label: "Preferred Tags",
|
|
Description: "Comma-separated disk tags to prioritize for EC shard placement, ordered by preference.",
|
|
Placeholder: "fast,ssd",
|
|
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
|
|
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
DefaultValues: map[string]*plugin_pb.ConfigValue{
|
|
"quiet_for_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300},
|
|
},
|
|
"fullness_ratio": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8},
|
|
},
|
|
"min_size_mb": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
|
|
},
|
|
"preferred_tags": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
|
|
},
|
|
},
|
|
},
|
|
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
|
|
Enabled: true,
|
|
DetectionIntervalMinutes: 17,
|
|
DetectionTimeoutSeconds: 300,
|
|
MaxJobsPerDetection: 500,
|
|
GlobalExecutionConcurrency: 16,
|
|
PerWorkerExecutionConcurrency: 4,
|
|
RetryLimit: 1,
|
|
RetryBackoffSeconds: 30,
|
|
JobTypeMaxRuntimeSeconds: 1800,
|
|
ExecutionTimeoutSeconds: 1800,
|
|
},
|
|
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
|
|
"quiet_for_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 300},
|
|
},
|
|
"fullness_ratio": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: 0.8},
|
|
},
|
|
"min_size_mb": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 30},
|
|
},
|
|
"preferred_tags": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (h *ErasureCodingHandler) Detect(
|
|
ctx context.Context,
|
|
request *plugin_pb.RunDetectionRequest,
|
|
sender pluginworker.DetectionSender,
|
|
) error {
|
|
if request == nil {
|
|
return fmt.Errorf("run detection request is nil")
|
|
}
|
|
if sender == nil {
|
|
return fmt.Errorf("detection sender is nil")
|
|
}
|
|
if request.JobType != "" && request.JobType != "erasure_coding" {
|
|
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.JobType)
|
|
}
|
|
|
|
workerConfig := deriveErasureCodingWorkerConfig(request.GetWorkerConfigValues())
|
|
|
|
collectionFilter := strings.TrimSpace(pluginworker.ReadStringConfig(request.GetAdminConfigValues(), "collection_filter", ""))
|
|
if collectionFilter != "" {
|
|
workerConfig.TaskConfig.CollectionFilter = collectionFilter
|
|
}
|
|
|
|
masters := make([]string, 0)
|
|
if request.ClusterContext != nil {
|
|
masters = append(masters, request.ClusterContext.MasterGrpcAddresses...)
|
|
}
|
|
|
|
metrics, activeTopology, err := h.collectVolumeMetrics(ctx, masters, collectionFilter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
clusterInfo := &workertypes.ClusterInfo{ActiveTopology: activeTopology, GrpcDialOption: h.grpcDialOption}
|
|
maxResults := int(request.MaxResults)
|
|
if maxResults < 0 {
|
|
maxResults = 0
|
|
}
|
|
results, hasMore, err := Detection(ctx, metrics, clusterInfo, workerConfig.TaskConfig, maxResults)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if traceErr := emitErasureCodingDetectionDecisionTrace(sender, metrics, workerConfig.TaskConfig, results, maxResults, hasMore); traceErr != nil {
|
|
glog.Warningf("Plugin worker failed to emit erasure_coding detection trace: %v", traceErr)
|
|
}
|
|
|
|
proposals := make([]*plugin_pb.JobProposal, 0, len(results))
|
|
for _, result := range results {
|
|
proposal, proposalErr := buildErasureCodingProposal(result, h.workingDir)
|
|
if proposalErr != nil {
|
|
glog.Warningf("Plugin worker skip invalid erasure_coding proposal: %v", proposalErr)
|
|
continue
|
|
}
|
|
proposals = append(proposals, proposal)
|
|
}
|
|
|
|
if err := sender.SendProposals(&plugin_pb.DetectionProposals{
|
|
JobType: "erasure_coding",
|
|
Proposals: proposals,
|
|
HasMore: hasMore,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return sender.SendComplete(&plugin_pb.DetectionComplete{
|
|
JobType: "erasure_coding",
|
|
Success: true,
|
|
TotalProposals: int32(len(proposals)),
|
|
})
|
|
}
|
|
|
|
func emitErasureCodingDetectionDecisionTrace(
|
|
sender pluginworker.DetectionSender,
|
|
metrics []*workertypes.VolumeHealthMetrics,
|
|
taskConfig *Config,
|
|
results []*workertypes.TaskDetectionResult,
|
|
maxResults int,
|
|
hasMore bool,
|
|
) error {
|
|
if sender == nil || taskConfig == nil {
|
|
return nil
|
|
}
|
|
|
|
quietThreshold := time.Duration(taskConfig.QuietForSeconds) * time.Second
|
|
minSizeBytes := uint64(taskConfig.MinSizeMB) * 1024 * 1024
|
|
allowedCollections := wildcard.CompileWildcardMatchers(taskConfig.CollectionFilter)
|
|
|
|
volumeGroups := make(map[uint32][]*workertypes.VolumeHealthMetrics)
|
|
for _, metric := range metrics {
|
|
if metric == nil {
|
|
continue
|
|
}
|
|
volumeGroups[metric.VolumeID] = append(volumeGroups[metric.VolumeID], metric)
|
|
}
|
|
|
|
skippedAlreadyEC := 0
|
|
skippedTooSmall := 0
|
|
skippedCollectionFilter := 0
|
|
skippedQuietTime := 0
|
|
skippedFullness := 0
|
|
|
|
for _, groupMetrics := range volumeGroups {
|
|
if len(groupMetrics) == 0 {
|
|
continue
|
|
}
|
|
metric := groupMetrics[0]
|
|
for _, candidate := range groupMetrics {
|
|
if candidate != nil && candidate.Server < metric.Server {
|
|
metric = candidate
|
|
}
|
|
}
|
|
if metric == nil {
|
|
continue
|
|
}
|
|
|
|
if metric.IsECVolume {
|
|
skippedAlreadyEC++
|
|
continue
|
|
}
|
|
if metric.Size < minSizeBytes {
|
|
skippedTooSmall++
|
|
continue
|
|
}
|
|
if len(allowedCollections) > 0 && !wildcard.MatchesAnyWildcard(allowedCollections, metric.Collection) {
|
|
skippedCollectionFilter++
|
|
continue
|
|
}
|
|
if metric.Age < quietThreshold {
|
|
skippedQuietTime++
|
|
continue
|
|
}
|
|
if metric.FullnessRatio < taskConfig.FullnessRatio {
|
|
skippedFullness++
|
|
continue
|
|
}
|
|
}
|
|
|
|
totalVolumes := len(metrics)
|
|
summarySuffix := ""
|
|
if hasMore {
|
|
summarySuffix = fmt.Sprintf(" (max_results=%d reached; remaining volumes not evaluated)", maxResults)
|
|
}
|
|
summaryMessage := ""
|
|
if len(results) == 0 {
|
|
summaryMessage = fmt.Sprintf(
|
|
"EC detection: No tasks created for %d volumes%s (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
|
|
totalVolumes,
|
|
summarySuffix,
|
|
skippedAlreadyEC,
|
|
skippedTooSmall,
|
|
skippedCollectionFilter,
|
|
skippedQuietTime,
|
|
skippedFullness,
|
|
)
|
|
} else {
|
|
summaryMessage = fmt.Sprintf(
|
|
"EC detection: Created %d task(s)%s from %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
|
|
len(results),
|
|
summarySuffix,
|
|
totalVolumes,
|
|
skippedAlreadyEC,
|
|
skippedTooSmall,
|
|
skippedCollectionFilter,
|
|
skippedQuietTime,
|
|
skippedFullness,
|
|
)
|
|
}
|
|
|
|
if err := sender.SendActivity(pluginworker.BuildDetectorActivity("decision_summary", summaryMessage, map[string]*plugin_pb.ConfigValue{
|
|
"total_volumes": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(totalVolumes)},
|
|
},
|
|
"selected_tasks": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(results))},
|
|
},
|
|
"max_results": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(maxResults)},
|
|
},
|
|
"has_more": {
|
|
Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: hasMore},
|
|
},
|
|
"skipped_already_ec": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedAlreadyEC)},
|
|
},
|
|
"skipped_too_small": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedTooSmall)},
|
|
},
|
|
"skipped_filtered": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedCollectionFilter)},
|
|
},
|
|
"skipped_not_quiet": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedQuietTime)},
|
|
},
|
|
"skipped_not_full": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(skippedFullness)},
|
|
},
|
|
"quiet_for_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
|
|
},
|
|
"min_size_mb": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
|
|
},
|
|
"fullness_threshold_percent": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
|
|
},
|
|
})); err != nil {
|
|
return err
|
|
}
|
|
|
|
detailsEmitted := 0
|
|
for _, metric := range metrics {
|
|
if metric == nil || metric.IsECVolume {
|
|
continue
|
|
}
|
|
sizeMB := float64(metric.Size) / (1024 * 1024)
|
|
message := fmt.Sprintf(
|
|
"ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
|
|
metric.VolumeID,
|
|
sizeMB,
|
|
taskConfig.MinSizeMB,
|
|
metric.Age.Truncate(time.Minute),
|
|
quietThreshold.Truncate(time.Minute),
|
|
metric.FullnessRatio*100,
|
|
taskConfig.FullnessRatio*100,
|
|
)
|
|
if err := sender.SendActivity(pluginworker.BuildDetectorActivity("decision_volume", message, map[string]*plugin_pb.ConfigValue{
|
|
"volume_id": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.VolumeID)},
|
|
},
|
|
"size_mb": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: sizeMB},
|
|
},
|
|
"required_min_size_mb": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.MinSizeMB)},
|
|
},
|
|
"age_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(metric.Age.Seconds())},
|
|
},
|
|
"required_quiet_for_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(taskConfig.QuietForSeconds)},
|
|
},
|
|
"fullness_percent": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: metric.FullnessRatio * 100},
|
|
},
|
|
"required_fullness_percent": {
|
|
Kind: &plugin_pb.ConfigValue_DoubleValue{DoubleValue: taskConfig.FullnessRatio * 100},
|
|
},
|
|
})); err != nil {
|
|
return err
|
|
}
|
|
detailsEmitted++
|
|
if detailsEmitted >= 3 {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *ErasureCodingHandler) Execute(
|
|
ctx context.Context,
|
|
request *plugin_pb.ExecuteJobRequest,
|
|
sender pluginworker.ExecutionSender,
|
|
) error {
|
|
if request == nil || request.Job == nil {
|
|
return fmt.Errorf("execute request/job is nil")
|
|
}
|
|
if sender == nil {
|
|
return fmt.Errorf("execution sender is nil")
|
|
}
|
|
if request.Job.JobType != "" && request.Job.JobType != "erasure_coding" {
|
|
return fmt.Errorf("job type %q is not handled by erasure_coding worker", request.Job.JobType)
|
|
}
|
|
|
|
params, err := decodeErasureCodingTaskParams(request.Job)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
applyErasureCodingExecutionDefaults(params, request.GetClusterContext(), h.workingDir)
|
|
|
|
if len(params.Sources) == 0 || strings.TrimSpace(params.Sources[0].Node) == "" {
|
|
return fmt.Errorf("erasure coding source node is required")
|
|
}
|
|
if len(params.Targets) == 0 {
|
|
return fmt.Errorf("erasure coding targets are required")
|
|
}
|
|
|
|
task := NewErasureCodingTask(
|
|
request.Job.JobId,
|
|
params.Sources[0].Node,
|
|
params.VolumeId,
|
|
params.Collection,
|
|
h.grpcDialOption,
|
|
)
|
|
execCtx, execCancel := context.WithCancel(ctx)
|
|
defer execCancel()
|
|
task.SetProgressCallback(func(progress float64, stage string) {
|
|
message := fmt.Sprintf("erasure coding progress %.0f%%", progress)
|
|
if strings.TrimSpace(stage) != "" {
|
|
message = stage
|
|
}
|
|
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
State: plugin_pb.JobState_JOB_STATE_RUNNING,
|
|
ProgressPercent: progress,
|
|
Stage: stage,
|
|
Message: message,
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
pluginworker.BuildExecutorActivity(stage, message),
|
|
},
|
|
}); err != nil {
|
|
execCancel()
|
|
}
|
|
})
|
|
|
|
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
State: plugin_pb.JobState_JOB_STATE_ASSIGNED,
|
|
ProgressPercent: 0,
|
|
Stage: "assigned",
|
|
Message: "erasure coding job accepted",
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
pluginworker.BuildExecutorActivity("assigned", "erasure coding job accepted"),
|
|
},
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := task.Execute(execCtx, params); err != nil {
|
|
_ = sender.SendProgress(&plugin_pb.JobProgressUpdate{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
State: plugin_pb.JobState_JOB_STATE_FAILED,
|
|
ProgressPercent: 100,
|
|
Stage: "failed",
|
|
Message: err.Error(),
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
pluginworker.BuildExecutorActivity("failed", err.Error()),
|
|
},
|
|
})
|
|
return err
|
|
}
|
|
|
|
sourceNode := params.Sources[0].Node
|
|
resultSummary := fmt.Sprintf("erasure coding completed for volume %d across %d targets", params.VolumeId, len(params.Targets))
|
|
|
|
return sender.SendCompleted(&plugin_pb.JobCompleted{
|
|
JobId: request.Job.JobId,
|
|
JobType: request.Job.JobType,
|
|
Success: true,
|
|
Result: &plugin_pb.JobResult{
|
|
Summary: resultSummary,
|
|
OutputValues: map[string]*plugin_pb.ConfigValue{
|
|
"volume_id": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(params.VolumeId)},
|
|
},
|
|
"source_server": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
|
|
},
|
|
"target_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
|
|
},
|
|
},
|
|
},
|
|
Activities: []*plugin_pb.ActivityEvent{
|
|
pluginworker.BuildExecutorActivity("completed", resultSummary),
|
|
},
|
|
})
|
|
}
|
|
|
|
func (h *ErasureCodingHandler) collectVolumeMetrics(
|
|
ctx context.Context,
|
|
masterAddresses []string,
|
|
collectionFilter string,
|
|
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, error) {
|
|
metrics, activeTopology, _, err := pluginworker.CollectVolumeMetricsFromMasters(ctx, masterAddresses, collectionFilter, h.grpcDialOption)
|
|
return metrics, activeTopology, err
|
|
}
|
|
|
|
func deriveErasureCodingWorkerConfig(values map[string]*plugin_pb.ConfigValue) *erasureCodingWorkerConfig {
|
|
taskConfig := NewDefaultConfig()
|
|
|
|
quietForSeconds := pluginworker.ReadIntConfig(values, "quiet_for_seconds", taskConfig.QuietForSeconds)
|
|
if quietForSeconds < 0 {
|
|
quietForSeconds = 0
|
|
}
|
|
taskConfig.QuietForSeconds = quietForSeconds
|
|
|
|
fullnessRatio := pluginworker.ReadDoubleConfig(values, "fullness_ratio", taskConfig.FullnessRatio)
|
|
if fullnessRatio < 0 {
|
|
fullnessRatio = 0
|
|
}
|
|
if fullnessRatio > 1 {
|
|
fullnessRatio = 1
|
|
}
|
|
taskConfig.FullnessRatio = fullnessRatio
|
|
|
|
minSizeMB := pluginworker.ReadIntConfig(values, "min_size_mb", taskConfig.MinSizeMB)
|
|
if minSizeMB < 1 {
|
|
minSizeMB = 1
|
|
}
|
|
taskConfig.MinSizeMB = minSizeMB
|
|
|
|
taskConfig.PreferredTags = util.NormalizeTagList(pluginworker.ReadStringListConfig(values, "preferred_tags"))
|
|
|
|
return &erasureCodingWorkerConfig{
|
|
TaskConfig: taskConfig,
|
|
}
|
|
}
|
|
|
|
func buildErasureCodingProposal(
|
|
result *workertypes.TaskDetectionResult,
|
|
baseWorkingDir string,
|
|
) (*plugin_pb.JobProposal, error) {
|
|
if result == nil {
|
|
return nil, fmt.Errorf("task detection result is nil")
|
|
}
|
|
if result.TypedParams == nil {
|
|
return nil, fmt.Errorf("missing typed params for volume %d", result.VolumeID)
|
|
}
|
|
params := proto.Clone(result.TypedParams).(*worker_pb.TaskParams)
|
|
applyErasureCodingExecutionDefaults(params, nil, baseWorkingDir)
|
|
|
|
paramsPayload, err := proto.Marshal(params)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshal task params: %w", err)
|
|
}
|
|
|
|
proposalID := strings.TrimSpace(result.TaskID)
|
|
if proposalID == "" {
|
|
proposalID = fmt.Sprintf("erasure-coding-%d-%d", result.VolumeID, time.Now().UnixNano())
|
|
}
|
|
|
|
dedupeKey := fmt.Sprintf("erasure_coding:%d", result.VolumeID)
|
|
if result.Collection != "" {
|
|
dedupeKey += ":" + result.Collection
|
|
}
|
|
|
|
sourceNode := ""
|
|
if len(params.Sources) > 0 {
|
|
sourceNode = strings.TrimSpace(params.Sources[0].Node)
|
|
}
|
|
|
|
summary := fmt.Sprintf("Erasure code volume %d", result.VolumeID)
|
|
if sourceNode != "" {
|
|
summary = fmt.Sprintf("Erasure code volume %d from %s", result.VolumeID, sourceNode)
|
|
}
|
|
|
|
// EC encoding reads the full volume, computes shards, and writes 14
|
|
// shards out to target nodes. Budget 10 min/GB (roughly 2x a plain copy)
|
|
// so the scheduler grants a deadline scaled to volume size.
|
|
volumeSizeGB := int64(result.TypedParams.VolumeSize/1024/1024/1024) + 1
|
|
estimatedRuntimeSeconds := volumeSizeGB * 10 * 60
|
|
|
|
return &plugin_pb.JobProposal{
|
|
ProposalId: proposalID,
|
|
DedupeKey: dedupeKey,
|
|
JobType: "erasure_coding",
|
|
Priority: pluginworker.MapTaskPriority(result.Priority),
|
|
Summary: summary,
|
|
Detail: strings.TrimSpace(result.Reason),
|
|
Parameters: map[string]*plugin_pb.ConfigValue{
|
|
"task_params_pb": {
|
|
Kind: &plugin_pb.ConfigValue_BytesValue{BytesValue: paramsPayload},
|
|
},
|
|
"volume_id": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(result.VolumeID)},
|
|
},
|
|
"source_server": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: sourceNode},
|
|
},
|
|
"collection": {
|
|
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: result.Collection},
|
|
},
|
|
"target_count": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(len(params.Targets))},
|
|
},
|
|
"estimated_runtime_seconds": {
|
|
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: estimatedRuntimeSeconds},
|
|
},
|
|
},
|
|
Labels: map[string]string{
|
|
"task_type": "erasure_coding",
|
|
"volume_id": fmt.Sprintf("%d", result.VolumeID),
|
|
"collection": result.Collection,
|
|
"source_node": sourceNode,
|
|
"target_count": fmt.Sprintf("%d", len(params.Targets)),
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func decodeErasureCodingTaskParams(job *plugin_pb.JobSpec) (*worker_pb.TaskParams, error) {
|
|
if job == nil {
|
|
return nil, fmt.Errorf("job spec is nil")
|
|
}
|
|
|
|
if payload := pluginworker.ReadBytesConfig(job.Parameters, "task_params_pb"); len(payload) > 0 {
|
|
params := &worker_pb.TaskParams{}
|
|
if err := proto.Unmarshal(payload, params); err != nil {
|
|
return nil, fmt.Errorf("unmarshal task_params_pb: %w", err)
|
|
}
|
|
if params.TaskId == "" {
|
|
params.TaskId = job.JobId
|
|
}
|
|
return params, nil
|
|
}
|
|
|
|
volumeID := pluginworker.ReadUint32Config(job.Parameters, "volume_id", 0)
|
|
sourceNode := strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "source_server", ""))
|
|
if sourceNode == "" {
|
|
sourceNode = strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "server", ""))
|
|
}
|
|
targetServers := pluginworker.ReadStringListConfig(job.Parameters, "target_servers")
|
|
if len(targetServers) == 0 {
|
|
targetServers = pluginworker.ReadStringListConfig(job.Parameters, "targets")
|
|
}
|
|
collection := pluginworker.ReadStringConfig(job.Parameters, "collection", "")
|
|
|
|
dataShards := pluginworker.ReadInt32Config(job.Parameters, "data_shards", int32(ecstorage.DataShardsCount))
|
|
if dataShards <= 0 {
|
|
dataShards = int32(ecstorage.DataShardsCount)
|
|
}
|
|
parityShards := pluginworker.ReadInt32Config(job.Parameters, "parity_shards", int32(ecstorage.ParityShardsCount))
|
|
if parityShards <= 0 {
|
|
parityShards = int32(ecstorage.ParityShardsCount)
|
|
}
|
|
sourceDiskType := strings.TrimSpace(pluginworker.ReadStringConfig(job.Parameters, "source_disk_type", ""))
|
|
totalShards := int(dataShards + parityShards)
|
|
|
|
if volumeID == 0 {
|
|
return nil, fmt.Errorf("missing volume_id in job parameters")
|
|
}
|
|
if sourceNode == "" {
|
|
return nil, fmt.Errorf("missing source_server in job parameters")
|
|
}
|
|
if len(targetServers) == 0 {
|
|
return nil, fmt.Errorf("missing target_servers in job parameters")
|
|
}
|
|
if len(targetServers) < totalShards {
|
|
return nil, fmt.Errorf("insufficient target_servers: got %d, need at least %d", len(targetServers), totalShards)
|
|
}
|
|
|
|
shardAssignments := assignECShardIDs(totalShards, len(targetServers))
|
|
targets := make([]*worker_pb.TaskTarget, 0, len(targetServers))
|
|
for i := 0; i < len(targetServers); i++ {
|
|
targetNode := strings.TrimSpace(targetServers[i])
|
|
if targetNode == "" {
|
|
continue
|
|
}
|
|
targets = append(targets, &worker_pb.TaskTarget{
|
|
Node: targetNode,
|
|
VolumeId: volumeID,
|
|
ShardIds: shardAssignments[i],
|
|
})
|
|
}
|
|
if len(targets) < totalShards {
|
|
return nil, fmt.Errorf("insufficient non-empty target_servers after normalization: got %d, need at least %d", len(targets), totalShards)
|
|
}
|
|
|
|
return &worker_pb.TaskParams{
|
|
TaskId: job.JobId,
|
|
VolumeId: volumeID,
|
|
Collection: collection,
|
|
Sources: []*worker_pb.TaskSource{
|
|
{
|
|
Node: sourceNode,
|
|
VolumeId: volumeID,
|
|
},
|
|
},
|
|
Targets: targets,
|
|
TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
|
|
ErasureCodingParams: &worker_pb.ErasureCodingTaskParams{
|
|
DataShards: dataShards,
|
|
ParityShards: parityShards,
|
|
SourceDiskType: sourceDiskType,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func applyErasureCodingExecutionDefaults(
|
|
params *worker_pb.TaskParams,
|
|
clusterContext *plugin_pb.ClusterContext,
|
|
baseWorkingDir string,
|
|
) {
|
|
if params == nil {
|
|
return
|
|
}
|
|
|
|
ecParams := params.GetErasureCodingParams()
|
|
if ecParams == nil {
|
|
ecParams = &worker_pb.ErasureCodingTaskParams{
|
|
DataShards: ecstorage.DataShardsCount,
|
|
ParityShards: ecstorage.ParityShardsCount,
|
|
}
|
|
params.TaskParams = &worker_pb.TaskParams_ErasureCodingParams{ErasureCodingParams: ecParams}
|
|
}
|
|
|
|
if ecParams.DataShards <= 0 {
|
|
ecParams.DataShards = ecstorage.DataShardsCount
|
|
}
|
|
if ecParams.ParityShards <= 0 {
|
|
ecParams.ParityShards = ecstorage.ParityShardsCount
|
|
}
|
|
ecParams.WorkingDir = defaultErasureCodingWorkingDir(baseWorkingDir)
|
|
ecParams.CleanupSource = true
|
|
if strings.TrimSpace(ecParams.MasterClient) == "" && clusterContext != nil && len(clusterContext.MasterGrpcAddresses) > 0 {
|
|
ecParams.MasterClient = clusterContext.MasterGrpcAddresses[0]
|
|
}
|
|
|
|
totalShards := int(ecParams.DataShards + ecParams.ParityShards)
|
|
if totalShards <= 0 {
|
|
totalShards = ecstorage.TotalShardsCount
|
|
}
|
|
needsShardAssignment := false
|
|
for _, target := range params.Targets {
|
|
if target == nil || len(target.ShardIds) == 0 {
|
|
needsShardAssignment = true
|
|
break
|
|
}
|
|
}
|
|
if needsShardAssignment && len(params.Targets) > 0 {
|
|
assignments := assignECShardIDs(totalShards, len(params.Targets))
|
|
for i := 0; i < len(params.Targets); i++ {
|
|
if params.Targets[i] == nil {
|
|
continue
|
|
}
|
|
if len(params.Targets[i].ShardIds) == 0 {
|
|
params.Targets[i].ShardIds = assignments[i]
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func assignECShardIDs(totalShards int, targetCount int) [][]uint32 {
|
|
if targetCount <= 0 {
|
|
return nil
|
|
}
|
|
if totalShards <= 0 {
|
|
totalShards = ecstorage.TotalShardsCount
|
|
}
|
|
|
|
assignments := make([][]uint32, targetCount)
|
|
for i := 0; i < totalShards; i++ {
|
|
targetIndex := i % targetCount
|
|
assignments[targetIndex] = append(assignments[targetIndex], uint32(i))
|
|
}
|
|
return assignments
|
|
}
|
|
|
|
func defaultErasureCodingWorkingDir(baseWorkingDir string) string {
|
|
dir := strings.TrimSpace(baseWorkingDir)
|
|
if dir == "" {
|
|
return filepath.Join(".", "erasure_coding")
|
|
}
|
|
return filepath.Join(dir, "erasure_coding")
|
|
}
|