Files
seaweedfs/weed/pb/worker.proto
Chris Lu 0566fbd552 EC encode: place shards via ecbalancer.Place + configurable replica placement (#9623)
* Add shared super_block.ResolveReplicaPlacement; use it in ec_balance

* Add ecbalancer.FromActiveTopology snapshot constructor for EC encode/repair

* Add ecbalancer.Place greenfield/repair placement core (strict + durability-first)

* topology: add GetEffectiveAvailableEcShardSlots; FromActiveTopology uses shard-granular free slots

GetDisksWithEffectiveCapacity flattens reserved shard slots into volume slots via
integer truncation, so an in-flight EC task reserving a non-multiple-of-
DataShardsCount number of shards was lost from the snapshot and freeSlots was
over-reported. GetEffectiveAvailableEcShardSlots subtracts the full reservation
impact at shard granularity.

* ecbalancer.Place: reject nodes without a free disk of the requested type

FromActiveTopology keeps all disk types in the snapshot, so an SSD-only request
could be routed to a node with only HDD capacity (pickBestDiskOnNode then returns
disk 0 on the wrong tier). Filter rack/node selection to those with a free disk
of the requested type.

* ecbalancer.Place: enforce ReplicaPlacement DiffDataCenterCount (per-DC shard cap)

* ecbalancer: enforce DiffDataCenterCount in balance (cross-DC phase + cross-rack DC cap)

Adds a cross-DC corrective phase that drains data centers holding more than
DiffDataCenterCount shards of a volume, and a per-DC cap on cross-rack move
targets. Both are no-ops when DiffDataCenterCount is unset, so balance output is
unchanged for non-DC placements.

* topology: ratio-aware EC shard slots and provisional empty-disk slot

GetEffectiveAvailableEcShardSlots now takes the target collection's data-shard
count, so a 4+2 volume's larger shards are not over-counted at 10 per volume slot;
and it keeps the one provisional slot for freshly started empty servers that
report max=0, matching getEffectiveAvailableCapacityUnsafe. FromActiveTopology
threads the ratio through.

* ecbalancer.Place: explicit disk-type filter signal (fix HDD vs any ambiguity)

HardDriveType normalizes to "", which collided with "" meaning any disk. Add
Constraints.FilterDiskType and normalize both sides so a hdd request matches disks
reported as "" and never leaks to SSD, while filter=false still means any.

* ecbalancer: add clearShardAccounting for repair snapshot reconciliation

Clears one disk's copy of a shard from per-domain accounting and recomputes the
node-level union (preserving a kept copy on another disk of the same node), without
crediting capacity. Repair uses it to drop to-be-deleted copies before placing
missing shards.

* ecbalancer: don't cap cross-DC target racks when DiffRackCount is unset

len(racks)+1 wrongly limited each target rack (3 in a 2-rack cluster), so draining
a DC could stop short of the DiffDataCenterCount cap. Use MaxShardCount+1 as the
effectively-unlimited default.

* topology/ecbalancer: ratio-correct EC capacity accounting

Reservation shard slots (default ShardsPerVolumeSlot units) are now converted to
the target ratio before subtracting, and existing EC shards are charged by size
(targetDataShards/shardDataShards) so a 2+1 shard isn't counted as one 10+4 slot.
Per-shard ratio lookup is behind shardDataShards (OSS uses the standard ratio).

* ecbalancer.Place: candidate tiering and eligible-rack caps

Adds a per-disk eligibility/preference abstraction so Place supports:
- preferred-tag whole-plan retry (try disks carrying the earliest tags first,
  widen to all only if a tier cannot place every shard; reports
  SpilledOutsidePreferredTags),
- soft disk-type spill via DiskTypePolicy (Any/Prefer/Require): Prefer fills the
  preferred type then spills, reporting SpilledToOtherDiskType; Require filters,
- even per-rack caps that divide by racks holding an eligible disk, so a tiered
  cluster (e.g. SSDs in 2 of 4 racks) isn't capped impossibly low.
Disk tags carried via Node.AddDiskTags + FromActiveTopology.

* ecbalancer: export ClearShardAccounting for repair snapshot reconciliation

* ecbalancer: address review feedback (ratio rounding, bitmap walk, same-DC moves)

- topology/ecbalancer: round shard-reservation and existing-shard footprint up
  when converting to target-ratio shard slots, so a sub-slot reservation is not
  truncated to zero and free capacity is not overstated for low-data-shard
  layouts (targetDataShards < ds).
- erasure_coding: add ShardBits.All iterator and use it across the balancer,
  cross-DC phase, and placement scoring instead of scanning 0..MaxShardCount and
  probing Has on every id.
- ecbalancer: allow same-DC cross-rack moves when a DC already sits at its
  DiffDataCenterCount cap; a same-DC move leaves the DC total unchanged. Add a
  regression test that fails without the guard.
- ecbalancer cross-DC phase: pick targets via the eligible-aware
  pickNodeInRackEligible/pickBestDiskEligible helpers so the disk-type filter is
  honored and a 0 disk id is not mistaken for a valid selection.

* ecbalancer: test ecShardSlotsOnDisk fractional round-up

Cover the mixed-ratio path (targetDataShards < existing data shards) so a
shard's fractional footprint is never floored to zero and free capacity is not
overstated. Exercises the round-up via the targetDataShards parameter; OSS uses
the standard ratio at runtime while the enterprise build hits it with real
per-volume ratios.

* ecbalancer: assert node B rack in TestFromActiveTopology

* ecbalancer: split Destination into separate DataCenter and bare Rack

Replace the composite "dc:rack" Rack field on Destination with separate
DataCenter and bare Rack values, matching topology.DiskInfo and the worker-task
convention. Callers (and tests) read the data center directly instead of parsing
the composite with strings.SplitN.

* shell ec.balance: use utilization-based global balancing (parity with worker)

The shell's global rebalance phase balanced by raw shard count; switch it to
fractional fullness (shards/capacity), as the worker already does. On uniform
capacity the two agree; on heterogeneous capacity it fills nodes proportionally
instead of driving small-capacity nodes toward full.

Updates the heterogeneous-capacity regression test to assert even fullness
(~equal shards/capacity per node) rather than even shard count.

* ecbalancer: bounded-proportional per-DC shard spread

DiffDataCenterCount was enforced only as a ceiling (drain-to-cap), which could
leave a within-cap-but-lopsided DC distribution under a loose cap (e.g. 10/4 of 14
with cap=10). Now the cross-DC phase, the cross-rack DC guard, and Place all target
boundedMaxPerDC = min(DiffDataCenterCount, max(ceil(total/numDCs), parityShards)):
shards spread proportionally across DCs, but no tighter than the durability floor
(once each DC holds <= parityShards a DC loss is recoverable, so further spreading
only adds cross-DC/WAN traffic). No-op when DiffDataCenterCount is 0; identical to
before when the cap is the binding constraint.

* ecbalancer: drop DiffDataCenterCount enforcement for EC placement

The 1-byte volume ReplicaPlacement packs xyz into x*100+y*10+z<=255, so the DC
digit can only be 0-2 -- far too small to be a meaningful per-DC EC shard cap (a
cap of 1-2 would demand 7-14 DCs for a 10+4 volume). It's volume replica-placement,
not an EC spec. Removes the cross-DC balance phase, the DC guard in the cross-rack
phase, and the per-DC cap in Place (and the just-added bounded-proportional logic);
EC relies on the RP-independent rack/node even spread instead. Rack/node caps
(DiffRackCount/SameRackCount) are unchanged. Per-domain EC caps are left for a real
EC placement spec.

* ecbalancer: enforce per-disk durability cap; symmetric reserve/release

Place now refuses to put more than parityShards shards of a volume on a single
disk (pickBestDiskEligible skips a disk once it holds parityShards of the volume,
a hard cap not relaxed even in durability-first). Previously Place assigned by
free capacity, so a skewed near-full cluster could pile >parityShards onto one
disk -> losing it loses the volume; only distinct-disk count was checked. This
covers encode and repair (both route through Place); the caller skips/leaves the
volume rather than minting an unrecoverable layout.

Also makes reserveShard decrement freeSlots unconditionally, symmetric with
releaseShard's unconditional increment (the old guarded decrement could credit a
phantom slot on release if a shard were ever reserved onto a full disk).

