diff --git a/test/plugin_workers/vacuum/execution_test.go b/test/plugin_workers/vacuum/execution_test.go index e1846b0a9..e687a2dec 100644 --- a/test/plugin_workers/vacuum/execution_test.go +++ b/test/plugin_workers/vacuum/execution_test.go @@ -59,5 +59,11 @@ func TestVacuumExecutionIntegration(t *testing.T) { require.GreaterOrEqual(t, checkCalls, 2) require.GreaterOrEqual(t, compactCalls, 1) require.GreaterOrEqual(t, commitCalls, 1) - require.GreaterOrEqual(t, cleanupCalls, 1) + // Cleanup is only invoked when Phase 1 (Compact) fails to roll back + // the .cpd/.cpx/.cpldb temp files; on the success path Commit + // consumes them (rename .cpd → .dat, .cpx → .idx, .cpldb → .ldb via + // the leveldb needle map) so no Cleanup call is needed. Matches + // topology.vacuumOneVolumeId which only calls batchVacuumVolumeCleanup + // on the Compact-failure branch. + require.Equal(t, 0, cleanupCalls) } diff --git a/weed/worker/tasks/vacuum/detection.go b/weed/worker/tasks/vacuum/detection.go index 1ba55dccd..a52cca18f 100644 --- a/weed/worker/tasks/vacuum/detection.go +++ b/weed/worker/tasks/vacuum/detection.go @@ -18,75 +18,89 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI } vacuumConfig := config.(*Config) - var results []*types.TaskDetectionResult minVolumeAge := time.Duration(vacuumConfig.MinVolumeAgeSeconds) * time.Second - debugCount := 0 + type volumeBucket struct { + replicas []*types.VolumeHealthMetrics + } + buckets := make(map[uint32]*volumeBucket) + order := make([]uint32, 0) + for _, m := range metrics { + if m == nil { + continue + } + b, ok := buckets[m.VolumeID] + if !ok { + b = &volumeBucket{} + buckets[m.VolumeID] = b + order = append(order, m.VolumeID) + } + b.replicas = append(b.replicas, m) + } + + var results []*types.TaskDetectionResult skippedDueToGarbage := 0 skippedDueToAge := 0 + debugCount := 0 - for _, metric := range metrics { - // Check if volume needs vacuum - if metric.GarbageRatio >= vacuumConfig.GarbageThreshold && metric.Age >= minVolumeAge { - priority := types.TaskPriorityNormal - if metric.GarbageRatio > 0.6 { - priority = types.TaskPriorityHigh - } + for _, vid := range order { + replicas := buckets[vid].replicas - // Generate task ID for future ActiveTopology integration - taskID := fmt.Sprintf("vacuum_vol_%d_%d", metric.VolumeID, time.Now().Unix()) + // Pick the replica with the highest garbage ratio that also satisfies + // the minimum-age gate as the "primary" used for priority + reason. + // Master built-in vacuum filters per-replica again inside + // batchVacuumVolumeCheck, so it's fine if only some replicas are + // above threshold here — the worker's checkVacuumEligibility will + // filter again at execute time. + primary := pickPrimaryReplica(replicas, vacuumConfig.GarbageThreshold, minVolumeAge) + if primary == nil { + recordSkipReasons(replicas, vacuumConfig.GarbageThreshold, minVolumeAge, &skippedDueToGarbage, &skippedDueToAge, &debugCount) + continue + } - result := &types.TaskDetectionResult{ - TaskID: taskID, // For future ActiveTopology integration - TaskType: types.TaskTypeVacuum, - VolumeID: metric.VolumeID, - Server: metric.Server, - Collection: metric.Collection, - Priority: priority, - Reason: "Volume has excessive garbage requiring vacuum", - ScheduleAt: time.Now(), - } + priority := types.TaskPriorityNormal + if primary.GarbageRatio > 0.6 { + priority = types.TaskPriorityHigh + } - // Check if ANY task already exists in ActiveTopology for this volume - if clusterInfo != nil && clusterInfo.ActiveTopology != nil { - if clusterInfo.ActiveTopology.HasAnyTask(metric.VolumeID) { - glog.V(2).Infof("VACUUM: Skipping volume %d, task already exists in ActiveTopology", metric.VolumeID) - continue - } - } + taskID := fmt.Sprintf("vacuum_vol_%d_%d", vid, time.Now().Unix()) - // Create typed parameters for vacuum task - result.TypedParams = createVacuumTaskParams(result, metric, vacuumConfig, clusterInfo) - if result.TypedParams != nil { - results = append(results, result) + result := &types.TaskDetectionResult{ + TaskID: taskID, + TaskType: types.TaskTypeVacuum, + VolumeID: vid, + Server: primary.Server, + Collection: primary.Collection, + Priority: priority, + Reason: "Volume has excessive garbage requiring vacuum", + ScheduleAt: time.Now(), + } + + // Check if ANY task already exists in ActiveTopology for this volume + if clusterInfo != nil && clusterInfo.ActiveTopology != nil { + if clusterInfo.ActiveTopology.HasAnyTask(vid) { + glog.V(2).Infof("VACUUM: Skipping volume %d, task already exists in ActiveTopology", vid) + continue } - } else { - // Debug why volume was not selected - if debugCount < 5 { // Limit debug output to first 5 volumes - if metric.GarbageRatio < vacuumConfig.GarbageThreshold { - skippedDueToGarbage++ - } - if metric.Age < minVolumeAge { - skippedDueToAge++ - } - } - debugCount++ + } + + result.TypedParams = createVacuumTaskParams(result, replicas, vacuumConfig, clusterInfo) + if result.TypedParams != nil { + results = append(results, result) } } - // Log debug summary if no tasks were created if len(results) == 0 && len(metrics) > 0 { totalVolumes := len(metrics) - glog.V(1).Infof("VACUUM: No tasks created for %d volumes. Threshold=%.2f%%, MinAge=%s. Skipped: %d (garbage= 3 { // Limit to first 3 volumes + if i >= 3 { break } - glog.V(1).Infof("VACUUM: Volume %d: garbage=%.2f%% (need ≥%.2f%%), age=%s (need ≥%s)", - metric.VolumeID, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100, + glog.V(1).Infof("VACUUM: Volume %d on %s: garbage=%.2f%% (need ≥%.2f%%), age=%s (need ≥%s)", + metric.VolumeID, metric.Server, metric.GarbageRatio*100, vacuumConfig.GarbageThreshold*100, metric.Age.Truncate(time.Minute), minVolumeAge.Truncate(time.Minute)) } } @@ -94,54 +108,100 @@ func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterI return results, nil } -// createVacuumTaskParams creates typed parameters for vacuum tasks -// This function is moved from MaintenanceIntegration.createVacuumTaskParams to the detection logic -func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.VolumeHealthMetrics, vacuumConfig *Config, clusterInfo *types.ClusterInfo) *worker_pb.TaskParams { - // Use configured values or defaults - garbageThreshold := 0.3 // Default 30% - verifyChecksum := true // Default to verify - batchSize := int32(1000) // Default batch size - workingDir := "" // Use worker-provided default if empty +// pickPrimaryReplica returns the eligible replica with the highest garbage +// ratio. Returns nil if no replica satisfies both gates. +func pickPrimaryReplica(replicas []*types.VolumeHealthMetrics, garbageThreshold float64, minAge time.Duration) *types.VolumeHealthMetrics { + var best *types.VolumeHealthMetrics + for _, m := range replicas { + if m.GarbageRatio < garbageThreshold { + continue + } + if m.Age < minAge { + continue + } + if best == nil || m.GarbageRatio > best.GarbageRatio { + best = m + } + } + return best +} + +// recordSkipReasons tallies skip counters for debug logging when no +// replica of a volume qualified for vacuum. +func recordSkipReasons(replicas []*types.VolumeHealthMetrics, garbageThreshold float64, minAge time.Duration, garbageSkip, ageSkip, debugCount *int) { + for _, m := range replicas { + if *debugCount >= 5 { + return + } + if m.GarbageRatio < garbageThreshold { + *garbageSkip++ + } + if m.Age < minAge { + *ageSkip++ + } + *debugCount++ + } +} + +// createVacuumTaskParams creates typed parameters for a vacuum task whose +// Sources list contains every replica of the volume. Worker-side +// performVacuum iterates over Sources and vacuums each replica. +func createVacuumTaskParams(task *types.TaskDetectionResult, replicas []*types.VolumeHealthMetrics, vacuumConfig *Config, clusterInfo *types.ClusterInfo) *worker_pb.TaskParams { + garbageThreshold := 0.3 + verifyChecksum := true + batchSize := int32(1000) + workingDir := "" if vacuumConfig != nil { garbageThreshold = vacuumConfig.GarbageThreshold - // Note: VacuumTaskConfig has GarbageThreshold, MinVolumeAgeHours - // Other fields like VerifyChecksum, BatchSize, WorkingDir would need to be added - // to the protobuf definition if they should be configurable } - // Use DC and rack information directly from VolumeHealthMetrics - sourceDC, sourceRack := metric.DataCenter, metric.Rack - - // Get server address from topology (required for vacuum tasks) if clusterInfo == nil || clusterInfo.ActiveTopology == nil { glog.Errorf("Topology not available for vacuum task on volume %d, skipping", task.VolumeID) return nil } - address, err := util.ResolveServerAddress(task.Server, clusterInfo.ActiveTopology) - if err != nil { - glog.Errorf("Failed to resolve address for server %s for vacuum task on volume %d, skipping task: %v", task.Server, task.VolumeID, err) + + sources := make([]*worker_pb.TaskSource, 0, len(replicas)) + seen := make(map[string]struct{}, len(replicas)) + for _, m := range replicas { + address, err := util.ResolveServerAddress(m.Server, clusterInfo.ActiveTopology) + if err != nil { + glog.Warningf("Failed to resolve address for server %s for vacuum task on volume %d, dropping replica: %v", m.Server, task.VolumeID, err) + continue + } + if _, ok := seen[address]; ok { + continue + } + seen[address] = struct{}{} + sources = append(sources, &worker_pb.TaskSource{ + Node: address, + VolumeId: task.VolumeID, + EstimatedSize: m.Size, + DataCenter: m.DataCenter, + Rack: m.Rack, + }) + } + + if len(sources) == 0 { + glog.Errorf("No resolvable replicas for vacuum task on volume %d, skipping", task.VolumeID) return nil } - // Create typed protobuf parameters with unified sources + // Use the primary replica (matches task.Server) for the canonical size + // recorded in TaskParams. Master built-in keeps a single volumeSizeLimit + // here so timeouts are stable across replicas. + canonical := primaryReplicaByServer(replicas, task.Server) + var canonicalSize uint64 + if canonical != nil { + canonicalSize = canonical.Size + } + return &worker_pb.TaskParams{ - TaskId: task.TaskID, // Link to ActiveTopology pending task (if integrated) + TaskId: task.TaskID, VolumeId: task.VolumeID, Collection: task.Collection, - VolumeSize: metric.Size, // Store original volume size for tracking changes - - // Unified sources array - Sources: []*worker_pb.TaskSource{ - { - Node: address, - VolumeId: task.VolumeID, - EstimatedSize: metric.Size, - DataCenter: sourceDC, - Rack: sourceRack, - }, - }, - + VolumeSize: canonicalSize, + Sources: sources, TaskParams: &worker_pb.TaskParams_VacuumParams{ VacuumParams: &worker_pb.VacuumTaskParams{ GarbageThreshold: garbageThreshold, @@ -153,3 +213,15 @@ func createVacuumTaskParams(task *types.TaskDetectionResult, metric *types.Volum }, } } + +func primaryReplicaByServer(replicas []*types.VolumeHealthMetrics, server string) *types.VolumeHealthMetrics { + for _, m := range replicas { + if m.Server == server { + return m + } + } + if len(replicas) > 0 { + return replicas[0] + } + return nil +} diff --git a/weed/worker/tasks/vacuum/plugin_handler.go b/weed/worker/tasks/vacuum/plugin_handler.go index e753e9f15..849100d63 100644 --- a/weed/worker/tasks/vacuum/plugin_handler.go +++ b/weed/worker/tasks/vacuum/plugin_handler.go @@ -395,9 +395,15 @@ func (h *VacuumHandler) Execute(ctx context.Context, request *plugin_pb.ExecuteJ } } + servers := make([]string, 0, len(params.Sources)) + for _, src := range params.Sources { + if src != nil && src.Node != "" { + servers = append(servers, src.Node) + } + } task := NewVacuumTask( request.Job.JobId, - params.Sources[0].Node, + servers, params.VolumeId, params.Collection, h.grpcDialOption, diff --git a/weed/worker/tasks/vacuum/register.go b/weed/worker/tasks/vacuum/register.go index d918ba469..34068f0cb 100644 --- a/weed/worker/tasks/vacuum/register.go +++ b/weed/worker/tasks/vacuum/register.go @@ -50,9 +50,15 @@ func RegisterVacuumTask() { if len(params.Sources) == 0 { return nil, fmt.Errorf("at least one source is required for vacuum task") } + servers := make([]string, 0, len(params.Sources)) + for _, src := range params.Sources { + if src != nil && src.Node != "" { + servers = append(servers, src.Node) + } + } return NewVacuumTask( fmt.Sprintf("vacuum-%d", params.VolumeId), - params.Sources[0].Node, // Use first source node + servers, params.VolumeId, params.Collection, dialOpt, diff --git a/weed/worker/tasks/vacuum/vacuum_task.go b/weed/worker/tasks/vacuum/vacuum_task.go index 3b226334b..c56a8d5c9 100644 --- a/weed/worker/tasks/vacuum/vacuum_task.go +++ b/weed/worker/tasks/vacuum/vacuum_task.go @@ -16,23 +16,33 @@ import ( "google.golang.org/grpc" ) -// VacuumTask implements the Task interface +// VacuumTask implements the Task interface. +// +// One task covers all replicas of a volume so behavior matches the master +// built-in vacuum (see topology.Topology.vacuumOneVolumeId): Check across +// every replica → filter to those whose garbage ratio meets the threshold +// → Compact/Commit/Cleanup that subset. Treating one replica per task (the +// prior behavior) drops the other N-1 replicas because the dispatcher +// gates duplicate tasks per volume via ActiveTopology.HasAnyTask. type VacuumTask struct { *base.BaseTask - server string + servers []string volumeID uint32 collection string garbageThreshold float64 progress float64 grpcDialOption grpc.DialOption volumeSize uint64 + vacuumTargets []string // populated by checkVacuumEligibility — subset of servers that pass the per-replica garbage re-check and proceed to Compact/Commit/Cleanup } -// NewVacuumTask creates a new unified vacuum task instance -func NewVacuumTask(id string, server string, volumeID uint32, collection string, grpcDialOption grpc.DialOption) *VacuumTask { +// NewVacuumTask creates a new unified vacuum task instance covering every +// replica server reported by the dispatcher. +func NewVacuumTask(id string, servers []string, volumeID uint32, collection string, grpcDialOption grpc.DialOption) *VacuumTask { + deduped := dedupePreserveOrder(servers) return &VacuumTask{ BaseTask: base.NewBaseTask(id, types.TaskTypeVacuum), - server: server, + servers: deduped, volumeID: volumeID, collection: collection, garbageThreshold: 0.3, // Default 30% threshold @@ -56,40 +66,49 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) t.GetLogger().WithFields(map[string]interface{}{ "volume_id": t.volumeID, - "server": t.server, + "servers": t.servers, "collection": t.collection, "garbage_threshold": t.garbageThreshold, }).Info("Starting vacuum task") - // Step 1: Check volume status and garbage ratio + if len(t.servers) == 0 { + return fmt.Errorf("no source servers configured for vacuum task") + } + + // Step 1: Check vacuum eligibility for each replica. Mirrors + // topology.batchVacuumVolumeCheck — only replicas whose garbage is at + // or above the threshold proceed to Compact/Commit/Cleanup. t.ReportProgress(10.0) t.GetLogger().Info("Checking volume status") - eligible, currentGarbageRatio, err := t.checkVacuumEligibility(ctx) + targets, currentGarbageRatios, err := t.checkVacuumEligibility(ctx) if err != nil { return fmt.Errorf("failed to check vacuum eligibility: %v", err) } - if !eligible { + if len(targets) == 0 { t.GetLogger().WithFields(map[string]interface{}{ - "current_garbage_ratio": currentGarbageRatio, - "required_threshold": t.garbageThreshold, - }).Info("Volume does not meet vacuum criteria, skipping") + "garbage_ratios": currentGarbageRatios, + "required_threshold": t.garbageThreshold, + }).Info("No replica meets vacuum criteria, skipping") t.ReportProgress(100.0) return nil } + t.vacuumTargets = targets - // Step 2: Perform vacuum operation + // Step 2: Perform vacuum (compact + commit + cleanup) across every + // target replica. t.ReportProgress(50.0) t.GetLogger().WithFields(map[string]interface{}{ - "garbage_ratio": currentGarbageRatio, - "threshold": t.garbageThreshold, + "vacuum_targets": targets, + "garbage_ratios": currentGarbageRatios, + "threshold": t.garbageThreshold, }).Info("Performing vacuum operation") if err := t.performVacuum(ctx); err != nil { return fmt.Errorf("failed to perform vacuum: %v", err) } - // Step 3: Verify vacuum results + // Step 3: Verify vacuum results on each target replica. t.ReportProgress(90.0) t.GetLogger().Info("Verifying vacuum results") if err := t.verifyVacuumResults(ctx); err != nil { @@ -98,8 +117,8 @@ func (t *VacuumTask) Execute(ctx context.Context, params *worker_pb.TaskParams) } t.ReportProgress(100.0) - glog.Infof("Vacuum task completed successfully: volume %d from %s (garbage ratio was %.2f%%)", - t.volumeID, t.server, currentGarbageRatio*100) + glog.Infof("Vacuum task completed successfully: volume %d on %v (garbage ratios %v)", + t.volumeID, targets, currentGarbageRatios) return nil } @@ -118,16 +137,20 @@ func (t *VacuumTask) Validate(params *worker_pb.TaskParams) error { return fmt.Errorf("volume ID mismatch: expected %d, got %d", t.volumeID, params.VolumeId) } - // Validate that at least one source matches our server - found := false + // Every server the task was created with must appear in the params' + // Sources list. The dispatcher fills Sources from the detection-time + // replica set, so a mismatch means the worker received stale routing. + sourceSet := make(map[string]struct{}, len(params.Sources)) for _, source := range params.Sources { - if source.Node == t.server { - found = true - break + if source == nil { + continue } + sourceSet[source.Node] = struct{}{} } - if !found { - return fmt.Errorf("no source matches expected server %s", t.server) + for _, server := range t.servers { + if _, ok := sourceSet[server]; !ok { + return fmt.Errorf("task server %s not present in params.Sources", server) + } } if vacuumParams.GarbageThreshold < 0 || vacuumParams.GarbageThreshold > 1.0 { @@ -161,11 +184,40 @@ func (t *VacuumTask) vacuumTimeout(base time.Duration) time.Duration { // Helper methods for real vacuum operations -// checkVacuumEligibility checks if the volume meets vacuum criteria -func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) (bool, float64, error) { - var garbageRatio float64 +// checkVacuumEligibility queries every replica's current garbage ratio. +// Mirrors topology.batchVacuumVolumeCheck's all-or-nothing contract: if +// any replica's check fails (unreachable, RPC error, etc.) the entire +// task aborts. Vacuuming only the replicas that responded while +// silently skipping unreachable ones would compact the responders' +// garbage but leave the unreachable replica still carrying it, +// producing divergence the moment that replica comes back. +// +// Returns the subset of servers whose garbage is at or above the +// configured threshold, alongside a per-server ratio map for logging. +func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) ([]string, map[string]float64, error) { + ratios := make(map[string]float64, len(t.servers)) + for _, server := range t.servers { + ratio, err := t.checkOneVacuumEligibility(ctx, server) + if err != nil { + return nil, ratios, fmt.Errorf("vacuum check on %s for volume %d: %w (aborting; refusing to vacuum subset of replicas)", server, t.volumeID, err) + } + ratios[server] = ratio + glog.V(1).Infof("Volume %d on %s garbage ratio: %.2f%%, threshold: %.2f%%", + t.volumeID, server, ratio*100, t.garbageThreshold*100) + } - err := operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, + eligible := make([]string, 0, len(ratios)) + for _, server := range t.servers { + if ratios[server] >= t.garbageThreshold { + eligible = append(eligible, server) + } + } + return eligible, ratios, nil +} + +func (t *VacuumTask) checkOneVacuumEligibility(ctx context.Context, server string) (float64, error) { + var garbageRatio float64 + err := operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { checkCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) defer cancel() @@ -175,95 +227,163 @@ func (t *VacuumTask) checkVacuumEligibility(ctx context.Context) (bool, float64, if err != nil { return fmt.Errorf("failed to check volume vacuum status: %v", err) } - garbageRatio = resp.GarbageRatio - return nil }) - - if err != nil { - return false, 0, err - } - - eligible := garbageRatio >= t.garbageThreshold - glog.V(1).Infof("Volume %d garbage ratio: %.2f%%, threshold: %.2f%%, eligible: %v", - t.volumeID, garbageRatio*100, t.garbageThreshold*100, eligible) - - return eligible, garbageRatio, nil + return garbageRatio, err } -// performVacuum executes the actual vacuum operation +// performVacuum runs the two-phase vacuum protocol that master built-in +// vacuum uses (topology.vacuumOneVolumeId): +// +// Phase 1 (Compact): build the new .cpd/.cpx files on every target. +// If any replica fails, roll back by Cleanup'ing the .cp* temp files +// on every target and abort — no replica has yet swapped its active +// files, so no replica is committed. +// +// Phase 2 (Commit): swap each target's active files with its .cp* +// files. Best-effort, matching batchVacuumVolumeCommit: per-replica +// errors are logged and surfaced together, but once any replica has +// swapped there is no clean rollback for the others, so we do not +// retry or undo. An operator must reconcile a partial commit +// failure. +// +// Interleaving Compact→Commit→Cleanup per replica (the prior behavior) +// could leave a committed first replica beside an uncompacted second +// replica when Compact on the second failed — replica divergence with +// no automatic recovery. func (t *VacuumTask) performVacuum(ctx context.Context) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, + // Phase 1: Compact all targets. + for _, server := range t.vacuumTargets { + if err := t.compactOne(ctx, server); err != nil { + t.cleanupAll(ctx) + return fmt.Errorf("vacuum compact on %s: %w", server, err) + } + } + + // Phase 2: Commit all targets. + var commitErrors []error + for _, server := range t.vacuumTargets { + if err := t.commitOne(ctx, server); err != nil { + glog.Errorf("vacuum commit on %s for volume %d: %v", server, t.volumeID, err) + commitErrors = append(commitErrors, fmt.Errorf("%s: %w", server, err)) + } + } + if len(commitErrors) > 0 { + return fmt.Errorf("vacuum commit failed on %d/%d replicas: %v", + len(commitErrors), len(t.vacuumTargets), commitErrors) + } + return nil +} + +func (t *VacuumTask) compactOne(ctx context.Context, server string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - // Step 1: Compact the volume (3 min per GB, matching topology vacuum) - t.GetLogger().Info("Compacting volume") - compactCtx, compactCancel := context.WithTimeout(ctx, t.vacuumTimeout(3*time.Minute)) - defer compactCancel() + t.GetLogger().Info("Compacting volume on %s", server) + compactCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(3*time.Minute)) + defer cancel() stream, err := client.VacuumVolumeCompact(compactCtx, &volume_server_pb.VacuumVolumeCompactRequest{ VolumeId: t.volumeID, }) if err != nil { - return fmt.Errorf("vacuum compact failed: %v", err) + return fmt.Errorf("vacuum compact start: %v", err) } - - // Read compact progress for { resp, recvErr := stream.Recv() if recvErr != nil { if recvErr == io.EOF { break } - return fmt.Errorf("vacuum compact stream error: %v", recvErr) + return fmt.Errorf("vacuum compact stream: %v", recvErr) } - glog.V(2).Infof("Volume %d compact progress: %d bytes processed", t.volumeID, resp.ProcessedBytes) + glog.V(2).Infof("Volume %d on %s compact progress: %d bytes", t.volumeID, server, resp.ProcessedBytes) } - - // Step 2: Commit the vacuum (1 min per GB) - t.GetLogger().Info("Committing vacuum operation") - commitCtx, commitCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) - defer commitCancel() - _, err = client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("vacuum commit failed: %v", err) - } - - // Step 3: Cleanup old files (1 min per GB) - t.GetLogger().Info("Cleaning up vacuum files") - cleanupCtx, cleanupCancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) - defer cleanupCancel() - _, err = client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{ - VolumeId: t.volumeID, - }) - if err != nil { - return fmt.Errorf("vacuum cleanup failed: %v", err) - } - - glog.V(1).Infof("Volume %d vacuum operation completed successfully", t.volumeID) return nil }) } -// verifyVacuumResults checks the volume status after vacuum -func (t *VacuumTask) verifyVacuumResults(ctx context.Context) error { - return operation.WithVolumeServerClient(false, pb.ServerAddress(t.server), t.grpcDialOption, +func (t *VacuumTask) commitOne(ctx context.Context, server string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - verifyCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + t.GetLogger().Info("Committing vacuum on %s", server) + commitCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) defer cancel() - resp, err := client.VacuumVolumeCheck(verifyCtx, &volume_server_pb.VacuumVolumeCheckRequest{ + _, err := client.VacuumVolumeCommit(commitCtx, &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: t.volumeID, }) if err != nil { - return fmt.Errorf("failed to verify vacuum results: %v", err) + return fmt.Errorf("vacuum commit: %v", err) } - - postVacuumGarbageRatio := resp.GarbageRatio - - glog.V(1).Infof("Volume %d post-vacuum garbage ratio: %.2f%%", - t.volumeID, postVacuumGarbageRatio*100) - return nil }) } + +func (t *VacuumTask) cleanupOne(ctx context.Context, server string) error { + return operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + cleanupCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + _, err := client.VacuumVolumeCleanup(cleanupCtx, &volume_server_pb.VacuumVolumeCleanupRequest{ + VolumeId: t.volumeID, + }) + return err + }) +} + +// cleanupAll removes the .cpd/.cpx/.cpldb temp files on every target. +// Used to roll back when Compact fails on one replica after others +// have already created their temp files. Per-target failures are +// logged but never bubble up — the rollback is best-effort. +func (t *VacuumTask) cleanupAll(ctx context.Context) { + for _, server := range t.vacuumTargets { + if err := t.cleanupOne(ctx, server); err != nil { + glog.Warningf("rollback cleanup on %s for volume %d: %v", server, t.volumeID, err) + } + } +} + +// verifyVacuumResults checks each target replica's post-vacuum garbage +// ratio. Failures are logged at WARN — the task does not fail because the +// vacuum itself already succeeded. +func (t *VacuumTask) verifyVacuumResults(ctx context.Context) error { + for _, server := range t.vacuumTargets { + err := operation.WithVolumeServerClient(false, pb.ServerAddress(server), t.grpcDialOption, + func(client volume_server_pb.VolumeServerClient) error { + verifyCtx, cancel := context.WithTimeout(ctx, t.vacuumTimeout(time.Minute)) + defer cancel() + resp, err := client.VacuumVolumeCheck(verifyCtx, &volume_server_pb.VacuumVolumeCheckRequest{ + VolumeId: t.volumeID, + }) + if err != nil { + return fmt.Errorf("failed to verify vacuum results: %v", err) + } + glog.V(1).Infof("Volume %d on %s post-vacuum garbage ratio: %.2f%%", + t.volumeID, server, resp.GarbageRatio*100) + return nil + }) + if err != nil { + glog.Warningf("post-vacuum verify on %s: %v", server, err) + } + } + return nil +} + +// dedupePreserveOrder returns servers with duplicates removed, keeping the +// first occurrence's position. Detection sometimes hands the same node +// address in multiple Sources (e.g. EC variants); we coalesce them so each +// physical replica is vacuumed exactly once. +func dedupePreserveOrder(servers []string) []string { + seen := make(map[string]struct{}, len(servers)) + out := make([]string, 0, len(servers)) + for _, s := range servers { + if s == "" { + continue + } + if _, ok := seen[s]; ok { + continue + } + seen[s] = struct{}{} + out = append(out, s) + } + return out +}