feat(tranquil-store): beginnings of the gauntlet test suite

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-04-15 16:41:41 +03:00
parent ec273fa814
commit 7f2e83e92f
30 changed files with 1290 additions and 106 deletions

View File

@@ -175,10 +175,7 @@ pub async fn passkey_start(
}
}
async fn passkey_start_discoverable(
state: AppState,
request_id: RequestId,
) -> Response {
async fn passkey_start_discoverable(state: AppState, request_id: RequestId) -> Response {
let (rcr, auth_state) = match state.webauthn_config.start_discoverable_authentication() {
Ok(result) => result,
Err(e) => {
@@ -570,16 +567,13 @@ pub async fn passkey_finish(
Err(response) => return response,
},
None => {
let result = match passkey_finish_discoverable(
&state,
&credential,
&passkey_finish_request_id,
)
.await
{
Ok(result) => result,
Err(response) => return response,
};
let result =
match passkey_finish_discoverable(&state, &credential, &passkey_finish_request_id)
.await
{
Ok(result) => result,
Err(response) => return response,
};
if state
.repos
.oauth

View File

@@ -246,9 +246,8 @@ pub async fn finalize_repo_write(
let obsolete_cids = match original_settled.diff(&new_settled).await {
Ok(diff) => {
let mut obsolete: Vec<Cid> = Vec::with_capacity(
1 + diff.removed_mst_blocks.len() + diff.removed_cids.len(),
);
let mut obsolete: Vec<Cid> =
Vec::with_capacity(1 + diff.removed_mst_blocks.len() + diff.removed_cids.len());
obsolete.push(ctx.current_root_cid);
obsolete.extend(diff.removed_mst_blocks);
obsolete.extend(diff.removed_cids);

View File

@@ -13,12 +13,10 @@ fn run_compaction(store: &tranquil_store::blockstore::TranquilBlockStore) {
.map(|(&fid, _)| fid)
.collect::<Vec<_>>()
.into_iter()
.for_each(|fid| {
match store.compact_file(fid, 0) {
Ok(_) => {}
Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {}
Err(e) => eprintln!("compaction: {e}"),
}
.for_each(|fid| match store.compact_file(fid, 0) {
Ok(_) => {}
Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {}
Err(e) => eprintln!("compaction: {e}"),
});
}
@@ -84,10 +82,7 @@ async fn mst_blocks_survive_full_store_reopen() {
}
let data_dir = store.data_dir().to_path_buf();
let index_dir = data_dir
.parent()
.unwrap()
.join("index");
let index_dir = data_dir.parent().unwrap().join("index");
let store_clone = store.clone();
tokio::task::spawn_blocking(move || {
@@ -107,10 +102,9 @@ async fn mst_blocks_survive_full_store_reopen() {
let head_cid = cid::Cid::try_from(repo_root_str.as_str()).expect("invalid cid");
let car_blocks =
tranquil_pds::scheduled::collect_current_repo_blocks(block_store, &head_cid)
.await
.expect("collect blocks");
let car_blocks = tranquil_pds::scheduled::collect_current_repo_blocks(block_store, &head_cid)
.await
.expect("collect blocks");
let block_count_before = car_blocks.len();
@@ -131,8 +125,8 @@ async fn mst_blocks_survive_full_store_reopen() {
group_commit: tranquil_store::blockstore::GroupCommitConfig::default(),
shard_count: 1,
};
let fresh = tranquil_store::blockstore::TranquilBlockStore::open(config)
.expect("reopen failed");
let fresh =
tranquil_store::blockstore::TranquilBlockStore::open(config).expect("reopen failed");
let missing: Vec<String> = car_blocks
.iter()

View File

@@ -626,7 +626,12 @@ async fn create_app_password_session(
.send()
.await
.expect("Failed to login with app password");
assert_eq!(login_res.status(), StatusCode::OK, "App password login for '{}' failed", name);
assert_eq!(
login_res.status(),
StatusCode::OK,
"App password login for '{}' failed",
name
);
let session: Value = login_res.json().await.unwrap();
let jwt = session["accessJwt"].as_str().unwrap().to_string();
(jwt, scopes_response)
@@ -635,10 +640,7 @@ async fn create_app_password_session(
async fn try_chat_service_auth(client: &reqwest::Client, jwt: &str) -> StatusCode {
let base = base_url().await;
let res = client
.get(format!(
"{}/xrpc/com.atproto.server.getServiceAuth",
base
))
.get(format!("{}/xrpc/com.atproto.server.getServiceAuth", base))
.bearer_auth(jwt)
.query(&[
("aud", "did:web:api.bsky.app"),

View File

@@ -15,7 +15,9 @@ fn test_cid(n: u32) -> Cid {
Cid::new_v1(0x71, mh)
}
async fn compute_obsolete_full_walk<S: jacquard_repo::storage::BlockStore + Sync + Send + 'static>(
async fn compute_obsolete_full_walk<
S: jacquard_repo::storage::BlockStore + Sync + Send + 'static,
>(
old: &Mst<S>,
new: &Mst<S>,
) -> BTreeSet<Cid> {
@@ -34,9 +36,7 @@ async fn compute_obsolete_full_walk<S: jacquard_repo::storage::BlockStore + Sync
.collect()
}
fn compute_obsolete_from_diff(
diff: &jacquard_repo::mst::diff::MstDiff,
) -> BTreeSet<Cid> {
fn compute_obsolete_from_diff(diff: &jacquard_repo::mst::diff::MstDiff) -> BTreeSet<Cid> {
diff.removed_mst_blocks
.iter()
.copied()
@@ -74,12 +74,17 @@ async fn assert_equivalence(
let diff_obsolete = compute_obsolete_from_diff(&diff);
assert_eq!(
full_walk_obsolete, diff_obsolete,
full_walk_obsolete,
diff_obsolete,
"MISMATCH in scenario: {scenario}\n full_walk count: {}\n diff count: {}\n in full_walk but not diff: {:?}\n in diff but not full_walk: {:?}",
full_walk_obsolete.len(),
diff_obsolete.len(),
full_walk_obsolete.difference(&diff_obsolete).collect::<Vec<_>>(),
diff_obsolete.difference(&full_walk_obsolete).collect::<Vec<_>>(),
full_walk_obsolete
.difference(&diff_obsolete)
.collect::<Vec<_>>(),
diff_obsolete
.difference(&full_walk_obsolete)
.collect::<Vec<_>>(),
);
}
@@ -256,7 +261,12 @@ async fn massive_to_empty() {
async fn massive_complete_replacement() {
let old = generate_records("app.bsky.feed.post", 0..1000);
let new_rec = generate_records("app.bsky.feed.post", 1000..2000);
assert_equivalence(&old, &new_rec, "1000 records fully replaced with 1000 different").await;
assert_equivalence(
&old,
&new_rec,
"1000 records fully replaced with 1000 different",
)
.await;
}
#[tokio::test]
@@ -276,7 +286,12 @@ async fn multi_collection_5_collections_500_each() {
];
let old = generate_multi_collection_records(&collections, 500);
let new_rec = apply_scattered_updates(&old, 4, 30000);
assert_equivalence(&old, &new_rec, "5 collections x 500 records - update every 4th").await;
assert_equivalence(
&old,
&new_rec,
"5 collections x 500 records - update every 4th",
)
.await;
}
#[tokio::test]
@@ -294,7 +309,12 @@ async fn multi_collection_wipe_one_collection() {
.filter(|(key, _)| !key.starts_with("app.bsky.feed.repost"))
.cloned()
.collect();
assert_equivalence(&old, &new_rec, "4 collections x 400 - wipe repost collection").await;
assert_equivalence(
&old,
&new_rec,
"4 collections x 400 - wipe repost collection",
)
.await;
}
#[tokio::test]
@@ -313,10 +333,7 @@ async fn multi_collection_keep_only_one() {
#[tokio::test]
async fn multi_collection_add_new_collection() {
let old_collections = [
"app.bsky.feed.like",
"app.bsky.feed.post",
];
let old_collections = ["app.bsky.feed.like", "app.bsky.feed.post"];
let old = generate_multi_collection_records(&old_collections, 500);
let new_rec = append_records(&old, "app.bsky.graph.follow", 0..500, 40000);
assert_equivalence(&old, &new_rec, "2 collections x 500 + add 500 follows").await;
@@ -378,7 +395,12 @@ async fn interleaved_keys_disjoint_ranges() {
let new_rec: Vec<_> = (0..1000u32)
.map(|i| (make_key("app.bsky.feed.post", i * 2 + 1), i + 10000))
.collect();
assert_equivalence(&old, &new_rec, "1000 even-keyed records replaced by 1000 odd-keyed").await;
assert_equivalence(
&old,
&new_rec,
"1000 even-keyed records replaced by 1000 odd-keyed",
)
.await;
}
#[tokio::test]
@@ -426,7 +448,12 @@ async fn many_collections_few_records_each() {
})
.collect();
assert_equivalence(&old, &new_rec, "50 collections x 20 records - delete every 15th, update every 7th").await;
assert_equivalence(
&old,
&new_rec,
"50 collections x 20 records - delete every 15th, update every 7th",
)
.await;
}
#[tokio::test]
@@ -457,7 +484,12 @@ async fn one_to_massive() {
async fn delete_head_and_tail() {
let old = generate_records("app.bsky.feed.post", 0..2000);
let new_rec: Vec<_> = old[200..1800].to_vec();
assert_equivalence(&old, &new_rec, "2000 records - delete first 200 and last 200").await;
assert_equivalence(
&old,
&new_rec,
"2000 records - delete first 200 and last 200",
)
.await;
}
#[tokio::test]
@@ -465,7 +497,12 @@ async fn keep_head_and_tail_only() {
let old = generate_records("app.bsky.feed.post", 0..2000);
let mut new_rec: Vec<_> = old[..100].to_vec();
new_rec.extend_from_slice(&old[1900..]);
assert_equivalence(&old, &new_rec, "2000 records - keep only first 100 and last 100").await;
assert_equivalence(
&old,
&new_rec,
"2000 records - keep only first 100 and last 100",
)
.await;
}
#[tokio::test]
@@ -515,7 +552,12 @@ async fn swiss_cheese_deletions() {
})
.map(|(_, r)| r.clone())
.collect();
assert_equivalence(&old, &new_rec, "1500 records - delete every 3rd chunk of 50").await;
assert_equivalence(
&old,
&new_rec,
"1500 records - delete every 3rd chunk of 50",
)
.await;
}
#[tokio::test]
@@ -529,9 +571,7 @@ async fn mixed_ops_with_key_density_change() {
.filter(|(_, val)| val % 4 != 0)
.cloned()
.collect();
new_rec.extend((0..500u32).map(|i| {
(make_key("app.bsky.feed.post", i * 3 + 1), i + 100000)
}));
new_rec.extend((0..500u32).map(|i| (make_key("app.bsky.feed.post", i * 3 + 1), i + 100000)));
new_rec.sort_by(|(a, _), (b, _)| a.cmp(b));
assert_equivalence(

View File

@@ -164,7 +164,10 @@ impl ScopePermissions {
if self.has_transition_generic && !self.has_transition_chat {
return Err(ScopeError::InsufficientScope {
required: "transition:chat.bsky".to_string(),
message: format!("Chat access requires transition:chat.bsky scope to call {}", lxm),
message: format!(
"Chat access requires transition:chat.bsky scope to call {}",
lxm
),
});
}
}

View File

@@ -14,7 +14,7 @@ parking_lot = { workspace = true }
fjall = "3"
lsm-tree = "3"
flume = "0.11"
tokio = { workspace = true, features = ["sync", "rt"] }
tokio = { workspace = true, features = ["sync", "rt", "time"] }
bytes = "1"
memmap2 = "0.9"
tracing = { workspace = true }
@@ -34,9 +34,10 @@ dashmap = "6"
rayon = "1"
smallvec = "1"
uuid = { workspace = true }
tempfile = { version = "3", optional = true }
[features]
test-harness = []
test-harness = ["dep:tempfile"]
[dev-dependencies]
tranquil-store = { path = ".", features = ["test-harness"] }

View File

@@ -1202,6 +1202,15 @@ impl BlockIndex {
self.table.read().contains_live(cid)
}
pub fn live_entries_snapshot(&self) -> Vec<([u8; CID_SIZE], RefCount)> {
self.table
.read()
.iter()
.filter(|s| !s.refcount.is_zero())
.map(|s| (s.cid, s.refcount))
.collect()
}
pub fn batch_put(
&self,
entries: &[([u8; CID_SIZE], BlockLocation)],
@@ -1222,7 +1231,14 @@ impl BlockIndex {
now: WallClockMs,
position_update: PositionUpdate<'_>,
) -> Result<(), BlockIndexError> {
self.batch_put_inner(entries, decrements, cursor, epoch, now, Some(position_update))
self.batch_put_inner(
entries,
decrements,
cursor,
epoch,
now,
Some(position_update),
)
}
fn batch_put_inner(

View File

@@ -57,10 +57,8 @@ fn write_hint_record<S: StorageIO>(
fn encode_location_fields(record: &mut [u8; HINT_RECORD_SIZE], loc: &BlockLocation) {
record[FIELD_A_OFFSET..FIELD_A_OFFSET + 4].copy_from_slice(&loc.file_id.raw().to_le_bytes());
record[FIELD_A_OFFSET + 4..FIELD_A_OFFSET + 8]
.copy_from_slice(&loc.length.raw().to_le_bytes());
record[FIELD_B_OFFSET..FIELD_B_OFFSET + 8]
.copy_from_slice(&loc.offset.raw().to_le_bytes());
record[FIELD_A_OFFSET + 4..FIELD_A_OFFSET + 8].copy_from_slice(&loc.length.raw().to_le_bytes());
record[FIELD_B_OFFSET..FIELD_B_OFFSET + 8].copy_from_slice(&loc.offset.raw().to_le_bytes());
}
pub(crate) fn encode_hint_record<S: StorageIO>(
@@ -841,7 +839,11 @@ mod tests {
let offset = BlockOffset::new(1024);
let length = BlockLength::new(256);
let loc = BlockLocation { file_id, offset, length };
let loc = BlockLocation {
file_id,
offset,
length,
};
encode_hint_record(&sim, fd, HintOffset::new(0), &cid, &loc).unwrap();
let file_size = sim.file_size(fd).unwrap();

View File

@@ -552,7 +552,7 @@ fn decode_mmap_event(
segment = %segment_id,
offset = raw,
file_size,
"decode offset past file size (corrupt index?)"
"decode offset past file size, index likely corrupt"
);
return Ok(MmapDecodeResult::Corrupted);
}

View File

@@ -305,7 +305,7 @@ impl<S: StorageIO> EventLogWriter<S> {
match self.build_sidecar_for_segment(old_id) {
Ok(()) => {}
Err(e) => warn!(segment = %old_id, error = %e, "sidecar build failed (non-fatal)"),
Err(e) => warn!(segment = %old_id, error = %e, "non-fatal sidecar build failure"),
}
let (new_id, new_fd) = self.manager.prepare_rotation(old_id)?;

View File

@@ -0,0 +1,41 @@
use std::cell::RefCell;
use rayon::prelude::*;
use tokio::runtime::Runtime;
use super::op::Seed;
use super::runner::{Gauntlet, GauntletConfig, GauntletReport};
thread_local! {
static RUNTIME: RefCell<Option<Runtime>> = const { RefCell::new(None) };
}
fn with_runtime<R>(f: impl FnOnce(&Runtime) -> R) -> R {
RUNTIME.with(|cell| {
let mut slot = cell.borrow_mut();
if slot.is_none() {
*slot = Some(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("build rt"),
);
}
f(slot.as_ref().expect("runtime present"))
})
}
pub fn run_many<F>(make_config: F, seeds: impl IntoIterator<Item = Seed>) -> Vec<GauntletReport>
where
F: Fn(Seed) -> GauntletConfig + Sync + Send,
{
let seeds: Vec<Seed> = seeds.into_iter().collect();
seeds
.into_par_iter()
.map(|s| {
let cfg = make_config(s);
let gauntlet = Gauntlet::new(cfg).expect("build gauntlet");
with_runtime(|rt| rt.block_on(gauntlet.run()))
})
.collect()
}

View File

@@ -0,0 +1,141 @@
use std::collections::{HashMap, HashSet};
use super::oracle::Oracle;
use crate::blockstore::{CidBytes, TranquilBlockStore};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InvariantSet(u32);
impl InvariantSet {
pub const EMPTY: Self = Self(0);
pub const REFCOUNT_CONSERVATION: Self = Self(1 << 0);
pub const REACHABILITY: Self = Self(1 << 1);
const ALL_KNOWN: u32 = Self::REFCOUNT_CONSERVATION.0 | Self::REACHABILITY.0;
pub const fn contains(self, other: Self) -> bool {
(self.0 & other.0) == other.0
}
pub const fn union(self, other: Self) -> Self {
Self(self.0 | other.0)
}
pub const fn unknown_bits(self) -> u32 {
self.0 & !Self::ALL_KNOWN
}
}
impl std::ops::BitOr for InvariantSet {
type Output = Self;
fn bitor(self, rhs: Self) -> Self {
self.union(rhs)
}
}
#[derive(Debug)]
pub struct InvariantViolation {
pub invariant: &'static str,
pub detail: String,
}
pub trait Invariant {
fn name(&self) -> &'static str;
fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation>;
}
pub struct RefcountConservation;
impl Invariant for RefcountConservation {
fn name(&self) -> &'static str {
"RefcountConservation"
}
fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation> {
let live: Vec<(String, CidBytes)> = oracle.live_cids_labeled();
let live_set: HashSet<CidBytes> = live.iter().map(|(_, c)| *c).collect();
let index: HashMap<CidBytes, u32> = store
.block_index()
.live_entries_snapshot()
.into_iter()
.map(|(c, r)| (c, r.raw()))
.collect();
let forward: Vec<String> = live
.iter()
.filter_map(|(label, cid)| match index.get(cid) {
Some(&r) if r >= 1 => None,
Some(&r) => Some(format!("{label}: refcount {r}")),
None => Some(format!("{label}: missing from index")),
})
.collect();
let inverse: Vec<String> = index
.iter()
.filter(|(cid, _)| !live_set.contains(*cid))
.map(|(cid, r)| format!("orphan cid {} refcount {}", hex_short(cid), r))
.collect();
let violations: Vec<String> = forward.into_iter().chain(inverse).collect();
if violations.is_empty() {
Ok(())
} else {
Err(InvariantViolation {
invariant: "RefcountConservation",
detail: violations.join("; "),
})
}
}
}
pub struct Reachability;
impl Invariant for Reachability {
fn name(&self) -> &'static str {
"Reachability"
}
fn check(&self, store: &TranquilBlockStore, oracle: &Oracle) -> Result<(), InvariantViolation> {
let violations: Vec<String> = oracle
.live_cids_labeled()
.into_iter()
.filter_map(|(label, fixed)| match store.get_block_sync(&fixed) {
Ok(Some(_)) => None,
Ok(None) => Some(format!("{label}: missing")),
Err(e) => Some(format!("{label}: read error {e}")),
})
.collect();
if violations.is_empty() {
Ok(())
} else {
Err(InvariantViolation {
invariant: "Reachability",
detail: violations.join("; "),
})
}
}
}
fn hex_short(cid: &CidBytes) -> String {
let tail = &cid[cid.len() - 6..];
tail.iter().map(|b| format!("{b:02x}")).collect()
}
pub fn invariants_for(set: InvariantSet) -> Vec<Box<dyn Invariant>> {
let unknown = set.unknown_bits();
assert!(
unknown == 0,
"invariants_for: unknown InvariantSet bits 0x{unknown:x}; all bits must map to an impl"
);
[
(
InvariantSet::REFCOUNT_CONSERVATION,
Box::new(RefcountConservation) as Box<dyn Invariant>,
),
(InvariantSet::REACHABILITY, Box::new(Reachability)),
]
.into_iter()
.filter_map(|(flag, inv)| set.contains(flag).then_some(inv))
.collect()
}

View File

@@ -0,0 +1,20 @@
pub mod farm;
pub mod invariants;
pub mod op;
pub mod oracle;
pub mod runner;
pub mod scenarios;
pub mod workload;
pub use invariants::{Invariant, InvariantSet, InvariantViolation, invariants_for};
pub use op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed};
pub use oracle::Oracle;
pub use runner::{
CompactInterval, Gauntlet, GauntletBuildError, GauntletConfig, GauntletReport, IoBackend,
MaxFileSize, OpIndex, OpInterval, OpsExecuted, RestartCount, RestartPolicy, RunLimits,
ShardCount, StoreConfig, WallMs,
};
pub use scenarios::{Scenario, config_for};
pub use workload::{
ByteRange, KeySpaceSize, OpCount, OpWeights, SizeDistribution, ValueBytes, WorkloadModel,
};

View File

@@ -0,0 +1,60 @@
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Seed(pub u64);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CollectionName(pub String);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RecordKey(pub String);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ValueSeed(pub u32);
#[derive(Debug, Clone)]
pub enum Op {
AddRecord {
collection: CollectionName,
rkey: RecordKey,
value_seed: ValueSeed,
},
DeleteRecord {
collection: CollectionName,
rkey: RecordKey,
},
Compact,
Checkpoint,
}
#[derive(Debug, Clone)]
pub struct OpStream {
ops: Vec<Op>,
}
impl OpStream {
pub fn from_vec(ops: Vec<Op>) -> Self {
Self { ops }
}
pub fn into_vec(self) -> Vec<Op> {
self.ops
}
pub fn iter(&self) -> impl Iterator<Item = &Op> {
self.ops.iter()
}
pub fn len(&self) -> usize {
self.ops.len()
}
pub fn is_empty(&self) -> bool {
self.ops.is_empty()
}
pub fn shrink(&self) -> Option<OpStream> {
(self.ops.len() >= 2).then(|| {
let half = self.ops.len() / 2;
OpStream::from_vec(self.ops[..half].to_vec())
})
}
}

View File

@@ -0,0 +1,75 @@
use std::collections::HashMap;
use cid::Cid;
use super::op::{CollectionName, RecordKey};
use crate::blockstore::CidBytes;
#[derive(Debug, Default)]
pub struct Oracle {
live: HashMap<(CollectionName, RecordKey), CidBytes>,
current_root: Option<Cid>,
mst_node_cids: Vec<Cid>,
}
impl Oracle {
pub fn new() -> Self {
Self::default()
}
pub fn add(
&mut self,
coll: CollectionName,
rkey: RecordKey,
record_cid: CidBytes,
) -> Option<CidBytes> {
self.live.insert((coll, rkey), record_cid)
}
pub fn delete(&mut self, coll: &CollectionName, rkey: &RecordKey) -> Option<CidBytes> {
self.live.remove(&(coll.clone(), rkey.clone()))
}
pub fn set_root(&mut self, root: Cid) {
self.current_root = Some(root);
}
pub fn root(&self) -> Option<Cid> {
self.current_root
}
pub fn set_node_cids(&mut self, cids: Vec<Cid>) {
self.mst_node_cids = cids;
}
pub fn mst_node_cids(&self) -> &[Cid] {
&self.mst_node_cids
}
pub fn live_records(&self) -> impl Iterator<Item = (&CollectionName, &RecordKey, &CidBytes)> {
self.live.iter().map(|((c, r), v)| (c, r, v))
}
pub fn live_count(&self) -> usize {
self.live.len()
}
pub fn live_cids_labeled(&self) -> Vec<(String, CidBytes)> {
let nodes = self
.mst_node_cids
.iter()
.map(|cid| (format!("mst {cid}"), cid_to_fixed(cid)));
let records = self
.live_records()
.map(|(c, r, v)| (format!("record {}/{}", c.0, r.0), *v));
nodes.chain(records).collect()
}
}
pub(super) fn cid_to_fixed(cid: &Cid) -> CidBytes {
let bytes = cid.to_bytes();
debug_assert_eq!(bytes.len(), 36, "expected 36 byte CIDv1+sha256");
let mut arr = [0u8; 36];
arr.copy_from_slice(&bytes[..36]);
arr
}

View File

@@ -0,0 +1,438 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use cid::Cid;
use jacquard_repo::mst::Mst;
use jacquard_repo::storage::BlockStore;
use super::invariants::{InvariantSet, InvariantViolation, invariants_for};
use super::op::{Op, OpStream, Seed, ValueSeed};
use super::oracle::{Oracle, cid_to_fixed};
use super::workload::{Lcg, OpCount, SizeDistribution, ValueBytes, WorkloadModel};
use crate::blockstore::{
BlockStoreConfig, CidBytes, CompactionError, GroupCommitConfig, TranquilBlockStore,
};
#[derive(Debug, Clone, Copy)]
pub enum IoBackend {
Real,
Simulated,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct OpInterval(pub usize);
#[derive(Debug, Clone, Copy)]
pub enum RestartPolicy {
Never,
EveryNOps(OpInterval),
PoissonByOps(OpInterval),
}
#[derive(Debug, Clone, Copy)]
pub struct WallMs(pub u64);
#[derive(Debug, Clone, Copy)]
pub struct RunLimits {
pub max_wall_ms: Option<WallMs>,
}
#[derive(Debug, Clone, Copy)]
pub struct MaxFileSize(pub u64);
#[derive(Debug, Clone, Copy)]
pub struct ShardCount(pub u8);
#[derive(Debug, Clone, Copy)]
pub struct CompactInterval(pub u32);
#[derive(Debug, Clone)]
pub struct StoreConfig {
pub max_file_size: MaxFileSize,
pub group_commit: GroupCommitConfig,
pub shard_count: ShardCount,
pub compact_every: CompactInterval,
}
#[derive(Debug, Clone)]
pub struct GauntletConfig {
pub seed: Seed,
pub io: IoBackend,
pub workload: WorkloadModel,
pub op_count: OpCount,
pub invariants: InvariantSet,
pub limits: RunLimits,
pub restart_policy: RestartPolicy,
pub store: StoreConfig,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct OpsExecuted(pub usize);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RestartCount(pub usize);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct OpIndex(pub usize);
#[derive(Debug)]
pub struct GauntletReport {
pub seed: Seed,
pub ops_executed: OpsExecuted,
pub restarts: RestartCount,
pub violations: Vec<InvariantViolation>,
}
impl GauntletReport {
pub fn is_clean(&self) -> bool {
self.violations.is_empty()
}
}
#[derive(Debug, thiserror::Error)]
enum OpError {
#[error("put record: {0}")]
PutRecord(String),
#[error("mst add: {0}")]
MstAdd(String),
#[error("mst delete: {0}")]
MstDelete(String),
#[error("mst persist: {0}")]
MstPersist(String),
#[error("mst diff: {0}")]
MstDiff(String),
#[error("apply commit: {0}")]
ApplyCommit(String),
#[error("compact_file: {0}")]
CompactFile(String),
#[error("join: {0}")]
Join(String),
}
pub struct Gauntlet {
config: GauntletConfig,
}
#[derive(Debug, thiserror::Error)]
pub enum GauntletBuildError {
#[error("IoBackend::Simulated not wired yet")]
UnsupportedIoBackend,
}
impl Gauntlet {
pub fn new(config: GauntletConfig) -> Result<Self, GauntletBuildError> {
match config.io {
IoBackend::Real => Ok(Self { config }),
IoBackend::Simulated => Err(GauntletBuildError::UnsupportedIoBackend),
}
}
pub async fn run(self) -> GauntletReport {
let deadline = self
.config
.limits
.max_wall_ms
.map(|WallMs(ms)| Duration::from_millis(ms));
let seed = self.config.seed;
let ops_counter = Arc::new(AtomicUsize::new(0));
let restarts_counter = Arc::new(AtomicUsize::new(0));
let fut = run_real_inner(self.config, ops_counter.clone(), restarts_counter.clone());
match deadline {
Some(d) => match tokio::time::timeout(d, fut).await {
Ok(r) => r,
Err(_) => GauntletReport {
seed,
ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)),
restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)),
violations: vec![InvariantViolation {
invariant: "WallClockBudget",
detail: format!("exceeded max_wall_ms ({} ms)", d.as_millis()),
}],
},
},
None => fut.await,
}
}
}
async fn run_real_inner(
config: GauntletConfig,
ops_counter: Arc<AtomicUsize>,
restarts_counter: Arc<AtomicUsize>,
) -> GauntletReport {
let dir = tempfile::TempDir::new().expect("tempdir");
let op_stream: OpStream = config.workload.generate(config.seed, config.op_count);
let mut oracle = Oracle::new();
let mut violations: Vec<InvariantViolation> = Vec::new();
let mut store = Arc::new(
TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)).expect("open store"),
);
let mut root: Option<Cid> = None;
let mut restart_rng = Lcg::new(Seed(config.seed.0 ^ 0xA5A5_A5A5_A5A5_A5A5));
let mut halt_ops = false;
for (idx, op) in op_stream.iter().enumerate() {
if halt_ops {
break;
}
match apply_op(&store, &mut root, &mut oracle, op, &config.workload).await {
Ok(()) => {}
Err(e) => {
violations.push(InvariantViolation {
invariant: "OpExecution",
detail: format!("op {idx}: {e}"),
});
ops_counter.store(idx + 1, Ordering::Relaxed);
halt_ops = true;
continue;
}
}
ops_counter.store(idx + 1, Ordering::Relaxed);
if should_restart(config.restart_policy, OpIndex(idx), &mut restart_rng) {
drop(store);
store = Arc::new(
TranquilBlockStore::open(blockstore_config(dir.path(), &config.store))
.expect("reopen store"),
);
let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1;
if let Err(e) = refresh_oracle_graph(&store, &mut oracle, root).await {
violations.push(InvariantViolation {
invariant: "OpExecution",
detail: format!("refresh after restart {n}: {e}"),
});
halt_ops = true;
continue;
}
violations.extend(check_all(&store, &oracle, config.invariants));
if !violations.is_empty() {
halt_ops = true;
}
}
}
match refresh_oracle_graph(&store, &mut oracle, root).await {
Ok(()) => violations.extend(check_all(&store, &oracle, config.invariants)),
Err(e) => violations.push(InvariantViolation {
invariant: "OpExecution",
detail: format!("refresh at end: {e}"),
}),
}
GauntletReport {
seed: config.seed,
ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)),
restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)),
violations,
}
}
fn check_all(
store: &TranquilBlockStore,
oracle: &Oracle,
set: InvariantSet,
) -> Vec<InvariantViolation> {
invariants_for(set)
.into_iter()
.filter_map(|inv| inv.check(store, oracle).err())
.collect()
}
async fn refresh_oracle_graph(
store: &Arc<TranquilBlockStore>,
oracle: &mut Oracle,
root: Option<Cid>,
) -> Result<(), String> {
match root {
None => {
oracle.set_node_cids(Vec::new());
Ok(())
}
Some(r) => {
let settled = Mst::load(store.clone(), r, None);
let cids = settled
.collect_node_cids()
.await
.map_err(|e| format!("collect_node_cids: {e}"))?;
oracle.set_root(r);
oracle.set_node_cids(cids);
Ok(())
}
}
}
fn should_restart(policy: RestartPolicy, idx: OpIndex, rng: &mut Lcg) -> bool {
match policy {
RestartPolicy::Never => false,
RestartPolicy::EveryNOps(OpInterval(n)) => n > 0 && (idx.0 + 1).is_multiple_of(n),
RestartPolicy::PoissonByOps(OpInterval(n)) => {
if n == 0 {
false
} else {
rng.next_u64().is_multiple_of(n as u64)
}
}
}
}
fn blockstore_config(dir: &std::path::Path, s: &StoreConfig) -> BlockStoreConfig {
BlockStoreConfig {
data_dir: dir.join("data"),
index_dir: dir.join("index"),
max_file_size: s.max_file_size.0,
group_commit: s.group_commit.clone(),
shard_count: s.shard_count.0,
}
}
fn make_record_bytes(value_seed: ValueSeed, dist: SizeDistribution) -> Vec<u8> {
let raw = value_seed.0;
let target_len: usize = match dist {
SizeDistribution::Fixed(ValueBytes(n)) => n as usize,
SizeDistribution::Uniform(range) => {
let ValueBytes(lo) = range.min();
let ValueBytes(hi) = range.max();
let span = u64::from(hi.saturating_sub(lo)).max(1);
let rng_state = u64::from(raw);
(lo as usize) + (rng_state % span) as usize
}
};
serde_ipld_dagcbor::to_vec(&serde_json::json!({
"$type": "app.bsky.feed.post",
"text": format!("record-{raw}"),
"createdAt": "2026-01-01T00:00:00Z",
"pad": "x".repeat(target_len.saturating_sub(64)),
}))
.expect("encode record")
}
async fn apply_op(
store: &Arc<TranquilBlockStore>,
root: &mut Option<Cid>,
oracle: &mut Oracle,
op: &Op,
workload: &WorkloadModel,
) -> Result<(), OpError> {
match op {
Op::AddRecord {
collection,
rkey,
value_seed,
} => {
let record_bytes = make_record_bytes(*value_seed, workload.size_distribution);
let record_cid = store
.put(&record_bytes)
.await
.map_err(|e| OpError::PutRecord(e.to_string()))?;
let key = format!("{}/{}", collection.0, rkey.0);
let loaded = match *root {
None => Mst::new(store.clone()),
Some(r) => Mst::load(store.clone(), r, None),
};
let updated = loaded
.add(&key, record_cid)
.await
.map_err(|e| OpError::MstAdd(e.to_string()))?;
let new_root = updated
.persist()
.await
.map_err(|e| OpError::MstPersist(e.to_string()))?;
if let Some(old_root) = *root {
apply_mst_diff(store, old_root, new_root).await?;
}
*root = Some(new_root);
oracle.add(collection.clone(), rkey.clone(), cid_to_fixed(&record_cid));
Ok(())
}
Op::DeleteRecord { collection, rkey } => {
let Some(old_root) = *root else { return Ok(()) };
if oracle.delete(collection, rkey).is_none() {
return Ok(());
}
let key = format!("{}/{}", collection.0, rkey.0);
let loaded = Mst::load(store.clone(), old_root, None);
let updated = loaded
.delete(&key)
.await
.map_err(|e| OpError::MstDelete(e.to_string()))?;
let new_root = updated
.persist()
.await
.map_err(|e| OpError::MstPersist(e.to_string()))?;
apply_mst_diff(store, old_root, new_root).await?;
*root = Some(new_root);
Ok(())
}
Op::Compact => {
let s = store.clone();
tokio::task::spawn_blocking(move || compact_by_liveness(&s))
.await
.map_err(|e| OpError::Join(e.to_string()))?
}
Op::Checkpoint => {
let s = store.clone();
tokio::task::spawn_blocking(move || {
s.apply_commit_blocking(vec![], vec![])
.map_err(|e| e.to_string())
})
.await
.map_err(|e| OpError::Join(e.to_string()))?
.map_err(OpError::ApplyCommit)
}
}
}
async fn apply_mst_diff(
store: &Arc<TranquilBlockStore>,
old_root: Cid,
new_root: Cid,
) -> Result<(), OpError> {
let old_m = Mst::load(store.clone(), old_root, None);
let new_m = Mst::load(store.clone(), new_root, None);
let diff = old_m
.diff(&new_m)
.await
.map_err(|e| OpError::MstDiff(e.to_string()))?;
let obsolete: Vec<CidBytes> = diff
.removed_mst_blocks
.into_iter()
.chain(diff.removed_cids.into_iter())
.map(|c| cid_to_fixed(&c))
.collect();
let s = store.clone();
tokio::task::spawn_blocking(move || {
s.apply_commit_blocking(vec![], obsolete)
.map_err(|e| e.to_string())
})
.await
.map_err(|e| OpError::Join(e.to_string()))?
.map_err(OpError::ApplyCommit)
}
const COMPACT_LIVENESS_CEILING: f64 = 0.99;
fn compact_by_liveness(store: &TranquilBlockStore) -> Result<(), OpError> {
let liveness = match store.compaction_liveness(0) {
Ok(l) => l,
Err(_) => return Ok(()),
};
let targets: Vec<_> = liveness
.iter()
.filter(|(_, info)| info.total_blocks > 0 && info.ratio() < COMPACT_LIVENESS_CEILING)
.map(|(&fid, _)| fid)
.collect();
targets
.into_iter()
.try_for_each(|fid| match store.compact_file(fid, 0) {
Ok(_) => Ok(()),
Err(CompactionError::ActiveFileCannotBeCompacted) => Ok(()),
Err(e) => Err(OpError::CompactFile(format!("{fid}: {e}"))),
})
}

View File

@@ -0,0 +1,120 @@
use super::invariants::InvariantSet;
use super::op::{CollectionName, Seed};
use super::runner::{
CompactInterval, GauntletConfig, IoBackend, MaxFileSize, OpInterval, RestartPolicy, RunLimits,
ShardCount, StoreConfig, WallMs,
};
use super::workload::{
KeySpaceSize, OpCount, OpWeights, SizeDistribution, ValueBytes, WorkloadModel,
};
use crate::blockstore::GroupCommitConfig;
#[derive(Debug, Clone, Copy)]
pub enum Scenario {
SmokePR,
MstChurn,
MstRestartChurn,
}
pub fn config_for(scenario: Scenario, seed: Seed) -> GauntletConfig {
match scenario {
Scenario::SmokePR => smoke_pr(seed),
Scenario::MstChurn => mst_churn(seed),
Scenario::MstRestartChurn => mst_restart_churn(seed),
}
}
fn default_collections() -> Vec<CollectionName> {
vec![
CollectionName("app.bsky.feed.post".to_string()),
CollectionName("app.bsky.feed.like".to_string()),
]
}
fn tiny_store() -> StoreConfig {
StoreConfig {
max_file_size: MaxFileSize(300),
group_commit: GroupCommitConfig {
checkpoint_interval_ms: 100,
checkpoint_write_threshold: 10,
..GroupCommitConfig::default()
},
shard_count: ShardCount(1),
compact_every: CompactInterval(5),
}
}
fn smoke_pr(seed: Seed) -> GauntletConfig {
GauntletConfig {
seed,
io: IoBackend::Real,
workload: WorkloadModel {
weights: OpWeights {
add: 80,
delete: 0,
compact: 10,
checkpoint: 10,
},
size_distribution: SizeDistribution::Fixed(ValueBytes(64)),
collections: default_collections(),
key_space: KeySpaceSize(200),
},
op_count: OpCount(10_000),
invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY,
limits: RunLimits {
max_wall_ms: Some(WallMs(60_000)),
},
restart_policy: RestartPolicy::EveryNOps(OpInterval(2_000)),
store: tiny_store(),
}
}
fn mst_churn(seed: Seed) -> GauntletConfig {
GauntletConfig {
seed,
io: IoBackend::Real,
workload: WorkloadModel {
weights: OpWeights {
add: 85,
delete: 0,
compact: 10,
checkpoint: 5,
},
size_distribution: SizeDistribution::Fixed(ValueBytes(64)),
collections: default_collections(),
key_space: KeySpaceSize(2_000),
},
op_count: OpCount(100_000),
invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY,
limits: RunLimits {
max_wall_ms: Some(WallMs(600_000)),
},
restart_policy: RestartPolicy::Never,
store: tiny_store(),
}
}
fn mst_restart_churn(seed: Seed) -> GauntletConfig {
GauntletConfig {
seed,
io: IoBackend::Real,
workload: WorkloadModel {
weights: OpWeights {
add: 85,
delete: 0,
compact: 10,
checkpoint: 5,
},
size_distribution: SizeDistribution::Fixed(ValueBytes(64)),
collections: default_collections(),
key_space: KeySpaceSize(2_000),
},
op_count: OpCount(100_000),
invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY,
limits: RunLimits {
max_wall_ms: Some(WallMs(600_000)),
},
restart_policy: RestartPolicy::PoissonByOps(OpInterval(5_000)),
store: tiny_store(),
}
}

View File

@@ -0,0 +1,133 @@
use super::op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ValueBytes(pub u32);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct KeySpaceSize(pub u32);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct OpCount(pub usize);
#[derive(Debug, Clone, Copy)]
pub struct OpWeights {
pub add: u32,
pub delete: u32,
pub compact: u32,
pub checkpoint: u32,
}
impl OpWeights {
pub const fn total(&self) -> u32 {
self.add + self.delete + self.compact + self.checkpoint
}
}
#[derive(Debug, Clone, Copy)]
pub struct ByteRange {
min: ValueBytes,
max: ValueBytes,
}
impl ByteRange {
pub fn new(min: ValueBytes, max: ValueBytes) -> Result<Self, String> {
if max.0 < min.0 {
Err(format!("ByteRange: max {} < min {}", max.0, min.0))
} else {
Ok(Self { min, max })
}
}
pub fn min(&self) -> ValueBytes {
self.min
}
pub fn max(&self) -> ValueBytes {
self.max
}
}
#[derive(Debug, Clone, Copy)]
pub enum SizeDistribution {
Fixed(ValueBytes),
Uniform(ByteRange),
}
#[derive(Debug, Clone)]
pub struct WorkloadModel {
pub weights: OpWeights,
pub size_distribution: SizeDistribution,
pub collections: Vec<CollectionName>,
pub key_space: KeySpaceSize,
}
impl WorkloadModel {
pub fn generate(&self, seed: Seed, op_count: OpCount) -> OpStream {
let mut rng = Lcg::new(seed);
let total = self.weights.total();
assert!(total > 0, "workload weights must sum to > 0");
assert!(
!self.collections.is_empty(),
"workload needs at least 1 collection"
);
let ops: Vec<Op> = (0..op_count.0)
.map(|_| {
let bucket = rng.next_u32() % total;
let coll = self.collections[rng.next_usize() % self.collections.len()].clone();
let rkey = RecordKey(format!("{:06}", rng.next_u32() % self.key_space.0.max(1)));
let (a, d, c) = (
self.weights.add,
self.weights.add + self.weights.delete,
self.weights.add + self.weights.delete + self.weights.compact,
);
match bucket {
b if b < a => Op::AddRecord {
collection: coll,
rkey,
value_seed: ValueSeed(rng.next_u32()),
},
b if b < d => Op::DeleteRecord {
collection: coll,
rkey,
},
b if b < c => Op::Compact,
_ => Op::Checkpoint,
}
})
.collect();
OpStream::from_vec(ops)
}
}
pub struct Lcg {
state: u64,
}
impl Lcg {
pub fn new(seed: Seed) -> Self {
Self {
state: seed
.0
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407),
}
}
pub fn next_u64(&mut self) -> u64 {
self.state = self
.state
.wrapping_mul(6364136223846793005)
.wrapping_add(1442695040888963407);
self.state
}
pub fn next_u32(&mut self) -> u32 {
(self.next_u64() >> 16) as u32
}
pub fn next_usize(&mut self) -> usize {
self.next_u32() as usize
}
}

View File

@@ -6,6 +6,8 @@ pub mod consistency;
pub mod eventlog;
pub mod fsync_order;
#[cfg(any(test, feature = "test-harness"))]
pub mod gauntlet;
#[cfg(any(test, feature = "test-harness"))]
mod harness;
mod io;
pub mod metastore;

View File

@@ -5964,7 +5964,7 @@ fn handler_loop<S: StorageIO + 'static>(
None => "unknown panic payload".to_owned(),
},
};
tracing::error!(thread_index, msg, "metastore handler panic (recovered)");
tracing::error!(thread_index, msg, "recovered metastore handler panic");
}
}
});

View File

@@ -4,10 +4,10 @@ use std::io;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tranquil_store::PostBlockstoreHook;
use tranquil_store::blockstore::{
BlockStoreConfig, BlocksSynced, CidBytes, GroupCommitConfig, TranquilBlockStore,
};
use tranquil_store::PostBlockstoreHook;
struct SlowHook;
@@ -72,9 +72,7 @@ fn write_phase(base: &std::path::Path, use_hook: bool) -> Vec<CidBytes> {
store
.put_blocks_blocking(vec![(cid, vec![shard; 60])])
.unwrap();
store
.apply_commit_blocking(vec![], vec![cid])
.unwrap();
store.apply_commit_blocking(vec![], vec![cid]).unwrap();
targets.push(cid);
seq += 1;
total_cycles.fetch_add(1, Ordering::Relaxed);
@@ -208,7 +206,9 @@ fn __crash_write_phase() {
Ok(d) => d,
Err(_) => return,
};
let use_hook = std::env::var("CRASH_TEST_HOOK").map(|v| v == "1").unwrap_or(false);
let use_hook = std::env::var("CRASH_TEST_HOOK")
.map(|v| v == "1")
.unwrap_or(false);
let base = std::path::Path::new(&dir);
let rt = tokio::runtime::Runtime::new().unwrap();

View File

@@ -226,7 +226,7 @@ fn stress_create_delete_restart_cycle_matches_bug_report() {
live.insert(seed_a);
live.insert(seed_b);
if rng.next_u32() % 2 == 0 {
if rng.next_u32().is_multiple_of(2) {
let victim: Option<u32> = live.iter().copied().next();
if let Some(v) = victim {
store

View File

@@ -26,9 +26,14 @@ fn open_full_stack(base_dir: &Path) -> FullStack {
let blockstore_data = base_dir.join("blockstore").join("data");
let blockstore_index = base_dir.join("blockstore").join("index");
[&metastore_dir, &segments_dir, &blockstore_data, &blockstore_index]
.iter()
.for_each(|d| std::fs::create_dir_all(d).unwrap());
[
&metastore_dir,
&segments_dir,
&blockstore_data,
&blockstore_index,
]
.iter()
.for_each(|d| std::fs::create_dir_all(d).unwrap());
let metastore = Metastore::open(&metastore_dir, MetastoreConfig::default()).unwrap();
@@ -59,9 +64,7 @@ fn open_full_stack(base_dir: &Path) -> FullStack {
let indexes = metastore.partition(Partition::Indexes).clone();
let event_ops = metastore.event_ops(Arc::clone(&bridge));
let recovered = event_ops
.recover_metastore_mutations(&indexes)
.unwrap();
let recovered = event_ops.recover_metastore_mutations(&indexes).unwrap();
if recovered > 0 {
eprintln!("replayed {recovered} metastore mutations from eventlog");
}
@@ -240,9 +243,7 @@ fn commit_style_decrements() {
if round > 0 {
let prev_mst = test_cid(6000 + round - 1);
store
.apply_commit_blocking(vec![], vec![prev_mst])
.unwrap();
store.apply_commit_blocking(vec![], vec![prev_mst]).unwrap();
}
prev_commit = new_commit;
@@ -408,8 +409,7 @@ fn multiple_restart_cycles_blockstore() {
(0..10u32).for_each(|cycle| {
{
let store =
TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
(0..50u32).for_each(|round| {
let churn = test_cid(9000 + cycle * 100 + round);

View File

@@ -579,7 +579,7 @@ fn index_checkpoint_accelerates_recovery() {
assert!(
reads_with_index < reads_without_index,
"read with index ({reads_with_index} reads) should require fewer reads than without ({reads_without_index} reads)"
"indexed read took {reads_with_index} reads but unindexed took only {reads_without_index}"
);
}
@@ -654,7 +654,7 @@ fn fsync_ordering_blocks_before_events() {
assert_eq!(
event_writer.synced_seq(),
EventSequence::BEFORE_ALL,
"crash between blockstore sync and eventlog sync must not persist the event (blocks exist, event does not = orphan, not inconsistency)"
"crash between blockstore sync and eventlog sync must leave blocks orphaned rather than persist the event"
);
drop(event_writer);

View File

@@ -72,7 +72,7 @@ fn run_retention_at_deletes_sealed_segments_past_cutoff() {
);
assert!(
segments_after >= 1,
"active segment must remain (got {segments_after})"
"active segment must remain, got {segments_after}"
);
}

View File

@@ -0,0 +1,111 @@
use tranquil_store::blockstore::GroupCommitConfig;
use tranquil_store::gauntlet::{
CollectionName, CompactInterval, Gauntlet, GauntletConfig, InvariantSet, IoBackend,
KeySpaceSize, MaxFileSize, OpCount, OpInterval, OpWeights, RestartPolicy, RunLimits, Scenario,
Seed, ShardCount, SizeDistribution, StoreConfig, ValueBytes, WallMs, WorkloadModel, config_for,
farm,
};
#[test]
#[ignore = "long running, 30 seeds of 10k ops each"]
fn smoke_pr_30_seeds() {
let reports = farm::run_many(
|seed| config_for(Scenario::SmokePR, seed),
(0..30).map(Seed),
);
let failures: Vec<String> = reports
.iter()
.filter(|r| !r.is_clean())
.map(|r| {
format!(
"seed {}: {} violations\n {}",
r.seed.0,
r.violations.len(),
r.violations
.iter()
.map(|v| format!("{}: {}", v.invariant, v.detail))
.collect::<Vec<_>>()
.join("\n ")
)
})
.collect();
assert!(failures.is_empty(), "{}", failures.join("\n---\n"));
}
fn fast_sanity_config(seed: Seed) -> GauntletConfig {
GauntletConfig {
seed,
io: IoBackend::Real,
workload: WorkloadModel {
weights: OpWeights {
add: 80,
delete: 0,
compact: 10,
checkpoint: 10,
},
size_distribution: SizeDistribution::Fixed(ValueBytes(64)),
collections: vec![CollectionName("app.bsky.feed.post".to_string())],
key_space: KeySpaceSize(100),
},
op_count: OpCount(200),
invariants: InvariantSet::REFCOUNT_CONSERVATION | InvariantSet::REACHABILITY,
limits: RunLimits {
max_wall_ms: Some(WallMs(30_000)),
},
restart_policy: RestartPolicy::EveryNOps(OpInterval(80)),
store: StoreConfig {
max_file_size: MaxFileSize(512),
group_commit: GroupCommitConfig {
checkpoint_interval_ms: 50,
checkpoint_write_threshold: 8,
..GroupCommitConfig::default()
},
shard_count: ShardCount(1),
compact_every: CompactInterval(5),
},
}
}
#[tokio::test]
async fn gauntlet_fast_sanity() {
let report = Gauntlet::new(fast_sanity_config(Seed(7)))
.expect("build gauntlet")
.run()
.await;
assert!(
report.is_clean(),
"violations: {:?}",
report
.violations
.iter()
.map(|v| format!("{}: {}", v.invariant, v.detail))
.collect::<Vec<_>>()
);
assert!(
report.restarts.0 >= 2,
"expected at least 2 restarts, got {}",
report.restarts.0
);
assert_eq!(report.ops_executed.0, 200);
}
#[tokio::test]
#[ignore = "long running, 100k ops with around 20 restarts"]
async fn mst_restart_churn_single_seed() {
let cfg = config_for(Scenario::MstRestartChurn, Seed(42));
let report = Gauntlet::new(cfg).expect("build gauntlet").run().await;
assert!(
report.is_clean(),
"violations: {:?}",
report
.violations
.iter()
.map(|v| format!("{}: {}", v.invariant, v.detail))
.collect::<Vec<_>>()
);
assert!(
report.restarts.0 >= 1,
"PoissonByOps(5000) over 100k ops should fire at least 1 restart, got {}",
report.restarts.0
);
}

View File

@@ -160,7 +160,7 @@ fn collect_dead_blocks_respects_epoch_gating() {
.collect();
assert!(
all_cids.contains(&cid_a),
"cid_a should be collectible (epoch advanced by subsequent commit)"
"cid_a should be collectible after subsequent commit advanced the epoch"
);
});
}

View File

@@ -80,8 +80,7 @@ async fn mst_shared_subtrees_survive_incremental_writes_compaction_restart() {
let obsolete =
compute_obsolete_from_diff(&old_settled, &new_settled, prev_commit).await;
let obsolete_fixed: Vec<[u8; 36]> =
obsolete.iter().map(|c| cid_to_fixed(c)).collect();
let obsolete_fixed: Vec<[u8; 36]> = obsolete.iter().map(cid_to_fixed).collect();
let s = store.clone();
tokio::task::spawn_blocking(move || {
s.apply_commit_blocking(vec![], obsolete_fixed).unwrap();
@@ -115,8 +114,7 @@ async fn mst_shared_subtrees_survive_incremental_writes_compaction_restart() {
let obsolete =
compute_obsolete_from_diff(&old_settled, &new_settled, prev_commit).await;
let obsolete_fixed: Vec<[u8; 36]> =
obsolete.iter().map(|c| cid_to_fixed(c)).collect();
let obsolete_fixed: Vec<[u8; 36]> = obsolete.iter().map(cid_to_fixed).collect();
let s = store.clone();
tokio::task::spawn_blocking(move || {
s.apply_commit_blocking(vec![], obsolete_fixed).unwrap();
@@ -168,9 +166,7 @@ async fn mst_shared_subtrees_survive_incremental_writes_compaction_restart() {
}
{
let store = Arc::new(
TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(),
);
let store = Arc::new(TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap());
let missing: Vec<String> = final_node_cids
.iter()

View File

@@ -74,9 +74,7 @@ impl SimHarness {
let cid = test_cid(seed as u32);
let data = vec![seed as u8; data_size];
let loc = writer.append_block(&cid, &data).unwrap();
hint_writer
.append_hint(&cid, &loc)
.unwrap();
hint_writer.append_hint(&cid, &loc).unwrap();
(cid, loc)
})
.collect();
@@ -538,9 +536,7 @@ fn sim_aggressive_faults_data_integrity() {
let cid = test_cid(i as u32);
let data = vec![i as u8; 64];
let loc = writer.append_block(&cid, &data).ok()?;
hint_writer
.append_hint(&cid, &loc)
.ok()?;
hint_writer.append_hint(&cid, &loc).ok()?;
Some(())
})?;