* ecbalancer: add Topology.ReleaseVolumeShards (clear + credit) for greenfield encode

Releases all of a volume's shards from the snapshot and credits the freed disk
capacity, so a greenfield encode can plan as if stale EC shards from a prior failed
attempt are gone. Safe to credit because the encode task deletes stale shards
(cleanupStaleEcShards) before distributing the new ones. Distinct from
ClearShardAccounting (repair), which does not credit.

* ecbalancer: ReleaseVolumeShards credits node freeSlots, not just disks

releaseShard only increments per-disk freeSlots, but rack capacity is summed from
node freeSlots (buildRacks) and node freeSlots gates node eligibility. Crediting
only disks left a node/rack looking full after releasing stale shards, so a
greenfield encode still couldn't use the freed capacity. Now credits the node by
the total disk-slots freed.

* ecbalancer: correct PlacementMode docs (encode uses durability-first)

PlaceStrict was labeled '(encode)' but encode uses PlaceDurabilityFirst. Clarify
that durability-first is used by both encode and repair, reports relaxations in
PlaceResult.Relaxed, and never relaxes the per-disk durability cap.

* ecbalancer: treat SameRackCount as a direct per-node shard cap

The 3rd ReplicaPlacement digit now caps shards per node at exactly the digit
value, matching how DiffRackCount (2nd digit) caps per rack, instead of allowing
digit+1 per node. This makes the per-rack and per-node caps consistent and
matches the documented "digits cap EC shards per rack and per node" semantics;
e.g. 011 now means at most one shard per rack and one per node.

