From 09d437b3e32f42a71a0337d33134f074d89f1399 Mon Sep 17 00:00:00 2001 From: Lewis Date: Wed, 15 Apr 2026 21:20:55 +0300 Subject: [PATCH] feat(tranquil-store): gauntlet persistence & restart invariants Lewis: May this revision serve well! --- .config/nextest.toml | 4 + .../tranquil-store/src/gauntlet/invariants.rs | 168 +++++++++-- crates/tranquil-store/src/gauntlet/mod.rs | 6 +- crates/tranquil-store/src/gauntlet/oracle.rs | 32 ++- crates/tranquil-store/src/gauntlet/runner.rs | 269 ++++++++++++++---- .../tranquil-store/src/gauntlet/scenarios.rs | 60 +++- crates/tranquil-store/tests/gauntlet_smoke.rs | 33 ++- 7 files changed, 470 insertions(+), 102 deletions(-) diff --git a/.config/nextest.toml b/.config/nextest.toml index cfcfcee..aa8a6ed 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -72,6 +72,10 @@ test-group = "io-heavy-sim" filter = "test(/test_scale_/) | test(/full_backup_and_restore/)" slow-timeout = { period = "120s", terminate-after = 4 } +[[profile.default.overrides]] +filter = "binary(gauntlet_smoke)" +slow-timeout = { period = "300s", terminate-after = 8 } + [[profile.default.overrides]] filter = "binary(compaction_restart) | binary(mst_refcount_integrity) | binary(gc_compaction_restart)" slow-timeout = { period = "120s", terminate-after = 4 } diff --git a/crates/tranquil-store/src/gauntlet/invariants.rs b/crates/tranquil-store/src/gauntlet/invariants.rs index 9448419..a8fe7f8 100644 --- a/crates/tranquil-store/src/gauntlet/invariants.rs +++ b/crates/tranquil-store/src/gauntlet/invariants.rs @@ -1,6 +1,11 @@ use std::collections::{HashMap, HashSet}; +use std::sync::Arc; -use super::oracle::Oracle; +use async_trait::async_trait; +use cid::Cid; +use jacquard_repo::mst::Mst; + +use super::oracle::{Oracle, hex_short, try_cid_to_fixed}; use crate::blockstore::{CidBytes, TranquilBlockStore}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -10,8 +15,15 @@ impl InvariantSet { pub const EMPTY: Self = Self(0); pub const REFCOUNT_CONSERVATION: Self = Self(1 << 0); pub const REACHABILITY: Self = Self(1 << 1); + pub const ACKED_WRITE_PERSISTENCE: Self = Self(1 << 2); + pub const READ_AFTER_WRITE: Self = Self(1 << 3); + pub const RESTART_IDEMPOTENT: Self = Self(1 << 4); - const ALL_KNOWN: u32 = Self::REFCOUNT_CONSERVATION.0 | Self::REACHABILITY.0; + const ALL_KNOWN: u32 = Self::REFCOUNT_CONSERVATION.0 + | Self::REACHABILITY.0 + | Self::ACKED_WRITE_PERSISTENCE.0 + | Self::READ_AFTER_WRITE.0 + | Self::RESTART_IDEMPOTENT.0; pub const fn contains(self, other: Self) -> bool { (self.0 & other.0) == other.0 @@ -21,6 +33,10 @@ impl InvariantSet { Self(self.0 | other.0) } + pub const fn without(self, other: Self) -> Self { + Self(self.0 & !other.0) + } + pub const fn unknown_bits(self) -> u32 { self.0 & !Self::ALL_KNOWN } @@ -39,22 +55,31 @@ pub struct InvariantViolation { pub detail: String, } -pub trait Invariant { +pub struct InvariantCtx<'a> { + pub store: &'a Arc, + pub oracle: &'a Oracle, + pub root: Option, +} + +#[async_trait] +pub trait Invariant: Send + Sync { fn name(&self) -> &'static str; - fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation>; + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation>; } pub struct RefcountConservation; +#[async_trait] impl Invariant for RefcountConservation { fn name(&self) -> &'static str { "RefcountConservation" } - fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation> { - let live: Vec<(String, CidBytes)> = oracle.live_cids_labeled(); + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { + let live: Vec<(String, CidBytes)> = ctx.oracle.live_cids_labeled(); let live_set: HashSet = live.iter().map(|(_, c)| *c).collect(); - let index: HashMap = store + let index: HashMap = ctx + .store .block_index() .live_entries_snapshot() .into_iter() @@ -90,16 +115,18 @@ impl Invariant for RefcountConservation { pub struct Reachability; +#[async_trait] impl Invariant for Reachability { fn name(&self) -> &'static str { "Reachability" } - fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation> { - let violations: Vec = oracle + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { + let violations: Vec = ctx + .oracle .live_cids_labeled() .into_iter() - .filter_map(|(label, fixed)| match store.get_block_sync(&fixed) { + .filter_map(|(label, fixed)| match ctx.store.get_block_sync(&fixed) { Ok(Some(_)) => None, Ok(None) => Some(format!("{label}: missing")), Err(e) => Some(format!("{label}: read error {e}")), @@ -117,9 +144,106 @@ impl Invariant for Reachability { } } -fn hex_short(cid: &CidBytes) -> String { - let tail = &cid[cid.len() - 6..]; - tail.iter().map(|b| format!("{b:02x}")).collect() +pub struct AckedWritePersistence; + +#[async_trait] +impl Invariant for AckedWritePersistence { + fn name(&self) -> &'static str { + "AckedWritePersistence" + } + + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { + let Some(root) = ctx.root else { + if ctx.oracle.live_count() == 0 { + return Ok(()); + } + return Err(InvariantViolation { + invariant: "AckedWritePersistence", + detail: format!( + "oracle has {} live records but reopened store has no root", + ctx.oracle.live_count() + ), + }); + }; + let mst = Mst::load(ctx.store.clone(), root, None); + let keys: Vec = ctx + .oracle + .live_records() + .map(|(c, r, _)| format!("{}/{}", c.0, r.0)) + .collect(); + + let mut missing: Vec = Vec::new(); + for key in &keys { + match mst.get(key).await { + Ok(Some(_)) => {} + Ok(None) => missing.push(format!("{key}: missing after reopen")), + Err(e) => missing.push(format!("{key}: mst.get error after reopen: {e}")), + } + } + + if missing.is_empty() { + Ok(()) + } else { + Err(InvariantViolation { + invariant: "AckedWritePersistence", + detail: missing.join("; "), + }) + } + } +} + +pub struct ReadAfterWrite; + +#[async_trait] +impl Invariant for ReadAfterWrite { + fn name(&self) -> &'static str { + "ReadAfterWrite" + } + + async fn check(&self, ctx: &InvariantCtx<'_>) -> Result<(), InvariantViolation> { + let Some(root) = ctx.root else { + return Ok(()); + }; + let mst = Mst::load(ctx.store.clone(), root, None); + + let entries: Vec<(String, CidBytes)> = ctx + .oracle + .live_records() + .map(|(c, r, v)| (format!("{}/{}", c.0, r.0), *v)) + .collect(); + + let mut violations: Vec = Vec::new(); + for (key, expected) in &entries { + match mst.get(key).await { + Ok(Some(cid)) => match try_cid_to_fixed(&cid) { + Ok(actual) if actual == *expected => match ctx.store.get_block_sync(&actual) { + Ok(Some(_)) => {} + Ok(None) => violations.push(format!("{key}: block missing for cid")), + Err(e) => violations.push(format!("{key}: block read error {e}")), + }, + Ok(actual) => violations.push(format!( + "{key}: MST cid {} != oracle cid {}", + hex_short(&actual), + hex_short(expected), + )), + Err(e) => { + violations.push(format!("{key}: unexpected CID format from MST: {e}")) + } + }, + Ok(None) => violations.push(format!("{key}: MST returned None")), + Err(e) => violations.push(format!("{key}: mst.get error {e}")), + } + } + + if violations.is_empty() { + Ok(()) + } else { + Err(InvariantViolation { + invariant: "ReadAfterWrite", + detail: violations.join("; "), + }) + } + } } pub fn invariants_for(set: InvariantSet) -> Vec> { @@ -128,14 +252,20 @@ pub fn invariants_for(set: InvariantSet) -> Vec> { unknown == 0, "invariants_for: unknown InvariantSet bits 0x{unknown:x}; all bits must map to an impl" ); - [ + let candidates: Vec<(InvariantSet, Box)> = vec![ ( InvariantSet::REFCOUNT_CONSERVATION, - Box::new(RefcountConservation) as Box, + Box::new(RefcountConservation), ), (InvariantSet::REACHABILITY, Box::new(Reachability)), - ] - .into_iter() - .filter_map(|(flag, inv)| set.contains(flag).then_some(inv)) - .collect() + ( + InvariantSet::ACKED_WRITE_PERSISTENCE, + Box::new(AckedWritePersistence), + ), + (InvariantSet::READ_AFTER_WRITE, Box::new(ReadAfterWrite)), + ]; + candidates + .into_iter() + .filter_map(|(flag, inv)| set.contains(flag).then_some(inv)) + .collect() } diff --git a/crates/tranquil-store/src/gauntlet/mod.rs b/crates/tranquil-store/src/gauntlet/mod.rs index 0db375c..57c650a 100644 --- a/crates/tranquil-store/src/gauntlet/mod.rs +++ b/crates/tranquil-store/src/gauntlet/mod.rs @@ -10,9 +10,9 @@ pub use invariants::{Invariant, InvariantSet, InvariantViolation, invariants_for pub use op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed}; pub use oracle::Oracle; pub use runner::{ - CompactInterval, Gauntlet, GauntletBuildError, GauntletConfig, GauntletReport, IoBackend, - MaxFileSize, OpIndex, OpInterval, OpsExecuted, RestartCount, RestartPolicy, RunLimits, - ShardCount, StoreConfig, WallMs, + Gauntlet, GauntletBuildError, GauntletConfig, GauntletReport, IoBackend, MaxFileSize, OpIndex, + OpInterval, OpsExecuted, RestartCount, RestartPolicy, RunLimits, ShardCount, StoreConfig, + WallMs, }; pub use scenarios::{Scenario, config_for}; pub use workload::{ diff --git a/crates/tranquil-store/src/gauntlet/oracle.rs b/crates/tranquil-store/src/gauntlet/oracle.rs index e5aa985..0d80567 100644 --- a/crates/tranquil-store/src/gauntlet/oracle.rs +++ b/crates/tranquil-store/src/gauntlet/oracle.rs @@ -5,11 +5,17 @@ use cid::Cid; use super::op::{CollectionName, RecordKey}; use crate::blockstore::CidBytes; +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +#[error("unexpected CID encoding: got {actual} bytes, expected 36 for sha256 CIDv1")] +pub struct CidFormatError { + pub actual: usize, +} + #[derive(Debug, Default)] pub struct Oracle { live: HashMap<(CollectionName, RecordKey), CidBytes>, current_root: Option, - mst_node_cids: Vec, + mst_node_cids: Vec, } impl Oracle { @@ -38,12 +44,13 @@ impl Oracle { self.current_root } - pub fn set_node_cids(&mut self, cids: Vec) { + pub fn set_mst_node_cids(&mut self, cids: Vec) { self.mst_node_cids = cids; } - pub fn mst_node_cids(&self) -> &[Cid] { - &self.mst_node_cids + pub fn clear_mst_state(&mut self) { + self.current_root = None; + self.mst_node_cids.clear(); } pub fn live_records(&self) -> impl Iterator { @@ -58,7 +65,7 @@ impl Oracle { let nodes = self .mst_node_cids .iter() - .map(|cid| (format!("mst {cid}"), cid_to_fixed(cid))); + .map(|bytes| (format!("mst {}", hex_short(bytes)), *bytes)); let records = self .live_records() .map(|(c, r, v)| (format!("record {}/{}", c.0, r.0), *v)); @@ -66,10 +73,15 @@ impl Oracle { } } -pub(super) fn cid_to_fixed(cid: &Cid) -> CidBytes { +pub(super) fn try_cid_to_fixed(cid: &Cid) -> Result { let bytes = cid.to_bytes(); - debug_assert_eq!(bytes.len(), 36, "expected 36 byte CIDv1+sha256"); - let mut arr = [0u8; 36]; - arr.copy_from_slice(&bytes[..36]); - arr + let actual = bytes.len(); + bytes.try_into().map_err(|_| CidFormatError { actual }) +} + +pub(super) fn hex_short(cid: &CidBytes) -> String { + cid[cid.len() - 6..] + .iter() + .map(|b| format!("{b:02x}")) + .collect() } diff --git a/crates/tranquil-store/src/gauntlet/runner.rs b/crates/tranquil-store/src/gauntlet/runner.rs index eb6abc8..b768f09 100644 --- a/crates/tranquil-store/src/gauntlet/runner.rs +++ b/crates/tranquil-store/src/gauntlet/runner.rs @@ -6,9 +6,9 @@ use cid::Cid; use jacquard_repo::mst::Mst; use jacquard_repo::storage::BlockStore; -use super::invariants::{InvariantSet, InvariantViolation, invariants_for}; +use super::invariants::{InvariantCtx, InvariantSet, InvariantViolation, invariants_for}; use super::op::{Op, OpStream, Seed, ValueSeed}; -use super::oracle::{Oracle, cid_to_fixed}; +use super::oracle::{CidFormatError, Oracle, hex_short, try_cid_to_fixed}; use super::workload::{Lcg, OpCount, SizeDistribution, ValueBytes, WorkloadModel}; use crate::blockstore::{ BlockStoreConfig, CidBytes, CompactionError, GroupCommitConfig, TranquilBlockStore, @@ -44,15 +44,11 @@ pub struct MaxFileSize(pub u64); #[derive(Debug, Clone, Copy)] pub struct ShardCount(pub u8); -#[derive(Debug, Clone, Copy)] -pub struct CompactInterval(pub u32); - #[derive(Debug, Clone)] pub struct StoreConfig { pub max_file_size: MaxFileSize, pub group_commit: GroupCommitConfig, pub shard_count: ShardCount, - pub compact_every: CompactInterval, } #[derive(Debug, Clone)] @@ -108,6 +104,8 @@ enum OpError { CompactFile(String), #[error("join: {0}")] Join(String), + #[error("cid format: {0}")] + CidFormat(#[from] CidFormatError), } pub struct Gauntlet { @@ -148,7 +146,7 @@ impl Gauntlet { restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), violations: vec![InvariantViolation { invariant: "WallClockBudget", - detail: format!("exceeded max_wall_ms ({} ms)", d.as_millis()), + detail: format!("exceeded max_wall_ms of {} ms", d.as_millis()), }], }, }, @@ -175,6 +173,12 @@ async fn run_real_inner( let mut restart_rng = Lcg::new(Seed(config.seed.0 ^ 0xA5A5_A5A5_A5A5_A5A5)); let mut halt_ops = false; + let mid_run_set = config + .invariants + .without(InvariantSet::RESTART_IDEMPOTENT) + .without(InvariantSet::ACKED_WRITE_PERSISTENCE); + let post_reopen_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); + for (idx, op) in op_stream.iter().enumerate() { if halt_ops { break; @@ -186,7 +190,6 @@ async fn run_real_inner( invariant: "OpExecution", detail: format!("op {idx}: {e}"), }); - ops_counter.store(idx + 1, Ordering::Relaxed); halt_ops = true; continue; } @@ -209,19 +212,49 @@ async fn run_real_inner( halt_ops = true; continue; } - violations.extend(check_all(&store, &oracle, config.invariants)); - if !violations.is_empty() { + let before = violations.len(); + violations.extend(run_invariants(&store, &oracle, root, mid_run_set).await); + if violations.len() > before { halt_ops = true; } } } - match refresh_oracle_graph(&store, &mut oracle, root).await { - Ok(()) => violations.extend(check_all(&store, &oracle, config.invariants)), - Err(e) => violations.push(InvariantViolation { - invariant: "OpExecution", - detail: format!("refresh at end: {e}"), - }), + if !halt_ops { + match refresh_oracle_graph(&store, &mut oracle, root).await { + Ok(()) => { + let before = violations.len(); + violations.extend(run_invariants(&store, &oracle, root, mid_run_set).await); + if violations.len() > before { + halt_ops = true; + } + } + Err(e) => { + violations.push(InvariantViolation { + invariant: "OpExecution", + detail: format!("refresh at end: {e}"), + }); + halt_ops = true; + } + } + } + + if config.invariants.contains(InvariantSet::RESTART_IDEMPOTENT) && !halt_ops { + let pre_snapshot = snapshot_block_index(&store); + drop(store); + let reopened = Arc::new( + TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)) + .expect("reopen for RestartIdempotent"), + ); + let post_snapshot = snapshot_block_index(&reopened); + if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) { + violations.push(InvariantViolation { + invariant: "RestartIdempotent", + detail, + }); + } else { + violations.extend(run_invariants(&reopened, &oracle, root, post_reopen_set).await); + } } GauntletReport { @@ -232,15 +265,82 @@ async fn run_real_inner( } } -fn check_all( - store: &TranquilBlockStore, +async fn run_invariants( + store: &Arc, oracle: &Oracle, + root: Option, set: InvariantSet, ) -> Vec { - invariants_for(set) + let ctx = InvariantCtx { + store, + oracle, + root, + }; + let mut out = Vec::new(); + for inv in invariants_for(set) { + if let Err(v) = inv.check(&ctx).await { + out.push(v); + } + } + out +} + +fn snapshot_block_index(store: &TranquilBlockStore) -> Vec<(CidBytes, u32)> { + let mut v: Vec<(CidBytes, u32)> = store + .block_index() + .live_entries_snapshot() .into_iter() - .filter_map(|inv| inv.check(store, oracle).err()) - .collect() + .map(|(c, r)| (c, r.raw())) + .collect(); + v.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + v +} + +const SNAPSHOT_DIFF_ITEMS: usize = 16; + +fn diff_snapshots(pre: &[(CidBytes, u32)], post: &[(CidBytes, u32)]) -> Option { + if pre == post { + return None; + } + let pre_map: std::collections::HashMap = pre.iter().copied().collect(); + let post_map: std::collections::HashMap = post.iter().copied().collect(); + + let only_pre: Vec = pre_map + .iter() + .filter(|(c, _)| !post_map.contains_key(*c)) + .map(|(c, r)| format!("lost {} refcount {}", hex_short(c), r)) + .collect(); + let only_post: Vec = post_map + .iter() + .filter(|(c, _)| !pre_map.contains_key(*c)) + .map(|(c, r)| format!("gained {} refcount {}", hex_short(c), r)) + .collect(); + let changed: Vec = pre_map + .iter() + .filter_map(|(c, pre_r)| match post_map.get(c) { + Some(post_r) if post_r != pre_r => { + Some(format!("{} refcount {} -> {}", hex_short(c), pre_r, post_r)) + } + _ => None, + }) + .collect(); + + let total = only_pre.len() + only_post.len() + changed.len(); + let mut items: Vec = only_pre + .into_iter() + .chain(only_post) + .chain(changed) + .take(SNAPSHOT_DIFF_ITEMS) + .collect(); + if total > items.len() { + items.push(format!("+{} more", total - items.len())); + } + Some(format!( + "block index changed across clean reopen: pre={} entries, post={} entries; {}", + pre.len(), + post.len(), + items.join("; "), + )) } async fn refresh_oracle_graph( @@ -250,7 +350,7 @@ async fn refresh_oracle_graph( ) -> Result<(), String> { match root { None => { - oracle.set_node_cids(Vec::new()); + oracle.clear_mst_state(); Ok(()) } Some(r) => { @@ -259,8 +359,13 @@ async fn refresh_oracle_graph( .collect_node_cids() .await .map_err(|e| format!("collect_node_cids: {e}"))?; + let fixed: Vec = cids + .iter() + .map(try_cid_to_fixed) + .collect::>() + .map_err(|e| format!("mst node cid: {e}"))?; oracle.set_root(r); - oracle.set_node_cids(cids); + oracle.set_mst_node_cids(fixed); Ok(()) } } @@ -298,17 +403,14 @@ fn make_record_bytes(value_seed: ValueSeed, dist: SizeDistribution) -> Vec { let ValueBytes(lo) = range.min(); let ValueBytes(hi) = range.max(); let span = u64::from(hi.saturating_sub(lo)).max(1); - let rng_state = u64::from(raw); - (lo as usize) + (rng_state % span) as usize + (lo as usize) + (u64::from(raw) % span) as usize } }; - serde_ipld_dagcbor::to_vec(&serde_json::json!({ - "$type": "app.bsky.feed.post", - "text": format!("record-{raw}"), - "createdAt": "2026-01-01T00:00:00Z", - "pad": "x".repeat(target_len.saturating_sub(64)), - })) - .expect("encode record") + let target_len = target_len.max(8); + let seed_bytes = raw.to_le_bytes(); + (0..target_len) + .map(|i| seed_bytes[i % 4] ^ (i as u8).wrapping_mul(31)) + .collect() } async fn apply_op( @@ -329,27 +431,32 @@ async fn apply_op( .put(&record_bytes) .await .map_err(|e| OpError::PutRecord(e.to_string()))?; - let key = format!("{}/{}", collection.0, rkey.0); - let loaded = match *root { - None => Mst::new(store.clone()), - Some(r) => Mst::load(store.clone(), r, None), - }; - let updated = loaded - .add(&key, record_cid) - .await - .map_err(|e| OpError::MstAdd(e.to_string()))?; - let new_root = updated - .persist() - .await - .map_err(|e| OpError::MstPersist(e.to_string()))?; + let record_cid_bytes = try_cid_to_fixed(&record_cid)?; - if let Some(old_root) = *root { - apply_mst_diff(store, old_root, new_root).await?; + let outcome = + add_record_inner(store, *root, collection, rkey, record_cid, record_cid_bytes) + .await; + match outcome { + Ok((new_root, applied)) => { + *root = Some(new_root); + if applied { + oracle.add(collection.clone(), rkey.clone(), record_cid_bytes); + } + Ok(()) + } + Err(e) => { + if let Err(cleanup_err) = + decrement_obsolete(store, vec![record_cid_bytes]).await + { + tracing::warn!( + op_error = %e, + cleanup_error = %cleanup_err, + "AddRecord cleanup decrement failed; refcount may leak", + ); + } + Err(e) + } } - - *root = Some(new_root); - oracle.add(collection.clone(), rkey.clone(), cid_to_fixed(&record_cid)); - Ok(()) } Op::DeleteRecord { collection, rkey } => { let Some(old_root) = *root else { return Ok(()) }; @@ -389,6 +496,55 @@ async fn apply_op( } } +async fn add_record_inner( + store: &Arc, + root: Option, + collection: &super::op::CollectionName, + rkey: &super::op::RecordKey, + record_cid: Cid, + record_cid_bytes: CidBytes, +) -> Result<(Cid, bool), OpError> { + let key = format!("{}/{}", collection.0, rkey.0); + let loaded = match root { + None => Mst::new(store.clone()), + Some(r) => Mst::load(store.clone(), r, None), + }; + let updated = loaded + .add(&key, record_cid) + .await + .map_err(|e| OpError::MstAdd(e.to_string()))?; + let new_root = updated + .persist() + .await + .map_err(|e| OpError::MstPersist(e.to_string()))?; + + match root { + Some(old_root) if old_root == new_root => { + decrement_obsolete(store, vec![record_cid_bytes]).await?; + Ok((new_root, false)) + } + Some(old_root) => { + apply_mst_diff(store, old_root, new_root).await?; + Ok((new_root, true)) + } + None => Ok((new_root, true)), + } +} + +async fn decrement_obsolete( + store: &Arc, + obsolete: Vec, +) -> Result<(), OpError> { + let s = store.clone(); + tokio::task::spawn_blocking(move || { + s.apply_commit_blocking(vec![], obsolete) + .map_err(|e| e.to_string()) + }) + .await + .map_err(|e| OpError::Join(e.to_string()))? + .map_err(OpError::ApplyCommit) +} + async fn apply_mst_diff( store: &Arc, old_root: Cid, @@ -404,8 +560,8 @@ async fn apply_mst_diff( .removed_mst_blocks .into_iter() .chain(diff.removed_cids.into_iter()) - .map(|c| cid_to_fixed(&c)) - .collect(); + .map(|c| try_cid_to_fixed(&c)) + .collect::>()?; let s = store.clone(); tokio::task::spawn_blocking(move || { s.apply_commit_blocking(vec![], obsolete) @@ -419,10 +575,9 @@ async fn apply_mst_diff( const COMPACT_LIVENESS_CEILING: f64 = 0.99; fn compact_by_liveness(store: &TranquilBlockStore) -> Result<(), OpError> { - let liveness = match store.compaction_liveness(0) { - Ok(l) => l, - Err(_) => return Ok(()), - }; + let liveness = store + .compaction_liveness(0) + .map_err(|e| OpError::CompactFile(format!("compaction_liveness: {e}")))?; let targets: Vec<_> = liveness .iter() .filter(|(_, info)| info.total_blocks > 0 && info.ratio() < COMPACT_LIVENESS_CEILING) diff --git a/crates/tranquil-store/src/gauntlet/scenarios.rs b/crates/tranquil-store/src/gauntlet/scenarios.rs index 81a0186..ebb031c 100644 --- a/crates/tranquil-store/src/gauntlet/scenarios.rs +++ b/crates/tranquil-store/src/gauntlet/scenarios.rs @@ -1,8 +1,8 @@ use super::invariants::InvariantSet; use super::op::{CollectionName, Seed}; use super::runner::{ - CompactInterval, GauntletConfig, IoBackend, MaxFileSize, OpInterval, RestartPolicy, RunLimits, - ShardCount, StoreConfig, WallMs, + GauntletConfig, IoBackend, MaxFileSize, OpInterval, RestartPolicy, RunLimits, ShardCount, + StoreConfig, WallMs, }; use super::workload::{ KeySpaceSize, OpCount, OpWeights, SizeDistribution, ValueBytes, WorkloadModel, @@ -14,6 +14,7 @@ pub enum Scenario { SmokePR, MstChurn, MstRestartChurn, + FullStackRestart, } pub fn config_for(scenario: Scenario, seed: Seed) -> GauntletConfig { @@ -21,6 +22,7 @@ pub fn config_for(scenario: Scenario, seed: Seed) -> GauntletConfig { Scenario::SmokePR => smoke_pr(seed), Scenario::MstChurn => mst_churn(seed), Scenario::MstRestartChurn => mst_restart_churn(seed), + Scenario::FullStackRestart => full_stack_restart(seed), } } @@ -33,14 +35,13 @@ fn default_collections() -> Vec { fn tiny_store() -> StoreConfig { StoreConfig { - max_file_size: MaxFileSize(300), + max_file_size: MaxFileSize(4096), group_commit: GroupCommitConfig { checkpoint_interval_ms: 100, checkpoint_write_threshold: 10, ..GroupCommitConfig::default() }, shard_count: ShardCount(1), - compact_every: CompactInterval(5), } } @@ -60,7 +61,11 @@ fn smoke_pr(seed: Seed) -> GauntletConfig { key_space: KeySpaceSize(200), }, op_count: OpCount(10_000), - invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, + invariants: InvariantSet::REFCOUNT_CONSERVATION + | InvariantSet::REACHABILITY + | InvariantSet::ACKED_WRITE_PERSISTENCE + | InvariantSet::READ_AFTER_WRITE + | InvariantSet::RESTART_IDEMPOTENT, limits: RunLimits { max_wall_ms: Some(WallMs(60_000)), }, @@ -85,7 +90,11 @@ fn mst_churn(seed: Seed) -> GauntletConfig { key_space: KeySpaceSize(2_000), }, op_count: OpCount(100_000), - invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, + invariants: InvariantSet::REFCOUNT_CONSERVATION + | InvariantSet::REACHABILITY + | InvariantSet::ACKED_WRITE_PERSISTENCE + | InvariantSet::READ_AFTER_WRITE + | InvariantSet::RESTART_IDEMPOTENT, limits: RunLimits { max_wall_ms: Some(WallMs(600_000)), }, @@ -110,7 +119,11 @@ fn mst_restart_churn(seed: Seed) -> GauntletConfig { key_space: KeySpaceSize(2_000), }, op_count: OpCount(100_000), - invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, + invariants: InvariantSet::REFCOUNT_CONSERVATION + | InvariantSet::REACHABILITY + | InvariantSet::ACKED_WRITE_PERSISTENCE + | InvariantSet::READ_AFTER_WRITE + | InvariantSet::RESTART_IDEMPOTENT, limits: RunLimits { max_wall_ms: Some(WallMs(600_000)), }, @@ -118,3 +131,36 @@ fn mst_restart_churn(seed: Seed) -> GauntletConfig { store: tiny_store(), } } + +fn full_stack_restart(seed: Seed) -> GauntletConfig { + GauntletConfig { + seed, + io: IoBackend::Real, + workload: WorkloadModel { + weights: OpWeights { + add: 80, + delete: 0, + compact: 15, + checkpoint: 5, + }, + size_distribution: SizeDistribution::Fixed(ValueBytes(80)), + collections: default_collections(), + key_space: KeySpaceSize(500), + }, + op_count: OpCount(5_000), + invariants: InvariantSet::REFCOUNT_CONSERVATION + | InvariantSet::REACHABILITY + | InvariantSet::ACKED_WRITE_PERSISTENCE + | InvariantSet::READ_AFTER_WRITE + | InvariantSet::RESTART_IDEMPOTENT, + limits: RunLimits { + max_wall_ms: Some(WallMs(120_000)), + }, + restart_policy: RestartPolicy::EveryNOps(OpInterval(500)), + store: StoreConfig { + max_file_size: MaxFileSize(4096), + group_commit: GroupCommitConfig::default(), + shard_count: ShardCount(1), + }, + } +} diff --git a/crates/tranquil-store/tests/gauntlet_smoke.rs b/crates/tranquil-store/tests/gauntlet_smoke.rs index bb93249..1037979 100644 --- a/crates/tranquil-store/tests/gauntlet_smoke.rs +++ b/crates/tranquil-store/tests/gauntlet_smoke.rs @@ -1,9 +1,8 @@ use tranquil_store::blockstore::GroupCommitConfig; use tranquil_store::gauntlet::{ - CollectionName, CompactInterval, Gauntlet, GauntletConfig, InvariantSet, IoBackend, - KeySpaceSize, MaxFileSize, OpCount, OpInterval, OpWeights, RestartPolicy, RunLimits, Scenario, - Seed, ShardCount, SizeDistribution, StoreConfig, ValueBytes, WallMs, WorkloadModel, config_for, - farm, + CollectionName, Gauntlet, GauntletConfig, InvariantSet, IoBackend, KeySpaceSize, MaxFileSize, + OpCount, OpInterval, OpWeights, RestartPolicy, RunLimits, Scenario, Seed, ShardCount, + SizeDistribution, StoreConfig, ValueBytes, WallMs, WorkloadModel, config_for, farm, }; #[test] @@ -48,7 +47,11 @@ fn fast_sanity_config(seed: Seed) -> GauntletConfig { key_space: KeySpaceSize(100), }, op_count: OpCount(200), - invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, + invariants: InvariantSet::REFCOUNT_CONSERVATION + | InvariantSet::REACHABILITY + | InvariantSet::ACKED_WRITE_PERSISTENCE + | InvariantSet::READ_AFTER_WRITE + | InvariantSet::RESTART_IDEMPOTENT, limits: RunLimits { max_wall_ms: Some(WallMs(30_000)), }, @@ -61,7 +64,6 @@ fn fast_sanity_config(seed: Seed) -> GauntletConfig { ..GroupCommitConfig::default() }, shard_count: ShardCount(1), - compact_every: CompactInterval(5), }, } } @@ -89,6 +91,25 @@ async fn gauntlet_fast_sanity() { assert_eq!(report.ops_executed.0, 200); } +#[tokio::test] +async fn full_stack_restart_port() { + let cfg = config_for(Scenario::FullStackRestart, Seed(1)); + let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; + assert!( + report.is_clean(), + "violations: {:?}", + report + .violations + .iter() + .map(|v| format!("{}: {}", v.invariant, v.detail)) + .collect::>() + ); + assert_eq!( + report.restarts.0, 10, + "FullStackRestart with EveryNOps(500) over 5000 ops must restart exactly 10 times", + ); +} + #[tokio::test] #[ignore = "long running, 100k ops with around 20 restarts"] async fn mst_restart_churn_single_seed() {