feat(gauntlet): index-backed/hint-backed/readable invariants, ExternalCorruption scenario

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-05-16 23:24:55 +03:00
parent a7517ed5c9
commit 1815ddba9f
13 changed files with 637 additions and 44 deletions

View File

@@ -76,6 +76,7 @@ fn tiny_config() -> GauntletConfig {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}

View File

@@ -606,6 +606,13 @@ impl HashTable {
});
}
pub fn cids_in_file(&self, file_id: DataFileId) -> Vec<CidBytes> {
self.iter()
.filter(|s| s.file_id == file_id)
.map(|s| s.cid)
.collect()
}
pub fn purge_by_file_id(&mut self, file_id: DataFileId) -> u64 {
let victims: Vec<(CidBytes, RefCount)> = self
.iter()
@@ -1524,6 +1531,10 @@ impl BlockIndex {
})
}
pub fn cids_in_file(&self, file_id: DataFileId) -> Vec<CidBytes> {
self.table.read().cids_in_file(file_id)
}
pub fn purge_by_file_id(&self, file_id: DataFileId) -> u64 {
self.table.write().purge_by_file_id(file_id)
}

View File

@@ -0,0 +1,113 @@
use std::collections::HashSet;
use cid::Cid;
use jacquard_repo::mst::NodeData;
use super::oracle::{hex_short, try_cid_to_fixed};
use crate::StorageIO;
use crate::blockstore::{CidBytes, TranquilBlockStore};
pub enum LookupResult {
Found(Cid),
NotFound,
LostPath,
}
pub fn walk_mst_node_cids_tolerant<S: StorageIO + Send + Sync + 'static>(
store: &TranquilBlockStore<S>,
root: Cid,
lost: &HashSet<CidBytes>,
) -> Result<Vec<CidBytes>, String> {
let mut visited: HashSet<CidBytes> = HashSet::new();
let mut to_visit: Vec<Cid> = vec![root];
let mut result: Vec<CidBytes> = Vec::new();
while let Some(cid) = to_visit.pop() {
let cid_bytes = try_cid_to_fixed(&cid).map_err(|e| format!("cid format: {e}"))?;
if !visited.insert(cid_bytes) {
continue;
}
if lost.contains(&cid_bytes) {
continue;
}
let node = read_node(store, &cid_bytes)?;
result.push(cid_bytes);
if let Some(left) = node.left {
to_visit.push(left);
}
node.entries
.into_iter()
.filter_map(|e| e.tree)
.for_each(|t| to_visit.push(t));
}
Ok(result)
}
pub fn mst_get_tolerant<S: StorageIO + Send + Sync + 'static>(
store: &TranquilBlockStore<S>,
root: Cid,
target: &str,
lost: &HashSet<CidBytes>,
) -> Result<LookupResult, String> {
let mut cursor = root;
loop {
let cursor_bytes = try_cid_to_fixed(&cursor).map_err(|e| format!("cid format: {e}"))?;
if lost.contains(&cursor_bytes) {
return Ok(LookupResult::LostPath);
}
let node = read_node(store, &cursor_bytes)?;
let keys = full_keys(&node)?;
let index = keys
.iter()
.position(|k| k.as_str() >= target)
.unwrap_or(keys.len());
if index < keys.len() && keys[index] == target {
return Ok(LookupResult::Found(node.entries[index].value));
}
let subtree = match index {
0 => node.left,
n => node.entries[n - 1].tree,
};
match subtree {
Some(child) => cursor = child,
None => return Ok(LookupResult::NotFound),
}
}
}
fn read_node<S: StorageIO + Send + Sync + 'static>(
store: &TranquilBlockStore<S>,
cid_bytes: &CidBytes,
) -> Result<NodeData, String> {
let bytes = match store.get_block_sync(cid_bytes) {
Ok(Some(b)) => b,
Ok(None) => return Err(format!("missing block: {}", hex_short(cid_bytes))),
Err(e) => return Err(format!("read {}: {e}", hex_short(cid_bytes))),
};
serde_ipld_dagcbor::from_slice(&bytes)
.map_err(|e| format!("deserialize node {}: {e}", hex_short(cid_bytes)))
}
fn full_keys(node: &NodeData) -> Result<Vec<String>, String> {
node.entries
.iter()
.scan(String::new(), |last_key, entry| {
let suffix = match std::str::from_utf8(&entry.key_suffix) {
Ok(s) => s,
Err(e) => return Some(Err(format!("invalid utf-8 in key suffix: {e}"))),
};
let prefix_len = entry.prefix_len as usize;
if prefix_len > last_key.len() {
return Some(Err(format!(
"prefix length {} exceeds last key length {}",
prefix_len,
last_key.len()
)));
}
let full = format!("{}{}", &last_key[..prefix_len], suffix);
*last_key = full.clone();
Some(Ok(full))
})
.collect()
}

