From ace105899fbf55e8961a102bda3b83c2872292ef Mon Sep 17 00:00:00 2001 From: Lewis Date: Sun, 19 Apr 2026 00:19:08 +0300 Subject: [PATCH] feat(tranquil-store/gauntlet): concurrent executor, eventlog, fault recovery Lewis: May this revision serve well! --- crates/tranquil-store/src/gauntlet/runner.rs | 1313 ++++++++++++++++-- 1 file changed, 1215 insertions(+), 98 deletions(-) diff --git a/crates/tranquil-store/src/gauntlet/runner.rs b/crates/tranquil-store/src/gauntlet/runner.rs index b768f09..7e8e026 100644 --- a/crates/tranquil-store/src/gauntlet/runner.rs +++ b/crates/tranquil-store/src/gauntlet/runner.rs @@ -1,3 +1,5 @@ +use std::ops::Range; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; @@ -6,18 +8,27 @@ use cid::Cid; use jacquard_repo::mst::Mst; use jacquard_repo::storage::BlockStore; -use super::invariants::{InvariantCtx, InvariantSet, InvariantViolation, invariants_for}; -use super::op::{Op, OpStream, Seed, ValueSeed}; -use super::oracle::{CidFormatError, Oracle, hex_short, try_cid_to_fixed}; +use super::invariants::{ + EventLogSnapshot, InvariantCtx, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for, +}; +use super::op::{DidSeed, EventKind, Op, OpStream, PayloadSeed, RetentionSecs, Seed, ValueSeed}; +use super::oracle::{CidFormatError, EventExpectation, Oracle, hex_short, try_cid_to_fixed}; use super::workload::{Lcg, OpCount, SizeDistribution, ValueBytes, WorkloadModel}; use crate::blockstore::{ BlockStoreConfig, CidBytes, CompactionError, GroupCommitConfig, TranquilBlockStore, + hash_to_cid_bytes, }; +use crate::eventlog::{ + DEFAULT_INDEX_INTERVAL, DidHash, EventLogWriter, EventTypeTag, MAX_EVENT_PAYLOAD, SegmentId, + SegmentManager, SegmentReader, TimestampMicros, ValidEvent, +}; +use crate::io::{RealIO, StorageIO}; +use crate::sim::{FaultConfig, SimulatedIO}; #[derive(Debug, Clone, Copy)] pub enum IoBackend { Real, - Simulated, + Simulated { fault: FaultConfig }, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -28,6 +39,7 @@ pub enum RestartPolicy { Never, EveryNOps(OpInterval), PoissonByOps(OpInterval), + CrashAtSyscall(OpInterval), } #[derive(Debug, Clone, Copy)] @@ -41,6 +53,9 @@ pub struct RunLimits { #[derive(Debug, Clone, Copy)] pub struct MaxFileSize(pub u64); +#[derive(Debug, Clone, Copy)] +pub struct MaxSegmentSize(pub u64); + #[derive(Debug, Clone, Copy)] pub struct ShardCount(pub u8); @@ -51,6 +66,20 @@ pub struct StoreConfig { pub shard_count: ShardCount, } +#[derive(Debug, Clone, Copy)] +pub struct EventLogConfig { + pub max_segment_size: MaxSegmentSize, +} + +#[derive(Debug, Clone, Copy)] +pub struct WriterConcurrency(pub usize); + +impl Default for WriterConcurrency { + fn default() -> Self { + Self(1) + } +} + #[derive(Debug, Clone)] pub struct GauntletConfig { pub seed: Seed, @@ -61,29 +90,40 @@ pub struct GauntletConfig { pub limits: RunLimits, pub restart_policy: RestartPolicy, pub store: StoreConfig, + pub eventlog: Option, + pub writer_concurrency: WriterConcurrency, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct OpsExecuted(pub usize); +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct OpErrorCount(pub usize); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct RestartCount(pub usize); #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct OpIndex(pub usize); -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct GauntletReport { pub seed: Seed, pub ops_executed: OpsExecuted, + pub op_errors: OpErrorCount, pub restarts: RestartCount, pub violations: Vec, + pub ops: OpStream, } impl GauntletReport { pub fn is_clean(&self) -> bool { self.violations.is_empty() } + + pub fn violation_invariants(&self) -> std::collections::BTreeSet<&'static str> { + self.violations.iter().map(|v| v.invariant).collect() + } } #[derive(Debug, thiserror::Error)] @@ -106,6 +146,35 @@ enum OpError { Join(String), #[error("cid format: {0}")] CidFormat(#[from] CidFormatError), + #[error("eventlog append: {0}")] + EventLogAppend(String), + #[error("eventlog sync: {0}")] + EventLogSync(String), + #[error("eventlog retention: {0}")] + EventLogRetention(String), +} + +pub struct EventLogState { + pub writer: EventLogWriter, + pub manager: Arc>, + pub segments_dir: PathBuf, + pub max_segment_size: u64, +} + +pub struct Harness { + pub store: Arc>, + pub eventlog: Option>, +} + +pub struct WriteState { + pub root: Option, + pub oracle: Oracle, + pub eventlog: Option>, +} + +pub struct SharedState { + pub store: Arc>, + pub write: tokio::sync::Mutex>, } pub struct Gauntlet { @@ -113,20 +182,25 @@ pub struct Gauntlet { } #[derive(Debug, thiserror::Error)] -pub enum GauntletBuildError { - #[error("IoBackend::Simulated not wired yet")] - UnsupportedIoBackend, -} +pub enum GauntletBuildError {} impl Gauntlet { pub fn new(config: GauntletConfig) -> Result { - match config.io { - IoBackend::Real => Ok(Self { config }), - IoBackend::Simulated => Err(GauntletBuildError::UnsupportedIoBackend), - } + Ok(Self { config }) + } + + pub fn generate_ops(&self) -> OpStream { + self.config + .workload + .generate(self.config.seed, self.config.op_count) } pub async fn run(self) -> GauntletReport { + let ops = self.generate_ops(); + self.run_with_ops(ops).await + } + + pub async fn run_with_ops(self, ops: OpStream) -> GauntletReport { let deadline = self .config .limits @@ -134,58 +208,254 @@ impl Gauntlet { .map(|WallMs(ms)| Duration::from_millis(ms)); let seed = self.config.seed; + let ops_for_report = ops.clone(); let ops_counter = Arc::new(AtomicUsize::new(0)); + let op_errors_counter = Arc::new(AtomicUsize::new(0)); let restarts_counter = Arc::new(AtomicUsize::new(0)); - let fut = run_real_inner(self.config, ops_counter.clone(), restarts_counter.clone()); - match deadline { + let fut: std::pin::Pin + Send>> = + match self.config.io { + IoBackend::Real => Box::pin(run_inner_real( + self.config, + ops, + ops_counter.clone(), + op_errors_counter.clone(), + restarts_counter.clone(), + )), + IoBackend::Simulated { fault } => Box::pin(run_inner_simulated( + self.config, + fault, + ops, + ops_counter.clone(), + op_errors_counter.clone(), + restarts_counter.clone(), + )), + }; + let mut report = match deadline { Some(d) => match tokio::time::timeout(d, fut).await { Ok(r) => r, Err(_) => GauntletReport { seed, ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)), + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), violations: vec![InvariantViolation { invariant: "WallClockBudget", detail: format!("exceeded max_wall_ms of {} ms", d.as_millis()), }], + ops: OpStream::empty(), }, }, None => fut.await, - } + }; + report.ops = ops_for_report; + report } } -async fn run_real_inner( +fn segments_subdir(root: &Path) -> PathBuf { + root.join("segments") +} + +async fn run_inner_real( config: GauntletConfig, + ops: OpStream, ops_counter: Arc, + op_errors_counter: Arc, restarts_counter: Arc, ) -> GauntletReport { let dir = tempfile::TempDir::new().expect("tempdir"); - let op_stream: OpStream = config.workload.generate(config.seed, config.op_count); + let cfg = blockstore_config(dir.path(), &config.store); + let eventlog_cfg = config.eventlog; + let segments_dir = segments_subdir(dir.path()); + let open = { + let segments_dir = segments_dir.clone(); + move || -> Result, String> { + let store = TranquilBlockStore::open(cfg.clone()) + .map(Arc::new) + .map_err(|e| e.to_string())?; + let eventlog = match eventlog_cfg { + None => None, + Some(elc) => Some( + open_eventlog(RealIO::new(), segments_dir.clone(), elc.max_segment_size.0) + .map_err(|e| format!("eventlog: {e}"))?, + ), + }; + Ok(Harness { store, eventlog }) + } + }; + if config.writer_concurrency.0 > 1 { + run_inner_generic_concurrent::( + config, + ops, + ops_counter, + op_errors_counter, + restarts_counter, + open, + || {}, + false, + ) + .await + } else { + run_inner_generic::( + config, + ops, + ops_counter, + op_errors_counter, + restarts_counter, + open, + || {}, + false, + ) + .await + } +} +async fn run_inner_simulated( + config: GauntletConfig, + fault: FaultConfig, + ops: OpStream, + ops_counter: Arc, + op_errors_counter: Arc, + restarts_counter: Arc, +) -> GauntletReport { + let dir = tempfile::TempDir::new().expect("tempdir"); + let cfg = blockstore_config(dir.path(), &config.store); + let tolerate_errors = fault.injects_errors(); + let eventlog_cfg = config.eventlog; + let segments_dir = segments_subdir(dir.path()); + let sim: Arc = Arc::new(SimulatedIO::new(config.seed.0, fault)); + let sim_for_open = Arc::clone(&sim); + let open = { + let segments_dir = segments_dir.clone(); + move || -> Result>, String> { + let factory_sim = Arc::clone(&sim_for_open); + let make_io = move || Arc::clone(&factory_sim); + let store = TranquilBlockStore::>::open_with_io(cfg.clone(), make_io) + .map(Arc::new) + .map_err(|e| e.to_string())?; + let eventlog = match eventlog_cfg { + None => None, + Some(elc) => Some( + open_eventlog( + Arc::clone(&sim_for_open), + segments_dir.clone(), + elc.max_segment_size.0, + ) + .map_err(|e| format!("eventlog: {e}"))?, + ), + }; + Ok(Harness { store, eventlog }) + } + }; + let sim_for_crash = Arc::clone(&sim); + let crash = move || sim_for_crash.crash(); + if config.writer_concurrency.0 > 1 { + run_inner_generic_concurrent::, _, _>( + config, + ops, + ops_counter, + op_errors_counter, + restarts_counter, + open, + crash, + tolerate_errors, + ) + .await + } else { + run_inner_generic::, _, _>( + config, + ops, + ops_counter, + op_errors_counter, + restarts_counter, + open, + crash, + tolerate_errors, + ) + .await + } +} + +fn open_eventlog( + io: S, + segments_dir: PathBuf, + max_segment_size: u64, +) -> std::io::Result> { + let manager = Arc::new(SegmentManager::new( + io, + segments_dir.clone(), + max_segment_size, + )?); + let writer = EventLogWriter::open( + Arc::clone(&manager), + DEFAULT_INDEX_INTERVAL, + MAX_EVENT_PAYLOAD, + )?; + Ok(EventLogState { + writer, + manager, + segments_dir, + max_segment_size, + }) +} + +#[allow(clippy::too_many_arguments)] +async fn run_inner_generic( + config: GauntletConfig, + op_stream: OpStream, + ops_counter: Arc, + op_errors_counter: Arc, + restarts_counter: Arc, + mut open: Open, + mut crash: Crash, + tolerate_op_errors: bool, +) -> GauntletReport +where + S: StorageIO + Send + Sync + 'static, + Open: FnMut() -> Result, String>, + Crash: FnMut(), +{ let mut oracle = Oracle::new(); let mut violations: Vec = Vec::new(); - let mut store = Arc::new( - TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)).expect("open store"), - ); + let mut harness: Option> = match open() { + Ok(h) => Some(h), + Err(e) => { + return GauntletReport { + seed: config.seed, + ops_executed: OpsExecuted(0), + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), + restarts: RestartCount(0), + violations: vec![InvariantViolation { + invariant: "OpenStore", + detail: format!("initial open: {e}"), + }], + ops: OpStream::empty(), + }; + } + }; let mut root: Option = None; let mut restart_rng = Lcg::new(Seed(config.seed.0 ^ 0xA5A5_A5A5_A5A5_A5A5)); + let mut sample_rng = Lcg::new(Seed(config.seed.0 ^ 0x5A5A_5A5A_5A5A_5A5A)); let mut halt_ops = false; - let mid_run_set = config - .invariants - .without(InvariantSet::RESTART_IDEMPOTENT) - .without(InvariantSet::ACKED_WRITE_PERSISTENCE); let post_reopen_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); for (idx, op) in op_stream.iter().enumerate() { if halt_ops { break; } - match apply_op(&store, &mut root, &mut oracle, op, &config.workload).await { + let live = harness + .as_mut() + .expect("harness invariant: present when halt_ops is false"); + let root_before = root; + match apply_op(live, &mut root, &mut oracle, op, &config.workload).await { Ok(()) => {} Err(e) => { + if tolerate_op_errors { + op_errors_counter.fetch_add(1, Ordering::Relaxed); + continue; + } violations.push(InvariantViolation { invariant: "OpExecution", detail: format!("op {idx}: {e}"), @@ -194,90 +464,305 @@ async fn run_real_inner( continue; } } + let _ = root_before; ops_counter.store(idx + 1, Ordering::Relaxed); - if should_restart(config.restart_policy, OpIndex(idx), &mut restart_rng) { - drop(store); - store = Arc::new( - TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)) - .expect("reopen store"), - ); - let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1; - - if let Err(e) = refresh_oracle_graph(&store, &mut oracle, root).await { + let action = should_restart(config.restart_policy, OpIndex(idx), &mut restart_rng); + let crashing = matches!(action, RestartAction::Crash); + if matches!(action, RestartAction::None) { + continue; + } + if crashing { + crash(); + oracle.record_crash(); + } + shutdown_harness(&mut harness); + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { + Ok(reopened) => { + harness = Some(reopened); + let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1; + let live = harness.as_ref().expect("just reopened"); + let before = violations.len(); + violations.extend( + run_quick_check( + &live.store, + &oracle, + root, + &mut sample_rng, + QUICK_SAMPLE_SIZE, + n, + ) + .await, + ); + if violations.len() > before { + halt_ops = true; + } + } + Err(detail) => { violations.push(InvariantViolation { - invariant: "OpExecution", - detail: format!("refresh after restart {n}: {e}"), + invariant: "ReopenFailed", + detail: format!("reopen after op {idx}: {detail}"), }); halt_ops = true; - continue; + break; } - let before = violations.len(); - violations.extend(run_invariants(&store, &oracle, root, mid_run_set).await); - if violations.len() > before { + } + } + + if !halt_ops && tolerate_op_errors && harness.is_some() { + crash(); + oracle.record_crash(); + shutdown_harness(&mut harness); + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { + Ok(reopened) => harness = Some(reopened), + Err(detail) => { + violations.push(InvariantViolation { + invariant: "ReopenFailed", + detail: format!("reopen after post-run crash: {detail}"), + }); halt_ops = true; } } } - if !halt_ops { - match refresh_oracle_graph(&store, &mut oracle, root).await { + let end_of_run_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); + if !halt_ops && let Some(live) = harness.as_ref() { + match refresh_oracle_graph(&live.store, &mut oracle, root).await { Ok(()) => { let before = violations.len(); - violations.extend(run_invariants(&store, &oracle, root, mid_run_set).await); + let snapshot = eventlog_snapshot(live.eventlog.as_ref()); + violations.extend( + run_invariants(&live.store, &oracle, root, snapshot, end_of_run_set).await, + ); if violations.len() > before { halt_ops = true; } } Err(e) => { violations.push(InvariantViolation { - invariant: "OpExecution", - detail: format!("refresh at end: {e}"), + invariant: "MstRootDurability", + detail: format!("refresh after final reopen: {e}"), }); halt_ops = true; } } } - if config.invariants.contains(InvariantSet::RESTART_IDEMPOTENT) && !halt_ops { - let pre_snapshot = snapshot_block_index(&store); - drop(store); - let reopened = Arc::new( - TranquilBlockStore::open(blockstore_config(dir.path(), &config.store)) - .expect("reopen for RestartIdempotent"), - ); - let post_snapshot = snapshot_block_index(&reopened); - if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) { - violations.push(InvariantViolation { - invariant: "RestartIdempotent", - detail, - }); - } else { - violations.extend(run_invariants(&reopened, &oracle, root, post_reopen_set).await); + if config.invariants.contains(InvariantSet::RESTART_IDEMPOTENT) + && !halt_ops + && let Some(live) = harness.as_ref() + { + let pre_snapshot = snapshot_block_index(&live.store); + shutdown_harness(&mut harness); + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { + Ok(reopened) => { + let post_snapshot = snapshot_block_index(&reopened.store); + if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) { + violations.push(InvariantViolation { + invariant: "RestartIdempotent", + detail, + }); + } else { + let snapshot = eventlog_snapshot(reopened.eventlog.as_ref()); + violations.extend( + run_invariants(&reopened.store, &oracle, root, snapshot, post_reopen_set) + .await, + ); + } + } + Err(detail) => { + violations.push(InvariantViolation { + invariant: "ReopenFailed", + detail: format!("reopen for idempotency check: {detail}"), + }); + } } } GauntletReport { seed: config.seed, ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)), + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), violations, + ops: OpStream::empty(), } } -async fn run_invariants( - store: &Arc, +fn eventlog_snapshot( + state: Option<&EventLogState>, +) -> Option { + let s = state?; + let segments = s.manager.list_segments().unwrap_or_default(); + let mut events: Vec = Vec::new(); + let mut segment_last_ts: Vec<(SegmentId, u64)> = Vec::new(); + segments.iter().for_each(|&id| { + let per_segment: Vec = match s.manager.open_for_read(id) { + Ok(fd) => match SegmentReader::open(s.manager.io(), fd, MAX_EVENT_PAYLOAD) { + Ok(reader) => reader.valid_prefix().unwrap_or_default(), + Err(_) => Vec::new(), + }, + Err(_) => Vec::new(), + }; + if let Some(last) = per_segment.last() { + segment_last_ts.push((id, last.timestamp.raw())); + } + per_segment.into_iter().for_each(|e| { + events.push(SnapshotEvent { + seq: e.seq, + timestamp_us: e.timestamp.raw(), + event_type_raw: e.event_type.raw(), + did_hash: e.did_hash.raw(), + }); + }); + }); + Some(EventLogSnapshot { + segments_dir: s.segments_dir.clone(), + max_segment_size: s.max_segment_size, + synced_seq: s.writer.synced_seq(), + segments, + events, + segment_last_ts, + }) +} + +fn shutdown_harness(harness: &mut Option>) { + if let Some(h) = harness.as_mut() + && let Some(el) = h.eventlog.as_mut() + { + let _ = el.writer.shutdown(); + el.manager.shutdown(); + } + let _ = harness.take(); +} + +const MAX_REOPEN_ATTEMPTS: usize = 5; + +fn reopen_with_recovery( + open: &mut Open, + crash: &mut Crash, + tolerate: bool, +) -> Result, String> +where + S: StorageIO + Send + Sync + 'static, + Open: FnMut() -> Result, String>, + Crash: FnMut(), +{ + let mut errors: Vec = Vec::new(); + (0..MAX_REOPEN_ATTEMPTS) + .find_map(|attempt| match open() { + Ok(h) => Some(Ok(h)), + Err(e) => { + errors.push(format!("attempt {attempt}: {e}")); + if !tolerate { + return Some(Err(errors.join(" | "))); + } + crash(); + None + } + }) + .unwrap_or_else(|| Err(errors.join(" | "))) +} + +const QUICK_SAMPLE_SIZE: usize = 32; + +fn sample_distinct(rng: &mut Lcg, n: usize, k: usize) -> Vec { + assert!(k <= n, "sample_distinct: k {k} > n {n}"); + let mut selected: std::collections::HashSet = + std::collections::HashSet::with_capacity(k); + ((n - k)..n) + .map(|i| { + let t = (rng.next_u64() as usize) % (i + 1); + let pick = if selected.contains(&t) { i } else { t }; + selected.insert(pick); + pick + }) + .collect() +} + +async fn run_quick_check( + store: &Arc>, oracle: &Oracle, root: Option, + rng: &mut Lcg, + sample_size: usize, + restart_seq: usize, +) -> Vec { + let Some(r) = root else { + return if oracle.live_count() == 0 { + Vec::new() + } else { + vec![InvariantViolation { + invariant: "QuickHealth", + detail: format!( + "restart {restart_seq}: oracle has {} live records but reopened store has no root", + oracle.live_count() + ), + }] + }; + }; + + let mst = Mst::load(store.clone(), r, None); + let live: Vec<(super::op::CollectionName, super::op::RecordKey, CidBytes)> = oracle + .live_records() + .map(|(c, k, v)| (c.clone(), k.clone(), *v)) + .collect(); + let total = live.len(); + let picks: Vec = if total <= sample_size { + (0..total).collect() + } else { + sample_distinct(rng, total, sample_size) + }; + + let mut violations: Vec = Vec::new(); + for idx in picks { + let (coll, rkey, expected) = &live[idx]; + let key = format!("{}/{}", coll.0, rkey.0); + match mst.get(&key).await { + Ok(Some(cid)) => match try_cid_to_fixed(&cid) { + Ok(actual) if actual == *expected => {} + Ok(actual) => violations.push(format!( + "{key}: MST cid {} != oracle cid {}", + hex_short(&actual), + hex_short(expected) + )), + Err(e) => violations.push(format!("{key}: cid format: {e}")), + }, + Ok(None) => violations.push(format!("{key}: missing after reopen")), + Err(e) => violations.push(format!("{key}: mst.get error: {e}")), + } + } + + if violations.is_empty() { + Vec::new() + } else { + vec![InvariantViolation { + invariant: "QuickHealth", + detail: format!( + "restart {restart_seq}, sampled {}/{}: {}", + violations.len(), + sample_size.min(total), + violations.join("; ") + ), + }] + } +} + +async fn run_invariants( + store: &Arc>, + oracle: &Oracle, + root: Option, + eventlog: Option, set: InvariantSet, ) -> Vec { - let ctx = InvariantCtx { + let ctx = InvariantCtx:: { store, oracle, root, + eventlog: eventlog.as_ref(), }; let mut out = Vec::new(); - for inv in invariants_for(set) { + for inv in invariants_for::(set) { if let Err(v) = inv.check(&ctx).await { out.push(v); } @@ -285,7 +770,9 @@ async fn run_invariants( out } -fn snapshot_block_index(store: &TranquilBlockStore) -> Vec<(CidBytes, u32)> { +fn snapshot_block_index( + store: &TranquilBlockStore, +) -> Vec<(CidBytes, u32)> { let mut v: Vec<(CidBytes, u32)> = store .block_index() .live_entries_snapshot() @@ -302,8 +789,8 @@ fn diff_snapshots(pre: &[(CidBytes, u32)], post: &[(CidBytes, u32)]) -> Option = pre.iter().copied().collect(); - let post_map: std::collections::HashMap = post.iter().copied().collect(); + let pre_map: std::collections::BTreeMap = pre.iter().copied().collect(); + let post_map: std::collections::BTreeMap = post.iter().copied().collect(); let only_pre: Vec = pre_map .iter() @@ -336,15 +823,15 @@ fn diff_snapshots(pre: &[(CidBytes, u32)], post: &[(CidBytes, u32)]) -> Option {} entries; {}", pre.len(), post.len(), items.join("; "), )) } -async fn refresh_oracle_graph( - store: &Arc, +async fn refresh_oracle_graph( + store: &Arc>, oracle: &mut Oracle, root: Option, ) -> Result<(), String> { @@ -371,15 +858,34 @@ async fn refresh_oracle_graph( } } -fn should_restart(policy: RestartPolicy, idx: OpIndex, rng: &mut Lcg) -> bool { +enum RestartAction { + None, + Clean, + Crash, +} + +fn should_restart(policy: RestartPolicy, idx: OpIndex, rng: &mut Lcg) -> RestartAction { match policy { - RestartPolicy::Never => false, - RestartPolicy::EveryNOps(OpInterval(n)) => n > 0 && (idx.0 + 1).is_multiple_of(n), - RestartPolicy::PoissonByOps(OpInterval(n)) => { - if n == 0 { - false + RestartPolicy::Never => RestartAction::None, + RestartPolicy::EveryNOps(OpInterval(n)) => { + if n > 0 && (idx.0 + 1).is_multiple_of(n) { + RestartAction::Clean } else { - rng.next_u64().is_multiple_of(n as u64) + RestartAction::None + } + } + RestartPolicy::PoissonByOps(OpInterval(n)) => { + if n > 0 && rng.next_u64().is_multiple_of(n as u64) { + RestartAction::Clean + } else { + RestartAction::None + } + } + RestartPolicy::CrashAtSyscall(OpInterval(n)) => { + if n > 0 && rng.next_u64().is_multiple_of(n as u64) { + RestartAction::Crash + } else { + RestartAction::None } } } @@ -405,6 +911,21 @@ fn make_record_bytes(value_seed: ValueSeed, dist: SizeDistribution) -> Vec { let span = u64::from(hi.saturating_sub(lo)).max(1); (lo as usize) + (u64::from(raw) % span) as usize } + SizeDistribution::HeavyTail(range) => { + let ValueBytes(lo) = range.min(); + let ValueBytes(hi) = range.max(); + let lo64 = u64::from(lo); + let hi64 = u64::from(hi); + let span = hi64.saturating_sub(lo64).max(1); + let roll = u64::from(raw) % 1024; + let extra = match roll { + 0..=820 => span / 64, + 821..=1000 => span / 8, + 1001..=1015 => span / 2, + _ => span, + }; + (lo64 + (extra.min(span))) as usize + } }; let target_len = target_len.max(8); let seed_bytes = raw.to_le_bytes(); @@ -413,8 +934,30 @@ fn make_record_bytes(value_seed: ValueSeed, dist: SizeDistribution) -> Vec { .collect() } -async fn apply_op( - store: &Arc, +fn event_payload_bytes(payload_seed: PayloadSeed) -> Vec { + let raw = payload_seed.0; + let len: usize = 48 + ((raw as usize) % 256); + let seed_bytes = raw.to_le_bytes(); + (0..len) + .map(|i| seed_bytes[i % 4] ^ (i as u8).wrapping_mul(17)) + .collect() +} + +fn event_kind_to_tag(kind: EventKind) -> EventTypeTag { + match kind { + EventKind::Commit => EventTypeTag::COMMIT, + EventKind::Identity => EventTypeTag::IDENTITY, + EventKind::Account => EventTypeTag::ACCOUNT, + EventKind::Sync => EventTypeTag::SYNC, + } +} + +fn did_hash_for_seed(seed: DidSeed) -> DidHash { + DidHash::from_did(&format!("did:plc:gauntlet{:08x}", seed.0)) +} + +async fn apply_op( + harness: &mut Harness, root: &mut Option, oracle: &mut Oracle, op: &Op, @@ -427,15 +970,22 @@ async fn apply_op( value_seed, } => { let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); - let record_cid = store + let record_cid = harness + .store .put(&record_bytes) .await .map_err(|e| OpError::PutRecord(e.to_string()))?; let record_cid_bytes = try_cid_to_fixed(&record_cid)?; - let outcome = - add_record_inner(store, *root, collection, rkey, record_cid, record_cid_bytes) - .await; + let outcome = add_record_inner( + &harness.store, + *root, + collection, + rkey, + record_cid, + record_cid_bytes, + ) + .await; match outcome { Ok((new_root, applied)) => { *root = Some(new_root); @@ -446,12 +996,12 @@ async fn apply_op( } Err(e) => { if let Err(cleanup_err) = - decrement_obsolete(store, vec![record_cid_bytes]).await + decrement_obsolete(&harness.store, vec![record_cid_bytes]).await { tracing::warn!( op_error = %e, cleanup_error = %cleanup_err, - "AddRecord cleanup decrement failed; refcount may leak", + "AddRecord cleanup decrement failed", ); } Err(e) @@ -460,11 +1010,11 @@ async fn apply_op( } Op::DeleteRecord { collection, rkey } => { let Some(old_root) = *root else { return Ok(()) }; - if oracle.delete(collection, rkey).is_none() { + if !oracle.contains_record(collection, rkey) { return Ok(()); } let key = format!("{}/{}", collection.0, rkey.0); - let loaded = Mst::load(store.clone(), old_root, None); + let loaded = Mst::load(harness.store.clone(), old_root, None); let updated = loaded .delete(&key) .await @@ -473,18 +1023,19 @@ async fn apply_op( .persist() .await .map_err(|e| OpError::MstPersist(e.to_string()))?; - apply_mst_diff(store, old_root, new_root).await?; + apply_mst_diff(&harness.store, old_root, new_root).await?; + oracle.delete(collection, rkey); *root = Some(new_root); Ok(()) } Op::Compact => { - let s = store.clone(); + let s = harness.store.clone(); tokio::task::spawn_blocking(move || compact_by_liveness(&s)) .await .map_err(|e| OpError::Join(e.to_string()))? } Op::Checkpoint => { - let s = store.clone(); + let s = harness.store.clone(); tokio::task::spawn_blocking(move || { s.apply_commit_blocking(vec![], vec![]) .map_err(|e| e.to_string()) @@ -493,11 +1044,109 @@ async fn apply_op( .map_err(|e| OpError::Join(e.to_string()))? .map_err(OpError::ApplyCommit) } + Op::AppendEvent { + did_seed, + event_kind, + payload_seed, + } => { + let Some(el) = harness.eventlog.as_mut() else { + return Ok(()); + }; + let did_hash = did_hash_for_seed(*did_seed); + let tag = event_kind_to_tag(*event_kind); + let payload = event_payload_bytes(*payload_seed); + let ts_before = TimestampMicros::now().raw(); + match el.writer.append(did_hash, tag, payload) { + Ok(seq) => { + oracle.record_event_append(EventExpectation { + seq, + timestamp_us: ts_before, + kind: *event_kind, + did_hash: did_hash.raw(), + }); + let _ = el.writer.rotate_if_needed(); + Ok(()) + } + Err(e) => Err(OpError::EventLogAppend(e.to_string())), + } + } + Op::SyncEventLog => { + let Some(el) = harness.eventlog.as_mut() else { + return Ok(()); + }; + match el.writer.sync() { + Ok(result) => { + let _ = el.manager.io().sync_dir(el.segments_dir.as_path()); + let _ = el.writer.rotate_if_needed(); + oracle.record_event_sync(result.synced_through); + Ok(()) + } + Err(e) => Err(OpError::EventLogSync(e.to_string())), + } + } + Op::RunRetention { max_age_secs } => { + let Some(el) = harness.eventlog.as_mut() else { + return Ok(()); + }; + run_retention(el, oracle, *max_age_secs).map_err(OpError::EventLogRetention) + } + Op::ReadRecord { collection, rkey } => { + let Some(r) = *root else { return Ok(()) }; + let key = format!("{}/{}", collection.0, rkey.0); + let mst = Mst::load(harness.store.clone(), r, None); + let _ = mst.get(&key).await; + Ok(()) + } + Op::ReadBlock { value_seed } => { + let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); + let record_cid = hash_to_cid_bytes(&record_bytes); + let _ = harness.store.get_block_sync(&record_cid); + Ok(()) + } } } -async fn add_record_inner( - store: &Arc, +fn run_retention( + el: &mut EventLogState, + oracle: &mut Oracle, + max_age: RetentionSecs, +) -> Result<(), String> { + let sync_result = el.writer.sync().map_err(|e| e.to_string())?; + let _ = el.writer.rotate_if_needed(); + oracle.record_event_sync(sync_result.synced_through); + let active_id = sync_result.segment_id; + let now_us = TimestampMicros::now().raw(); + let max_age_us = u64::from(max_age.0).saturating_mul(1_000_000); + let cutoff_us = now_us.saturating_sub(max_age_us); + let segments = el.manager.list_segments().map_err(|e| e.to_string())?; + segments + .iter() + .take_while(|&&id| id != active_id) + .try_for_each(|&id| -> Result<(), String> { + let last_ts = segment_last_timestamp(&el.manager, id).map_err(|e| e.to_string())?; + match last_ts { + Some(ts) if ts < cutoff_us => { + el.manager.delete_segment(id).map_err(|e| e.to_string()) + } + _ => Ok(()), + } + })?; + oracle.record_retention(cutoff_us); + Ok(()) +} + +fn segment_last_timestamp( + manager: &SegmentManager, + id: SegmentId, +) -> std::io::Result> { + let fd = manager.open_for_read(id)?; + let reader = SegmentReader::open(manager.io(), fd, MAX_EVENT_PAYLOAD)?; + let events = reader.valid_prefix()?; + Ok(events.last().map(|e: &ValidEvent| e.timestamp.raw())) +} + +async fn add_record_inner( + store: &Arc>, root: Option, collection: &super::op::CollectionName, rkey: &super::op::RecordKey, @@ -531,8 +1180,8 @@ async fn add_record_inner( } } -async fn decrement_obsolete( - store: &Arc, +async fn decrement_obsolete( + store: &Arc>, obsolete: Vec, ) -> Result<(), OpError> { let s = store.clone(); @@ -545,8 +1194,8 @@ async fn decrement_obsolete( .map_err(OpError::ApplyCommit) } -async fn apply_mst_diff( - store: &Arc, +async fn apply_mst_diff( + store: &Arc>, old_root: Cid, new_root: Cid, ) -> Result<(), OpError> { @@ -574,7 +1223,9 @@ async fn apply_mst_diff( const COMPACT_LIVENESS_CEILING: f64 = 0.99; -fn compact_by_liveness(store: &TranquilBlockStore) -> Result<(), OpError> { +fn compact_by_liveness( + store: &TranquilBlockStore, +) -> Result<(), OpError> { let liveness = store .compaction_liveness(0) .map_err(|e| OpError::CompactFile(format!("compaction_liveness: {e}")))?; @@ -591,3 +1242,469 @@ fn compact_by_liveness(store: &TranquilBlockStore) -> Result<(), OpError> { Err(e) => Err(OpError::CompactFile(format!("{fid}: {e}"))), }) } + +async fn apply_op_concurrent( + shared: &Arc>, + op: &Op, + workload: &WorkloadModel, +) -> Result<(), OpError> { + match op { + Op::AddRecord { + collection, + rkey, + value_seed, + } => { + let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); + let record_cid = shared + .store + .put(&record_bytes) + .await + .map_err(|e| OpError::PutRecord(e.to_string()))?; + let record_cid_bytes = try_cid_to_fixed(&record_cid)?; + + let mut state = shared.write.lock().await; + let outcome = add_record_inner( + &shared.store, + state.root, + collection, + rkey, + record_cid, + record_cid_bytes, + ) + .await; + match outcome { + Ok((new_root, applied)) => { + state.root = Some(new_root); + if applied { + state + .oracle + .add(collection.clone(), rkey.clone(), record_cid_bytes); + } + Ok(()) + } + Err(e) => { + drop(state); + if let Err(cleanup) = + decrement_obsolete(&shared.store, vec![record_cid_bytes]).await + { + tracing::warn!( + op_error = %e, + cleanup_error = %cleanup, + "AddRecord concurrent cleanup decrement failed", + ); + } + Err(e) + } + } + } + Op::DeleteRecord { collection, rkey } => { + let mut state = shared.write.lock().await; + let Some(old_root) = state.root else { + return Ok(()); + }; + if !state.oracle.contains_record(collection, rkey) { + return Ok(()); + } + let key = format!("{}/{}", collection.0, rkey.0); + let loaded = Mst::load(shared.store.clone(), old_root, None); + let updated = loaded + .delete(&key) + .await + .map_err(|e| OpError::MstDelete(e.to_string()))?; + let new_root = updated + .persist() + .await + .map_err(|e| OpError::MstPersist(e.to_string()))?; + apply_mst_diff(&shared.store, old_root, new_root).await?; + state.oracle.delete(collection, rkey); + state.root = Some(new_root); + Ok(()) + } + Op::Compact => { + let _guard = shared.write.lock().await; + let s = shared.store.clone(); + tokio::task::spawn_blocking(move || compact_by_liveness(&s)) + .await + .map_err(|e| OpError::Join(e.to_string()))? + } + Op::Checkpoint => { + let s = shared.store.clone(); + tokio::task::spawn_blocking(move || { + s.apply_commit_blocking(vec![], vec![]) + .map_err(|e| e.to_string()) + }) + .await + .map_err(|e| OpError::Join(e.to_string()))? + .map_err(OpError::ApplyCommit) + } + Op::AppendEvent { + did_seed, + event_kind, + payload_seed, + } => { + let did_hash = did_hash_for_seed(*did_seed); + let tag = event_kind_to_tag(*event_kind); + let payload = event_payload_bytes(*payload_seed); + let ts_before = TimestampMicros::now().raw(); + let mut state = shared.write.lock().await; + let Some(el) = state.eventlog.as_mut() else { + return Ok(()); + }; + match el.writer.append(did_hash, tag, payload) { + Ok(seq) => { + let _ = el.writer.rotate_if_needed(); + state.oracle.record_event_append(EventExpectation { + seq, + timestamp_us: ts_before, + kind: *event_kind, + did_hash: did_hash.raw(), + }); + Ok(()) + } + Err(e) => Err(OpError::EventLogAppend(e.to_string())), + } + } + Op::SyncEventLog => { + let mut state = shared.write.lock().await; + let Some(el) = state.eventlog.as_mut() else { + return Ok(()); + }; + match el.writer.sync() { + Ok(result) => { + let _ = el.manager.io().sync_dir(el.segments_dir.as_path()); + let _ = el.writer.rotate_if_needed(); + state.oracle.record_event_sync(result.synced_through); + Ok(()) + } + Err(e) => Err(OpError::EventLogSync(e.to_string())), + } + } + Op::RunRetention { max_age_secs } => { + let mut state_guard = shared.write.lock().await; + let state = &mut *state_guard; + let WriteState { + oracle, eventlog, .. + } = state; + let Some(el) = eventlog.as_mut() else { + return Ok(()); + }; + run_retention(el, oracle, *max_age_secs).map_err(OpError::EventLogRetention) + } + Op::ReadRecord { collection, rkey } => { + let r = { shared.write.lock().await.root }; + let Some(r) = r else { + return Ok(()); + }; + let key = format!("{}/{}", collection.0, rkey.0); + let mst = Mst::load(shared.store.clone(), r, None); + let _ = mst.get(&key).await; + Ok(()) + } + Op::ReadBlock { value_seed } => { + let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); + let record_cid = hash_to_cid_bytes(&record_bytes); + let _ = shared.store.get_block_sync(&record_cid); + Ok(()) + } + } +} + +#[allow(clippy::too_many_arguments)] +async fn writer_task( + shared: Arc>, + ops: Arc>, + index: Arc, + end: usize, + workload: Arc, + ops_counter: Arc, + op_errors_counter: Arc, + tolerate_op_errors: bool, +) -> Option { + loop { + let idx = index.fetch_add(1, Ordering::Relaxed); + if idx >= end { + return None; + } + let op = &ops[idx]; + match apply_op_concurrent(&shared, op, &workload).await { + Ok(()) => { + ops_counter.fetch_max(idx + 1, Ordering::Relaxed); + } + Err(e) => { + if tolerate_op_errors { + op_errors_counter.fetch_add(1, Ordering::Relaxed); + continue; + } + return Some(InvariantViolation { + invariant: "OpExecution", + detail: format!("op {idx}: {e}"), + }); + } + } + } +} + +fn compute_chunks( + policy: RestartPolicy, + total_ops: usize, + restart_rng: &mut Lcg, +) -> Vec<(Range, RestartAction)> { + let points: Vec<(usize, RestartAction)> = (0..total_ops) + .filter_map(|i| match should_restart(policy, OpIndex(i), restart_rng) { + RestartAction::None => None, + a => Some((i + 1, a)), + }) + .collect(); + let mut chunks = Vec::new(); + let mut start = 0; + for (end, action) in points { + chunks.push((start..end, action)); + start = end; + } + if start < total_ops { + chunks.push((start..total_ops, RestartAction::None)); + } + chunks +} + +#[allow(clippy::too_many_arguments)] +async fn run_inner_generic_concurrent( + config: GauntletConfig, + op_stream: OpStream, + ops_counter: Arc, + op_errors_counter: Arc, + restarts_counter: Arc, + mut open: Open, + mut crash: Crash, + tolerate_op_errors: bool, +) -> GauntletReport +where + S: StorageIO + Send + Sync + 'static, + Open: FnMut() -> Result, String>, + Crash: FnMut(), +{ + let ops: Vec = op_stream.into_vec(); + let total_ops = ops.len(); + let ops_arc = Arc::new(ops); + let workload_arc = Arc::new(config.workload.clone()); + + let mut violations: Vec = Vec::new(); + let mut restart_rng = Lcg::new(Seed(config.seed.0 ^ 0xA5A5_A5A5_A5A5_A5A5)); + let mut sample_rng = Lcg::new(Seed(config.seed.0 ^ 0x5A5A_5A5A_5A5A_5A5A)); + let chunks = compute_chunks(config.restart_policy, total_ops, &mut restart_rng); + + let mut harness: Option> = match open() { + Ok(h) => Some(h), + Err(e) => { + return GauntletReport { + seed: config.seed, + ops_executed: OpsExecuted(0), + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), + restarts: RestartCount(0), + violations: vec![InvariantViolation { + invariant: "OpenStore", + detail: format!("initial open: {e}"), + }], + ops: OpStream::empty(), + }; + } + }; + let mut root: Option = None; + let mut oracle = Oracle::new(); + let mut halt_ops = false; + + let writer_n = config.writer_concurrency.0.max(1); + + for (chunk_i, (chunk_range, action)) in chunks.iter().enumerate() { + if halt_ops { + break; + } + let current = harness.take().expect("harness present before chunk"); + let taken_oracle = std::mem::take(&mut oracle); + let shared = Arc::new(SharedState { + store: Arc::clone(¤t.store), + write: tokio::sync::Mutex::new(WriteState { + root, + oracle: taken_oracle, + eventlog: current.eventlog, + }), + }); + + let index = Arc::new(AtomicUsize::new(chunk_range.start)); + let end = chunk_range.end; + let mut handles: Vec>> = Vec::new(); + for _ in 0..writer_n { + handles.push(tokio::spawn(writer_task( + Arc::clone(&shared), + Arc::clone(&ops_arc), + Arc::clone(&index), + end, + Arc::clone(&workload_arc), + Arc::clone(&ops_counter), + Arc::clone(&op_errors_counter), + tolerate_op_errors, + ))); + } + for h in handles.drain(..) { + match h.await { + Ok(None) => {} + Ok(Some(v)) => { + violations.push(v); + halt_ops = true; + } + Err(join) => { + violations.push(InvariantViolation { + invariant: "TaskJoin", + detail: join.to_string(), + }); + halt_ops = true; + } + } + } + + let shared = match Arc::try_unwrap(shared) { + Ok(s) => s, + Err(still_held) => { + violations.push(InvariantViolation { + invariant: "ConcurrencyInvariant", + detail: format!( + "SharedState still held by {} refs after task join", + Arc::strong_count(&still_held) + ), + }); + halt_ops = true; + break; + } + }; + let store = shared.store; + let write_state = shared.write.into_inner(); + root = write_state.root; + oracle = write_state.oracle; + let eventlog = write_state.eventlog; + harness = Some(Harness { store, eventlog }); + + if halt_ops { + break; + } + + match action { + RestartAction::None => {} + RestartAction::Clean | RestartAction::Crash => { + if matches!(action, RestartAction::Crash) { + crash(); + oracle.record_crash(); + } + shutdown_harness(&mut harness); + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { + Ok(reopened) => { + harness = Some(reopened); + let n = restarts_counter.fetch_add(1, Ordering::Relaxed) + 1; + let live = harness.as_ref().expect("just reopened"); + let before = violations.len(); + violations.extend( + run_quick_check( + &live.store, + &oracle, + root, + &mut sample_rng, + QUICK_SAMPLE_SIZE, + n, + ) + .await, + ); + if violations.len() > before { + halt_ops = true; + } + } + Err(detail) => { + violations.push(InvariantViolation { + invariant: "ReopenFailed", + detail: format!("reopen after chunk {chunk_i}: {detail}"), + }); + halt_ops = true; + } + } + } + } + } + + if !halt_ops && tolerate_op_errors && harness.is_some() { + crash(); + oracle.record_crash(); + shutdown_harness(&mut harness); + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { + Ok(reopened) => harness = Some(reopened), + Err(detail) => { + violations.push(InvariantViolation { + invariant: "ReopenFailed", + detail: format!("reopen after post-run crash: {detail}"), + }); + halt_ops = true; + } + } + } + + let end_of_run_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); + if !halt_ops && let Some(live) = harness.as_ref() { + match refresh_oracle_graph(&live.store, &mut oracle, root).await { + Ok(()) => { + let before = violations.len(); + let snapshot = eventlog_snapshot(live.eventlog.as_ref()); + violations.extend( + run_invariants(&live.store, &oracle, root, snapshot, end_of_run_set).await, + ); + if violations.len() > before { + halt_ops = true; + } + } + Err(e) => { + violations.push(InvariantViolation { + invariant: "MstRootDurability", + detail: format!("refresh after final reopen: {e}"), + }); + halt_ops = true; + } + } + } + + let post_reopen_set = config.invariants.without(InvariantSet::RESTART_IDEMPOTENT); + if config.invariants.contains(InvariantSet::RESTART_IDEMPOTENT) + && !halt_ops + && let Some(live) = harness.as_ref() + { + let pre_snapshot = snapshot_block_index(&live.store); + shutdown_harness(&mut harness); + match reopen_with_recovery(&mut open, &mut crash, tolerate_op_errors) { + Ok(reopened) => { + let post_snapshot = snapshot_block_index(&reopened.store); + if let Some(detail) = diff_snapshots(&pre_snapshot, &post_snapshot) { + violations.push(InvariantViolation { + invariant: "RestartIdempotent", + detail, + }); + } else { + let snapshot = eventlog_snapshot(reopened.eventlog.as_ref()); + violations.extend( + run_invariants(&reopened.store, &oracle, root, snapshot, post_reopen_set) + .await, + ); + } + } + Err(detail) => { + violations.push(InvariantViolation { + invariant: "ReopenFailed", + detail: format!("reopen for idempotency check: {detail}"), + }); + } + } + } + + GauntletReport { + seed: config.seed, + ops_executed: OpsExecuted(ops_counter.load(Ordering::Relaxed)), + op_errors: OpErrorCount(op_errors_counter.load(Ordering::Relaxed)), + restarts: RestartCount(restarts_counter.load(Ordering::Relaxed)), + violations, + ops: OpStream::empty(), + } +}