* EC encode: place shards via ecbalancer.Place + configurable replica placement

Encode now plans destinations through the shared ecbalancer.Place policy
(durability-first: prefers the source disk type and honors replica placement /
caps / anti-affinity, relaxing rather than failing when capacity is tight) instead
of the EC-only placement planner. Targets and capacity reservations use Place's
actual per-disk shard assignment, not a round-robin guess; cross-volume in-cycle
capacity is tracked by ActiveTopology's pending task, so the cached planner is no
longer consulted. Adds a configurable replica_placement (proto field 6 + worker
form + reader) that overrides the master default replication.

The placement-package planner code is left in place (now unused) and removed in a
follow-up that drops the package.

* EC encode: drop unused dataShards param from createECTargets

Addresses review feedback: after switching to Place's per-disk shardsPerPlan
assignment, createECTargets no longer needs the data-shard count.

* EC encode: fix packed-target validation, greenfield stale-shard accounting, RP docs

- Validate counts distinct shard ids across targets, not target rows, so packed
  plans (fewer (node,disk) targets than shards) aren't rejected.
- planECDestinations releases the volume's stale EC shards from the snapshot before
  Place (ReleaseVolumeShards), crediting their capacity. The encode task deletes
  stale shards before distributing, so a retry on tight capacity no longer fails
  planning by counting shards that are about to be removed.
- replica_placement config/form help no longer claims a data-center limit (the DC
  digit is ignored for EC); detection logs a warning when a DC digit is set.

* EC encode: surface relaxed placement; mark replica_placement best-effort

Encode places with PlaceDurabilityFirst (the chosen lenient behavior), which can
relax caps/anti-affinity/replica-placement to avoid deferring. That was silent
(only disk-type/tag spills were logged). Now logs PlaceResult.Relaxed so a tight
replica placement isn't weakened unnoticed, and the config/form help states the
rack/node caps are best-effort during encode (enforced by rebalancing).

* EC encode: key per-disk shard grouping by struct, not formatted string

planECDestinations grouped destinations using a fmt.Sprintf("%s:%d") map key
per shard; use a {node,diskID} struct key and pre-size the map/slice to the
shard count to drop the per-shard string allocation.
2026-05-22 20:22:30 -07:00

489 lines
18 KiB
Protocol Buffer

