diff --git a/test/vacuum/vacuum_integration_test.go b/test/vacuum/vacuum_integration_test.go index 166946106..6d7d14585 100644 --- a/test/vacuum/vacuum_integration_test.go +++ b/test/vacuum/vacuum_integration_test.go @@ -10,6 +10,8 @@ import ( "os" "os/exec" "path/filepath" + "sort" + "strings" "testing" "time" @@ -226,16 +228,30 @@ func TestVacuumIntegration(t *testing.T) { var fids []string var payloads [][]byte - var volumeId needle.VolumeId + var fileVolumes []needle.VolumeId for i := 0; i < totalFiles; i++ { data := bytes.Repeat([]byte{byte('A' + i%26)}, fileSize) fid, vid, err := uploadData(masterAddr, collection, data) require.NoError(t, err, "upload %d", i) fids = append(fids, fid) payloads = append(payloads, data) - volumeId = vid + fileVolumes = append(fileVolumes, vid) } - t.Logf("Uploaded %d files (%d KB each) to volume %d", totalFiles, fileSize/1024, volumeId) + // Collect the set of volumes that will contain garbage after the deletes below. + // The master may spread uploads across multiple volumes, so we cannot assume + // a single volume id holds all the garbage. + dirtyVolumesSet := map[needle.VolumeId]struct{}{} + for i := 0; i < filesToDelete; i++ { + dirtyVolumesSet[fileVolumes[i]] = struct{}{} + } + var dirtyVolumes []needle.VolumeId + for v := range dirtyVolumesSet { + dirtyVolumes = append(dirtyVolumes, v) + } + // Sort for deterministic log output and stable iteration order across runs. + sort.Slice(dirtyVolumes, func(i, j int) bool { return dirtyVolumes[i] < dirtyVolumes[j] }) + t.Logf("Uploaded %d files (%d KB each) across volumes %v; will delete from volumes %v", + totalFiles, fileSize/1024, fileVolumes, dirtyVolumes) // Wait for heartbeat to report sizes time.Sleep(6 * time.Second) @@ -250,19 +266,44 @@ func TestVacuumIntegration(t *testing.T) { // Wait for heartbeat to report deletions time.Sleep(6 * time.Second) - // Verify garbage exists + // Verify garbage exists on every volume we deleted from. + // Retry briefly in case heartbeats / deletions have not fully settled. + // We require all dirty volumes to report garbage > threshold so that + // the subsequent vacuum + cleanup check has a well-defined expectation + // for every volume, not just the first one that happens to be ready. t.Run("verify_garbage_before_vacuum", func(t *testing.T) { - for _, addr := range []string{"127.0.0.1:8080", "127.0.0.1:8081"} { - ratio, err := getGarbageRatio(addr, uint32(volumeId)) - if err != nil { - continue + deadline := time.Now().Add(20 * time.Second) + var lastMissing needle.VolumeId + for { + ready := true + for _, vid := range dirtyVolumes { + volumeReady := false + for _, addr := range []string{"127.0.0.1:8080", "127.0.0.1:8081"} { + ratio, err := getGarbageRatio(addr, uint32(vid)) + if err != nil { + continue + } + t.Logf("Garbage ratio for volume %d on %s: %.2f%%", vid, addr, ratio*100) + if ratio > 0.1 { + volumeReady = true + break + } + } + if !volumeReady { + ready = false + lastMissing = vid + break + } } - t.Logf("Garbage ratio on %s: %.2f%%", addr, ratio*100) - if ratio > 0.1 { - return // sufficient garbage found + if ready { + return } + if time.Now().After(deadline) { + break + } + time.Sleep(1 * time.Second) } - t.Fatal("No server reported garbage > 10% — test data setup failed") + t.Fatalf("volume %d did not report garbage > 10%% — test data setup failed", lastMissing) }) // Execute vacuum via shell command @@ -305,29 +346,60 @@ func TestVacuumIntegration(t *testing.T) { t.Log("Vacuum completed successfully") }) - // Wait for vacuum effects to settle - time.Sleep(6 * time.Second) - - // Verify garbage was cleaned + // Verify garbage was cleaned on every volume we deleted from. + // Vacuum + heartbeat reporting is asynchronous, so retry until each + // volume reports a cleaned ratio or the deadline expires. t.Run("verify_cleanup_after_vacuum", func(t *testing.T) { - var volumeFound, cleanupVerified bool - for _, addr := range []string{"127.0.0.1:8080", "127.0.0.1:8081"} { - ratio, err := getGarbageRatio(addr, uint32(volumeId)) - if err != nil { - continue + deadline := time.Now().Add(30 * time.Second) + remaining := map[needle.VolumeId]struct{}{} + for _, vid := range dirtyVolumes { + remaining[vid] = struct{}{} + } + failureReasons := map[needle.VolumeId]string{} + for { + for vid := range remaining { + var volumeFound, cleanupVerified bool + for _, addr := range []string{"127.0.0.1:8080", "127.0.0.1:8081"} { + ratio, err := getGarbageRatio(addr, uint32(vid)) + if err != nil { + continue + } + volumeFound = true + t.Logf("Garbage ratio for volume %d after vacuum on %s: %.2f%%", vid, addr, ratio*100) + if ratio < 0.05 { + cleanupVerified = true + break + } + } + switch { + case !volumeFound: + failureReasons[vid] = fmt.Sprintf("no server reported volume %d after vacuum", vid) + case !cleanupVerified: + failureReasons[vid] = fmt.Sprintf("garbage on volume %d was not cleaned up after vacuum", vid) + default: + delete(remaining, vid) + delete(failureReasons, vid) + } } - volumeFound = true - t.Logf("Garbage ratio after vacuum on %s: %.2f%%", addr, ratio*100) - if ratio < 0.05 { - cleanupVerified = true + if len(remaining) == 0 { + return } + if time.Now().After(deadline) { + break + } + time.Sleep(1 * time.Second) } - if !volumeFound { - t.Fatal("No server reported volume after vacuum") + stillFailing := make([]needle.VolumeId, 0, len(remaining)) + for vid := range remaining { + stillFailing = append(stillFailing, vid) } - if !cleanupVerified { - t.Fatal("Garbage was not cleaned up after vacuum") + sort.Slice(stillFailing, func(i, j int) bool { return stillFailing[i] < stillFailing[j] }) + msgs := make([]string, 0, len(stillFailing)) + for _, vid := range stillFailing { + msgs = append(msgs, failureReasons[vid]) } + t.Fatalf("cleanup verification failed for %d volume(s): %s", + len(stillFailing), strings.Join(msgs, "; ")) }) // Verify remaining files are still readable with correct contents