diff --git a/crates/tranquil-oauth-server/src/endpoints/authorize/passkey.rs b/crates/tranquil-oauth-server/src/endpoints/authorize/passkey.rs index 521a69e..f7684ce 100644 --- a/crates/tranquil-oauth-server/src/endpoints/authorize/passkey.rs +++ b/crates/tranquil-oauth-server/src/endpoints/authorize/passkey.rs @@ -175,10 +175,7 @@ pub async fn passkey_start( } } -async fn passkey_start_discoverable( - state: AppState, - request_id: RequestId, -) -> Response { +async fn passkey_start_discoverable(state: AppState, request_id: RequestId) -> Response { let (rcr, auth_state) = match state.webauthn_config.start_discoverable_authentication() { Ok(result) => result, Err(e) => { @@ -570,16 +567,13 @@ pub async fn passkey_finish( Err(response) => return response, }, None => { - let result = match passkey_finish_discoverable( - &state, - &credential, - &passkey_finish_request_id, - ) - .await - { - Ok(result) => result, - Err(response) => return response, - }; + let result = + match passkey_finish_discoverable(&state, &credential, &passkey_finish_request_id) + .await + { + Ok(result) => result, + Err(response) => return response, + }; if state .repos .oauth diff --git a/crates/tranquil-pds/src/repo_ops.rs b/crates/tranquil-pds/src/repo_ops.rs index 086b049..2171890 100644 --- a/crates/tranquil-pds/src/repo_ops.rs +++ b/crates/tranquil-pds/src/repo_ops.rs @@ -246,9 +246,8 @@ pub async fn finalize_repo_write( let obsolete_cids = match original_settled.diff(&new_settled).await { Ok(diff) => { - let mut obsolete: Vec = Vec::with_capacity( - 1 + diff.removed_mst_blocks.len() + diff.removed_cids.len(), - ); + let mut obsolete: Vec = + Vec::with_capacity(1 + diff.removed_mst_blocks.len() + diff.removed_cids.len()); obsolete.push(ctx.current_root_cid); obsolete.extend(diff.removed_mst_blocks); obsolete.extend(diff.removed_cids); diff --git a/crates/tranquil-pds/tests/gc_compaction_restart.rs b/crates/tranquil-pds/tests/gc_compaction_restart.rs index aedef91..f351c73 100644 --- a/crates/tranquil-pds/tests/gc_compaction_restart.rs +++ b/crates/tranquil-pds/tests/gc_compaction_restart.rs @@ -13,12 +13,10 @@ fn run_compaction(store: &tranquil_store::blockstore::TranquilBlockStore) { .map(|(&fid, _)| fid) .collect::>() .into_iter() - .for_each(|fid| { - match store.compact_file(fid, 0) { - Ok(_) => {} - Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {} - Err(e) => eprintln!("compaction: {e}"), - } + .for_each(|fid| match store.compact_file(fid, 0) { + Ok(_) => {} + Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {} + Err(e) => eprintln!("compaction: {e}"), }); } @@ -84,10 +82,7 @@ async fn mst_blocks_survive_full_store_reopen() { } let data_dir = store.data_dir().to_path_buf(); - let index_dir = data_dir - .parent() - .unwrap() - .join("index"); + let index_dir = data_dir.parent().unwrap().join("index"); let store_clone = store.clone(); tokio::task::spawn_blocking(move || { @@ -107,10 +102,9 @@ async fn mst_blocks_survive_full_store_reopen() { let head_cid = cid::Cid::try_from(repo_root_str.as_str()).expect("invalid cid"); - let car_blocks = - tranquil_pds::scheduled::collect_current_repo_blocks(block_store, &head_cid) - .await - .expect("collect blocks"); + let car_blocks = tranquil_pds::scheduled::collect_current_repo_blocks(block_store, &head_cid) + .await + .expect("collect blocks"); let block_count_before = car_blocks.len(); @@ -131,8 +125,8 @@ async fn mst_blocks_survive_full_store_reopen() { group_commit: tranquil_store::blockstore::GroupCommitConfig::default(), shard_count: 1, }; - let fresh = tranquil_store::blockstore::TranquilBlockStore::open(config) - .expect("reopen failed"); + let fresh = + tranquil_store::blockstore::TranquilBlockStore::open(config).expect("reopen failed"); let missing: Vec = car_blocks .iter() diff --git a/crates/tranquil-pds/tests/lifecycle_session.rs b/crates/tranquil-pds/tests/lifecycle_session.rs index 8f04162..8285d82 100644 --- a/crates/tranquil-pds/tests/lifecycle_session.rs +++ b/crates/tranquil-pds/tests/lifecycle_session.rs @@ -626,7 +626,12 @@ async fn create_app_password_session( .send() .await .expect("Failed to login with app password"); - assert_eq!(login_res.status(), StatusCode::OK, "App password login for '{}' failed", name); + assert_eq!( + login_res.status(), + StatusCode::OK, + "App password login for '{}' failed", + name + ); let session: Value = login_res.json().await.unwrap(); let jwt = session["accessJwt"].as_str().unwrap().to_string(); (jwt, scopes_response) @@ -635,10 +640,7 @@ async fn create_app_password_session( async fn try_chat_service_auth(client: &reqwest::Client, jwt: &str) -> StatusCode { let base = base_url().await; let res = client - .get(format!( - "{}/xrpc/com.atproto.server.getServiceAuth", - base - )) + .get(format!("{}/xrpc/com.atproto.server.getServiceAuth", base)) .bearer_auth(jwt) .query(&[ ("aud", "did:web:api.bsky.app"), diff --git a/crates/tranquil-pds/tests/mst_diff_equivalence.rs b/crates/tranquil-pds/tests/mst_diff_equivalence.rs index 76877aa..f532f31 100644 --- a/crates/tranquil-pds/tests/mst_diff_equivalence.rs +++ b/crates/tranquil-pds/tests/mst_diff_equivalence.rs @@ -15,7 +15,9 @@ fn test_cid(n: u32) -> Cid { Cid::new_v1(0x71, mh) } -async fn compute_obsolete_full_walk( +async fn compute_obsolete_full_walk< + S: jacquard_repo::storage::BlockStore + Sync + Send + 'static, +>( old: &Mst, new: &Mst, ) -> BTreeSet { @@ -34,9 +36,7 @@ async fn compute_obsolete_full_walk BTreeSet { +fn compute_obsolete_from_diff(diff: &jacquard_repo::mst::diff::MstDiff) -> BTreeSet { diff.removed_mst_blocks .iter() .copied() @@ -74,12 +74,17 @@ async fn assert_equivalence( let diff_obsolete = compute_obsolete_from_diff(&diff); assert_eq!( - full_walk_obsolete, diff_obsolete, + full_walk_obsolete, + diff_obsolete, "MISMATCH in scenario: {scenario}\n full_walk count: {}\n diff count: {}\n in full_walk but not diff: {:?}\n in diff but not full_walk: {:?}", full_walk_obsolete.len(), diff_obsolete.len(), - full_walk_obsolete.difference(&diff_obsolete).collect::>(), - diff_obsolete.difference(&full_walk_obsolete).collect::>(), + full_walk_obsolete + .difference(&diff_obsolete) + .collect::>(), + diff_obsolete + .difference(&full_walk_obsolete) + .collect::>(), ); } @@ -256,7 +261,12 @@ async fn massive_to_empty() { async fn massive_complete_replacement() { let old = generate_records("app.bsky.feed.post", 0..1000); let new_rec = generate_records("app.bsky.feed.post", 1000..2000); - assert_equivalence(&old, &new_rec, "1000 records fully replaced with 1000 different").await; + assert_equivalence( + &old, + &new_rec, + "1000 records fully replaced with 1000 different", + ) + .await; } #[tokio::test] @@ -276,7 +286,12 @@ async fn multi_collection_5_collections_500_each() { ]; let old = generate_multi_collection_records(&collections, 500); let new_rec = apply_scattered_updates(&old, 4, 30000); - assert_equivalence(&old, &new_rec, "5 collections x 500 records - update every 4th").await; + assert_equivalence( + &old, + &new_rec, + "5 collections x 500 records - update every 4th", + ) + .await; } #[tokio::test] @@ -294,7 +309,12 @@ async fn multi_collection_wipe_one_collection() { .filter(|(key, _)| !key.starts_with("app.bsky.feed.repost")) .cloned() .collect(); - assert_equivalence(&old, &new_rec, "4 collections x 400 - wipe repost collection").await; + assert_equivalence( + &old, + &new_rec, + "4 collections x 400 - wipe repost collection", + ) + .await; } #[tokio::test] @@ -313,10 +333,7 @@ async fn multi_collection_keep_only_one() { #[tokio::test] async fn multi_collection_add_new_collection() { - let old_collections = [ - "app.bsky.feed.like", - "app.bsky.feed.post", - ]; + let old_collections = ["app.bsky.feed.like", "app.bsky.feed.post"]; let old = generate_multi_collection_records(&old_collections, 500); let new_rec = append_records(&old, "app.bsky.graph.follow", 0..500, 40000); assert_equivalence(&old, &new_rec, "2 collections x 500 + add 500 follows").await; @@ -378,7 +395,12 @@ async fn interleaved_keys_disjoint_ranges() { let new_rec: Vec<_> = (0..1000u32) .map(|i| (make_key("app.bsky.feed.post", i * 2 + 1), i + 10000)) .collect(); - assert_equivalence(&old, &new_rec, "1000 even-keyed records replaced by 1000 odd-keyed").await; + assert_equivalence( + &old, + &new_rec, + "1000 even-keyed records replaced by 1000 odd-keyed", + ) + .await; } #[tokio::test] @@ -426,7 +448,12 @@ async fn many_collections_few_records_each() { }) .collect(); - assert_equivalence(&old, &new_rec, "50 collections x 20 records - delete every 15th, update every 7th").await; + assert_equivalence( + &old, + &new_rec, + "50 collections x 20 records - delete every 15th, update every 7th", + ) + .await; } #[tokio::test] @@ -457,7 +484,12 @@ async fn one_to_massive() { async fn delete_head_and_tail() { let old = generate_records("app.bsky.feed.post", 0..2000); let new_rec: Vec<_> = old[200..1800].to_vec(); - assert_equivalence(&old, &new_rec, "2000 records - delete first 200 and last 200").await; + assert_equivalence( + &old, + &new_rec, + "2000 records - delete first 200 and last 200", + ) + .await; } #[tokio::test] @@ -465,7 +497,12 @@ async fn keep_head_and_tail_only() { let old = generate_records("app.bsky.feed.post", 0..2000); let mut new_rec: Vec<_> = old[..100].to_vec(); new_rec.extend_from_slice(&old[1900..]); - assert_equivalence(&old, &new_rec, "2000 records - keep only first 100 and last 100").await; + assert_equivalence( + &old, + &new_rec, + "2000 records - keep only first 100 and last 100", + ) + .await; } #[tokio::test] @@ -515,7 +552,12 @@ async fn swiss_cheese_deletions() { }) .map(|(_, r)| r.clone()) .collect(); - assert_equivalence(&old, &new_rec, "1500 records - delete every 3rd chunk of 50").await; + assert_equivalence( + &old, + &new_rec, + "1500 records - delete every 3rd chunk of 50", + ) + .await; } #[tokio::test] @@ -529,9 +571,7 @@ async fn mixed_ops_with_key_density_change() { .filter(|(_, val)| val % 4 != 0) .cloned() .collect(); - new_rec.extend((0..500u32).map(|i| { - (make_key("app.bsky.feed.post", i * 3 + 1), i + 100000) - })); + new_rec.extend((0..500u32).map(|i| (make_key("app.bsky.feed.post", i * 3 + 1), i + 100000))); new_rec.sort_by(|(a, _), (b, _)| a.cmp(b)); assert_equivalence( diff --git a/crates/tranquil-scopes/src/permissions.rs b/crates/tranquil-scopes/src/permissions.rs index 7342ded..7835e2d 100644 --- a/crates/tranquil-scopes/src/permissions.rs +++ b/crates/tranquil-scopes/src/permissions.rs @@ -164,7 +164,10 @@ impl ScopePermissions { if self.has_transition_generic && !self.has_transition_chat { return Err(ScopeError::InsufficientScope { required: "transition:chat.bsky".to_string(), - message: format!("Chat access requires transition:chat.bsky scope to call {}", lxm), + message: format!( + "Chat access requires transition:chat.bsky scope to call {}", + lxm + ), }); } } diff --git a/crates/tranquil-store/Cargo.toml b/crates/tranquil-store/Cargo.toml index 91e09ef..3f2b4dc 100644 --- a/crates/tranquil-store/Cargo.toml +++ b/crates/tranquil-store/Cargo.toml @@ -14,7 +14,7 @@ parking_lot = { workspace = true } fjall = "3" lsm-tree = "3" flume = "0.11" -tokio = { workspace = true, features = ["sync", "rt"] } +tokio = { workspace = true, features = ["sync", "rt", "time"] } bytes = "1" memmap2 = "0.9" tracing = { workspace = true } @@ -34,9 +34,10 @@ dashmap = "6" rayon = "1" smallvec = "1" uuid = { workspace = true } +tempfile = { version = "3", optional = true } [features] -test-harness = [] +test-harness = ["dep:tempfile"] [dev-dependencies] tranquil-store = { path = ".", features = ["test-harness"] } diff --git a/crates/tranquil-store/src/blockstore/hash_index.rs b/crates/tranquil-store/src/blockstore/hash_index.rs index e9668ec..a0e3be7 100644 --- a/crates/tranquil-store/src/blockstore/hash_index.rs +++ b/crates/tranquil-store/src/blockstore/hash_index.rs @@ -1202,6 +1202,15 @@ impl BlockIndex { self.table.read().contains_live(cid) } + pub fn live_entries_snapshot(&self) -> Vec<([u8; CID_SIZE], RefCount)> { + self.table + .read() + .iter() + .filter(|s| !s.refcount.is_zero()) + .map(|s| (s.cid, s.refcount)) + .collect() + } + pub fn batch_put( &self, entries: &[([u8; CID_SIZE], BlockLocation)], @@ -1222,7 +1231,14 @@ impl BlockIndex { now: WallClockMs, position_update: PositionUpdate<'_>, ) -> Result<(), BlockIndexError> { - self.batch_put_inner(entries, decrements, cursor, epoch, now, Some(position_update)) + self.batch_put_inner( + entries, + decrements, + cursor, + epoch, + now, + Some(position_update), + ) } fn batch_put_inner( diff --git a/crates/tranquil-store/src/blockstore/hint.rs b/crates/tranquil-store/src/blockstore/hint.rs index e07150c..36af998 100644 --- a/crates/tranquil-store/src/blockstore/hint.rs +++ b/crates/tranquil-store/src/blockstore/hint.rs @@ -57,10 +57,8 @@ fn write_hint_record( fn encode_location_fields(record: &mut [u8; HINT_RECORD_SIZE], loc: &BlockLocation) { record[FIELD_A_OFFSET..FIELD_A_OFFSET + 4].copy_from_slice(&loc.file_id.raw().to_le_bytes()); - record[FIELD_A_OFFSET + 4..FIELD_A_OFFSET + 8] - .copy_from_slice(&loc.length.raw().to_le_bytes()); - record[FIELD_B_OFFSET..FIELD_B_OFFSET + 8] - .copy_from_slice(&loc.offset.raw().to_le_bytes()); + record[FIELD_A_OFFSET + 4..FIELD_A_OFFSET + 8].copy_from_slice(&loc.length.raw().to_le_bytes()); + record[FIELD_B_OFFSET..FIELD_B_OFFSET + 8].copy_from_slice(&loc.offset.raw().to_le_bytes()); } pub(crate) fn encode_hint_record( @@ -841,7 +839,11 @@ mod tests { let offset = BlockOffset::new(1024); let length = BlockLength::new(256); - let loc = BlockLocation { file_id, offset, length }; + let loc = BlockLocation { + file_id, + offset, + length, + }; encode_hint_record(&sim, fd, HintOffset::new(0), &cid, &loc).unwrap(); let file_size = sim.file_size(fd).unwrap(); diff --git a/crates/tranquil-store/src/eventlog/reader.rs b/crates/tranquil-store/src/eventlog/reader.rs index 309a6fc..7890e34 100644 --- a/crates/tranquil-store/src/eventlog/reader.rs +++ b/crates/tranquil-store/src/eventlog/reader.rs @@ -552,7 +552,7 @@ fn decode_mmap_event( segment = %segment_id, offset = raw, file_size, - "decode offset past file size (corrupt index?)" + "decode offset past file size, index likely corrupt" ); return Ok(MmapDecodeResult::Corrupted); } diff --git a/crates/tranquil-store/src/eventlog/writer.rs b/crates/tranquil-store/src/eventlog/writer.rs index 0e18f64..288d01c 100644 --- a/crates/tranquil-store/src/eventlog/writer.rs +++ b/crates/tranquil-store/src/eventlog/writer.rs @@ -305,7 +305,7 @@ impl EventLogWriter { match self.build_sidecar_for_segment(old_id) { Ok(()) => {} - Err(e) => warn!(segment = %old_id, error = %e, "sidecar build failed (non-fatal)"), + Err(e) => warn!(segment = %old_id, error = %e, "non-fatal sidecar build failure"), } let (new_id, new_fd) = self.manager.prepare_rotation(old_id)?; diff --git a/crates/tranquil-store/src/gauntlet/farm.rs b/crates/tranquil-store/src/gauntlet/farm.rs new file mode 100644 index 0000000..dd3e791 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/farm.rs @@ -0,0 +1,41 @@ +use std::cell::RefCell; + +use rayon::prelude::*; +use tokio::runtime::Runtime; + +use super::op::Seed; +use super::runner::{Gauntlet, GauntletConfig, GauntletReport}; + +thread_local! { + static RUNTIME: RefCell> = const { RefCell::new(None) }; +} + +fn with_runtime(f: impl FnOnce(&Runtime) -> R) -> R { + RUNTIME.with(|cell| { + let mut slot = cell.borrow_mut(); + if slot.is_none() { + *slot = Some( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build rt"), + ); + } + f(slot.as_ref().expect("runtime present")) + }) +} + +pub fn run_many(make_config: F, seeds: impl IntoIterator) -> Vec +where + F: Fn(Seed) -> GauntletConfig + Sync + Send, +{ + let seeds: Vec = seeds.into_iter().collect(); + seeds + .into_par_iter() + .map(|s| { + let cfg = make_config(s); + let gauntlet = Gauntlet::new(cfg).expect("build gauntlet"); + with_runtime(|rt| rt.block_on(gauntlet.run())) + }) + .collect() +} diff --git a/crates/tranquil-store/src/gauntlet/invariants.rs b/crates/tranquil-store/src/gauntlet/invariants.rs new file mode 100644 index 0000000..9448419 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/invariants.rs @@ -0,0 +1,141 @@ +use std::collections::{HashMap, HashSet}; + +use super::oracle::Oracle; +use crate::blockstore::{CidBytes, TranquilBlockStore}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct InvariantSet(u32); + +impl InvariantSet { + pub const EMPTY: Self = Self(0); + pub const REFCOUNT_CONSERVATION: Self = Self(1 << 0); + pub const REACHABILITY: Self = Self(1 << 1); + + const ALL_KNOWN: u32 = Self::REFCOUNT_CONSERVATION.0 | Self::REACHABILITY.0; + + pub const fn contains(self, other: Self) -> bool { + (self.0 & other.0) == other.0 + } + + pub const fn union(self, other: Self) -> Self { + Self(self.0 | other.0) + } + + pub const fn unknown_bits(self) -> u32 { + self.0 & !Self::ALL_KNOWN + } +} + +impl std::ops::BitOr for InvariantSet { + type Output = Self; + fn bitor(self, rhs: Self) -> Self { + self.union(rhs) + } +} + +#[derive(Debug)] +pub struct InvariantViolation { + pub invariant: &'static str, + pub detail: String, +} + +pub trait Invariant { + fn name(&self) -> &'static str; + fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation>; +} + +pub struct RefcountConservation; + +impl Invariant for RefcountConservation { + fn name(&self) -> &'static str { + "RefcountConservation" + } + + fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation> { + let live: Vec<(String, CidBytes)> = oracle.live_cids_labeled(); + let live_set: HashSet = live.iter().map(|(_, c)| *c).collect(); + let index: HashMap = store + .block_index() + .live_entries_snapshot() + .into_iter() + .map(|(c, r)| (c, r.raw())) + .collect(); + + let forward: Vec = live + .iter() + .filter_map(|(label, cid)| match index.get(cid) { + Some(&r) if r >= 1 => None, + Some(&r) => Some(format!("{label}: refcount {r}")), + None => Some(format!("{label}: missing from index")), + }) + .collect(); + + let inverse: Vec = index + .iter() + .filter(|(cid, _)| !live_set.contains(*cid)) + .map(|(cid, r)| format!("orphan cid {} refcount {}", hex_short(cid), r)) + .collect(); + + let violations: Vec = forward.into_iter().chain(inverse).collect(); + if violations.is_empty() { + Ok(()) + } else { + Err(InvariantViolation { + invariant: "RefcountConservation", + detail: violations.join("; "), + }) + } + } +} + +pub struct Reachability; + +impl Invariant for Reachability { + fn name(&self) -> &'static str { + "Reachability" + } + + fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation> { + let violations: Vec = oracle + .live_cids_labeled() + .into_iter() + .filter_map(|(label, fixed)| match store.get_block_sync(&fixed) { + Ok(Some(_)) => None, + Ok(None) => Some(format!("{label}: missing")), + Err(e) => Some(format!("{label}: read error {e}")), + }) + .collect(); + + if violations.is_empty() { + Ok(()) + } else { + Err(InvariantViolation { + invariant: "Reachability", + detail: violations.join("; "), + }) + } + } +} + +fn hex_short(cid: &CidBytes) -> String { + let tail = &cid[cid.len() - 6..]; + tail.iter().map(|b| format!("{b:02x}")).collect() +} + +pub fn invariants_for(set: InvariantSet) -> Vec> { + let unknown = set.unknown_bits(); + assert!( + unknown == 0, + "invariants_for: unknown InvariantSet bits 0x{unknown:x}; all bits must map to an impl" + ); + [ + ( + InvariantSet::REFCOUNT_CONSERVATION, + Box::new(RefcountConservation) as Box, + ), + (InvariantSet::REACHABILITY, Box::new(Reachability)), + ] + .into_iter() + .filter_map(|(flag, inv)| set.contains(flag).then_some(inv)) + .collect() +} diff --git a/crates/tranquil-store/src/gauntlet/mod.rs b/crates/tranquil-store/src/gauntlet/mod.rs new file mode 100644 index 0000000..0db375c --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/mod.rs @@ -0,0 +1,20 @@ +pub mod farm; +pub mod invariants; +pub mod op; +pub mod oracle; +pub mod runner; +pub mod scenarios; +pub mod workload; + +pub use invariants::{Invariant, InvariantSet, InvariantViolation, invariants_for}; +pub use op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed}; +pub use oracle::Oracle; +pub use runner::{ + CompactInterval, Gauntlet, GauntletBuildError, GauntletConfig, GauntletReport, IoBackend, + MaxFileSize, OpIndex, OpInterval, OpsExecuted, RestartCount, RestartPolicy, RunLimits, + ShardCount, StoreConfig, WallMs, +}; +pub use scenarios::{Scenario, config_for}; +pub use workload::{ + ByteRange, KeySpaceSize, OpCount, OpWeights, SizeDistribution, ValueBytes, WorkloadModel, +}; diff --git a/crates/tranquil-store/src/gauntlet/op.rs b/crates/tranquil-store/src/gauntlet/op.rs new file mode 100644 index 0000000..6645ab4 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/op.rs @@ -0,0 +1,60 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct Seed(pub u64); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct CollectionName(pub String); + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RecordKey(pub String); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ValueSeed(pub u32); + +#[derive(Debug, Clone)] +pub enum Op { + AddRecord { + collection: CollectionName, + rkey: RecordKey, + value_seed: ValueSeed, + }, + DeleteRecord { + collection: CollectionName, + rkey: RecordKey, + }, + Compact, + Checkpoint, +} + +#[derive(Debug, Clone)] +pub struct OpStream { + ops: Vec, +} + +impl OpStream { + pub fn from_vec(ops: Vec) -> Self { + Self { ops } + } + + pub fn into_vec(self) -> Vec { + self.ops + } + + pub fn iter(&self) -> impl Iterator { + self.ops.iter() + } + + pub fn len(&self) -> usize { + self.ops.len() + } + + pub fn is_empty(&self) -> bool { + self.ops.is_empty() + } + + pub fn shrink(&self) -> Option { + (self.ops.len() >= 2).then(|| { + let half = self.ops.len() / 2; + OpStream::from_vec(self.ops[..half].to_vec()) + }) + } +} diff --git a/crates/tranquil-store/src/gauntlet/oracle.rs b/crates/tranquil-store/src/gauntlet/oracle.rs new file mode 100644 index 0000000..e5aa985 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/oracle.rs @@ -0,0 +1,75 @@ +use std::collections::HashMap; + +use cid::Cid; + +use super::op::{CollectionName, RecordKey}; +use crate::blockstore::CidBytes; + +#[derive(Debug, Default)] +pub struct Oracle { + live: HashMap<(CollectionName, RecordKey), CidBytes>, + current_root: Option, + mst_node_cids: Vec, +} + +impl Oracle { + pub fn new() -> Self { + Self::default() + } + + pub fn add( + &mut self, + coll: CollectionName, + rkey: RecordKey, + record_cid: CidBytes, + ) -> Option { + self.live.insert((coll, rkey), record_cid) + } + + pub fn delete(&mut self, coll: &CollectionName, rkey: &RecordKey) -> Option { + self.live.remove(&(coll.clone(), rkey.clone())) + } + + pub fn set_root(&mut self, root: Cid) { + self.current_root = Some(root); + } + + pub fn root(&self) -> Option { + self.current_root + } + + pub fn set_node_cids(&mut self, cids: Vec) { + self.mst_node_cids = cids; + } + + pub fn mst_node_cids(&self) -> &[Cid] { + &self.mst_node_cids + } + + pub fn live_records(&self) -> impl Iterator { + self.live.iter().map(|((c, r), v)| (c, r, v)) + } + + pub fn live_count(&self) -> usize { + self.live.len() + } + + pub fn live_cids_labeled(&self) -> Vec<(String, CidBytes)> { + let nodes = self + .mst_node_cids + .iter() + .map(|cid| (format!("mst {cid}"), cid_to_fixed(cid))); + let records = self + .live_records() + .map(|(c, r, v)| (format!("record {}/{}", c.0, r.0), *v)); + nodes.chain(records).collect() + } +} + +pub(super) fn cid_to_fixed(cid: &Cid) -> CidBytes { + let bytes = cid.to_bytes(); + debug_assert_eq!(bytes.len(), 36, "expected 36 byte CIDv1+sha256"); + let mut arr = [0u8; 36]; + arr.copy_from_slice(&bytes[..36]); + arr +} diff --git a/crates/tranquil-store/src/gauntlet/runner.rs b/crates/tranquil-store/src/gauntlet/runner.rs new file mode 100644 index 0000000..eb6abc8 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/runner.rs @@ -0,0 +1,438 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; + +use cid::Cid; +use jacquard_repo::mst::Mst; +use jacquard_repo::storage::BlockStore; + +use super::invariants::{InvariantSet, InvariantViolation, invariants_for}; +use super::op::{Op, OpStream, Seed, ValueSeed}; +use super::oracle::{Oracle, cid_to_fixed}; +use super::workload::{Lcg, OpCount, SizeDistribution, ValueBytes, WorkloadModel}; +use crate::blockstore::{ + BlockStoreConfig, CidBytes, CompactionError, GroupCommitConfig, TranquilBlockStore, +}; + +#[derive(Debug, Clone, Copy)] +pub enum IoBackend { + Real, + Simulated, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct OpInterval(pub usize); + +#[derive(Debug, Clone, Copy)] +pub enum RestartPolicy { + Never, + EveryNOps(OpInterval), + PoissonByOps(OpInterval), +} + +#[derive(Debug, Clone, Copy)] +pub struct WallMs(pub u64); + +#[derive(Debug, Clone, Copy)] +pub struct RunLimits { + pub max_wall_ms: Option, +} + +#[derive(Debug, Clone, Copy)] +pub struct MaxFileSize(pub u64); + +#[derive(Debug, Clone, Copy)] +pub struct ShardCount(pub u8); + +#[derive(Debug, Clone, Copy)] +pub struct CompactInterval(pub u32); + +#[derive(Debug, Clone)] +pub struct StoreConfig { + pub max_file_size: MaxFileSize, + pub group_commit: GroupCommitConfig, + pub shard_count: ShardCount, + pub compact_every: CompactInterval, +} + +#[derive(Debug, Clone)] +pub struct GauntletConfig { + pub seed: Seed, + pub io: IoBackend, + pub workload: WorkloadModel, + pub op_count: OpCount, + pub invariants: InvariantSet, + pub limits: RunLimits, + pub restart_policy: RestartPolicy, + pub store: StoreConfig, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct OpsExecuted(pub usize); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct RestartCount(pub usize); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct OpIndex(pub usize); + +#[derive(Debug)] +pub struct GauntletReport { + pub seed: Seed, + pub ops_executed: OpsExecuted, + pub restarts: RestartCount, + pub violations: Vec, +} + +impl GauntletReport { + pub fn is_clean(&self) -> bool { + self.violations.is_empty() + } +} + +#[derive(Debug, thiserror::Error)] +enum OpError { + #[error("put record: {0}")] + PutRecord(String), + #[error("mst add: {0}")] + MstAdd(String), + #[error("mst delete: {0}")] + MstDelete(String), + #[error("mst persist: {0}")] + MstPersist(String), + #[error("mst diff: {0}")] + MstDiff(String), + #[error("apply commit: {0}")] + ApplyCommit(String), + #[error("compact_file: {0}")] + CompactFile(String), + #[error("join: {0}")] + Join(String), +} + +pub struct Gauntlet { + config: GauntletConfig, +} + +#[derive(Debug, thiserror::Error)] +pub enum GauntletBuildError { + #[error("IoBackend::Simulated not wired yet")] + UnsupportedIoBackend, +} + +impl Gauntlet { + pub fn new(config: GauntletConfig) -> Result { + match config.io { + IoBackend::Real => Ok(Self { config }), + IoBackend::Simulated => Err(GauntletBuildError::UnsupportedIoBackend), + } + } + + pub async fn run(self) -> GauntletReport { + let deadline = self + .config + .limits + .max_wall_ms + .map(|WallMs(ms)| Duration::from_millis(ms)); + + let seed = self.config.seed; + let ops_counter = Arc::new(AtomicUsize::new(0)); + let restarts_counter = Arc::new(AtomicUsize::new(0)); + let fut = run_real_inner(self.config, ops_counter.clone(), restarts_counter.clone()); + match deadline { + Some(d) => match tokio::time::timeout(d, fut).await { + Ok(r) => r, + Err(_) => GauntletReport { + seed, + ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)), + restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), + violations: vec![InvariantViolation { + invariant: "WallClockBudget", + detail: format!("exceeded max_wall_ms ({} ms)", d.as_millis()), + }], + }, + }, + None => fut.await, + } + } +} + +async fn run_real_inner( + config: GauntletConfig, + ops_counter: Arc, + restarts_counter: Arc, +) -> GauntletReport { + let dir = tempfile::TempDir::new().expect("tempdir"); + let op_stream: OpStream = config.workload.generate(config.seed, config.op_count); + + let mut oracle = Oracle::new(); + let mut violations: Vec = Vec::new(); + + let mut store = Arc::new( + TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)).expect("open store"), + ); + let mut root: Option = None; + let mut restart_rng = Lcg::new(Seed(config.seed.0 ^ 0xA5A5_A5A5_A5A5_A5A5)); + let mut halt_ops = false; + + for (idx, op) in op_stream.iter().enumerate() { + if halt_ops { + break; + } + match apply_op(&store, &mut root, &mut oracle, op, &config.workload).await { + Ok(()) => {} + Err(e) => { + violations.push(InvariantViolation { + invariant: "OpExecution", + detail: format!("op {idx}: {e}"), + }); + ops_counter.store(idx + 1, Ordering::Relaxed); + halt_ops = true; + continue; + } + } + ops_counter.store(idx + 1, Ordering::Relaxed); + + if should_restart(config.restart_policy, OpIndex(idx), &mut restart_rng) { + drop(store); + store = Arc::new( + TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)) + .expect("reopen store"), + ); + let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1; + + if let Err(e) = refresh_oracle_graph(&store, &mut oracle, root).await { + violations.push(InvariantViolation { + invariant: "OpExecution", + detail: format!("refresh after restart {n}: {e}"), + }); + halt_ops = true; + continue; + } + violations.extend(check_all(&store, &oracle, config.invariants)); + if !violations.is_empty() { + halt_ops = true; + } + } + } + + match refresh_oracle_graph(&store, &mut oracle, root).await { + Ok(()) => violations.extend(check_all(&store, &oracle, config.invariants)), + Err(e) => violations.push(InvariantViolation { + invariant: "OpExecution", + detail: format!("refresh at end: {e}"), + }), + } + + GauntletReport { + seed: config.seed, + ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)), + restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), + violations, + } +} + +fn check_all( + store: &TranquilBlockStore, + oracle: &Oracle, + set: InvariantSet, +) -> Vec { + invariants_for(set) + .into_iter() + .filter_map(|inv| inv.check(store, oracle).err()) + .collect() +} + +async fn refresh_oracle_graph( + store: &Arc, + oracle: &mut Oracle, + root: Option, +) -> Result<(), String> { + match root { + None => { + oracle.set_node_cids(Vec::new()); + Ok(()) + } + Some(r) => { + let settled = Mst::load(store.clone(), r, None); + let cids = settled + .collect_node_cids() + .await + .map_err(|e| format!("collect_node_cids: {e}"))?; + oracle.set_root(r); + oracle.set_node_cids(cids); + Ok(()) + } + } +} + +fn should_restart(policy: RestartPolicy, idx: OpIndex, rng: &mut Lcg) -> bool { + match policy { + RestartPolicy::Never => false, + RestartPolicy::EveryNOps(OpInterval(n)) => n > 0 && (idx.0 + 1).is_multiple_of(n), + RestartPolicy::PoissonByOps(OpInterval(n)) => { + if n == 0 { + false + } else { + rng.next_u64().is_multiple_of(n as u64) + } + } + } +} + +fn blockstore_config(dir: &std::path::Path, s: &StoreConfig) -> BlockStoreConfig { + BlockStoreConfig { + data_dir: dir.join("data"), + index_dir: dir.join("index"), + max_file_size: s.max_file_size.0, + group_commit: s.group_commit.clone(), + shard_count: s.shard_count.0, + } +} + +fn make_record_bytes(value_seed: ValueSeed, dist: SizeDistribution) -> Vec { + let raw = value_seed.0; + let target_len: usize = match dist { + SizeDistribution::Fixed(ValueBytes(n)) => n as usize, + SizeDistribution::Uniform(range) => { + let ValueBytes(lo) = range.min(); + let ValueBytes(hi) = range.max(); + let span = u64::from(hi.saturating_sub(lo)).max(1); + let rng_state = u64::from(raw); + (lo as usize) + (rng_state % span) as usize + } + }; + serde_ipld_dagcbor::to_vec(&serde_json::json!({ + "$type": "app.bsky.feed.post", + "text": format!("record-{raw}"), + "createdAt": "2026-01-01T00:00:00Z", + "pad": "x".repeat(target_len.saturating_sub(64)), + })) + .expect("encode record") +} + +async fn apply_op( + store: &Arc, + root: &mut Option, + oracle: &mut Oracle, + op: &Op, + workload: &WorkloadModel, +) -> Result<(), OpError> { + match op { + Op::AddRecord { + collection, + rkey, + value_seed, + } => { + let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); + let record_cid = store + .put(&record_bytes) + .await + .map_err(|e| OpError::PutRecord(e.to_string()))?; + let key = format!("{}/{}", collection.0, rkey.0); + let loaded = match *root { + None => Mst::new(store.clone()), + Some(r) => Mst::load(store.clone(), r, None), + }; + let updated = loaded + .add(&key, record_cid) + .await + .map_err(|e| OpError::MstAdd(e.to_string()))?; + let new_root = updated + .persist() + .await + .map_err(|e| OpError::MstPersist(e.to_string()))?; + + if let Some(old_root) = *root { + apply_mst_diff(store, old_root, new_root).await?; + } + + *root = Some(new_root); + oracle.add(collection.clone(), rkey.clone(), cid_to_fixed(&record_cid)); + Ok(()) + } + Op::DeleteRecord { collection, rkey } => { + let Some(old_root) = *root else { return Ok(()) }; + if oracle.delete(collection, rkey).is_none() { + return Ok(()); + } + let key = format!("{}/{}", collection.0, rkey.0); + let loaded = Mst::load(store.clone(), old_root, None); + let updated = loaded + .delete(&key) + .await + .map_err(|e| OpError::MstDelete(e.to_string()))?; + let new_root = updated + .persist() + .await + .map_err(|e| OpError::MstPersist(e.to_string()))?; + apply_mst_diff(store, old_root, new_root).await?; + *root = Some(new_root); + Ok(()) + } + Op::Compact => { + let s = store.clone(); + tokio::task::spawn_blocking(move || compact_by_liveness(&s)) + .await + .map_err(|e| OpError::Join(e.to_string()))? + } + Op::Checkpoint => { + let s = store.clone(); + tokio::task::spawn_blocking(move || { + s.apply_commit_blocking(vec![], vec![]) + .map_err(|e| e.to_string()) + }) + .await + .map_err(|e| OpError::Join(e.to_string()))? + .map_err(OpError::ApplyCommit) + } + } +} + +async fn apply_mst_diff( + store: &Arc, + old_root: Cid, + new_root: Cid, +) -> Result<(), OpError> { + let old_m = Mst::load(store.clone(), old_root, None); + let new_m = Mst::load(store.clone(), new_root, None); + let diff = old_m + .diff(&new_m) + .await + .map_err(|e| OpError::MstDiff(e.to_string()))?; + let obsolete: Vec = diff + .removed_mst_blocks + .into_iter() + .chain(diff.removed_cids.into_iter()) + .map(|c| cid_to_fixed(&c)) + .collect(); + let s = store.clone(); + tokio::task::spawn_blocking(move || { + s.apply_commit_blocking(vec![], obsolete) + .map_err(|e| e.to_string()) + }) + .await + .map_err(|e| OpError::Join(e.to_string()))? + .map_err(OpError::ApplyCommit) +} + +const COMPACT_LIVENESS_CEILING: f64 = 0.99; + +fn compact_by_liveness(store: &TranquilBlockStore) -> Result<(), OpError> { + let liveness = match store.compaction_liveness(0) { + Ok(l) => l, + Err(_) => return Ok(()), + }; + let targets: Vec<_> = liveness + .iter() + .filter(|(_, info)| info.total_blocks > 0 && info.ratio() < COMPACT_LIVENESS_CEILING) + .map(|(&fid, _)| fid) + .collect(); + targets + .into_iter() + .try_for_each(|fid| match store.compact_file(fid, 0) { + Ok(_) => Ok(()), + Err(CompactionError::ActiveFileCannotBeCompacted) => Ok(()), + Err(e) => Err(OpError::CompactFile(format!("{fid}: {e}"))), + }) +} diff --git a/crates/tranquil-store/src/gauntlet/scenarios.rs b/crates/tranquil-store/src/gauntlet/scenarios.rs new file mode 100644 index 0000000..81a0186 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/scenarios.rs @@ -0,0 +1,120 @@ +use super::invariants::InvariantSet; +use super::op::{CollectionName, Seed}; +use super::runner::{ + CompactInterval, GauntletConfig, IoBackend, MaxFileSize, OpInterval, RestartPolicy, RunLimits, + ShardCount, StoreConfig, WallMs, +}; +use super::workload::{ + KeySpaceSize, OpCount, OpWeights, SizeDistribution, ValueBytes, WorkloadModel, +}; +use crate::blockstore::GroupCommitConfig; + +#[derive(Debug, Clone, Copy)] +pub enum Scenario { + SmokePR, + MstChurn, + MstRestartChurn, +} + +pub fn config_for(scenario: Scenario, seed: Seed) -> GauntletConfig { + match scenario { + Scenario::SmokePR => smoke_pr(seed), + Scenario::MstChurn => mst_churn(seed), + Scenario::MstRestartChurn => mst_restart_churn(seed), + } +} + +fn default_collections() -> Vec { + vec![ + CollectionName("app.bsky.feed.post".to_string()), + CollectionName("app.bsky.feed.like".to_string()), + ] +} + +fn tiny_store() -> StoreConfig { + StoreConfig { + max_file_size: MaxFileSize(300), + group_commit: GroupCommitConfig { + checkpoint_interval_ms: 100, + checkpoint_write_threshold: 10, + ..GroupCommitConfig::default() + }, + shard_count: ShardCount(1), + compact_every: CompactInterval(5), + } +} + +fn smoke_pr(seed: Seed) -> GauntletConfig { + GauntletConfig { + seed, + io: IoBackend::Real, + workload: WorkloadModel { + weights: OpWeights { + add: 80, + delete: 0, + compact: 10, + checkpoint: 10, + }, + size_distribution: SizeDistribution::Fixed(ValueBytes(64)), + collections: default_collections(), + key_space: KeySpaceSize(200), + }, + op_count: OpCount(10_000), + invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, + limits: RunLimits { + max_wall_ms: Some(WallMs(60_000)), + }, + restart_policy: RestartPolicy::EveryNOps(OpInterval(2_000)), + store: tiny_store(), + } +} + +fn mst_churn(seed: Seed) -> GauntletConfig { + GauntletConfig { + seed, + io: IoBackend::Real, + workload: WorkloadModel { + weights: OpWeights { + add: 85, + delete: 0, + compact: 10, + checkpoint: 5, + }, + size_distribution: SizeDistribution::Fixed(ValueBytes(64)), + collections: default_collections(), + key_space: KeySpaceSize(2_000), + }, + op_count: OpCount(100_000), + invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, + limits: RunLimits { + max_wall_ms: Some(WallMs(600_000)), + }, + restart_policy: RestartPolicy::Never, + store: tiny_store(), + } +} + +fn mst_restart_churn(seed: Seed) -> GauntletConfig { + GauntletConfig { + seed, + io: IoBackend::Real, + workload: WorkloadModel { + weights: OpWeights { + add: 85, + delete: 0, + compact: 10, + checkpoint: 5, + }, + size_distribution: SizeDistribution::Fixed(ValueBytes(64)), + collections: default_collections(), + key_space: KeySpaceSize(2_000), + }, + op_count: OpCount(100_000), + invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, + limits: RunLimits { + max_wall_ms: Some(WallMs(600_000)), + }, + restart_policy: RestartPolicy::PoissonByOps(OpInterval(5_000)), + store: tiny_store(), + } +} diff --git a/crates/tranquil-store/src/gauntlet/workload.rs b/crates/tranquil-store/src/gauntlet/workload.rs new file mode 100644 index 0000000..9277d53 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/workload.rs @@ -0,0 +1,133 @@ +use super::op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ValueBytes(pub u32); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct KeySpaceSize(pub u32); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct OpCount(pub usize); + +#[derive(Debug, Clone, Copy)] +pub struct OpWeights { + pub add: u32, + pub delete: u32, + pub compact: u32, + pub checkpoint: u32, +} + +impl OpWeights { + pub const fn total(&self) -> u32 { + self.add + self.delete + self.compact + self.checkpoint + } +} + +#[derive(Debug, Clone, Copy)] +pub struct ByteRange { + min: ValueBytes, + max: ValueBytes, +} + +impl ByteRange { + pub fn new(min: ValueBytes, max: ValueBytes) -> Result { + if max.0 < min.0 { + Err(format!("ByteRange: max {} < min {}", max.0, min.0)) + } else { + Ok(Self { min, max }) + } + } + + pub fn min(&self) -> ValueBytes { + self.min + } + + pub fn max(&self) -> ValueBytes { + self.max + } +} + +#[derive(Debug, Clone, Copy)] +pub enum SizeDistribution { + Fixed(ValueBytes), + Uniform(ByteRange), +} + +#[derive(Debug, Clone)] +pub struct WorkloadModel { + pub weights: OpWeights, + pub size_distribution: SizeDistribution, + pub collections: Vec, + pub key_space: KeySpaceSize, +} + +impl WorkloadModel { + pub fn generate(&self, seed: Seed, op_count: OpCount) -> OpStream { + let mut rng = Lcg::new(seed); + let total = self.weights.total(); + assert!(total > 0, "workload weights must sum to > 0"); + assert!( + !self.collections.is_empty(), + "workload needs at least 1 collection" + ); + + let ops: Vec = (0..op_count.0) + .map(|_| { + let bucket = rng.next_u32() % total; + let coll = self.collections[rng.next_usize() % self.collections.len()].clone(); + let rkey = RecordKey(format!("{:06}", rng.next_u32() % self.key_space.0.max(1))); + + let (a, d, c) = ( + self.weights.add, + self.weights.add + self.weights.delete, + self.weights.add + self.weights.delete + self.weights.compact, + ); + match bucket { + b if b < a => Op::AddRecord { + collection: coll, + rkey, + value_seed: ValueSeed(rng.next_u32()), + }, + b if b < d => Op::DeleteRecord { + collection: coll, + rkey, + }, + b if b < c => Op::Compact, + _ => Op::Checkpoint, + } + }) + .collect(); + OpStream::from_vec(ops) + } +} + +pub struct Lcg { + state: u64, +} + +impl Lcg { + pub fn new(seed: Seed) -> Self { + Self { + state: seed + .0 + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407), + } + } + + pub fn next_u64(&mut self) -> u64 { + self.state = self + .state + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + self.state + } + + pub fn next_u32(&mut self) -> u32 { + (self.next_u64() >> 16) as u32 + } + + pub fn next_usize(&mut self) -> usize { + self.next_u32() as usize + } +} diff --git a/crates/tranquil-store/src/lib.rs b/crates/tranquil-store/src/lib.rs index ffebff7..0abad04 100644 --- a/crates/tranquil-store/src/lib.rs +++ b/crates/tranquil-store/src/lib.rs @@ -6,6 +6,8 @@ pub mod consistency; pub mod eventlog; pub mod fsync_order; #[cfg(any(test, feature = "test-harness"))] +pub mod gauntlet; +#[cfg(any(test, feature = "test-harness"))] mod harness; mod io; pub mod metastore; diff --git a/crates/tranquil-store/src/metastore/handler.rs b/crates/tranquil-store/src/metastore/handler.rs index d651158..b8e98d1 100644 --- a/crates/tranquil-store/src/metastore/handler.rs +++ b/crates/tranquil-store/src/metastore/handler.rs @@ -5964,7 +5964,7 @@ fn handler_loop( None => "unknown panic payload".to_owned(), }, }; - tracing::error!(thread_index, msg, "metastore handler panic (recovered)"); + tracing::error!(thread_index, msg, "recovered metastore handler panic"); } } }); diff --git a/crates/tranquil-store/tests/checkpoint_race.rs b/crates/tranquil-store/tests/checkpoint_race.rs index 18137c5..f902d74 100644 --- a/crates/tranquil-store/tests/checkpoint_race.rs +++ b/crates/tranquil-store/tests/checkpoint_race.rs @@ -4,10 +4,10 @@ use std::io; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use tranquil_store::PostBlockstoreHook; use tranquil_store::blockstore::{ BlockStoreConfig, BlocksSynced, CidBytes, GroupCommitConfig, TranquilBlockStore, }; -use tranquil_store::PostBlockstoreHook; struct SlowHook; @@ -72,9 +72,7 @@ fn write_phase(base: &std::path::Path, use_hook: bool) -> Vec { store .put_blocks_blocking(vec![(cid, vec![shard; 60])]) .unwrap(); - store - .apply_commit_blocking(vec![], vec![cid]) - .unwrap(); + store.apply_commit_blocking(vec![], vec![cid]).unwrap(); targets.push(cid); seq += 1; total_cycles.fetch_add(1, Ordering::Relaxed); @@ -208,7 +206,9 @@ fn __crash_write_phase() { Ok(d) => d, Err(_) => return, }; - let use_hook = std::env::var("CRASH_TEST_HOOK").map(|v| v == "1").unwrap_or(false); + let use_hook = std::env::var("CRASH_TEST_HOOK") + .map(|v| v == "1") + .unwrap_or(false); let base = std::path::Path::new(&dir); let rt = tokio::runtime::Runtime::new().unwrap(); diff --git a/crates/tranquil-store/tests/compaction_liveness.rs b/crates/tranquil-store/tests/compaction_liveness.rs index 3f06b4b..289d80c 100644 --- a/crates/tranquil-store/tests/compaction_liveness.rs +++ b/crates/tranquil-store/tests/compaction_liveness.rs @@ -226,7 +226,7 @@ fn stress_create_delete_restart_cycle_matches_bug_report() { live.insert(seed_a); live.insert(seed_b); - if rng.next_u32() % 2 == 0 { + if rng.next_u32().is_multiple_of(2) { let victim: Option = live.iter().copied().next(); if let Some(v) = victim { store diff --git a/crates/tranquil-store/tests/compaction_restart.rs b/crates/tranquil-store/tests/compaction_restart.rs index 08f38b5..c160793 100644 --- a/crates/tranquil-store/tests/compaction_restart.rs +++ b/crates/tranquil-store/tests/compaction_restart.rs @@ -26,9 +26,14 @@ fn open_full_stack(base_dir: &Path) -> FullStack { let blockstore_data = base_dir.join("blockstore").join("data"); let blockstore_index = base_dir.join("blockstore").join("index"); - [&metastore_dir, &segments_dir, &blockstore_data, &blockstore_index] - .iter() - .for_each(|d| std::fs::create_dir_all(d).unwrap()); + [ + &metastore_dir, + &segments_dir, + &blockstore_data, + &blockstore_index, + ] + .iter() + .for_each(|d| std::fs::create_dir_all(d).unwrap()); let metastore = Metastore::open(&metastore_dir, MetastoreConfig::default()).unwrap(); @@ -59,9 +64,7 @@ fn open_full_stack(base_dir: &Path) -> FullStack { let indexes = metastore.partition(Partition::Indexes).clone(); let event_ops = metastore.event_ops(Arc::clone(&bridge)); - let recovered = event_ops - .recover_metastore_mutations(&indexes) - .unwrap(); + let recovered = event_ops.recover_metastore_mutations(&indexes).unwrap(); if recovered > 0 { eprintln!("replayed {recovered} metastore mutations from eventlog"); } @@ -240,9 +243,7 @@ fn commit_style_decrements() { if round > 0 { let prev_mst = test_cid(6000 + round - 1); - store - .apply_commit_blocking(vec![], vec![prev_mst]) - .unwrap(); + store.apply_commit_blocking(vec![], vec![prev_mst]).unwrap(); } prev_commit = new_commit; @@ -408,8 +409,7 @@ fn multiple_restart_cycles_blockstore() { (0..10u32).for_each(|cycle| { { - let store = - TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); (0..50u32).for_each(|round| { let churn = test_cid(9000 + cycle * 100 + round); diff --git a/crates/tranquil-store/tests/eventlog_properties.rs b/crates/tranquil-store/tests/eventlog_properties.rs index 75d7833..84b72fa 100644 --- a/crates/tranquil-store/tests/eventlog_properties.rs +++ b/crates/tranquil-store/tests/eventlog_properties.rs @@ -579,7 +579,7 @@ fn index_checkpoint_accelerates_recovery() { assert!( reads_with_index < reads_without_index, - "read with index ({reads_with_index} reads) should require fewer reads than without ({reads_without_index} reads)" + "indexed read took {reads_with_index} reads but unindexed took only {reads_without_index}" ); } @@ -654,7 +654,7 @@ fn fsync_ordering_blocks_before_events() { assert_eq!( event_writer.synced_seq(), EventSequence::BEFORE_ALL, - "crash between blockstore sync and eventlog sync must not persist the event (blocks exist, event does not = orphan, not inconsistency)" + "crash between blockstore sync and eventlog sync must leave blocks orphaned rather than persist the event" ); drop(event_writer); diff --git a/crates/tranquil-store/tests/eventlog_retention.rs b/crates/tranquil-store/tests/eventlog_retention.rs index 5251417..d4250d7 100644 --- a/crates/tranquil-store/tests/eventlog_retention.rs +++ b/crates/tranquil-store/tests/eventlog_retention.rs @@ -72,7 +72,7 @@ fn run_retention_at_deletes_sealed_segments_past_cutoff() { ); assert!( segments_after >= 1, - "active segment must remain (got {segments_after})" + "active segment must remain, got {segments_after}" ); } diff --git a/crates/tranquil-store/tests/gauntlet_smoke.rs b/crates/tranquil-store/tests/gauntlet_smoke.rs new file mode 100644 index 0000000..bb93249 --- /dev/null +++ b/crates/tranquil-store/tests/gauntlet_smoke.rs @@ -0,0 +1,111 @@ +use tranquil_store::blockstore::GroupCommitConfig; +use tranquil_store::gauntlet::{ + CollectionName, CompactInterval, Gauntlet, GauntletConfig, InvariantSet, IoBackend, + KeySpaceSize, MaxFileSize, OpCount, OpInterval, OpWeights, RestartPolicy, RunLimits, Scenario, + Seed, ShardCount, SizeDistribution, StoreConfig, ValueBytes, WallMs, WorkloadModel, config_for, + farm, +}; + +#[test] +#[ignore = "long running, 30 seeds of 10k ops each"] +fn smoke_pr_30_seeds() { + let reports = farm::run_many( + |seed| config_for(Scenario::SmokePR, seed), + (0..30).map(Seed), + ); + let failures: Vec = reports + .iter() + .filter(|r| !r.is_clean()) + .map(|r| { + format!( + "seed {}: {} violations\n {}", + r.seed.0, + r.violations.len(), + r.violations + .iter() + .map(|v| format!("{}: {}", v.invariant, v.detail)) + .collect::>() + .join("\n ") + ) + }) + .collect(); + assert!(failures.is_empty(), "{}", failures.join("\n---\n")); +} + +fn fast_sanity_config(seed: Seed) -> GauntletConfig { + GauntletConfig { + seed, + io: IoBackend::Real, + workload: WorkloadModel { + weights: OpWeights { + add: 80, + delete: 0, + compact: 10, + checkpoint: 10, + }, + size_distribution: SizeDistribution::Fixed(ValueBytes(64)), + collections: vec![CollectionName("app.bsky.feed.post".to_string())], + key_space: KeySpaceSize(100), + }, + op_count: OpCount(200), + invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY, + limits: RunLimits { + max_wall_ms: Some(WallMs(30_000)), + }, + restart_policy: RestartPolicy::EveryNOps(OpInterval(80)), + store: StoreConfig { + max_file_size: MaxFileSize(512), + group_commit: GroupCommitConfig { + checkpoint_interval_ms: 50, + checkpoint_write_threshold: 8, + ..GroupCommitConfig::default() + }, + shard_count: ShardCount(1), + compact_every: CompactInterval(5), + }, + } +} + +#[tokio::test] +async fn gauntlet_fast_sanity() { + let report = Gauntlet::new(fast_sanity_config(Seed(7))) + .expect("build gauntlet") + .run() + .await; + assert!( + report.is_clean(), + "violations: {:?}", + report + .violations + .iter() + .map(|v| format!("{}: {}", v.invariant, v.detail)) + .collect::>() + ); + assert!( + report.restarts.0 >= 2, + "expected at least 2 restarts, got {}", + report.restarts.0 + ); + assert_eq!(report.ops_executed.0, 200); +} + +#[tokio::test] +#[ignore = "long running, 100k ops with around 20 restarts"] +async fn mst_restart_churn_single_seed() { + let cfg = config_for(Scenario::MstRestartChurn, Seed(42)); + let report = Gauntlet::new(cfg).expect("build gauntlet").run().await; + assert!( + report.is_clean(), + "violations: {:?}", + report + .violations + .iter() + .map(|v| format!("{}: {}", v.invariant, v.detail)) + .collect::>() + ); + assert!( + report.restarts.0 >= 1, + "PoissonByOps(5000) over 100k ops should fire at least 1 restart, got {}", + report.restarts.0 + ); +} diff --git a/crates/tranquil-store/tests/gc.rs b/crates/tranquil-store/tests/gc.rs index 3ab1bac..cde23e9 100644 --- a/crates/tranquil-store/tests/gc.rs +++ b/crates/tranquil-store/tests/gc.rs @@ -160,7 +160,7 @@ fn collect_dead_blocks_respects_epoch_gating() { .collect(); assert!( all_cids.contains(&cid_a), - "cid_a should be collectible (epoch advanced by subsequent commit)" + "cid_a should be collectible after subsequent commit advanced the epoch" ); }); } diff --git a/crates/tranquil-store/tests/mst_refcount_integrity.rs b/crates/tranquil-store/tests/mst_refcount_integrity.rs index 8421ed7..ad797f9 100644 --- a/crates/tranquil-store/tests/mst_refcount_integrity.rs +++ b/crates/tranquil-store/tests/mst_refcount_integrity.rs @@ -80,8 +80,7 @@ async fn mst_shared_subtrees_survive_incremental_writes_compaction_restart() { let obsolete = compute_obsolete_from_diff(&old_settled, &new_settled, prev_commit).await; - let obsolete_fixed: Vec<[u8; 36]> = - obsolete.iter().map(|c| cid_to_fixed(c)).collect(); + let obsolete_fixed: Vec<[u8; 36]> = obsolete.iter().map(cid_to_fixed).collect(); let s = store.clone(); tokio::task::spawn_blocking(move || { s.apply_commit_blocking(vec![], obsolete_fixed).unwrap(); @@ -115,8 +114,7 @@ async fn mst_shared_subtrees_survive_incremental_writes_compaction_restart() { let obsolete = compute_obsolete_from_diff(&old_settled, &new_settled, prev_commit).await; - let obsolete_fixed: Vec<[u8; 36]> = - obsolete.iter().map(|c| cid_to_fixed(c)).collect(); + let obsolete_fixed: Vec<[u8; 36]> = obsolete.iter().map(cid_to_fixed).collect(); let s = store.clone(); tokio::task::spawn_blocking(move || { s.apply_commit_blocking(vec![], obsolete_fixed).unwrap(); @@ -168,9 +166,7 @@ async fn mst_shared_subtrees_survive_incremental_writes_compaction_restart() { } { - let store = Arc::new( - TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(), - ); + let store = Arc::new(TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap()); let missing: Vec = final_node_cids .iter() diff --git a/crates/tranquil-store/tests/sim_blockstore.rs b/crates/tranquil-store/tests/sim_blockstore.rs index 57db979..1821e23 100644 --- a/crates/tranquil-store/tests/sim_blockstore.rs +++ b/crates/tranquil-store/tests/sim_blockstore.rs @@ -74,9 +74,7 @@ impl SimHarness { let cid = test_cid(seed as u32); let data = vec![seed as u8; data_size]; let loc = writer.append_block(&cid, &data).unwrap(); - hint_writer - .append_hint(&cid, &loc) - .unwrap(); + hint_writer.append_hint(&cid, &loc).unwrap(); (cid, loc) }) .collect(); @@ -538,9 +536,7 @@ fn sim_aggressive_faults_data_integrity() { let cid = test_cid(i as u32); let data = vec![i as u8; 64]; let loc = writer.append_block(&cid, &data).ok()?; - hint_writer - .append_hint(&cid, &loc) - .ok()?; + hint_writer.append_hint(&cid, &loc).ok()?; Some(()) })?;