syntax = "proto3";
package worker_pb;
option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb";
// WorkerService provides bidirectional communication between admin and worker
service WorkerService {
// WorkerStream maintains a bidirectional stream for worker communication
rpc WorkerStream(stream WorkerMessage) returns (stream AdminMessage);
}
// WorkerMessage represents messages from worker to admin
message WorkerMessage {
string worker_id = 1;
int64 timestamp = 2;
oneof message {
WorkerRegistration registration = 3;
WorkerHeartbeat heartbeat = 4;
TaskRequest task_request = 5;
TaskUpdate task_update = 6;
TaskComplete task_complete = 7;
WorkerShutdown shutdown = 8;
TaskLogResponse task_log_response = 9;
}
}
// AdminMessage represents messages from admin to worker
message AdminMessage {
string admin_id = 1;
int64 timestamp = 2;
oneof message {
RegistrationResponse registration_response = 3;
HeartbeatResponse heartbeat_response = 4;
TaskAssignment task_assignment = 5;
TaskCancellation task_cancellation = 6;
AdminShutdown admin_shutdown = 7;
TaskLogRequest task_log_request = 8;
}
}
// WorkerRegistration message when worker connects
message WorkerRegistration {
string worker_id = 1;
string address = 2;
repeated string capabilities = 3;
int32 max_concurrent = 4;
map<string, string> metadata = 5;
}
// RegistrationResponse confirms worker registration
message RegistrationResponse {
bool success = 1;
string message = 2;
string assigned_worker_id = 3;
}
// WorkerHeartbeat sent periodically by worker
message WorkerHeartbeat {
string worker_id = 1;
string status = 2;
int32 current_load = 3;
int32 max_concurrent = 4;
repeated string current_task_ids = 5;
int32 tasks_completed = 6;
int32 tasks_failed = 7;
int64 uptime_seconds = 8;
}
// HeartbeatResponse acknowledges heartbeat
message HeartbeatResponse {
bool success = 1;
string message = 2;
}
// TaskRequest from worker asking for new tasks
message TaskRequest {
string worker_id = 1;
repeated string capabilities = 2;
int32 available_slots = 3;
}
// TaskAssignment from admin to worker
message TaskAssignment {
string task_id = 1;
string task_type = 2;
TaskParams params = 3;
int32 priority = 4;
int64 created_time = 5;
map<string, string> metadata = 6;
}
// TaskParams contains task-specific parameters with typed variants
message TaskParams {
string task_id = 1; // ActiveTopology task ID for lifecycle management
uint32 volume_id = 2; // Primary volume ID for the task
string collection = 3; // Collection name
string data_center = 4; // Primary data center
string rack = 5; // Primary rack
uint64 volume_size = 6; // Original volume size in bytes for tracking size changes
// Unified source and target arrays for all task types
repeated TaskSource sources = 7; // Source locations (volume replicas, EC shards, etc.)
repeated TaskTarget targets = 8; // Target locations (destinations, new replicas, etc.)
// Typed task parameters
oneof task_params {
VacuumTaskParams vacuum_params = 9;
ErasureCodingTaskParams erasure_coding_params = 10;
BalanceTaskParams balance_params = 11;
ReplicationTaskParams replication_params = 12;
EcBalanceTaskParams ec_balance_params = 13;
S3LifecycleParams s3_lifecycle_params = 14;
}
}
// S3LifecycleParams routes a worker task to one of the three lifecycle
// subroutines. READ is the per-shard meta-log reader (one task per shard_id
// at a time); BOOTSTRAP walks a single bucket; DRAIN drains pending
// exceptions for a single rule.
message S3LifecycleParams {
// Zero is an UNSPECIFIED sentinel: a TaskParams payload whose subtype is
// unset must not silently route into a READ task. Callers always populate
// one of READ / BOOTSTRAP / DRAIN.
enum Subtype {
SUBTYPE_UNSPECIFIED = 0;
READ = 1;
BOOTSTRAP = 2;
DRAIN = 3;
}
Subtype subtype = 1;
// Required for BOOTSTRAP and DRAIN; ignored for READ. rule_hash is
// optional for BOOTSTRAP (omitting walks all rules for the bucket).
string bucket = 2;
bytes rule_hash = 3; // 8 bytes when present
bool force = 4; // operator override; bypasses scheduling guards
int64 batch_time_budget_ns = 5; // 0 = use default
int32 batch_event_budget = 6; // 0 = use default
ContinuationHint continuation = 7; // resume hint for kill-resume tasks
// READ only: which (bucket, key-prefix-hash) shard this task processes.
// Range 0..S3LifecycleShardCount-1 (16 shards). Required for READ; ignored
// for BOOTSTRAP and DRAIN. Workers receive one READ task per owned shard;
// shards distribute across workers via the existing scheduler.
int32 shard_id = 8;
}
// ContinuationHint lets a long-running BOOTSTRAP or READ task hand its
// resume point to the next scheduled invocation without going through
// durable state. The durable state files are still authoritative; this
// is just a scheduling hint to skip an unnecessary fresh start.
message ContinuationHint {
string last_scanned_path = 1; // BOOTSTRAP only
int64 last_position_ns = 2; // READ only (advisory)
}
// VacuumTaskParams for vacuum operations
message VacuumTaskParams {
double garbage_threshold = 1; // Minimum garbage ratio to trigger vacuum
bool force_vacuum = 2; // Force vacuum even if below threshold
int32 batch_size = 3; // Number of files to process per batch
string working_dir = 4; // Working directory for temporary files
bool verify_checksum = 5; // Verify file checksums during vacuum
}
// ErasureCodingTaskParams for EC encoding operations
message ErasureCodingTaskParams {
reserved 7;
uint64 estimated_shard_size = 1; // Estimated size per shard
int32 data_shards = 2; // Number of data shards (default: 10)
int32 parity_shards = 3; // Number of parity shards (default: 4)
string working_dir = 4; // Working directory for EC processing
string master_client = 5; // Master server address
bool cleanup_source = 6; // Whether to cleanup source volume after EC
string source_disk_type = 8; // Source volume's disk type, passed to VolumeEcShardsMount so shards report under it (#9423)
}
// TaskSource represents a unified source location for any task type
message TaskSource {
string node = 1; // Source server address
uint32 disk_id = 2; // Source disk ID
string rack = 3; // Source rack for tracking
string data_center = 4; // Source data center for tracking
uint32 volume_id = 5; // Volume ID (for volume operations)
repeated uint32 shard_ids = 6; // Shard IDs (for EC shard operations)
uint64 estimated_size = 7; // Estimated size to be processed
}
// TaskTarget represents a unified target location for any task type
message TaskTarget {
string node = 1; // Target server address
uint32 disk_id = 2; // Target disk ID
string rack = 3; // Target rack for tracking
string data_center = 4; // Target data center for tracking
uint32 volume_id = 5; // Volume ID (for volume operations)
repeated uint32 shard_ids = 6; // Shard IDs (for EC shard operations)
uint64 estimated_size = 7; // Estimated size to be created
}
// BalanceMoveSpec describes a single volume move within a batch balance job
message BalanceMoveSpec {
uint32 volume_id = 1; // Volume to move
string source_node = 2; // Source server address (host:port)
string target_node = 3; // Destination server address (host:port)
string collection = 4; // Collection name
uint64 volume_size = 5; // Volume size in bytes (informational)
}
// BalanceTaskParams for volume balancing operations
message BalanceTaskParams {
bool force_move = 1; // Force move even with conflicts
int32 timeout_seconds = 2; // Operation timeout
int32 max_concurrent_moves = 3; // Max concurrent moves in a batch job (0 = default 5)
repeated BalanceMoveSpec moves = 4; // Batch: multiple volume moves in one job
}
// ReplicationTaskParams for adding replicas
message ReplicationTaskParams {
int32 replica_count = 1; // Target replica count
bool verify_consistency = 2; // Verify replica consistency after creation
}
// TaskUpdate reports task progress
message TaskUpdate {
string task_id = 1;
string worker_id = 2;
string status = 3;
float progress = 4;
string message = 5;
map<string, string> metadata = 6;
}
// TaskComplete reports task completion
message TaskComplete {
string task_id = 1;
string worker_id = 2;
bool success = 3;
string error_message = 4;
int64 completion_time = 5;
map<string, string> result_metadata = 6;
}
// TaskCancellation from admin to cancel a task
message TaskCancellation {
string task_id = 1;
string reason = 2;
bool force = 3;
}
// WorkerShutdown notifies admin that worker is shutting down
message WorkerShutdown {
string worker_id = 1;
string reason = 2;
repeated string pending_task_ids = 3;
}
// AdminShutdown notifies worker that admin is shutting down
message AdminShutdown {
string reason = 1;
int32 graceful_shutdown_seconds = 2;
}
// ========== Task Log Messages ==========
// TaskLogRequest requests logs for a specific task
message TaskLogRequest {
string task_id = 1;
string worker_id = 2;
bool include_metadata = 3; // Include task metadata
int32 max_entries = 4; // Maximum number of log entries (0 = all)
string log_level = 5; // Filter by log level (INFO, WARNING, ERROR, DEBUG)
int64 start_time = 6; // Unix timestamp for start time filter
int64 end_time = 7; // Unix timestamp for end time filter
}
// TaskLogResponse returns task logs and metadata
message TaskLogResponse {
string task_id = 1;
string worker_id = 2;
bool success = 3;
string error_message = 4;
TaskLogMetadata metadata = 5;
repeated TaskLogEntry log_entries = 6;
}
// TaskLogMetadata contains metadata about task execution
message TaskLogMetadata {
string task_id = 1;
string task_type = 2;
string worker_id = 3;
int64 start_time = 4;
int64 end_time = 5;
int64 duration_ms = 6;
string status = 7;
float progress = 8;
uint32 volume_id = 9;
string server = 10;
string collection = 11;
string log_file_path = 12;
int64 created_at = 13;
map<string, string> custom_data = 14;
}
// TaskLogEntry represents a single log entry
message TaskLogEntry {
int64 timestamp = 1;
string level = 2;
string message = 3;
map<string, string> fields = 4;
float progress = 5;
string status = 6;
}
// ========== Maintenance Configuration Messages ==========
// MaintenanceConfig holds configuration for the maintenance system
message MaintenanceConfig {
bool enabled = 1;
int32 scan_interval_seconds = 2; // How often to scan for maintenance needs
int32 worker_timeout_seconds = 3; // Worker heartbeat timeout
int32 task_timeout_seconds = 4; // Individual task timeout
int32 retry_delay_seconds = 5; // Delay between retries
int32 max_retries = 6; // Default max retries for tasks
int32 cleanup_interval_seconds = 7; // How often to clean up old tasks
int32 task_retention_seconds = 8; // How long to keep completed/failed tasks
MaintenancePolicy policy = 9;
}
// MaintenancePolicy defines policies for maintenance operations
message MaintenancePolicy {
map<string, TaskPolicy> task_policies = 1; // Task type -> policy mapping
int32 global_max_concurrent = 2; // Overall limit across all task types
int32 default_repeat_interval_seconds = 3; // Default seconds if task doesn't specify
int32 default_check_interval_seconds = 4; // Default seconds for periodic checks
}
// TaskPolicy represents configuration for a specific task type
message TaskPolicy {
bool enabled = 1;
int32 max_concurrent = 2;
int32 repeat_interval_seconds = 3; // Seconds to wait before repeating
int32 check_interval_seconds = 4; // Seconds between checks
// Typed task-specific configuration (replaces generic map)
oneof task_config {
VacuumTaskConfig vacuum_config = 5;
ErasureCodingTaskConfig erasure_coding_config = 6;
BalanceTaskConfig balance_config = 7;
ReplicationTaskConfig replication_config = 8;
EcBalanceTaskConfig ec_balance_config = 9;
}
}
// Task-specific configuration messages
// VacuumTaskConfig contains vacuum-specific configuration
message VacuumTaskConfig {
double garbage_threshold = 1; // Minimum garbage ratio to trigger vacuum (0.0-1.0)
int32 min_volume_age_hours = 2; // Minimum age before vacuum is considered
int32 min_interval_seconds = 3; // Minimum time between vacuum operations on the same volume
}
// ErasureCodingTaskConfig contains EC-specific configuration
message ErasureCodingTaskConfig {
double fullness_ratio = 1; // Minimum fullness ratio to trigger EC (0.0-1.0)
int32 quiet_for_seconds = 2; // Minimum quiet time before EC
int32 min_volume_size_mb = 3; // Minimum volume size for EC
string collection_filter = 4; // Only process volumes from specific collections
repeated string preferred_tags = 5; // Disk tags to prioritize for EC shard placement
string replica_placement = 6; // EC shard replica placement (e.g. "020"); empty falls back to master default replication
}
// BalanceTaskConfig contains balance-specific configuration
message BalanceTaskConfig {
double imbalance_threshold = 1; // Threshold for triggering rebalancing (0.0-1.0)
int32 min_server_count = 2; // Minimum number of servers required for balancing
}
// ReplicationTaskConfig contains replication-specific configuration
message ReplicationTaskConfig {
int32 target_replica_count = 1; // Target number of replicas
}
// EcBalanceTaskParams for EC shard balancing operations
message EcBalanceTaskParams {
string disk_type = 1; // Disk type filter (hdd, ssd, "")
int32 max_parallelization = 2; // Max parallel shard moves within a batch
int32 timeout_seconds = 3; // Operation timeout per move
repeated EcShardMoveSpec moves = 4; // Batch: multiple shard moves in one job
}
// EcShardMoveSpec describes a single EC shard move within a batch
message EcShardMoveSpec {
uint32 volume_id = 1; // EC volume ID
uint32 shard_id = 2; // Shard ID (0-13)
string collection = 3; // Collection name
string source_node = 4; // Source server address
uint32 source_disk_id = 5; // Source disk ID
string target_node = 6; // Target server address
uint32 target_disk_id = 7; // Target disk ID
}
// EcBalanceTaskConfig contains EC balance-specific configuration
message EcBalanceTaskConfig {
double imbalance_threshold = 1; // Threshold for triggering EC shard rebalancing
int32 min_server_count = 2; // Minimum number of servers required
string collection_filter = 3; // Collection filter
string disk_type = 4; // Disk type filter
repeated string preferred_tags = 5; // Preferred disk tags for placement
string replica_placement = 6; // EC shard replica placement (e.g. "020"); empty falls back to master default replication
}
// ========== Task Persistence Messages ==========
// MaintenanceTaskData represents complete task state for persistence
message MaintenanceTaskData {
string id = 1;
string type = 2;
string priority = 3;
string status = 4;
uint32 volume_id = 5;
string server = 6;
string collection = 7;
TaskParams typed_params = 8;
string reason = 9;
int64 created_at = 10;
int64 scheduled_at = 11;
int64 started_at = 12;
int64 completed_at = 13;
string worker_id = 14;
string error = 15;
double progress = 16;
int32 retry_count = 17;
int32 max_retries = 18;
// Enhanced fields for detailed task tracking
string created_by = 19;
string creation_context = 20;
repeated TaskAssignmentRecord assignment_history = 21;
string detailed_reason = 22;
map<string, string> tags = 23;
TaskCreationMetrics creation_metrics = 24;
}
// TaskAssignmentRecord tracks worker assignments for a task
message TaskAssignmentRecord {
string worker_id = 1;
string worker_address = 2;
int64 assigned_at = 3;
int64 unassigned_at = 4; // Optional: when worker was unassigned
string reason = 5; // Reason for assignment/unassignment
}
// TaskCreationMetrics tracks why and how a task was created
message TaskCreationMetrics {
string trigger_metric = 1; // Name of metric that triggered creation
double metric_value = 2; // Value that triggered creation
double threshold = 3; // Threshold that was exceeded
VolumeHealthMetrics volume_metrics = 4; // Volume health at creation time
map<string, string> additional_data = 5; // Additional context data
}
// VolumeHealthMetrics captures volume state at task creation
message VolumeHealthMetrics {
uint64 total_size = 1;
uint64 used_size = 2;
uint64 garbage_size = 3;
double garbage_ratio = 4;
int32 file_count = 5;
int32 deleted_file_count = 6;
int64 last_modified = 7;
int32 replica_count = 8;
bool is_ec_volume = 9;
string collection = 10;
}
// TaskStateFile wraps task data with metadata for persistence
message TaskStateFile {
MaintenanceTaskData task = 1;
int64 last_updated = 2;
string admin_version = 3;
}