test(vacuum): fix flaky TestVacuumIntegration across multiple volumes (#9061)

* test(vacuum): fix flaky TestVacuumIntegration across multiple volumes

The test assumed all uploaded files landed in a single volume and
tracked only the last file's volume id. With -volumeSizeLimitMB 10
and 16x500KB files, the master can spread uploads across volumes,
so the tracked id could point to a volume with no deletes and thus
0% garbage — causing verify_garbage_before_vacuum to fail even
though vacuum ran correctly on the other volume.

Track the set of volumes where deletes actually occurred and
verify garbage/cleanup against all of them. Also add a short
retry loop on the pre-vacuum check to absorb heartbeat jitter.

* test(vacuum): require all dirty volumes ready; retry cleanup check

Address review feedback: the pre-vacuum check now waits until every
volume in dirtyVolumes reports garbage > threshold (not just the
first), and the post-vacuum cleanup check retries per-volume with a
deadline instead of relying on a fixed sleep, since vacuum + heartbeat
reporting is asynchronous.

* test(vacuum): deterministic dirty volumes order, aggregate cleanup failures

- Sort dirtyVolumes after building from the set so logs and iteration
  are stable across runs.
- In verify_cleanup_after_vacuum, track per-volume failure reasons in a
  map and report all still-failing volumes on timeout instead of only
  the last one that happened to be written to lastErr.
This commit is contained in:
Chris Lu
2026-04-13 17:56:32 -07:00
committed by GitHub
parent e0c361ec77
commit f00cbe4a6d

View File

@@ -10,6 +10,8 @@ import (
"os"
"os/exec"
"path/filepath"
"sort"
"strings"
"testing"
"time"
@@ -226,16 +228,30 @@ func TestVacuumIntegration(t *testing.T) {
var fids []string
var payloads [][]byte
var volumeId needle.VolumeId
var fileVolumes []needle.VolumeId
for i := 0; i < totalFiles; i++ {
data := bytes.Repeat([]byte{byte('A' + i%26)}, fileSize)
fid, vid, err := uploadData(masterAddr, collection, data)
require.NoError(t, err, "upload %d", i)
fids = append(fids, fid)
payloads = append(payloads, data)
volumeId = vid
fileVolumes = append(fileVolumes, vid)
}
t.Logf("Uploaded %d files (%d KB each) to volume %d", totalFiles, fileSize/1024, volumeId)
// Collect the set of volumes that will contain garbage after the deletes below.
// The master may spread uploads across multiple volumes, so we cannot assume
// a single volume id holds all the garbage.
dirtyVolumesSet := map[needle.VolumeId]struct{}{}
for i := 0; i < filesToDelete; i++ {
dirtyVolumesSet[fileVolumes[i]] = struct{}{}
}
var dirtyVolumes []needle.VolumeId
for v := range dirtyVolumesSet {
dirtyVolumes = append(dirtyVolumes, v)
}
// Sort for deterministic log output and stable iteration order across runs.
sort.Slice(dirtyVolumes, func(i, j int) bool { return dirtyVolumes[i] < dirtyVolumes[j] })
t.Logf("Uploaded %d files (%d KB each) across volumes %v; will delete from volumes %v",
totalFiles, fileSize/1024, fileVolumes, dirtyVolumes)
// Wait for heartbeat to report sizes
time.Sleep(6 * time.Second)
@@ -250,19 +266,44 @@ func TestVacuumIntegration(t *testing.T) {
// Wait for heartbeat to report deletions
time.Sleep(6 * time.Second)
// Verify garbage exists
// Verify garbage exists on every volume we deleted from.
// Retry briefly in case heartbeats / deletions have not fully settled.
// We require all dirty volumes to report garbage > threshold so that
// the subsequent vacuum + cleanup check has a well-defined expectation
// for every volume, not just the first one that happens to be ready.
t.Run("verify_garbage_before_vacuum", func(t *testing.T) {
for _, addr := range []string{"127.0.0.1:8080", "127.0.0.1:8081"} {
ratio, err := getGarbageRatio(addr, uint32(volumeId))
if err != nil {
continue
deadline := time.Now().Add(20 * time.Second)
var lastMissing needle.VolumeId
for {
ready := true
for _, vid := range dirtyVolumes {
volumeReady := false
for _, addr := range []string{"127.0.0.1:8080", "127.0.0.1:8081"} {
ratio, err := getGarbageRatio(addr, uint32(vid))
if err != nil {
continue
}
t.Logf("Garbage ratio for volume %d on %s: %.2f%%", vid, addr, ratio*100)
if ratio > 0.1 {
volumeReady = true
break
}
}
if !volumeReady {
ready = false
lastMissing = vid
break
}
}
t.Logf("Garbage ratio on %s: %.2f%%", addr, ratio*100)
if ratio > 0.1 {
return // sufficient garbage found
if ready {
return
}
if time.Now().After(deadline) {
break
}
time.Sleep(1 * time.Second)
}
t.Fatal("No server reported garbage > 10% — test data setup failed")
t.Fatalf("volume %d did not report garbage > 10%% — test data setup failed", lastMissing)
})
// Execute vacuum via shell command
@@ -305,29 +346,60 @@ func TestVacuumIntegration(t *testing.T) {
t.Log("Vacuum completed successfully")
})
// Wait for vacuum effects to settle
time.Sleep(6 * time.Second)
// Verify garbage was cleaned
// Verify garbage was cleaned on every volume we deleted from.
// Vacuum + heartbeat reporting is asynchronous, so retry until each
// volume reports a cleaned ratio or the deadline expires.
t.Run("verify_cleanup_after_vacuum", func(t *testing.T) {
var volumeFound, cleanupVerified bool
for _, addr := range []string{"127.0.0.1:8080", "127.0.0.1:8081"} {
ratio, err := getGarbageRatio(addr, uint32(volumeId))
if err != nil {
continue
deadline := time.Now().Add(30 * time.Second)
remaining := map[needle.VolumeId]struct{}{}
for _, vid := range dirtyVolumes {
remaining[vid] = struct{}{}
}
failureReasons := map[needle.VolumeId]string{}
for {
for vid := range remaining {
var volumeFound, cleanupVerified bool
for _, addr := range []string{"127.0.0.1:8080", "127.0.0.1:8081"} {
ratio, err := getGarbageRatio(addr, uint32(vid))
if err != nil {
continue
}
volumeFound = true
t.Logf("Garbage ratio for volume %d after vacuum on %s: %.2f%%", vid, addr, ratio*100)
if ratio < 0.05 {
cleanupVerified = true
break
}
}
switch {
case !volumeFound:
failureReasons[vid] = fmt.Sprintf("no server reported volume %d after vacuum", vid)
case !cleanupVerified:
failureReasons[vid] = fmt.Sprintf("garbage on volume %d was not cleaned up after vacuum", vid)
default:
delete(remaining, vid)
delete(failureReasons, vid)
}
}
volumeFound = true
t.Logf("Garbage ratio after vacuum on %s: %.2f%%", addr, ratio*100)
if ratio < 0.05 {
cleanupVerified = true
if len(remaining) == 0 {
return
}
if time.Now().After(deadline) {
break
}
time.Sleep(1 * time.Second)
}
if !volumeFound {
t.Fatal("No server reported volume after vacuum")
stillFailing := make([]needle.VolumeId, 0, len(remaining))
for vid := range remaining {
stillFailing = append(stillFailing, vid)
}
if !cleanupVerified {
t.Fatal("Garbage was not cleaned up after vacuum")
sort.Slice(stillFailing, func(i, j int) bool { return stillFailing[i] < stillFailing[j] })
msgs := make([]string, 0, len(stillFailing))
for _, vid := range stillFailing {
msgs = append(msgs, failureReasons[vid])
}
t.Fatalf("cleanup verification failed for %d volume(s): %s",
len(stillFailing), strings.Join(msgs, "; "))
})
// Verify remaining files are still readable with correct contents