From d00acded8a2cef8a786c376006487198a50ef82d Mon Sep 17 00:00:00 2001 From: Jaehoon Kim Date: Thu, 28 May 2026 03:15:25 +0900 Subject: [PATCH] fix(vacuum): batch all replicas in a single plugin worker task (#9702) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(vacuum): batch all replicas in a single plugin worker task The plugin worker vacuum path emitted one TaskDetectionResult per (volume, server) replica, but the dispatcher gates duplicate tasks per volume via ActiveTopology.HasAnyTask. The first replica's task was created and the remaining N-1 replicas were silently dropped, so only one replica per volume was ever vacuumed — leaving the others with all their garbage intact. Mirror the master built-in flow (topology.vacuumOneVolumeId → batchVacuumVolumeCheck/Compact/Commit/Cleanup) by: - aggregating detection metrics by VolumeID so a single task carries every replica in TaskParams.Sources - having VacuumTask accept []string servers (instead of a single string), re-check each replica's garbage ratio at execute time to derive a vacuumTargets subset, and run Compact/Commit/Cleanup against only that subset - updating the dispatcher (plugin_handler.Execute, register.CreateTask) to forward every Sources node to NewVacuumTask * fix(vacuum): run all-replica vacuum in two phases to keep failure atomic The prior implementation iterated Compact → Commit → Cleanup against each replica in sequence. A Compact failure on the second replica left the first one already committed (its active files swapped with the .cp* files), producing replica divergence with no automatic recovery. Split performVacuum into two phases, matching topology.vacuumOneVolumeId: Phase 1 — Compact all targets. If any fails, run VacuumVolumeCleanup on every target to drop the .cpd/.cpx/.cpldb temp files, then abort. No replica has swapped yet, so every replica returns to its original state. Phase 2 — Commit all targets. Best-effort, matching batchVacuumVolumeCommit: per-replica errors are collected and surfaced together. Once any replica has swapped there is no clean rollback, so a partial Phase 2 failure requires operator reconciliation. Adds compactOne / commitOne / cleanupOne / cleanupAll helpers and removes the old performVacuumOne. * fix(vacuum): abort when any replica's garbage check fails The prior check tolerated per-replica RPC errors and only failed the task if every replica errored — partial failures were silently treated as "ineligible" so the responding replicas would still be vacuumed. That produces divergence the moment the unreachable replica comes back: it still carries the original garbage while the others have been compacted. Match topology.batchVacuumVolumeCheck's contract instead — its return value (errCount == 0 && len(vacuumLocationList.list) > 0) gates the whole vacuum on every replica's check succeeding. If any replica is unreachable or its VacuumVolumeCheck RPC errors, abort the task; the volume will be retried on the next detection cycle once the replica is healthy. * fix(vacuum): guard against nil metrics and TaskSource entries Detection's bucket-building loop dereferenced m.VolumeID without checking m for nil. VacuumTask.Validate built sourceSet from params.Sources without checking each entry for nil. Both paths would panic on a malformed protobuf payload that managed to deliver a nil slot. Skip nil entries in both loops — neutral with the existing nil/empty filtering already done in register.CreateTask and plugin_handler.Execute. * test(vacuum): success path no longer calls VacuumVolumeCleanup The plugin worker vacuum is now two-phase (Compact-all → Commit-all, with Cleanup only invoked on Compact failure to roll back .cp* temp files). This matches topology.vacuumOneVolumeId, where batchVacuumVolumeCleanup runs only on the Compact-failure branch. On a successful Commit the temp files do not linger: - CommitCompactVolume renames .cpd → .dat and .cpx → .idx - leveldb needle map renames .cpldb → .ldb (needle_map_leveldb.go) so calling VacuumVolumeCleanup afterwards is a redundant no-op. The prior worker code called it unconditionally and the integration test asserted that — switch the expectation to cleanupCalls == 0 to document the new (and master-aligned) contract. --- test/plugin_workers/vacuum/execution_test.go | 8 +- weed/worker/tasks/vacuum/detection.go | 238 +++++++++------ weed/worker/tasks/vacuum/plugin_handler.go | 8 +- weed/worker/tasks/vacuum/register.go | 8 +- weed/worker/tasks/vacuum/vacuum_task.go | 296 +++++++++++++------ 5 files changed, 384 insertions(+), 174 deletions(-) diff --git a/test/plugin_workers/vacuum/execution_test.go b/test/plugin_workers/vacuum/execution_test.go index e1846b0a9..e687a2dec 100644 --- a/test/plugin_workers/vacuum/execution_test.go +++ b/test/plugin_workers/vacuum/execution_test.go @@ -59,5 +59,11 @@ func TestVacuumExecutionIntegration(t *testing.T) { require.GreaterOrEqual(t, checkCalls, 2) require.GreaterOrEqual(t, compactCalls, 1) require.GreaterOrEqual(t, commitCalls, 1) - require.GreaterOrEqual(t, cleanupCalls, 1) + // Cleanup is only invoked when Phase 1 (Compact) fails to roll back + // the .cpd/.cpx/.cpldb temp files; on the success path Commit + // consumes them (rename .cpd → .dat, .cpx → .idx, .cpldb → .ldb via + // the leveldb needle map) so no Cleanup call is needed. Matches + // topology.vacuumOneVolumeId which only calls batchVacuumVolumeCleanup + // on the Compact-failure branch. + require.Equal(t, 0, cleanupCalls) } diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go index 1ba55dccd..a52cca18f 100644 --- a/weed/worker/tasks/vacuum/detection.go +++ b/weed/worker/tasks/vacuum/detection.go @@ -18,75 +18,89 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } vacuumConfig := config.(*Config) - var results []*types.TaskDetectionResult minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second - debugCount := 0 + type volumeBucket struct { + replicas []*types.VolumeHealthMetrics + } + buckets := make(map[uint32]*volumeBucket) + order := make([]uint32, 0) + for _, m := range metrics { + if m == nil { + continue + } + b, ok := buckets[m.VolumeID] + if !ok { + b = &volumeBucket{} + buckets[m.VolumeID] = b + order = append(order, m.VolumeID) + } + b.replicas = append(b.replicas, m) + } + + var results []*types.TaskDetectionResult skippedDueToGarbage := 0 skippedDueToAge := 0 + debugCount := 0 - for _, metric := range metrics { - // Check if volume needs vacuum - if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { - priority := types.TaskPriorityNormal - if metric.GarbageRatio > 0.6 { - priority = types.TaskPriorityHigh - } + for _, vid := range order { + replicas := buckets[vid].replicas - // Generate task ID for future ActiveTopology integration - taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix()) + // Pick the replica with the highest garbage ratio that also satisfies + // the minimum-age gate as the "primary" used for priority + reason. + // Master built-in vacuum filters per-replica again inside + // batchVacuumVolumeCheck, so it's fine if only some replicas are + // above threshold here — the worker's checkVacuumEligibility will + // filter again at execute time. + primary := pickPrimaryReplica(replicas, vacuumConfig.GarbageThreshold, minVolumeAge) + if primary == nil { + recordSkipReasons(replicas, vacuumConfig.GarbageThreshold, minVolumeAge, &skippedDueToGarbage, &skippedDueToAge, &debugCount) + continue + } - result := &types.TaskDetectionResult{ - TaskID: taskID, // For future ActiveTopology integration - TaskType: types.TaskTypeVacuum, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: priority, - Reason: "Volume has excessive garbage requiring vacuum", - ScheduleAt: time.Now(), - } + priority := types.TaskPriorityNormal + if primary.GarbageRatio > 0.6 { + priority = types.TaskPriorityHigh + } - // Check if ANY task already exists in ActiveTopology for this volume - if clusterInfo != nil && clusterInfo.ActiveTopology != nil { - if clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) { - glog.V(2).Infof("VACUUM: Skipping volume %d, task already exists in ActiveTopology", metric.VolumeID) - continue - } - } + taskID := fmt.Sprintf("vacuum_vol_%d_%d", vid, time.Now().Unix()) - // Create typed parameters for vacuum task - result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo) - if result.TypedParams != nil { - results = append(results, result) + result := &types.TaskDetectionResult{ + TaskID: taskID, + TaskType: types.TaskTypeVacuum, + VolumeID: vid, + Server: primary.Server, + Collection: primary.Collection, + Priority: priority, + Reason: "Volume has excessive garbage requiring vacuum", + ScheduleAt: time.Now(), + } + + // Check if ANY task already exists in ActiveTopology for this volume + if clusterInfo != nil && clusterInfo.ActiveTopology != nil { + if clusterInfo.ActiveTopology.HasAnyTask(vid) { + glog.V(2).Infof("VACUUM: Skipping volume %d, task already exists in ActiveTopology", vid) + continue } - } else { - // Debug why volume was not selected - if debugCount < 5 { // Limit debug output to first 5 volumes - if metric.GarbageRatio < vacuumConfig.GarbageThreshold { - skippedDueToGarbage++ - } - if metric.Age < minVolumeAge { - skippedDueToAge++ - } - } - debugCount++ + } + + result.TypedParams = createVacuumTaskParams(result, replicas, vacuumConfig, clusterInfo) + if result.TypedParams != nil { + results = append(results, result) } } - // Log debug summary if no tasks were created if len(results) == 0 && len(metrics) > 0 { totalVolumes := len(metrics) - glog.V(1).Infof("VACUUM: No tasks created for %d volumes. Threshold=%.2f%%, MinAge=%s. Skipped: %d (garbage= 3 { // Limit to first 3 volumes + if i >= 3 { break } - glog.V(1).Infof("VACUUM: Volume %d: garbage=%.2f%% (need ≥%.2f%%), age=%s (need ≥%s)", - metric.VolumeID, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100, + glog.V(1).Infof("VACUUM: Volume %d on %s: garbage=%.2f%% (need ≥%.2f%%), age=%s (need ≥%s)", + metric.VolumeID, metric.Server, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100, metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute)) } } @@ -94,54 +108,100 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI return results, nil } -// createVacuumTaskParams creates typed parameters for vacuum tasks -// This function is moved from MaintenanceIntegration.createVacuumTaskParams to the detection logic -func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.VolumeHealthMetrics, vacuumConfig *Config, clusterInfo *types.ClusterInfo) *worker_pb.TaskParams { - // Use configured values or defaults - garbageThreshold := 0.3 // Default 30% - verifyChecksum := true // Default to verify - batchSize := int32(1000) // Default batch size - workingDir := "" // Use worker-provided default if empty +// pickPrimaryReplica returns the eligible replica with the highest garbage +// ratio. Returns nil if no replica satisfies both gates. +func pickPrimaryReplica(replicas []*types.VolumeHealthMetrics, garbageThreshold float64, minAge time.Duration) *types.VolumeHealthMetrics { + var best *types.VolumeHealthMetrics + for _, m := range replicas { + if m.GarbageRatio < garbageThreshold { + continue + } + if m.Age < minAge { + continue + } + if best == nil || m.GarbageRatio > best.GarbageRatio { + best = m + } + } + return best +} + +// recordSkipReasons tallies skip counters for debug logging when no +// replica of a volume qualified for vacuum. +func recordSkipReasons(replicas []*types.VolumeHealthMetrics, garbageThreshold float64, minAge time.Duration, garbageSkip, ageSkip, debugCount *int) { + for _, m := range replicas { + if *debugCount >= 5 { + return + } + if m.GarbageRatio < garbageThreshold { + *garbageSkip++ + } + if m.Age < minAge { + *ageSkip++ + } + *debugCount++ + } +} + +// createVacuumTaskParams creates typed parameters for a vacuum task whose +// Sources list contains every replica of the volume. Worker-side +// performVacuum iterates over Sources and vacuums each replica. +func createVacuumTaskParams(task *types.TaskDetectionResult, replicas []*types.VolumeHealthMetrics, vacuumConfig *Config, clusterInfo *types.ClusterInfo) *worker_pb.TaskParams { + garbageThreshold := 0.3 + verifyChecksum := true + batchSize := int32(1000) + workingDir := "" if vacuumConfig != nil { garbageThreshold = vacuumConfig.GarbageThreshold - // Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours - // Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added - // to the protobuf definition if they should be configurable } - // Use DC and rack information directly from VolumeHealthMetrics - sourceDC, sourceRack := metric.DataCenter, metric.Rack - - // Get server address from topology (required for vacuum tasks) if clusterInfo == nil || clusterInfo.ActiveTopology == nil { glog.Errorf("Topology not available for vacuum task on volume %d, skipping", task.VolumeID) return nil } - address, err := util.ResolveServerAddress(task.Server, clusterInfo.ActiveTopology) - if err != nil { - glog.Errorf("Failed to resolve address for server %s for vacuum task on volume %d, skipping task: %v", task.Server, task.VolumeID, err) + + sources := make([]*worker_pb.TaskSource, 0, len(replicas)) + seen := make(map[string]struct{}, len(replicas)) + for _, m := range replicas { + address, err := util.ResolveServerAddress(m.Server, clusterInfo.ActiveTopology) + if err != nil { + glog.Warningf("Failed to resolve address for server %s for vacuum task on volume %d, dropping replica: %v", m.Server, task.VolumeID, err) + continue + } + if _, ok := seen[address]; ok { + continue + } + seen[address] = struct{}{} + sources = append(sources, &worker_pb.TaskSource{ + Node: address, + VolumeId: task.VolumeID, + EstimatedSize: m.Size, + DataCenter: m.DataCenter, + Rack: m.Rack, + }) + } + + if len(sources) == 0 { + glog.Errorf("No resolvable replicas for vacuum task on volume %d, skipping", task.VolumeID) return nil } - // Create typed protobuf parameters with unified sources + // Use the primary replica (matches task.Server) for the canonical size + // recorded in TaskParams. Master built-in keeps a single volumeSizeLimit + // here so timeouts are stable across replicas. + canonical := primaryReplicaByServer(replicas, task.Server) + var canonicalSize uint64 + if canonical != nil { + canonicalSize = canonical.Size + } + return &worker_pb.TaskParams{ - TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated) + TaskId: task.TaskID, VolumeId: task.VolumeID, Collection: task.Collection, - VolumeSize: metric.Size, // Store original volume size for tracking changes - - // Unified sources array - Sources: []*worker_pb.TaskSource{ - { - Node: address, - VolumeId: task.VolumeID, - EstimatedSize: metric.Size, - DataCenter: sourceDC, - Rack: sourceRack, - }, - }, - + VolumeSize: canonicalSize, + Sources: sources, TaskParams: &worker_pb.TaskParams_VacuumParams{ VacuumParams: &worker_pb.VacuumTaskParams{ GarbageThreshold: garbageThreshold, @@ -153,3 +213,15 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum }, } } + +func primaryReplicaByServer(replicas []*types.VolumeHealthMetrics, server string) *types.VolumeHealthMetrics { + for _, m := range replicas { + if m.Server == server { + return m + } + } + if len(replicas) > 0 { + return replicas[0] + } + return nil +} diff --git a/weed/worker/tasks/vacuum/plugin_handler.go b/weed/worker/tasks/vacuum/plugin_handler.go index e753e9f15..849100d63 100644 --- a/weed/worker/tasks/vacuum/plugin_handler.go +++ b/weed/worker/tasks/vacuum/plugin_handler.go @@ -395,9 +395,15 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ } } + servers := make([]string, 0, len(params.Sources)) + for _, src := range params.Sources { + if src != nil && src.Node != "" { + servers = append(servers, src.Node) + } + } task := NewVacuumTask( request.Job.JobId, - params.Sources[0].Node, + servers, params.VolumeId, params.Collection, h.grpcDialOption, diff --git a/weed/worker/tasks/vacuum/register.go b/weed/worker/tasks/vacuum/register.go index d918ba469..34068f0cb 100644 --- a/weed/worker/tasks/vacuum/register.go +++ b/weed/worker/tasks/vacuum/register.go @@ -50,9 +50,15 @@ func RegisterVacuumTask() { if len(params.Sources) == 0 { return nil, fmt.Errorf("at least one source is required for vacuum task") } + servers := make([]string, 0, len(params.Sources)) + for _, src := range params.Sources { + if src != nil && src.Node != "" { + servers = append(servers, src.Node) + } + } return NewVacuumTask( fmt.Sprintf("vacuum-%d", params.VolumeId), - params.Sources[0].Node, // Use first source node + servers, params.VolumeId, params.Collection, dialOpt, diff --git a/weed/worker/tasks/vacuum/vacuum_task.go b/weed/worker/tasks/vacuum/vacuum_task.go index 3b226334b..c56a8d5c9 100644 --- a/weed/worker/tasks/vacuum/vacuum_task.go +++ b/weed/worker/tasks/vacuum/vacuum_task.go @@ -16,23 +16,33 @@ import ( "google.golang.org/grpc" ) -// VacuumTask implements the Task interface +// VacuumTask implements the Task interface. +// +// One task covers all replicas of a volume so behavior matches the master +// built-in vacuum (see topology.Topology.vacuumOneVolumeId): Check across +// every replica → filter to those whose garbage ratio meets the threshold +// → Compact/Commit/Cleanup that subset. Treating one replica per task (the +// prior behavior) drops the other N-1 replicas because the dispatcher +// gates duplicate tasks per volume via ActiveTopology.HasAnyTask. type VacuumTask struct { *base.BaseTask - server string + servers []string volumeID uint32 collection string garbageThreshold float64 progress float64 grpcDialOption grpc.DialOption volumeSize uint64 + vacuumTargets []string // populated by checkVacuumEligibility — subset of servers that pass the per-replica garbage re-check and proceed to Compact/Commit/Cleanup } -// NewVacuumTask creates a new unified vacuum task instance -func NewVacuumTask(id string, server string, volumeID uint32, collection string, grpcDialOption grpc.DialOption) *VacuumTask { +// NewVacuumTask creates a new unified vacuum task instance covering every +// replica server reported by the dispatcher. +func NewVacuumTask(id string, servers []string, volumeID uint32, collection string, grpcDialOption grpc.DialOption) *VacuumTask { + deduped := dedupePreserveOrder(servers) return &VacuumTask{ BaseTask: base.NewBaseTask(id, types.TaskTypeVacuum), - server: server, + servers: deduped, volumeID: volumeID, collection: collection, garbageThreshold: 0.3, // Default 30% threshold @@ -56,40 +66,49 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, - "server": t.server, + "servers": t.servers, "collection": t.collection, "garbage_threshold": t.garbageThreshold, }).Info("Starting vacuum task") - // Step 1: Check volume status and garbage ratio + if len(t.servers) == 0 { + return fmt.Errorf("no source servers configured for vacuum task") + } + + // Step 1: Check vacuum eligibility for each replica. Mirrors + // topology.batchVacuumVolumeCheck — only replicas whose garbage is at + // or above the threshold proceed to Compact/Commit/Cleanup. t.ReportProgress(10.0) t.GetLogger().Info("Checking volume status") - eligible, currentGarbageRatio, err := t.checkVacuumEligibility(ctx) + targets, currentGarbageRatios, err := t.checkVacuumEligibility(ctx) if err != nil { return fmt.Errorf("failed to check vacuum eligibility: %v", err) } - if !eligible { + if len(targets) == 0 { t.GetLogger().WithFields(map[string]interface{}{ - "current_garbage_ratio": currentGarbageRatio, - "required_threshold": t.garbageThreshold, - }).Info("Volume does not meet vacuum criteria, skipping") + "garbage_ratios": currentGarbageRatios, + "required_threshold": t.garbageThreshold, + }).Info("No replica meets vacuum criteria, skipping") t.ReportProgress(100.0) return nil } + t.vacuumTargets = targets - // Step 2: Perform vacuum operation + // Step 2: Perform vacuum (compact + commit + cleanup) across every + // target replica. t.ReportProgress(50.0) t.GetLogger().WithFields(map[string]interface{}{ - "garbage_ratio": currentGarbageRatio, - "threshold": t.garbageThreshold, + "vacuum_targets": targets, + "garbage_ratios": currentGarbageRatios, + "threshold": t.garbageThreshold, }).Info("Performing vacuum operation") if err := t.performVacuum(ctx); err != nil { return fmt.Errorf("failed to perform vacuum: %v", err) } - // Step 3: Verify vacuum results + // Step 3: Verify vacuum results on each target replica. t.ReportProgress(90.0) t.GetLogger().Info("Verifying vacuum results") if err := t.verifyVacuumResults(ctx); err != nil { @@ -98,8 +117,8 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) } t.ReportProgress(100.0) - glog.Infof("Vacuum task completed successfully: volume %d from %s (garbage ratio was %.2f%%)", - t.volumeID, t.server, currentGarbageRatio*100) + glog.Infof("Vacuum task completed successfully: volume %d on %v (garbage ratios %v)", + t.volumeID, targets, currentGarbageRatios) return nil } @@ -118,16 +137,20 @@ func (t *VacuumTask) Validate(params *worker_pb.TaskParams) error { return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId) } - // Validate that at least one source matches our server - found := false + // Every server the task was created with must appear in the params' + // Sources list. The dispatcher fills Sources from the detection-time + // replica set, so a mismatch means the worker received stale routing. + sourceSet := make(map[string]struct{}, len(params.Sources)) for _, source := range params.Sources { - if source.Node == t.server { - found = true - break + if source == nil { + continue } + sourceSet[source.Node] = struct{}{} } - if !found { - return fmt.Errorf("no source matches expected server %s", t.server) + for _, server := range t.servers { + if _, ok := sourceSet[server]; !ok { + return fmt.Errorf("task server %s not present in params.Sources", server) + } } if vacuumParams.GarbageThreshold < 0 || vacuumParams.GarbageThreshold > 1.0 { @@ -161,11 +184,40 @@ func (t *VacuumTask) vacuumTimeout(base time.Duration) time.Duration { // Helper methods for real vacuum operations -// checkVacuumEligibility checks if the volume meets vacuum criteria -func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) (bool, float64, error) { - var garbageRatio float64 +// checkVacuumEligibility queries every replica's current garbage ratio. +// Mirrors topology.batchVacuumVolumeCheck's all-or-nothing contract: if +// any replica's check fails (unreachable, RPC error, etc.) the entire +// task aborts. Vacuuming only the replicas that responded while +// silently skipping unreachable ones would compact the responders' +// garbage but leave the unreachable replica still carrying it, +// producing divergence the moment that replica comes back. +// +// Returns the subset of servers whose garbage is at or above the +// configured threshold, alongside a per-server ratio map for logging. +func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) ([]string, map[string]float64, error) { + ratios := make(map[string]float64, len(t.servers)) + for _, server := range t.servers { + ratio, err := t.checkOneVacuumEligibility(ctx, server) + if err != nil { + return nil, ratios, fmt.Errorf("vacuum check on %s for volume %d: %w (aborting; refusing to vacuum subset of replicas)", server, t.volumeID, err) + } + ratios[server] = ratio + glog.V(1).Infof("Volume %d on %s garbage ratio: %.2f%%, threshold: %.2f%%", + t.volumeID, server, ratio*100, t.garbageThreshold*100) + } - err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, + eligible := make([]string, 0, len(ratios)) + for _, server := range t.servers { + if ratios[server] >= t.garbageThreshold { + eligible = append(eligible, server) + } + } + return eligible, ratios, nil +} + +func (t *VacuumTask) checkOneVacuumEligibility(ctx context.Context, server string) (float64, error) { + var garbageRatio float64 + err := operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { checkCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) defer cancel() @@ -175,95 +227,163 @@ func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) (bool, float64, if err != nil { return fmt.Errorf("failed to check volume vacuum status: %v", err) } - garbageRatio = resp.GarbageRatio - return nil }) - - if err != nil { - return false, 0, err - } - - eligible := garbageRatio >= t.garbageThreshold - glog.V(1).Infof("Volume %d garbage ratio: %.2f%%, threshold: %.2f%%, eligible: %v", - t.volumeID, garbageRatio*100, t.garbageThreshold*100, eligible) - - return eligible, garbageRatio, nil + return garbageRatio, err } -// performVacuum executes the actual vacuum operation +// performVacuum runs the two-phase vacuum protocol that master built-in +// vacuum uses (topology.vacuumOneVolumeId): +// +// Phase 1 (Compact): build the new .cpd/.cpx files on every target. +// If any replica fails, roll back by Cleanup'ing the .cp* temp files +// on every target and abort — no replica has yet swapped its active +// files, so no replica is committed. +// +// Phase 2 (Commit): swap each target's active files with its .cp* +// files. Best-effort, matching batchVacuumVolumeCommit: per-replica +// errors are logged and surfaced together, but once any replica has +// swapped there is no clean rollback for the others, so we do not +// retry or undo. An operator must reconcile a partial commit +// failure. +// +// Interleaving Compact→Commit→Cleanup per replica (the prior behavior) +// could leave a committed first replica beside an uncompacted second +// replica when Compact on the second failed — replica divergence with +// no automatic recovery. func (t *VacuumTask) performVacuum(ctx context.Context) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, + // Phase 1: Compact all targets. + for _, server := range t.vacuumTargets { + if err := t.compactOne(ctx, server); err != nil { + t.cleanupAll(ctx) + return fmt.Errorf("vacuum compact on %s: %w", server, err) + } + } + + // Phase 2: Commit all targets. + var commitErrors []error + for _, server := range t.vacuumTargets { + if err := t.commitOne(ctx, server); err != nil { + glog.Errorf("vacuum commit on %s for volume %d: %v", server, t.volumeID, err) + commitErrors = append(commitErrors, fmt.Errorf("%s: %w", server, err)) + } + } + if len(commitErrors) > 0 { + return fmt.Errorf("vacuum commit failed on %d/%d replicas: %v", + len(commitErrors), len(t.vacuumTargets), commitErrors) + } + return nil +} + +func (t *VacuumTask) compactOne(ctx context.Context, server string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - // Step 1: Compact the volume (3 min per GB, matching topology vacuum) - t.GetLogger().Info("Compacting volume") - compactCtx, compactCancel := context.WithTimeout(ctx, t.vacuumTimeout(3*time.Minute)) - defer compactCancel() + t.GetLogger().Info("Compacting volume on %s", server) + compactCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(3*time.Minute)) + defer cancel() stream, err := client.VacuumVolumeCompact(compactCtx, &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: t.volumeID, }) if err != nil { - return fmt.Errorf("vacuum compact failed: %v", err) + return fmt.Errorf("vacuum compact start: %v", err) } - - // Read compact progress for { resp, recvErr := stream.Recv() if recvErr != nil { if recvErr == io.EOF { break } - return fmt.Errorf("vacuum compact stream error: %v", recvErr) + return fmt.Errorf("vacuum compact stream: %v", recvErr) } - glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes) + glog.V(2).Infof("Volume %d on %s compact progress: %d bytes", t.volumeID, server, resp.ProcessedBytes) } - - // Step 2: Commit the vacuum (1 min per GB) - t.GetLogger().Info("Committing vacuum operation") - commitCtx, commitCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) - defer commitCancel() - _, err = client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("vacuum commit failed: %v", err) - } - - // Step 3: Cleanup old files (1 min per GB) - t.GetLogger().Info("Cleaning up vacuum files") - cleanupCtx, cleanupCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) - defer cleanupCancel() - _, err = client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("vacuum cleanup failed: %v", err) - } - - glog.V(1).Infof("Volume %d vacuum operation completed successfully", t.volumeID) return nil }) } -// verifyVacuumResults checks the volume status after vacuum -func (t *VacuumTask) verifyVacuumResults(ctx context.Context) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, +func (t *VacuumTask) commitOne(ctx context.Context, server string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - verifyCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + t.GetLogger().Info("Committing vacuum on %s", server) + commitCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) defer cancel() - resp, err := client.VacuumVolumeCheck(verifyCtx, &volume_server_pb.VacuumVolumeCheckRequest{ + _, err := client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: t.volumeID, }) if err != nil { - return fmt.Errorf("failed to verify vacuum results: %v", err) + return fmt.Errorf("vacuum commit: %v", err) } - - postVacuumGarbageRatio := resp.GarbageRatio - - glog.V(1).Infof("Volume %d post-vacuum garbage ratio: %.2f%%", - t.volumeID, postVacuumGarbageRatio*100) - return nil }) } + +func (t *VacuumTask) cleanupOne(ctx context.Context, server string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + cleanupCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + _, err := client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{ + VolumeId: t.volumeID, + }) + return err + }) +} + +// cleanupAll removes the .cpd/.cpx/.cpldb temp files on every target. +// Used to roll back when Compact fails on one replica after others +// have already created their temp files. Per-target failures are +// logged but never bubble up — the rollback is best-effort. +func (t *VacuumTask) cleanupAll(ctx context.Context) { + for _, server := range t.vacuumTargets { + if err := t.cleanupOne(ctx, server); err != nil { + glog.Warningf("rollback cleanup on %s for volume %d: %v", server, t.volumeID, err) + } + } +} + +// verifyVacuumResults checks each target replica's post-vacuum garbage +// ratio. Failures are logged at WARN — the task does not fail because the +// vacuum itself already succeeded. +func (t *VacuumTask) verifyVacuumResults(ctx context.Context) error { + for _, server := range t.vacuumTargets { + err := operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + verifyCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(verifyCtx, &volume_server_pb.VacuumVolumeCheckRequest{ + VolumeId: t.volumeID, + }) + if err != nil { + return fmt.Errorf("failed to verify vacuum results: %v", err) + } + glog.V(1).Infof("Volume %d on %s post-vacuum garbage ratio: %.2f%%", + t.volumeID, server, resp.GarbageRatio*100) + return nil + }) + if err != nil { + glog.Warningf("post-vacuum verify on %s: %v", server, err) + } + } + return nil +} + +// dedupePreserveOrder returns servers with duplicates removed, keeping the +// first occurrence's position. Detection sometimes hands the same node +// address in multiple Sources (e.g. EC variants); we coalesce them so each +// physical replica is vacuumed exactly once. +func dedupePreserveOrder(servers []string) []string { + seen := make(map[string]struct{}, len(servers)) + out := make([]string, 0, len(servers)) + for _, s := range servers { + if s == "" { + continue + } + if _, ok := seen[s]; ok { + continue + } + seen[s] = struct{}{} + out = append(out, s) + } + return out +}