View File

@@ -31,6 +31,9 @@ impl InvariantSet {
pub const MONOTONIC_SEQ: Self = Self(1 << 10);
pub const FSYNC_ORDERING: Self = Self(1 << 11);
pub const TOMBSTONE_BOUND: Self = Self(1 << 12);
pub const INDEX_BACKED_BY_DISK: Self = Self(1 << 13);
pub const HINT_BACKED_BY_DATA: Self = Self(1 << 14);
pub const INDEX_BLOCKS_READABLE: Self = Self(1 << 15);
const ALL_KNOWN: u32 = Self::REFCOUNT_CONSERVATION.0
| Self::REACHABILITY.0
@@ -44,7 +47,10 @@ impl InvariantSet {
| Self::CHECKSUM_COVERAGE.0
| Self::MONOTONIC_SEQ.0
| Self::FSYNC_ORDERING.0
| Self::TOMBSTONE_BOUND.0;
| Self::TOMBSTONE_BOUND.0
| Self::INDEX_BACKED_BY_DISK.0
| Self::HINT_BACKED_BY_DATA.0
| Self::INDEX_BLOCKS_READABLE.0;
pub const fn contains(self, other: Self) -> bool {
(self.0 & other.0) == other.0
@@ -377,6 +383,164 @@ fn compact_by_liveness<S: StorageIO + Send + Sync + 'static>(
})
}
pub struct HintBackedByData;
#[async_trait]
impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for HintBackedByData {
fn name(&self) -> &'static str {
"HintBackedByData"
}
async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> {
let store_c = ctx.store.clone();
let result = tokio::task::spawn_blocking(move || {
let data: std::collections::HashSet<_> = store_c
.list_data_files()
.map_err(|e| e.to_string())?
.into_iter()
.collect();
let hints = store_c.list_hint_files().map_err(|e| e.to_string())?;
let orphans: Vec<String> = hints
.iter()
.filter(|fid| !data.contains(fid))
.map(|fid| fid.to_string())
.collect();
Ok::<_, String>(orphans)
})
.await
.map_err(|e| InvariantViolation {
invariant: "HintBackedByData",
detail: format!("join: {e}"),
})?;
let orphans = result.map_err(|e| InvariantViolation {
invariant: "HintBackedByData",
detail: e,
})?;
if orphans.is_empty() {
Ok(())
} else {
Err(InvariantViolation {
invariant: "HintBackedByData",
detail: format!(
"hint files without matching data file (orphan hints): {}",
orphans.join(", ")
),
})
}
}
}
pub struct IndexBlocksReadable;
#[async_trait]
impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for IndexBlocksReadable {
fn name(&self) -> &'static str {
"IndexBlocksReadable"
}
async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> {
let store_c = ctx.store.clone();
let result = tokio::task::spawn_blocking(move || {
let entries = store_c.block_index().live_entries_snapshot();
let unreadable: Vec<String> = entries
.iter()
.take(INDEX_READABLE_SAMPLE_CAP)
.filter_map(|(cid, _)| match store_c.get_block_sync(cid) {
Ok(Some(_)) => None,
Ok(None) => Some(format!(
"{}: index says present but reader missed",
hex_short(cid)
)),
Err(e) => Some(format!("{}: read error {e}", hex_short(cid))),
})
.take(INDEX_READABLE_REPORT_CAP)
.collect();
Ok::<_, String>(unreadable)
})
.await
.map_err(|e| InvariantViolation {
invariant: "IndexBlocksReadable",
detail: format!("join: {e}"),
})?;
let unreadable = result.map_err(|e| InvariantViolation {
invariant: "IndexBlocksReadable",
detail: e,
})?;
if unreadable.is_empty() {
Ok(())
} else {
Err(InvariantViolation {
invariant: "IndexBlocksReadable",
detail: format!(
"live index entries cannot be read back (first {INDEX_READABLE_REPORT_CAP}): {}",
unreadable.join("; ")
),
})
}
}
}
const INDEX_READABLE_SAMPLE_CAP: usize = 512;
const INDEX_READABLE_REPORT_CAP: usize = 20;
pub struct IndexBackedByDisk;
#[async_trait]
impl<S: StorageIO + Send + Sync + 'static> Invariant<S> for IndexBackedByDisk {
fn name(&self) -> &'static str {
"IndexBackedByDisk"
}
async fn check(&self, ctx: &InvariantCtx<'_, S>) -> Result<(), InvariantViolation> {
let store_c = ctx.store.clone();
let result = tokio::task::spawn_blocking(move || {
let disk: std::collections::HashSet<_> = store_c
.list_data_files()
.map_err(|e| e.to_string())?
.into_iter()
.collect();
let liveness = store_c.compaction_liveness(0).map_err(|e| e.to_string())?;
let missing: Vec<String> = liveness
.iter()
.filter(|(fid, _)| !disk.contains(fid))
.map(|(fid, info)| {
format!(
"{fid} (live_blocks={}, total_blocks={})",
info.live_blocks, info.total_blocks
)
})
.collect();
Ok::<_, String>(missing)
})
.await
.map_err(|e| InvariantViolation {
invariant: "IndexBackedByDisk",
detail: format!("join: {e}"),
})?;
let missing = result.map_err(|e| InvariantViolation {
invariant: "IndexBackedByDisk",
detail: e,
})?;
if missing.is_empty() {
Ok(())
} else {
Err(InvariantViolation {
invariant: "IndexBackedByDisk",
detail: format!(
"index references data files missing on disk (iris-shaped corruption): {}",
missing.join(", ")
),
})
}
}
}
pub struct NoOrphanFiles;
#[async_trait]
@@ -766,6 +930,18 @@ pub fn invariants_for<S: StorageIO + Send + Sync + 'static>(
Box::new(CompactionIdempotent),
),
(InvariantSet::NO_ORPHAN_FILES, Box::new(NoOrphanFiles)),
(
InvariantSet::INDEX_BACKED_BY_DISK,
Box::new(IndexBackedByDisk),
),
(
InvariantSet::HINT_BACKED_BY_DATA,
Box::new(HintBackedByData),
),
(
InvariantSet::INDEX_BLOCKS_READABLE,
Box::new(IndexBlocksReadable),
),
(InvariantSet::BYTE_BUDGET, Box::new(ByteBudget::default())),
(
InvariantSet::MANIFEST_EQUALS_REALITY,

View File

@@ -1,3 +1,4 @@
pub mod chaos_walker;
pub mod farm;
pub mod flaky;
pub mod invariants;
@@ -17,13 +18,14 @@ pub use flaky::{
BackingMegabytes, DownIntervalSecs, FlakyConfig, FlakyError, FlakyMount, UpIntervalSecs,
};
pub use invariants::{
EventLogSnapshot, Invariant, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for,
EventLogSnapshot, HintBackedByData, IndexBackedByDisk, IndexBlocksReadable, Invariant,
InvariantCtx, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for,
};
pub use leak::{LeakGateBuildError, LeakGateConfig, LeakViolation, evaluate as evaluate_leak_gate};
pub use metrics::{MetricName, MetricsSample, sample_harness};
pub use op::{
CollectionName, DidSeed, EventKind, Op, OpStream, PayloadSeed, RecordKey, RetentionSecs, Seed,
ValueSeed,
CollectionName, DidSeed, EventKind, FileChoice, Op, OpStream, PayloadSeed, RecordKey,
RetentionSecs, Seed, ValueSeed,
};
pub use oracle::{EventExpectation, Oracle};
pub use overrides::{ConfigOverrides, GroupCommitOverrides, StoreOverrides};

View File

@@ -29,6 +29,9 @@ pub enum EventKind {
Sync,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FileChoice(pub u32);
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Op {
AddRecord {
@@ -58,6 +61,9 @@ pub enum Op {
ReadBlock {
value_seed: ValueSeed,
},
ExternalDeleteDataFile {
choice: FileChoice,
},
}
impl Op {

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use cid::Cid;
@@ -29,6 +29,7 @@ pub struct Oracle {
unsynced_events: Vec<EventExpectation>,
last_synced_seq: Option<EventSequence>,
last_retention_cutoff_us: Option<u64>,
lost_blocks: HashSet<CidBytes>,
}
impl Oracle {
@@ -93,6 +94,27 @@ impl Oracle {
self.unsynced_events.push(event);
}
pub fn mark_blocks_lost(&mut self, cids: impl IntoIterator<Item = CidBytes>) -> usize {
let added: HashSet<CidBytes> = cids.into_iter().collect();
let added_count = added.len();
self.live
.retain(|_, record_cid| !added.contains(record_cid));
self.lost_blocks.extend(added);
added_count
}
pub fn lost_blocks(&self) -> &HashSet<CidBytes> {
&self.lost_blocks
}
pub fn is_block_lost(&self, cid: &CidBytes) -> bool {
self.lost_blocks.contains(cid)
}
pub fn has_lost_blocks(&self) -> bool {
!self.lost_blocks.is_empty()
}
pub fn record_event_sync(&mut self, synced_through: EventSequence) {
let (promoted, remaining): (Vec<_>, Vec<_>) = self
.unsynced_events

View File

@@ -93,6 +93,7 @@ pub struct GauntletConfig {
pub store: StoreConfig,
pub eventlog: Option<EventLogConfig>,
pub writer_concurrency: WriterConcurrency,
pub tolerate_op_errors: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@@ -287,6 +288,7 @@ async fn run_inner_real(
None => tempfile::TempDir::new().expect("tempdir"),
};
let root = dir.path().to_path_buf();
let tolerate = config.tolerate_op_errors;
let report = run_inner_real_on_root(
config,
root,
@@ -294,7 +296,7 @@ async fn run_inner_real(
ops_counter,
op_errors_counter,
restarts_counter,
false,
tolerate,
Duration::ZERO,
)
.await;
@@ -432,7 +434,7 @@ async fn run_inner_simulated(
) -> GauntletReport {
let dir = tempfile::TempDir::new().expect("tempdir");
let cfg = blockstore_config(dir.path(), &config.store);
let tolerate_errors = fault.injects_errors();
let tolerate_errors = fault.injects_errors() || config.tolerate_op_errors;
let eventlog_cfg = config.eventlog;
let segments_dir = segments_subdir(dir.path());
let sim: Arc<SimulatedIO> = Arc::new(SimulatedIO::new(config.seed.0, fault));
@@ -828,7 +830,6 @@ async fn run_quick_check<S: StorageIO + Send + Sync + 'static>(
};
};
let mst = Mst::load(store.clone(), r, None);
let live: Vec<(super::op::CollectionName, super::op::RecordKey, CidBytes)> = oracle
.live_records()
.map(|(c, k, v)| (c.clone(), k.clone(), *v))
@@ -840,24 +841,39 @@ async fn run_quick_check<S: StorageIO + Send + Sync + 'static>(
sample_distinct(rng, total, sample_size)
};
let mut violations: Vec<String> = Vec::new();
for idx in picks {
let (coll, rkey, expected) = &live[idx];
let key = format!("{}/{}", coll.0, rkey.0);
match mst.get(&key).await {
Ok(Some(cid)) => match try_cid_to_fixed(&cid) {
Ok(actual) if actual == *expected => {}
Ok(actual) => violations.push(format!(
"{key}: MST cid {} != oracle cid {}",
hex_short(&actual),
hex_short(expected)
)),
Err(e) => violations.push(format!("{key}: cid format: {e}")),
},
Ok(None) => violations.push(format!("{key}: missing after reopen")),
Err(e) => violations.push(format!("{key}: mst.get error: {e}")),
}
}
let store_c = store.clone();
let lost_clone = oracle.lost_blocks().clone();
let live_clone = live.clone();
let picks_c = picks.clone();
let violations: Vec<String> = tokio::task::spawn_blocking(move || {
picks_c
.iter()
.filter_map(|&idx| {
let (coll, rkey, expected) = &live_clone[idx];
let key = format!("{}/{}", coll.0, rkey.0);
match super::chaos_walker::mst_get_tolerant(&store_c, r, &key, &lost_clone) {
Ok(super::chaos_walker::LookupResult::Found(cid)) => {
match try_cid_to_fixed(&cid) {
Ok(actual) if actual == *expected => None,
Ok(actual) => Some(format!(
"{key}: MST cid {} != oracle cid {}",
hex_short(&actual),
hex_short(expected)
)),
Err(e) => Some(format!("{key}: cid format: {e}")),
}
}
Ok(super::chaos_walker::LookupResult::NotFound) => {
Some(format!("{key}: missing after reopen"))
}
Ok(super::chaos_walker::LookupResult::LostPath) => None,
Err(e) => Some(format!("{key}: mst.get error: {e}")),
}
})
.collect()
})
.await
.unwrap_or_else(|e| vec![format!("quick_check join: {e}")]);
if violations.is_empty() {
Vec::new()
@@ -967,16 +983,13 @@ pub(super) async fn refresh_oracle_graph<S: StorageIO + Send + Sync + 'static>(
Ok(())
}
Some(r) => {
let settled = Mst::load(store.clone(), r, None);
let cids = settled
.collect_node_cids()
.await
.map_err(|e| format!("collect_node_cids: {e}"))?;
let fixed: Vec<CidBytes> = cids
.iter()
.map(try_cid_to_fixed)
.collect::<Result<_, _>>()
.map_err(|e| format!("mst node cid: {e}"))?;
let store_c = store.clone();
let lost_clone = oracle.lost_blocks().clone();
let fixed = tokio::task::spawn_blocking(move || {
super::chaos_walker::walk_mst_node_cids_tolerant(&store_c, r, &lost_clone)
})
.await
.map_err(|e| format!("refresh join: {e}"))??;
oracle.set_root(r);
oracle.set_mst_node_cids(fixed);
Ok(())
@@ -1200,6 +1213,39 @@ pub(super) async fn apply_op<S: StorageIO + Send + Sync + 'static>(
let _ = harness.store.get_block_sync(&record_cid);
Ok(())
}
Op::ExternalDeleteDataFile { choice } => {
let s = harness.store.clone();
let pick = choice.0;
let lost_cids = tokio::task::spawn_blocking(move || externally_delete_data_file(&s, pick))
.await
.map_err(|e| OpError::Join(e.to_string()))??;
if !lost_cids.is_empty() {
oracle.mark_blocks_lost(lost_cids);
}
Ok(())
}
}
}
fn externally_delete_data_file(
store: &std::sync::Arc<TranquilBlockStore<impl StorageIO + 'static>>,
pick: u32,
) -> Result<Vec<CidBytes>, OpError> {
let active = store.block_index().read_write_cursor().map(|c| c.file_id);
let mut candidates = match store.list_data_files() {
Ok(files) => files,
Err(_) => return Ok(Vec::new()),
};
candidates.retain(|fid| active.is_none_or(|a| *fid < a));
if candidates.is_empty() {
return Ok(Vec::new());
}
let idx = (pick as usize) % candidates.len();
let victim = candidates[idx];
let cids = store.block_index().cids_in_file(victim);
match std::fs::remove_file(store.data_file_path(victim)) {
Ok(()) => Ok(cids),
Err(_) => Ok(Vec::new()),
}
}
@@ -1511,6 +1557,18 @@ async fn apply_op_concurrent<S: StorageIO + Send + Sync + 'static>(
let _ = shared.store.get_block_sync(&record_cid);
Ok(())
}
Op::ExternalDeleteDataFile { choice } => {
let mut guard = shared.write.lock().await;
let s = shared.store.clone();
let pick = choice.0;
let lost_cids = tokio::task::spawn_blocking(move || externally_delete_data_file(&s, pick))
.await
.map_err(|e| OpError::Join(e.to_string()))??;
if !lost_cids.is_empty() {
guard.oracle.mark_blocks_lost(lost_cids);
}
Ok(())
}
}
}
@@ -1848,6 +1906,7 @@ mod tests {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}

View File

@@ -31,6 +31,7 @@ pub enum Scenario {
ContendedReaders,
ContendedWriters,
FlakyDevice,
ExternalCorruption,
}
impl Scenario {
@@ -53,6 +54,7 @@ impl Scenario {
Self::ContendedReaders => "ContendedReaders",
Self::ContendedWriters => "ContendedWriters",
Self::FlakyDevice => "FlakyDevice",
Self::ExternalCorruption => "ExternalCorruption",
}
}
@@ -75,6 +77,7 @@ impl Scenario {
Self::ContendedReaders => "contended-readers",
Self::ContendedWriters => "contended-writers",
Self::FlakyDevice => "flaky-device",
Self::ExternalCorruption => "external-corruption",
}
}
@@ -109,6 +112,9 @@ impl Scenario {
Self::FlakyDevice => {
"Real IO on ext4 atop dm-flakey. Requires root with dm-flakey available, skips otherwise."
}
Self::ExternalCorruption => {
"Rare external data-file deletion mid-workload. Validates phantom-purge self-heal under chaos."
}
}
}
@@ -138,6 +144,7 @@ impl Scenario {
Self::ContendedReaders,
Self::ContendedWriters,
Self::FlakyDevice,
Self::ExternalCorruption,
];
}
@@ -211,6 +218,7 @@ pub fn config_for(scenario: Scenario, seed: Seed) -> GauntletConfig {
Scenario::ContendedReaders => contended_readers(seed),
Scenario::ContendedWriters => contended_writers(seed),
Scenario::FlakyDevice => flaky_device(seed),
Scenario::ExternalCorruption => external_corruption(seed),
}
}
@@ -280,6 +288,7 @@ fn smoke_pr(seed: Seed) -> GauntletConfig {
store: tiny_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -305,6 +314,7 @@ fn mst_churn(seed: Seed) -> GauntletConfig {
store: tiny_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -330,6 +340,7 @@ fn mst_restart_churn(seed: Seed) -> GauntletConfig {
store: tiny_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -359,6 +370,7 @@ fn full_stack_restart(seed: Seed) -> GauntletConfig {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -372,6 +384,9 @@ fn phase2_invariants() -> InvariantSet {
| InvariantSet::BYTE_BUDGET
| InvariantSet::MANIFEST_EQUALS_REALITY
| InvariantSet::CHECKSUM_COVERAGE
| InvariantSet::INDEX_BACKED_BY_DISK
| InvariantSet::HINT_BACKED_BY_DATA
| InvariantSet::INDEX_BLOCKS_READABLE
}
fn catastrophic_churn(seed: Seed) -> GauntletConfig {
@@ -392,6 +407,7 @@ fn catastrophic_churn(seed: Seed) -> GauntletConfig {
store: tiny_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -424,6 +440,7 @@ fn huge_values(seed: Seed) -> GauntletConfig {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -454,6 +471,7 @@ fn tiny_batches(seed: Seed) -> GauntletConfig {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -484,6 +502,7 @@ fn giant_batches(seed: Seed) -> GauntletConfig {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -509,6 +528,7 @@ fn many_files(seed: Seed) -> GauntletConfig {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -521,6 +541,9 @@ fn sim_invariants() -> InvariantSet {
| InvariantSet::NO_ORPHAN_FILES
| InvariantSet::BYTE_BUDGET
| InvariantSet::CHECKSUM_COVERAGE
| InvariantSet::INDEX_BACKED_BY_DISK
| InvariantSet::HINT_BACKED_BY_DATA
| InvariantSet::INDEX_BLOCKS_READABLE
}
fn sim_microbench_workload() -> WorkloadModel {
@@ -558,6 +581,7 @@ fn moderate_faults(seed: Seed) -> GauntletConfig {
store: sim_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -577,6 +601,7 @@ fn aggressive_faults(seed: Seed) -> GauntletConfig {
store: sim_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -596,6 +621,7 @@ fn torn_pages(seed: Seed) -> GauntletConfig {
store: sim_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -615,6 +641,7 @@ fn fsyncgate(seed: Seed) -> GauntletConfig {
store: sim_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -654,6 +681,7 @@ fn firehose_fanout(seed: Seed) -> GauntletConfig {
max_segment_size: MaxSegmentSize(64 * 1024),
}),
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -688,6 +716,7 @@ fn contended_readers(seed: Seed) -> GauntletConfig {
store: sim_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(64),
tolerate_op_errors: false,
}
}
@@ -719,6 +748,7 @@ fn flaky_device(seed: Seed) -> GauntletConfig {
store: tiny_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -753,5 +783,35 @@ fn contended_writers(seed: Seed) -> GauntletConfig {
store: sim_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(32),
tolerate_op_errors: false,
}
}
fn external_corruption(seed: Seed) -> GauntletConfig {
GauntletConfig {
seed,
io: IoBackend::Real,
workload: block_workload(
OpWeights {
add: 50,
delete: 30,
compact: 18,
checkpoint: 1,
external_delete_data_file: 1,
..OpWeights::default()
},
SizeDistribution::Fixed(ValueBytes(128)),
KeySpaceSize(200),
),
op_count: OpCount(2_000),
invariants: InvariantSet::NO_ORPHAN_FILES | InvariantSet::BYTE_BUDGET,
limits: RunLimits {
max_wall_ms: Some(WallMs(60_000)),
},
restart_policy: RestartPolicy::EveryNOps(OpInterval(1_000)),
store: tiny_store(),
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: true,
}
}

View File

@@ -142,6 +142,7 @@ mod tests {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}

View File

@@ -1,6 +1,6 @@
use super::op::{
CollectionName, DidSeed, EventKind, Op, OpStream, PayloadSeed, RecordKey, RetentionSecs, Seed,
ValueSeed,
CollectionName, DidSeed, EventKind, FileChoice, Op, OpStream, PayloadSeed, RecordKey,
RetentionSecs, Seed, ValueSeed,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@@ -23,6 +23,7 @@ pub struct OpWeights {
pub run_retention: u32,
pub read_record: u32,
pub read_block: u32,
pub external_delete_data_file: u32,
}
impl OpWeights {
@@ -36,6 +37,7 @@ impl OpWeights {
+ self.run_retention
+ self.read_record
+ self.read_block
+ self.external_delete_data_file
}
pub const fn touches_eventlog(&self) -> bool {
@@ -103,6 +105,7 @@ impl Default for WorkloadModel {
run_retention: 0,
read_record: 0,
read_block: 0,
external_delete_data_file: 0,
},
size_distribution: SizeDistribution::Fixed(ValueBytes(64)),
collections: vec![CollectionName("app.bsky.feed.post".to_string())],
@@ -138,6 +141,7 @@ impl WorkloadModel {
let t6 = t5 + w.sync_event_log;
let t7 = t6 + w.run_retention;
let t8 = t7 + w.read_record;
let t9 = t8 + w.read_block;
match bucket {
b if b < t1 => Op::AddRecord {
@@ -166,9 +170,12 @@ impl WorkloadModel {
collection: coll,
rkey,
},
_ => Op::ReadBlock {
b if b < t9 => Op::ReadBlock {
value_seed: ValueSeed(rng.next_u32()),
},
_ => Op::ExternalDeleteDataFile {
choice: FileChoice(rng.next_u32()),
},
}
})
.collect();

View File

@@ -0,0 +1,130 @@
mod common;
use std::sync::Arc;
use common::with_runtime;
use tranquil_store::blockstore::{BlockStoreConfig, GroupCommitConfig, TranquilBlockStore};
use tranquil_store::gauntlet::{
Gauntlet, IndexBackedByDisk, Invariant, InvariantCtx, InvariantSet, Oracle, Scenario, Seed,
config_for,
};
#[test]
fn index_backed_by_disk_invariant_catches_phantom_after_external_delete() {
with_runtime(|| {
let dir = tempfile::TempDir::new().unwrap();
let cfg = BlockStoreConfig {
data_dir: dir.path().join("data"),
index_dir: dir.path().join("index"),
max_file_size: 4096,
group_commit: GroupCommitConfig::default(),
shard_count: 1,
};
let store = Arc::new(TranquilBlockStore::open(cfg).expect("open store"));
let cids: Vec<[u8; 36]> = (0..20u32)
.map(|seed| {
let le = seed.to_le_bytes();
std::array::from_fn(|i| match i {
0 => 0x01,
1 => 0x71,
2 => 0x12,
3 => 0x20,
4..8 => le[i - 4],
_ => (seed as u8).wrapping_add(i as u8),
})
})
.collect();
cids.iter().for_each(|cid| {
store
.put_blocks_blocking(vec![(*cid, vec![0xAA; 96])])
.expect("put block");
});
let victim_fid = store
.compaction_liveness(0)
.unwrap()
.iter()
.filter(|(_, info)| info.live_blocks > 0)
.map(|(&fid, _)| fid)
.next()
.expect("expected at least one indexed file");
let victim_path = dir.path().join("data").join(format!("{victim_fid}.tqb"));
std::fs::remove_file(&victim_path).unwrap();
let oracle = Oracle::new();
let ctx = InvariantCtx {
store: &store,
oracle: &oracle,
root: None,
eventlog: None,
};
let runtime = tokio::runtime::Handle::current();
let result = runtime.block_on(IndexBackedByDisk.check(&ctx));
let violation = result.expect_err("phantom index entry must trigger violation");
assert_eq!(violation.invariant, "IndexBackedByDisk");
assert!(
violation.detail.contains(&victim_fid.to_string()),
"violation detail must reference the deleted file_id: {}",
violation.detail
);
});
}
#[tokio::test]
async fn external_corruption_scenario_survives_many_seeds() {
let failures: Vec<String> = futures::future::join_all((0..5).map(Seed).map(|seed| async move {
let cfg = config_for(Scenario::ExternalCorruption, seed);
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
(seed, report)
}))
.await
.into_iter()
.filter(|(_, r)| !r.is_clean())
.map(|(seed, r)| {
format!(
"seed {}: {} violations\n {}",
seed.0,
r.violations.len(),
r.violations
.iter()
.map(|v| format!("{}: {}", v.invariant, v.detail))
.collect::<Vec<_>>()
.join("\n ")
)
})
.collect();
assert!(failures.is_empty(), "{}", failures.join("\n---\n"));
}
#[tokio::test]
#[ignore = "long running, validates iris-class regression over many seeds"]
async fn iris_class_regression_30_seeds() {
let failures: Vec<String> =
futures::future::join_all((0..30).map(Seed).map(|seed| async move {
let mut cfg = config_for(Scenario::SmokePR, seed);
cfg.invariants = cfg.invariants | InvariantSet::INDEX_BACKED_BY_DISK;
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
(seed, report)
}))
.await
.into_iter()
.filter(|(_, r)| !r.is_clean())
.map(|(seed, r)| {
format!(
"seed {}: {} violations\n {}",
seed.0,
r.violations.len(),
r.violations
.iter()
.map(|v| format!("{}: {}", v.invariant, v.detail))
.collect::<Vec<_>>()
.join("\n ")
)
})
.collect();
assert!(failures.is_empty(), "{}", failures.join("\n---\n"));
}

View File

@@ -82,6 +82,7 @@ fn fast_sanity_config(seed: Seed) -> GauntletConfig {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
}
}
@@ -142,6 +143,7 @@ async fn compaction_idempotent_sanity() {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
};
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
assert_clean(&report);
@@ -181,6 +183,7 @@ async fn no_orphan_files_sanity() {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
};
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
assert_clean(&report);
@@ -225,6 +228,7 @@ async fn simulated_pristine_roundtrip() {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
};
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
assert_clean(&report);
@@ -279,6 +283,7 @@ async fn firehose_fanout_pristine_smoke() {
max_segment_size: MaxSegmentSize(32 * 1024),
}),
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
};
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
assert_clean(&report);
@@ -325,6 +330,7 @@ async fn contended_readers_pristine_smoke() {
},
eventlog: None,
writer_concurrency: WriterConcurrency(16),
tolerate_op_errors: false,
};
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
assert_clean(&report);
@@ -372,6 +378,7 @@ async fn contended_writers_pristine_smoke() {
},
eventlog: None,
writer_concurrency: WriterConcurrency(8),
tolerate_op_errors: false,
};
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
assert_clean(&report);
@@ -505,6 +512,7 @@ async fn torn_pages_only_completes_within_budget() {
},
eventlog: None,
writer_concurrency: WriterConcurrency(1),
tolerate_op_errors: false,
};
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
let budget_violations: Vec<&str> = report
@@ -553,11 +561,8 @@ fn farm_run_many_timed_with_scratch_roots_honors_assignment() {
std::fs::create_dir_all(&root_a).expect("mkdir a");
std::fs::create_dir_all(&root_b).expect("mkdir b");
let roots = vec![root_a.clone(), root_b.clone()];
let reports = farm::run_many_timed_with_scratch_roots(
fast_sanity_config,
&roots,
(0..2).map(Seed),
);
let reports =
farm::run_many_timed_with_scratch_roots(fast_sanity_config, &roots, (0..2).map(Seed));
assert_eq!(reports.len(), 2);
reports.iter().for_each(|(r, _)| assert_clean(r));
[&root_a, &root_b].iter().for_each(|root| {