Files
seaweedfs/test/plugin_workers/vacuum/execution_test.go
Jaehoon Kim d00acded8a fix(vacuum): batch all replicas in a single plugin worker task (#9702)
* fix(vacuum): batch all replicas in a single plugin worker task

The plugin worker vacuum path emitted one TaskDetectionResult per
(volume, server) replica, but the dispatcher gates duplicate tasks per
volume via ActiveTopology.HasAnyTask. The first replica's task was
created and the remaining N-1 replicas were silently dropped, so only
one replica per volume was ever vacuumed — leaving the others with all
their garbage intact.

Mirror the master built-in flow (topology.vacuumOneVolumeId →
batchVacuumVolumeCheck/Compact/Commit/Cleanup) by:

- aggregating detection metrics by VolumeID so a single task carries
  every replica in TaskParams.Sources
- having VacuumTask accept []string servers (instead of a single
  string), re-check each replica's garbage ratio at execute time to
  derive a vacuumTargets subset, and run Compact/Commit/Cleanup against
  only that subset
- updating the dispatcher (plugin_handler.Execute, register.CreateTask)
  to forward every Sources node to NewVacuumTask

* fix(vacuum): run all-replica vacuum in two phases to keep failure atomic

The prior implementation iterated Compact → Commit → Cleanup against
each replica in sequence. A Compact failure on the second replica left
the first one already committed (its active files swapped with the
.cp* files), producing replica divergence with no automatic recovery.

Split performVacuum into two phases, matching topology.vacuumOneVolumeId:

  Phase 1 — Compact all targets. If any fails, run VacuumVolumeCleanup
  on every target to drop the .cpd/.cpx/.cpldb temp files, then abort.
  No replica has swapped yet, so every replica returns to its original
  state.

  Phase 2 — Commit all targets. Best-effort, matching
  batchVacuumVolumeCommit: per-replica errors are collected and
  surfaced together. Once any replica has swapped there is no clean
  rollback, so a partial Phase 2 failure requires operator
  reconciliation.

Adds compactOne / commitOne / cleanupOne / cleanupAll helpers and
removes the old performVacuumOne.

* fix(vacuum): abort when any replica's garbage check fails

The prior check tolerated per-replica RPC errors and only failed the
task if every replica errored — partial failures were silently treated
as "ineligible" so the responding replicas would still be vacuumed.
That produces divergence the moment the unreachable replica comes
back: it still carries the original garbage while the others have
been compacted.

Match topology.batchVacuumVolumeCheck's contract instead — its return
value (errCount == 0 && len(vacuumLocationList.list) > 0) gates the
whole vacuum on every replica's check succeeding. If any replica is
unreachable or its VacuumVolumeCheck RPC errors, abort the task; the
volume will be retried on the next detection cycle once the replica
is healthy.

* fix(vacuum): guard against nil metrics and TaskSource entries

Detection's bucket-building loop dereferenced m.VolumeID without
checking m for nil. VacuumTask.Validate built sourceSet from
params.Sources without checking each entry for nil. Both paths would
panic on a malformed protobuf payload that managed to deliver a nil
slot. Skip nil entries in both loops — neutral with the existing
nil/empty filtering already done in register.CreateTask and
plugin_handler.Execute.

* test(vacuum): success path no longer calls VacuumVolumeCleanup

The plugin worker vacuum is now two-phase (Compact-all → Commit-all,
with Cleanup only invoked on Compact failure to roll back .cp* temp
files). This matches topology.vacuumOneVolumeId, where
batchVacuumVolumeCleanup runs only on the Compact-failure branch.

On a successful Commit the temp files do not linger:
  - CommitCompactVolume renames .cpd → .dat and .cpx → .idx
  - leveldb needle map renames .cpldb → .ldb (needle_map_leveldb.go)

so calling VacuumVolumeCleanup afterwards is a redundant no-op. The
prior worker code called it unconditionally and the integration test
asserted that — switch the expectation to cleanupCalls == 0 to
document the new (and master-aligned) contract.
2026-05-27 11:15:25 -07:00

70 lines
2.2 KiB
Go

package vacuum_test
import (
"context"
"fmt"
"testing"
"time"
pluginworkers "github.com/seaweedfs/seaweedfs/test/plugin_workers"
"github.com/seaweedfs/seaweedfs/weed/pb/plugin_pb"
pluginworker "github.com/seaweedfs/seaweedfs/weed/plugin/worker"
"github.com/seaweedfs/seaweedfs/weed/worker/tasks/vacuum"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestVacuumExecutionIntegration(t *testing.T) {
volumeID := uint32(202)
dialOption := grpc.WithTransportCredentials(insecure.NewCredentials())
handler := vacuum.NewVacuumHandler(dialOption, 1)
harness := pluginworkers.NewHarness(t, pluginworkers.HarnessConfig{
WorkerOptions: pluginworker.WorkerOptions{
GrpcDialOption: dialOption,
},
Handlers: []pluginworker.JobHandler{handler},
})
harness.WaitForJobType("vacuum")
source := pluginworkers.NewVolumeServer(t, "")
source.SetVacuumGarbageRatio(0.6)
job := &plugin_pb.JobSpec{
JobId: fmt.Sprintf("vacuum-job-%d", volumeID),
JobType: "vacuum",
Parameters: map[string]*plugin_pb.ConfigValue{
"volume_id": {
Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: int64(volumeID)},
},
"server": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: source.Address()},
},
"collection": {
Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "vac-test"},
},
},
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := harness.Plugin().ExecuteJob(ctx, job, nil, 1)
require.NoError(t, err)
require.NotNil(t, result)
require.True(t, result.Success)
checkCalls, compactCalls, commitCalls, cleanupCalls := source.VacuumStats()
require.GreaterOrEqual(t, checkCalls, 2)
require.GreaterOrEqual(t, compactCalls, 1)
require.GreaterOrEqual(t, commitCalls, 1)
// Cleanup is only invoked when Phase 1 (Compact) fails to roll back
// the .cpd/.cpx/.cpldb temp files; on the success path Commit
// consumes them (rename .cpd → .dat, .cpx → .idx, .cpldb → .ldb via
// the leveldb needle map) so no Cleanup call is needed. Matches
// topology.vacuumOneVolumeId which only calls batchVacuumVolumeCleanup
// on the Compact-failure branch.
require.Equal(t, 0, cleanupCalls)
}