fix(scrub): walk the on-disk .idx in Volume::scrub (source-of-truth parity) (#10143)

* refactor(scrub): extract open_index_for_scrub shared by scrub_index

Mirrors Go's openIndex, shared by ScrubIndex and the upcoming Scrub rewrite.

Claude-Session: https://claude.ai/code/session_015EE9Sc9EvNp8BCVva4RKdo

* fix(scrub): walk the on-disk .idx in Volume::scrub

scrub walked the deduped in-memory map, so total_read undercounted the
physical .dat on any volume with overwrites or deletes and the size
reconcile falsely flagged healthy volumes broken. Walk every .idx row
instead (matching Go's scrubVolumeData): count all rows, CRC-verify live
needles, skip deleted, and reconcile against the .dat. Holds one data-file
read lock and reads via the unlocked path, like Go's Scrub.

Claude-Session: https://claude.ai/code/session_015EE9Sc9EvNp8BCVva4RKdo
This commit is contained in:
Chris Lu
2026-06-30 01:17:28 -07:00
committed by GitHub
parent 1df7a0e653
commit dccc015a1f
+115 -77
View File
@@ -1966,46 +1966,47 @@ impl Volume {
Ok(())
}
/// Open the on-disk .idx for scrubbing and apply Go openIndex's zero-size
/// guard: a zero-size index is only legal for a pre-allocated volume whose
/// .dat is superblock-only; an empty index over a populated .dat is
/// corruption. Returns the open file + size, or the messages to report.
fn open_index_for_scrub(&self) -> Result<(File, i64), Vec<String>> {
let idx_path = self.file_name(".idx");
let idx_file =
File::open(&idx_path).map_err(|e| vec![format!("open index file {}: {}", idx_path, e)])?;
let idx_file_size = idx_file
.metadata()
.map_err(|e| vec![format!("stat index file {}: {}", idx_path, e)])?
.len() as i64;
if idx_file_size == 0 {
match self.dat_file_size() {
Ok(dat_size) if dat_size > SUPER_BLOCK_SIZE as u64 => {
return Err(vec![format!(
"zero-size IDX file for volume {} with store size {}",
self.id.0, dat_size
)]);
}
Ok(_) => {}
Err(e) => {
return Err(vec![format!(
"stat data file for volume {}: {}",
self.id.0, e
)])
}
}
}
Ok((idx_file, idx_file_size))
}
/// Scrub the volume index: verify the on-disk .idx for overlapping needles
/// and a size that is a whole number of entries. Mirrors Go's Volume.ScrubIndex
/// → idx.CheckIndexFile. Index-only; does not read the .dat.
pub fn scrub_index(&self) -> Result<(u64, Vec<String>), VolumeError> {
let _guard = self.data_file_access_control.read_lock();
let idx_path = self.file_name(".idx");
let mut idx_file = match File::open(&idx_path) {
Ok(f) => f,
Err(e) => return Ok((0, vec![format!("open index file {}: {}", idx_path, e)])),
let (mut idx_file, idx_file_size) = match self.open_index_for_scrub() {
Ok(v) => v,
Err(msgs) => return Ok((0, msgs)),
};
let idx_file_size = match idx_file.metadata() {
Ok(m) => m.len() as i64,
Err(e) => return Ok((0, vec![format!("stat index file {}: {}", idx_path, e)])),
};
// A zero-size index is only legal for a pre-allocated volume without
// data (e.g. after volume.grow); a populated .dat with an empty index
// is corruption the scrub must catch.
if idx_file_size == 0 {
match self.dat_file_size() {
Ok(dat_size) if dat_size > SUPER_BLOCK_SIZE as u64 => {
return Ok((
0,
vec![format!(
"zero-size IDX file for volume {} with store size {}",
self.id.0, dat_size
)],
));
}
Ok(_) => {}
Err(e) => {
return Ok((
0,
vec![format!("stat data file for volume {}: {}", self.id.0, e)],
))
}
}
}
Ok(crate::storage::idx::check_index_file(
&mut idx_file,
idx_file_size,
@@ -2013,74 +2014,81 @@ impl Volume {
))
}
/// Scrub the volume by reading and verifying all needles.
/// Returns (files_checked, broken_needles) tuple.
/// Each needle is read from disk and its CRC checksum is verified.
/// Scrub the volume: walk the on-disk .idx (every physical row, including
/// superseded and deleted entries), CRC-verify each live needle, and reconcile
/// the .dat size against the needles it indexes. Mirrors Go's Volume.Scrub →
/// scrubVolumeData. Returns (rows_walked, broken_needles).
pub fn scrub(&self) -> Result<(u64, Vec<String>), VolumeError> {
if self.dat_file.is_none() && self.remote_dat_file.is_none() {
// Matches Go's scrubVolumeData: a volume with no data backend can't be
// scrubbed (and a zero dat_size would flag every needle out of range).
if !self.has_data_backend() {
return Err(VolumeError::NotFound);
}
let nm = self.nm_or_not_found()?;
let _guard = self.data_file_access_control.read_lock();
let (mut idx_file, idx_file_size) = match self.open_index_for_scrub() {
Ok(v) => v,
Err(msgs) => return Ok((0, msgs)),
};
let dat_size = self.dat_file_size().map_err(|e| VolumeError::Io(e))?;
let version = self.version();
let dat_size = self.dat_file_size().map_err(VolumeError::Io)?;
let mut files_checked: u64 = 0;
let mut broken = Vec::new();
// Full scrub also runs the index structural check.
let (_, mut broken) =
crate::storage::idx::check_index_file(&mut idx_file, idx_file_size, version);
let mut count: u64 = 0;
let mut total_read: i64 = 0;
for (needle_id, nv) in nm.iter_entries() {
if nv.offset.is_zero() {
continue;
let walk = crate::storage::idx::walk_index_file(&mut idx_file, 0, |needle_id, offset, size| {
count += 1;
// Deleted needles still occupy .dat space: count their size, don't read.
total_read += get_actual_size(size, version);
if size.is_deleted() {
return Ok(());
}
// Accumulate actual needle size for ALL entries including deleted ones
// (matches Go: deleted needles still occupy space in the .dat file).
total_read += get_actual_size(nv.size, version);
if nv.size.is_deleted() {
continue;
}
let offset = nv.offset.to_actual_offset();
if offset < 0 || offset as u64 >= dat_size {
let actual_offset = offset.to_actual_offset();
if actual_offset < 0 || actual_offset as u64 >= dat_size {
broken.push(format!(
"needle {} offset {} out of range (dat_size={})",
needle_id.0, offset, dat_size
needle_id.0, actual_offset, dat_size
));
continue;
return Ok(());
}
// Read and verify the needle (read_needle_data_at checks CRC via read_bytes/read_tail)
// Lock held above; read directly via the unlocked path (matches Go).
let mut n = Needle {
id: needle_id,
..Needle::default()
};
match self.read_needle_data_at(&mut n, offset, nv.size) {
Ok(_) => {}
Err(e) => {
broken.push(format!("needle {} error: {}", needle_id.0, e));
}
let mut read_option = ReadOption::default();
if let Err(e) =
self.read_needle_data_at_unlocked(&mut n, actual_offset, size, &mut read_option)
{
broken.push(format!(
"failed to read needle {} on volume {}: {}",
needle_id.0, self.id.0, e
));
}
files_checked += 1;
Ok(())
});
if let Err(e) = walk {
broken.push(format!("walk index file: {}", e));
}
// Validate total data size against .dat file size (matches Go's scrubVolumeData)
let expected_size = total_read + SUPER_BLOCK_SIZE as i64;
if (dat_size as i64) < expected_size {
// Reconcile the physical .dat size against the needles read.
let want_size = total_read + SUPER_BLOCK_SIZE as i64;
if (dat_size as i64) < want_size {
broken.push(format!(
"dat file size {} is smaller than expected {} (total_read {} + super_block {})",
dat_size, expected_size, total_read, SUPER_BLOCK_SIZE
"data file for volume {} is smaller ({}) than the {} needles it contains ({})",
self.id.0, dat_size, count, want_size
));
} else if dat_size as i64 != expected_size {
} else if dat_size as i64 != want_size {
broken.push(format!(
"warning: dat file size {} does not match expected {} (total_read {} + super_block {})",
dat_size, expected_size, total_read, SUPER_BLOCK_SIZE
"data file size for volume {} ({}) doesn't match the size for {} needles read ({})",
self.id.0, dat_size, count, want_size
));
}
Ok((files_checked, broken))
Ok((count, broken))
}
/// Scan raw needle entries from the .dat file starting at `from_offset`.
@@ -3899,6 +3907,36 @@ mod tests {
);
}
#[test]
fn test_scrub_overwritten_volume_scrubs_clean() {
// An overwrite/delete appends a second .dat record + .idx row. Walking
// the on-disk .idx (not the deduped map) makes total_read match the
// physical .dat, so a healthy overwritten volume is NOT flagged broken.
let tmp = TempDir::new().unwrap();
let dir = tmp.path().to_str().unwrap();
let mut v = make_test_volume(dir);
for (id, body) in [(1u64, "first version"), (1, "second version"), (2, "other")] {
let mut n = Needle {
id: NeedleId(id),
cookie: Cookie(id as u32),
data: body.as_bytes().to_vec(),
data_size: body.len() as u32,
..Needle::default()
};
v.write_needle(&mut n, true).unwrap();
}
v.sync_to_disk().unwrap();
let (count, broken) = v.scrub().unwrap();
assert!(
broken.is_empty(),
"overwritten volume should scrub clean, got {:?}",
broken
);
assert_eq!(count, 3, "scrub must count every .idx row");
}
#[test]
fn test_scrub_index_flags_zero_size_idx_with_data() {
// A populated .dat with an empty .idx is corruption — the pre-allocated