From 1815ddba9fb33eed031aa1c80a5617ddf03d8290 Mon Sep 17 00:00:00 2001 From: Lewis Date: Sat, 16 May 2026 23:24:55 +0300 Subject: [PATCH] feat(gauntlet): index-backed/hint-backed/readable invariants, ExternalCorruption scenario Lewis: May this revision serve well! --- .../fuzz/fuzz_targets/gauntlet_micro.rs | 1 + .../src/blockstore/hash_index.rs | 11 ++ .../src/gauntlet/chaos_walker.rs | 113 +++++++++++ .../tranquil-store/src/gauntlet/invariants.rs | 178 +++++++++++++++++- crates/tranquil-store/src/gauntlet/mod.rs | 8 +- crates/tranquil-store/src/gauntlet/op.rs | 6 + crates/tranquil-store/src/gauntlet/oracle.rs | 24 ++- crates/tranquil-store/src/gauntlet/runner.rs | 121 +++++++++--- .../tranquil-store/src/gauntlet/scenarios.rs | 60 ++++++ crates/tranquil-store/src/gauntlet/shrink.rs | 1 + .../tranquil-store/src/gauntlet/workload.rs | 13 +- .../tests/gauntlet_index_backed.rs | 130 +++++++++++++ crates/tranquil-store/tests/gauntlet_smoke.rs | 15 +- 13 files changed, 637 insertions(+), 44 deletions(-) create mode 100644 crates/tranquil-store/src/gauntlet/chaos_walker.rs create mode 100644 crates/tranquil-store/tests/gauntlet_index_backed.rs diff --git a/crates/tranquil-store/fuzz/fuzz_targets/gauntlet_micro.rs b/crates/tranquil-store/fuzz/fuzz_targets/gauntlet_micro.rs index 5680477..fd6a4c4 100644 --- a/crates/tranquil-store/fuzz/fuzz_targets/gauntlet_micro.rs +++ b/crates/tranquil-store/fuzz/fuzz_targets/gauntlet_micro.rs @@ -76,6 +76,7 @@ fn tiny_config() -> GauntletConfig { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } diff --git a/crates/tranquil-store/src/blockstore/hash_index.rs b/crates/tranquil-store/src/blockstore/hash_index.rs index fe87211..9db10ee 100644 --- a/crates/tranquil-store/src/blockstore/hash_index.rs +++ b/crates/tranquil-store/src/blockstore/hash_index.rs @@ -606,6 +606,13 @@ impl HashTable { }); } + pub fn cids_in_file(&self, file_id: DataFileId) -> Vec { + self.iter() + .filter(|s| s.file_id == file_id) + .map(|s| s.cid) + .collect() + } + pub fn purge_by_file_id(&mut self, file_id: DataFileId) -> u64 { let victims: Vec<(CidBytes, RefCount)> = self .iter() @@ -1524,6 +1531,10 @@ impl BlockIndex { }) } + pub fn cids_in_file(&self, file_id: DataFileId) -> Vec { + self.table.read().cids_in_file(file_id) + } + pub fn purge_by_file_id(&self, file_id: DataFileId) -> u64 { self.table.write().purge_by_file_id(file_id) } diff --git a/crates/tranquil-store/src/gauntlet/chaos_walker.rs b/crates/tranquil-store/src/gauntlet/chaos_walker.rs new file mode 100644 index 0000000..6a9f890 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/chaos_walker.rs @@ -0,0 +1,113 @@ +use std::collections::HashSet; + +use cid::Cid; +use jacquard_repo::mst::NodeData; + +use super::oracle::{hex_short, try_cid_to_fixed}; +use crate::StorageIO; +use crate::blockstore::{CidBytes, TranquilBlockStore}; + +pub enum LookupResult { + Found(Cid), + NotFound, + LostPath, +} + +pub fn walk_mst_node_cids_tolerant( + store: &TranquilBlockStore, + root: Cid, + lost: &HashSet, +) -> Result, String> { + let mut visited: HashSet = HashSet::new(); + let mut to_visit: Vec = vec![root]; + let mut result: Vec = Vec::new(); + + while let Some(cid) = to_visit.pop() { + let cid_bytes = try_cid_to_fixed(&cid).map_err(|e| format!("cid format: {e}"))?; + if !visited.insert(cid_bytes) { + continue; + } + if lost.contains(&cid_bytes) { + continue; + } + let node = read_node(store, &cid_bytes)?; + result.push(cid_bytes); + if let Some(left) = node.left { + to_visit.push(left); + } + node.entries + .into_iter() + .filter_map(|e| e.tree) + .for_each(|t| to_visit.push(t)); + } + + Ok(result) +} + +pub fn mst_get_tolerant( + store: &TranquilBlockStore, + root: Cid, + target: &str, + lost: &HashSet, +) -> Result { + let mut cursor = root; + loop { + let cursor_bytes = try_cid_to_fixed(&cursor).map_err(|e| format!("cid format: {e}"))?; + if lost.contains(&cursor_bytes) { + return Ok(LookupResult::LostPath); + } + let node = read_node(store, &cursor_bytes)?; + let keys = full_keys(&node)?; + let index = keys + .iter() + .position(|k| k.as_str() >= target) + .unwrap_or(keys.len()); + if index < keys.len() && keys[index] == target { + return Ok(LookupResult::Found(node.entries[index].value)); + } + let subtree = match index { + 0 => node.left, + n => node.entries[n - 1].tree, + }; + match subtree { + Some(child) => cursor = child, + None => return Ok(LookupResult::NotFound), + } + } +} + +fn read_node( + store: &TranquilBlockStore, + cid_bytes: &CidBytes, +) -> Result { + let bytes = match store.get_block_sync(cid_bytes) { + Ok(Some(b)) => b, + Ok(None) => return Err(format!("missing block: {}", hex_short(cid_bytes))), + Err(e) => return Err(format!("read {}: {e}", hex_short(cid_bytes))), + }; + serde_ipld_dagcbor::from_slice(&bytes) + .map_err(|e| format!("deserialize node {}: {e}", hex_short(cid_bytes))) +} + +fn full_keys(node: &NodeData) -> Result, String> { + node.entries + .iter() + .scan(String::new(), |last_key, entry| { + let suffix = match std::str::from_utf8(&entry.key_suffix) { + Ok(s) => s, + Err(e) => return Some(Err(format!("invalid utf-8 in key suffix: {e}"))), + }; + let prefix_len = entry.prefix_len as usize; + if prefix_len > last_key.len() { + return Some(Err(format!( + "prefix length {} exceeds last key length {}", + prefix_len, + last_key.len() + ))); + } + let full = format!("{}{}", &last_key[..prefix_len], suffix); + *last_key = full.clone(); + Some(Ok(full)) + }) + .collect() +} diff --git a/crates/tranquil-store/src/gauntlet/invariants.rs b/crates/tranquil-store/src/gauntlet/invariants.rs index e1e2bb1..a2bcc51 100644 --- a/crates/tranquil-store/src/gauntlet/invariants.rs +++ b/crates/tranquil-store/src/gauntlet/invariants.rs @@ -31,6 +31,9 @@ impl InvariantSet { pub const MONOTONIC_SEQ: Self = Self(1 << 10); pub const FSYNC_ORDERING: Self = Self(1 << 11); pub const TOMBSTONE_BOUND: Self = Self(1 << 12); + pub const INDEX_BACKED_BY_DISK: Self = Self(1 << 13); + pub const HINT_BACKED_BY_DATA: Self = Self(1 << 14); + pub const INDEX_BLOCKS_READABLE: Self = Self(1 << 15); const ALL_KNOWN: u32 = Self::REFCOUNT_CONSERVATION.0 | Self::REACHABILITY.0 @@ -44,7 +47,10 @@ impl InvariantSet { | Self::CHECKSUM_COVERAGE.0 | Self::MONOTONIC_SEQ.0 | Self::FSYNC_ORDERING.0 - | Self::TOMBSTONE_BOUND.0; + | Self::TOMBSTONE_BOUND.0 + | Self::INDEX_BACKED_BY_DISK.0 + | Self::HINT_BACKED_BY_DATA.0 + | Self::INDEX_BLOCKS_READABLE.0; pub const fn contains(self, other: Self) -> bool { (self.0 & other.0) == other.0 @@ -377,6 +383,164 @@ fn compact_by_liveness( }) } +pub struct HintBackedByData; + +#[async_trait] +impl Invariant for HintBackedByData { + fn name(&self) -> &'static str { + "HintBackedByData" + } + + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { + let store_c = ctx.store.clone(); + let result = tokio::task::spawn_blocking(move || { + let data: std::collections::HashSet<_> = store_c + .list_data_files() + .map_err(|e| e.to_string())? + .into_iter() + .collect(); + let hints = store_c.list_hint_files().map_err(|e| e.to_string())?; + let orphans: Vec = hints + .iter() + .filter(|fid| !data.contains(fid)) + .map(|fid| fid.to_string()) + .collect(); + Ok::<_, String>(orphans) + }) + .await + .map_err(|e| InvariantViolation { + invariant: "HintBackedByData", + detail: format!("join: {e}"), + })?; + + let orphans = result.map_err(|e| InvariantViolation { + invariant: "HintBackedByData", + detail: e, + })?; + + if orphans.is_empty() { + Ok(()) + } else { + Err(InvariantViolation { + invariant: "HintBackedByData", + detail: format!( + "hint files without matching data file (orphan hints): {}", + orphans.join(", ") + ), + }) + } + } +} + +pub struct IndexBlocksReadable; + +#[async_trait] +impl Invariant for IndexBlocksReadable { + fn name(&self) -> &'static str { + "IndexBlocksReadable" + } + + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { + let store_c = ctx.store.clone(); + let result = tokio::task::spawn_blocking(move || { + let entries = store_c.block_index().live_entries_snapshot(); + let unreadable: Vec = entries + .iter() + .take(INDEX_READABLE_SAMPLE_CAP) + .filter_map(|(cid, _)| match store_c.get_block_sync(cid) { + Ok(Some(_)) => None, + Ok(None) => Some(format!( + "{}: index says present but reader missed", + hex_short(cid) + )), + Err(e) => Some(format!("{}: read error {e}", hex_short(cid))), + }) + .take(INDEX_READABLE_REPORT_CAP) + .collect(); + Ok::<_, String>(unreadable) + }) + .await + .map_err(|e| InvariantViolation { + invariant: "IndexBlocksReadable", + detail: format!("join: {e}"), + })?; + + let unreadable = result.map_err(|e| InvariantViolation { + invariant: "IndexBlocksReadable", + detail: e, + })?; + + if unreadable.is_empty() { + Ok(()) + } else { + Err(InvariantViolation { + invariant: "IndexBlocksReadable", + detail: format!( + "live index entries cannot be read back (first {INDEX_READABLE_REPORT_CAP}): {}", + unreadable.join("; ") + ), + }) + } + } +} + +const INDEX_READABLE_SAMPLE_CAP: usize = 512; +const INDEX_READABLE_REPORT_CAP: usize = 20; + +pub struct IndexBackedByDisk; + +#[async_trait] +impl Invariant for IndexBackedByDisk { + fn name(&self) -> &'static str { + "IndexBackedByDisk" + } + + async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> { + let store_c = ctx.store.clone(); + let result = tokio::task::spawn_blocking(move || { + let disk: std::collections::HashSet<_> = store_c + .list_data_files() + .map_err(|e| e.to_string())? + .into_iter() + .collect(); + let liveness = store_c.compaction_liveness(0).map_err(|e| e.to_string())?; + let missing: Vec = liveness + .iter() + .filter(|(fid, _)| !disk.contains(fid)) + .map(|(fid, info)| { + format!( + "{fid} (live_blocks={}, total_blocks={})", + info.live_blocks, info.total_blocks + ) + }) + .collect(); + Ok::<_, String>(missing) + }) + .await + .map_err(|e| InvariantViolation { + invariant: "IndexBackedByDisk", + detail: format!("join: {e}"), + })?; + + let missing = result.map_err(|e| InvariantViolation { + invariant: "IndexBackedByDisk", + detail: e, + })?; + + if missing.is_empty() { + Ok(()) + } else { + Err(InvariantViolation { + invariant: "IndexBackedByDisk", + detail: format!( + "index references data files missing on disk (iris-shaped corruption): {}", + missing.join(", ") + ), + }) + } + } +} + pub struct NoOrphanFiles; #[async_trait] @@ -766,6 +930,18 @@ pub fn invariants_for( Box::new(CompactionIdempotent), ), (InvariantSet::NO_ORPHAN_FILES, Box::new(NoOrphanFiles)), + ( + InvariantSet::INDEX_BACKED_BY_DISK, + Box::new(IndexBackedByDisk), + ), + ( + InvariantSet::HINT_BACKED_BY_DATA, + Box::new(HintBackedByData), + ), + ( + InvariantSet::INDEX_BLOCKS_READABLE, + Box::new(IndexBlocksReadable), + ), (InvariantSet::BYTE_BUDGET, Box::new(ByteBudget::default())), ( InvariantSet::MANIFEST_EQUALS_REALITY, diff --git a/crates/tranquil-store/src/gauntlet/mod.rs b/crates/tranquil-store/src/gauntlet/mod.rs index caf4cac..c45c53b 100644 --- a/crates/tranquil-store/src/gauntlet/mod.rs +++ b/crates/tranquil-store/src/gauntlet/mod.rs @@ -1,3 +1,4 @@ +pub mod chaos_walker; pub mod farm; pub mod flaky; pub mod invariants; @@ -17,13 +18,14 @@ pub use flaky::{ BackingMegabytes, DownIntervalSecs, FlakyConfig, FlakyError, FlakyMount, UpIntervalSecs, }; pub use invariants::{ - EventLogSnapshot, Invariant, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for, + EventLogSnapshot, HintBackedByData, IndexBackedByDisk, IndexBlocksReadable, Invariant, + InvariantCtx, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for, }; pub use leak::{LeakGateBuildError, LeakGateConfig, LeakViolation, evaluate as evaluate_leak_gate}; pub use metrics::{MetricName, MetricsSample, sample_harness}; pub use op::{ - CollectionName, DidSeed, EventKind, Op, OpStream, PayloadSeed, RecordKey, RetentionSecs, Seed, - ValueSeed, + CollectionName, DidSeed, EventKind, FileChoice, Op, OpStream, PayloadSeed, RecordKey, + RetentionSecs, Seed, ValueSeed, }; pub use oracle::{EventExpectation, Oracle}; pub use overrides::{ConfigOverrides, GroupCommitOverrides, StoreOverrides}; diff --git a/crates/tranquil-store/src/gauntlet/op.rs b/crates/tranquil-store/src/gauntlet/op.rs index 0c047f4..2a527c1 100644 --- a/crates/tranquil-store/src/gauntlet/op.rs +++ b/crates/tranquil-store/src/gauntlet/op.rs @@ -29,6 +29,9 @@ pub enum EventKind { Sync, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct FileChoice(pub u32); + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Op { AddRecord { @@ -58,6 +61,9 @@ pub enum Op { ReadBlock { value_seed: ValueSeed, }, + ExternalDeleteDataFile { + choice: FileChoice, + }, } impl Op { diff --git a/crates/tranquil-store/src/gauntlet/oracle.rs b/crates/tranquil-store/src/gauntlet/oracle.rs index 08beaa2..08c2b72 100644 --- a/crates/tranquil-store/src/gauntlet/oracle.rs +++ b/crates/tranquil-store/src/gauntlet/oracle.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use cid::Cid; @@ -29,6 +29,7 @@ pub struct Oracle { unsynced_events: Vec, last_synced_seq: Option, last_retention_cutoff_us: Option, + lost_blocks: HashSet, } impl Oracle { @@ -93,6 +94,27 @@ impl Oracle { self.unsynced_events.push(event); } + pub fn mark_blocks_lost(&mut self, cids: impl IntoIterator) -> usize { + let added: HashSet = cids.into_iter().collect(); + let added_count = added.len(); + self.live + .retain(|_, record_cid| !added.contains(record_cid)); + self.lost_blocks.extend(added); + added_count + } + + pub fn lost_blocks(&self) -> &HashSet { + &self.lost_blocks + } + + pub fn is_block_lost(&self, cid: &CidBytes) -> bool { + self.lost_blocks.contains(cid) + } + + pub fn has_lost_blocks(&self) -> bool { + !self.lost_blocks.is_empty() + } + pub fn record_event_sync(&mut self, synced_through: EventSequence) { let (promoted, remaining): (Vec<_>, Vec<_>) = self .unsynced_events diff --git a/crates/tranquil-store/src/gauntlet/runner.rs b/crates/tranquil-store/src/gauntlet/runner.rs index b8ff830..0aa1c1a 100644 --- a/crates/tranquil-store/src/gauntlet/runner.rs +++ b/crates/tranquil-store/src/gauntlet/runner.rs @@ -93,6 +93,7 @@ pub struct GauntletConfig { pub store: StoreConfig, pub eventlog: Option, pub writer_concurrency: WriterConcurrency, + pub tolerate_op_errors: bool, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -287,6 +288,7 @@ async fn run_inner_real( None => tempfile::TempDir::new().expect("tempdir"), }; let root = dir.path().to_path_buf(); + let tolerate = config.tolerate_op_errors; let report = run_inner_real_on_root( config, root, @@ -294,7 +296,7 @@ async fn run_inner_real( ops_counter, op_errors_counter, restarts_counter, - false, + tolerate, Duration::ZERO, ) .await; @@ -432,7 +434,7 @@ async fn run_inner_simulated( ) -> GauntletReport { let dir = tempfile::TempDir::new().expect("tempdir"); let cfg = blockstore_config(dir.path(), &config.store); - let tolerate_errors = fault.injects_errors(); + let tolerate_errors = fault.injects_errors() || config.tolerate_op_errors; let eventlog_cfg = config.eventlog; let segments_dir = segments_subdir(dir.path()); let sim: Arc = Arc::new(SimulatedIO::new(config.seed.0, fault)); @@ -828,7 +830,6 @@ async fn run_quick_check( }; }; - let mst = Mst::load(store.clone(), r, None); let live: Vec<(super::op::CollectionName, super::op::RecordKey, CidBytes)> = oracle .live_records() .map(|(c, k, v)| (c.clone(), k.clone(), *v)) @@ -840,24 +841,39 @@ async fn run_quick_check( sample_distinct(rng, total, sample_size) }; - let mut violations: Vec = Vec::new(); - for idx in picks { - let (coll, rkey, expected) = &live[idx]; - let key = format!("{}/{}", coll.0, rkey.0); - match mst.get(&key).await { - Ok(Some(cid)) => match try_cid_to_fixed(&cid) { - Ok(actual) if actual == *expected => {} - Ok(actual) => violations.push(format!( - "{key}: MST cid {} != oracle cid {}", - hex_short(&actual), - hex_short(expected) - )), - Err(e) => violations.push(format!("{key}: cid format: {e}")), - }, - Ok(None) => violations.push(format!("{key}: missing after reopen")), - Err(e) => violations.push(format!("{key}: mst.get error: {e}")), - } - } + let store_c = store.clone(); + let lost_clone = oracle.lost_blocks().clone(); + let live_clone = live.clone(); + let picks_c = picks.clone(); + let violations: Vec = tokio::task::spawn_blocking(move || { + picks_c + .iter() + .filter_map(|&idx| { + let (coll, rkey, expected) = &live_clone[idx]; + let key = format!("{}/{}", coll.0, rkey.0); + match super::chaos_walker::mst_get_tolerant(&store_c, r, &key, &lost_clone) { + Ok(super::chaos_walker::LookupResult::Found(cid)) => { + match try_cid_to_fixed(&cid) { + Ok(actual) if actual == *expected => None, + Ok(actual) => Some(format!( + "{key}: MST cid {} != oracle cid {}", + hex_short(&actual), + hex_short(expected) + )), + Err(e) => Some(format!("{key}: cid format: {e}")), + } + } + Ok(super::chaos_walker::LookupResult::NotFound) => { + Some(format!("{key}: missing after reopen")) + } + Ok(super::chaos_walker::LookupResult::LostPath) => None, + Err(e) => Some(format!("{key}: mst.get error: {e}")), + } + }) + .collect() + }) + .await + .unwrap_or_else(|e| vec![format!("quick_check join: {e}")]); if violations.is_empty() { Vec::new() @@ -967,16 +983,13 @@ pub(super) async fn refresh_oracle_graph( Ok(()) } Some(r) => { - let settled = Mst::load(store.clone(), r, None); - let cids = settled - .collect_node_cids() - .await - .map_err(|e| format!("collect_node_cids: {e}"))?; - let fixed: Vec = cids - .iter() - .map(try_cid_to_fixed) - .collect::>() - .map_err(|e| format!("mst node cid: {e}"))?; + let store_c = store.clone(); + let lost_clone = oracle.lost_blocks().clone(); + let fixed = tokio::task::spawn_blocking(move || { + super::chaos_walker::walk_mst_node_cids_tolerant(&store_c, r, &lost_clone) + }) + .await + .map_err(|e| format!("refresh join: {e}"))??; oracle.set_root(r); oracle.set_mst_node_cids(fixed); Ok(()) @@ -1200,6 +1213,39 @@ pub(super) async fn apply_op( let _ = harness.store.get_block_sync(&record_cid); Ok(()) } + Op::ExternalDeleteDataFile { choice } => { + let s = harness.store.clone(); + let pick = choice.0; + let lost_cids = tokio::task::spawn_blocking(move || externally_delete_data_file(&s, pick)) + .await + .map_err(|e| OpError::Join(e.to_string()))??; + if !lost_cids.is_empty() { + oracle.mark_blocks_lost(lost_cids); + } + Ok(()) + } + } +} + +fn externally_delete_data_file( + store: &std::sync::Arc>, + pick: u32, +) -> Result, OpError> { + let active = store.block_index().read_write_cursor().map(|c| c.file_id); + let mut candidates = match store.list_data_files() { + Ok(files) => files, + Err(_) => return Ok(Vec::new()), + }; + candidates.retain(|fid| active.is_none_or(|a| *fid < a)); + if candidates.is_empty() { + return Ok(Vec::new()); + } + let idx = (pick as usize) % candidates.len(); + let victim = candidates[idx]; + let cids = store.block_index().cids_in_file(victim); + match std::fs::remove_file(store.data_file_path(victim)) { + Ok(()) => Ok(cids), + Err(_) => Ok(Vec::new()), } } @@ -1511,6 +1557,18 @@ async fn apply_op_concurrent( let _ = shared.store.get_block_sync(&record_cid); Ok(()) } + Op::ExternalDeleteDataFile { choice } => { + let mut guard = shared.write.lock().await; + let s = shared.store.clone(); + let pick = choice.0; + let lost_cids = tokio::task::spawn_blocking(move || externally_delete_data_file(&s, pick)) + .await + .map_err(|e| OpError::Join(e.to_string()))??; + if !lost_cids.is_empty() { + guard.oracle.mark_blocks_lost(lost_cids); + } + Ok(()) + } } } @@ -1848,6 +1906,7 @@ mod tests { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } diff --git a/crates/tranquil-store/src/gauntlet/scenarios.rs b/crates/tranquil-store/src/gauntlet/scenarios.rs index 30a9cad..de094af 100644 --- a/crates/tranquil-store/src/gauntlet/scenarios.rs +++ b/crates/tranquil-store/src/gauntlet/scenarios.rs @@ -31,6 +31,7 @@ pub enum Scenario { ContendedReaders, ContendedWriters, FlakyDevice, + ExternalCorruption, } impl Scenario { @@ -53,6 +54,7 @@ impl Scenario { Self::ContendedReaders => "ContendedReaders", Self::ContendedWriters => "ContendedWriters", Self::FlakyDevice => "FlakyDevice", + Self::ExternalCorruption => "ExternalCorruption", } } @@ -75,6 +77,7 @@ impl Scenario { Self::ContendedReaders => "contended-readers", Self::ContendedWriters => "contended-writers", Self::FlakyDevice => "flaky-device", + Self::ExternalCorruption => "external-corruption", } } @@ -109,6 +112,9 @@ impl Scenario { Self::FlakyDevice => { "Real IO on ext4 atop dm-flakey. Requires root with dm-flakey available, skips otherwise." } + Self::ExternalCorruption => { + "Rare external data-file deletion mid-workload. Validates phantom-purge self-heal under chaos." + } } } @@ -138,6 +144,7 @@ impl Scenario { Self::ContendedReaders, Self::ContendedWriters, Self::FlakyDevice, + Self::ExternalCorruption, ]; } @@ -211,6 +218,7 @@ pub fn config_for(scenario: Scenario, seed: Seed) -> GauntletConfig { Scenario::ContendedReaders => contended_readers(seed), Scenario::ContendedWriters => contended_writers(seed), Scenario::FlakyDevice => flaky_device(seed), + Scenario::ExternalCorruption => external_corruption(seed), } } @@ -280,6 +288,7 @@ fn smoke_pr(seed: Seed) -> GauntletConfig { store: tiny_store(), eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -305,6 +314,7 @@ fn mst_churn(seed: Seed) -> GauntletConfig { store: tiny_store(), eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -330,6 +340,7 @@ fn mst_restart_churn(seed: Seed) -> GauntletConfig { store: tiny_store(), eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -359,6 +370,7 @@ fn full_stack_restart(seed: Seed) -> GauntletConfig { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -372,6 +384,9 @@ fn phase2_invariants() -> InvariantSet { | InvariantSet::BYTE_BUDGET | InvariantSet::MANIFEST_EQUALS_REALITY | InvariantSet::CHECKSUM_COVERAGE + | InvariantSet::INDEX_BACKED_BY_DISK + | InvariantSet::HINT_BACKED_BY_DATA + | InvariantSet::INDEX_BLOCKS_READABLE } fn catastrophic_churn(seed: Seed) -> GauntletConfig { @@ -392,6 +407,7 @@ fn catastrophic_churn(seed: Seed) -> GauntletConfig { store: tiny_store(), eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -424,6 +440,7 @@ fn huge_values(seed: Seed) -> GauntletConfig { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -454,6 +471,7 @@ fn tiny_batches(seed: Seed) -> GauntletConfig { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -484,6 +502,7 @@ fn giant_batches(seed: Seed) -> GauntletConfig { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -509,6 +528,7 @@ fn many_files(seed: Seed) -> GauntletConfig { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -521,6 +541,9 @@ fn sim_invariants() -> InvariantSet { | InvariantSet::NO_ORPHAN_FILES | InvariantSet::BYTE_BUDGET | InvariantSet::CHECKSUM_COVERAGE + | InvariantSet::INDEX_BACKED_BY_DISK + | InvariantSet::HINT_BACKED_BY_DATA + | InvariantSet::INDEX_BLOCKS_READABLE } fn sim_microbench_workload() -> WorkloadModel { @@ -558,6 +581,7 @@ fn moderate_faults(seed: Seed) -> GauntletConfig { store: sim_store(), eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -577,6 +601,7 @@ fn aggressive_faults(seed: Seed) -> GauntletConfig { store: sim_store(), eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -596,6 +621,7 @@ fn torn_pages(seed: Seed) -> GauntletConfig { store: sim_store(), eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -615,6 +641,7 @@ fn fsyncgate(seed: Seed) -> GauntletConfig { store: sim_store(), eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -654,6 +681,7 @@ fn firehose_fanout(seed: Seed) -> GauntletConfig { max_segment_size: MaxSegmentSize(64 * 1024), }), writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -688,6 +716,7 @@ fn contended_readers(seed: Seed) -> GauntletConfig { store: sim_store(), eventlog: None, writer_concurrency: WriterConcurrency(64), + tolerate_op_errors: false, } } @@ -719,6 +748,7 @@ fn flaky_device(seed: Seed) -> GauntletConfig { store: tiny_store(), eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -753,5 +783,35 @@ fn contended_writers(seed: Seed) -> GauntletConfig { store: sim_store(), eventlog: None, writer_concurrency: WriterConcurrency(32), + tolerate_op_errors: false, + } +} + +fn external_corruption(seed: Seed) -> GauntletConfig { + GauntletConfig { + seed, + io: IoBackend::Real, + workload: block_workload( + OpWeights { + add: 50, + delete: 30, + compact: 18, + checkpoint: 1, + external_delete_data_file: 1, + ..OpWeights::default() + }, + SizeDistribution::Fixed(ValueBytes(128)), + KeySpaceSize(200), + ), + op_count: OpCount(2_000), + invariants: InvariantSet::NO_ORPHAN_FILES | InvariantSet::BYTE_BUDGET, + limits: RunLimits { + max_wall_ms: Some(WallMs(60_000)), + }, + restart_policy: RestartPolicy::EveryNOps(OpInterval(1_000)), + store: tiny_store(), + eventlog: None, + writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: true, } } diff --git a/crates/tranquil-store/src/gauntlet/shrink.rs b/crates/tranquil-store/src/gauntlet/shrink.rs index 745a240..15fbeb8 100644 --- a/crates/tranquil-store/src/gauntlet/shrink.rs +++ b/crates/tranquil-store/src/gauntlet/shrink.rs @@ -142,6 +142,7 @@ mod tests { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } diff --git a/crates/tranquil-store/src/gauntlet/workload.rs b/crates/tranquil-store/src/gauntlet/workload.rs index a08fb85..8563a2e 100644 --- a/crates/tranquil-store/src/gauntlet/workload.rs +++ b/crates/tranquil-store/src/gauntlet/workload.rs @@ -1,6 +1,6 @@ use super::op::{ - CollectionName, DidSeed, EventKind, Op, OpStream, PayloadSeed, RecordKey, RetentionSecs, Seed, - ValueSeed, + CollectionName, DidSeed, EventKind, FileChoice, Op, OpStream, PayloadSeed, RecordKey, + RetentionSecs, Seed, ValueSeed, }; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -23,6 +23,7 @@ pub struct OpWeights { pub run_retention: u32, pub read_record: u32, pub read_block: u32, + pub external_delete_data_file: u32, } impl OpWeights { @@ -36,6 +37,7 @@ impl OpWeights { + self.run_retention + self.read_record + self.read_block + + self.external_delete_data_file } pub const fn touches_eventlog(&self) -> bool { @@ -103,6 +105,7 @@ impl Default for WorkloadModel { run_retention: 0, read_record: 0, read_block: 0, + external_delete_data_file: 0, }, size_distribution: SizeDistribution::Fixed(ValueBytes(64)), collections: vec![CollectionName("app.bsky.feed.post".to_string())], @@ -138,6 +141,7 @@ impl WorkloadModel { let t6 = t5 + w.sync_event_log; let t7 = t6 + w.run_retention; let t8 = t7 + w.read_record; + let t9 = t8 + w.read_block; match bucket { b if b < t1 => Op::AddRecord { @@ -166,9 +170,12 @@ impl WorkloadModel { collection: coll, rkey, }, - _ => Op::ReadBlock { + b if b < t9 => Op::ReadBlock { value_seed: ValueSeed(rng.next_u32()), }, + _ => Op::ExternalDeleteDataFile { + choice: FileChoice(rng.next_u32()), + }, } }) .collect(); diff --git a/crates/tranquil-store/tests/gauntlet_index_backed.rs b/crates/tranquil-store/tests/gauntlet_index_backed.rs new file mode 100644 index 0000000..cedf448 --- /dev/null +++ b/crates/tranquil-store/tests/gauntlet_index_backed.rs @@ -0,0 +1,130 @@ +mod common; + +use std::sync::Arc; + +use common::with_runtime; +use tranquil_store::blockstore::{BlockStoreConfig, GroupCommitConfig, TranquilBlockStore}; +use tranquil_store::gauntlet::{ + Gauntlet, IndexBackedByDisk, Invariant, InvariantCtx, InvariantSet, Oracle, Scenario, Seed, + config_for, +}; + +#[test] +fn index_backed_by_disk_invariant_catches_phantom_after_external_delete() { + with_runtime(|| { + let dir = tempfile::TempDir::new().unwrap(); + let cfg = BlockStoreConfig { + data_dir: dir.path().join("data"), + index_dir: dir.path().join("index"), + max_file_size: 4096, + group_commit: GroupCommitConfig::default(), + shard_count: 1, + }; + let store = Arc::new(TranquilBlockStore::open(cfg).expect("open store")); + + let cids: Vec<[u8; 36]> = (0..20u32) + .map(|seed| { + let le = seed.to_le_bytes(); + std::array::from_fn(|i| match i { + 0 => 0x01, + 1 => 0x71, + 2 => 0x12, + 3 => 0x20, + 4..8 => le[i - 4], + _ => (seed as u8).wrapping_add(i as u8), + }) + }) + .collect(); + + cids.iter().for_each(|cid| { + store + .put_blocks_blocking(vec![(*cid, vec![0xAA; 96])]) + .expect("put block"); + }); + + let victim_fid = store + .compaction_liveness(0) + .unwrap() + .iter() + .filter(|(_, info)| info.live_blocks > 0) + .map(|(&fid, _)| fid) + .next() + .expect("expected at least one indexed file"); + + let victim_path = dir.path().join("data").join(format!("{victim_fid}.tqb")); + std::fs::remove_file(&victim_path).unwrap(); + + let oracle = Oracle::new(); + let ctx = InvariantCtx { + store: &store, + oracle: &oracle, + root: None, + eventlog: None, + }; + let runtime = tokio::runtime::Handle::current(); + let result = runtime.block_on(IndexBackedByDisk.check(&ctx)); + + let violation = result.expect_err("phantom index entry must trigger violation"); + assert_eq!(violation.invariant, "IndexBackedByDisk"); + assert!( + violation.detail.contains(&victim_fid.to_string()), + "violation detail must reference the deleted file_id: {}", + violation.detail + ); + }); +} + +#[tokio::test] +async fn external_corruption_scenario_survives_many_seeds() { + let failures: Vec = futures::future::join_all((0..5).map(Seed).map(|seed| async move { + let cfg = config_for(Scenario::ExternalCorruption, seed); + let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; + (seed, report) + })) + .await + .into_iter() + .filter(|(_, r)| !r.is_clean()) + .map(|(seed, r)| { + format!( + "seed {}: {} violations\n {}", + seed.0, + r.violations.len(), + r.violations + .iter() + .map(|v| format!("{}: {}", v.invariant, v.detail)) + .collect::>() + .join("\n ") + ) + }) + .collect(); + assert!(failures.is_empty(), "{}", failures.join("\n---\n")); +} + +#[tokio::test] +#[ignore = "long running, validates iris-class regression over many seeds"] +async fn iris_class_regression_30_seeds() { + let failures: Vec = + futures::future::join_all((0..30).map(Seed).map(|seed| async move { + let mut cfg = config_for(Scenario::SmokePR, seed); + cfg.invariants = cfg.invariants | InvariantSet::INDEX_BACKED_BY_DISK; + let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; + (seed, report) + })) + .await + .into_iter() + .filter(|(_, r)| !r.is_clean()) + .map(|(seed, r)| { + format!( + "seed {}: {} violations\n {}", + seed.0, + r.violations.len(), + r.violations + .iter() + .map(|v| format!("{}: {}", v.invariant, v.detail)) + .collect::>() + .join("\n ") + ) + }) + .collect(); + assert!(failures.is_empty(), "{}", failures.join("\n---\n")); +} diff --git a/crates/tranquil-store/tests/gauntlet_smoke.rs b/crates/tranquil-store/tests/gauntlet_smoke.rs index 50327b7..db7ff68 100644 --- a/crates/tranquil-store/tests/gauntlet_smoke.rs +++ b/crates/tranquil-store/tests/gauntlet_smoke.rs @@ -82,6 +82,7 @@ fn fast_sanity_config(seed: Seed) -> GauntletConfig { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, } } @@ -142,6 +143,7 @@ async fn compaction_idempotent_sanity() { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, }; let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; assert_clean(&report); @@ -181,6 +183,7 @@ async fn no_orphan_files_sanity() { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, }; let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; assert_clean(&report); @@ -225,6 +228,7 @@ async fn simulated_pristine_roundtrip() { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, }; let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; assert_clean(&report); @@ -279,6 +283,7 @@ async fn firehose_fanout_pristine_smoke() { max_segment_size: MaxSegmentSize(32 * 1024), }), writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, }; let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; assert_clean(&report); @@ -325,6 +330,7 @@ async fn contended_readers_pristine_smoke() { }, eventlog: None, writer_concurrency: WriterConcurrency(16), + tolerate_op_errors: false, }; let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; assert_clean(&report); @@ -372,6 +378,7 @@ async fn contended_writers_pristine_smoke() { }, eventlog: None, writer_concurrency: WriterConcurrency(8), + tolerate_op_errors: false, }; let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; assert_clean(&report); @@ -505,6 +512,7 @@ async fn torn_pages_only_completes_within_budget() { }, eventlog: None, writer_concurrency: WriterConcurrency(1), + tolerate_op_errors: false, }; let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; let budget_violations: Vec<&str> = report @@ -553,11 +561,8 @@ fn farm_run_many_timed_with_scratch_roots_honors_assignment() { std::fs::create_dir_all(&root_a).expect("mkdir a"); std::fs::create_dir_all(&root_b).expect("mkdir b"); let roots = vec![root_a.clone(), root_b.clone()]; - let reports = farm::run_many_timed_with_scratch_roots( - fast_sanity_config, - &roots, - (0..2).map(Seed), - ); + let reports = + farm::run_many_timed_with_scratch_roots(fast_sanity_config, &roots, (0..2).map(Seed)); assert_eq!(reports.len(), 2); reports.iter().for_each(|(r, _)| assert_clean(r)); [&root_a, &root_b].iter().for_each(|root| {