diff --git a/crates/tranquil-store/src/bin/tranquil_gauntlet.rs b/crates/tranquil-store/src/bin/tranquil_gauntlet.rs index 75a98b0..d3bff4b 100644 --- a/crates/tranquil-store/src/bin/tranquil_gauntlet.rs +++ b/crates/tranquil-store/src/bin/tranquil_gauntlet.rs @@ -15,6 +15,7 @@ use tranquil_store::gauntlet::{ }; const MAX_HOURS: f64 = 1.0e6; +const DEFAULT_SWEEP_RUN_CAP: u64 = 10_000; /// Deterministic storage-engine gauntlet: scenario fuzzing, shrinking, regression replay. /// @@ -71,6 +72,41 @@ enum Cmd { #[arg(long, default_value_t = DEFAULT_MAX_SHRINK_ITERATIONS, conflicts_with = "no_shrink")] shrink_budget: usize, }, + /// Fan out a scenario across the cartesian product of declared axes. + /// + /// Reads a Toml config with [axes] lists (writer_concurrency, key_space, value_bytes, + /// fault_density_scale, fault_density_uniform, restart_every_n_ops, commit_batch_size, + /// max_file_size). For each combination, runs --seeds seeds and emits one NDjson record + /// per (combo, seed). + Sweep { + /// Toml config declaring scenario & axes. + #[arg(long)] + config: PathBuf, + + /// First seed in the batch range. Default 0. + #[arg(long)] + seed_start: Option, + + /// Seeds per axis combination. Default 8. Must be > 0. + #[arg(long)] + seeds: Option, + + /// Directory to dump regression Json on failure. + #[arg(long)] + dump_regressions: Option, + + /// Skip shrinking when dumping regressions. + #[arg(long)] + no_shrink: bool, + + /// Max shrink attempts per failing seed. + #[arg(long, default_value_t = DEFAULT_MAX_SHRINK_ITERATIONS, conflicts_with = "no_shrink")] + shrink_budget: usize, + + /// Hard cap on total (combinations x seeds). Default 10_000. Set 0 to disable. + #[arg(long, default_value_t = DEFAULT_SWEEP_RUN_CAP)] + max_runs: u64, + }, /// Replay a single seed or a saved regression file. /// /// With --from, replays a regression Json produced by `farm --dump-regressions`. @@ -131,6 +167,134 @@ fn load_config_file(path: &Path) -> Result { toml::from_str(&raw).map_err(|e| format!("parse {}: {e}", path.display())) } +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct SweepConfigFile { + scenario: Scenario, + #[serde(default)] + seed_start: Option, + #[serde(default)] + seeds: Option, + #[serde(default)] + dump_regressions: Option, + #[serde(default)] + base_overrides: ConfigOverrides, + #[serde(default)] + axes: SweepAxes, +} + +#[derive(Debug, Default, Deserialize)] +#[serde(deny_unknown_fields)] +struct SweepAxes { + #[serde(default)] + writer_concurrency: Vec, + #[serde(default)] + key_space: Vec, + #[serde(default)] + value_bytes: Vec, + #[serde(default)] + fault_density_scale: Vec, + #[serde(default)] + fault_density_uniform: Vec, + #[serde(default)] + restart_every_n_ops: Vec, + #[serde(default)] + commit_batch_size: Vec, + #[serde(default)] + max_file_size: Vec, +} + +#[derive(Debug, Clone, Copy, Default)] +struct SweepAxisValues { + writer_concurrency: Option, + key_space: Option, + value_bytes: Option, + fault_density_scale: Option, + fault_density_uniform: Option, + restart_every_n_ops: Option, + commit_batch_size: Option, + max_file_size: Option, +} + +impl SweepAxisValues { + fn apply_to(self, o: &mut ConfigOverrides) { + if let Some(v) = self.writer_concurrency { + o.writer_concurrency = Some(v); + } + if let Some(v) = self.key_space { + o.key_space = Some(v); + } + if let Some(v) = self.value_bytes { + o.value_bytes = Some(v); + } + if let Some(v) = self.fault_density_scale { + o.fault_density_scale = Some(v); + } + if let Some(v) = self.fault_density_uniform { + o.fault_density_uniform = Some(v); + } + if let Some(v) = self.restart_every_n_ops { + o.restart_every_n_ops = Some(v); + } + if let Some(v) = self.commit_batch_size { + o.store.group_commit.max_batch_size = Some(v); + } + if let Some(v) = self.max_file_size { + o.store.max_file_size = Some(v); + } + } +} + +impl SweepAxes { + fn axis_values(&self) -> Vec { + expand(&self.writer_concurrency) + .into_iter() + .flat_map(|wc| { + expand(&self.key_space).into_iter().flat_map(move |ks| { + expand(&self.value_bytes).into_iter().flat_map(move |vb| { + expand(&self.fault_density_scale) + .into_iter() + .flat_map(move |fds| { + expand(&self.fault_density_uniform).into_iter().flat_map( + move |fdu| { + expand(&self.restart_every_n_ops).into_iter().flat_map( + move |rc| { + expand(&self.commit_batch_size) + .into_iter() + .flat_map(move |cb| { + expand(&self.max_file_size).into_iter().map( + move |mfs| SweepAxisValues { + writer_concurrency: wc, + key_space: ks, + value_bytes: vb, + fault_density_scale: fds, + fault_density_uniform: fdu, + restart_every_n_ops: rc, + commit_batch_size: cb, + max_file_size: mfs, + }, + ) + }) + }, + ) + }, + ) + }) + }) + }) + }) + .collect() + } +} + +fn expand(values: &[T]) -> Vec> { + if values.is_empty() { + vec![None] + } else { + values.iter().copied().map(Some).collect() + } +} + #[derive(Debug, Serialize)] struct NdjsonResult { scenario: &'static str, @@ -142,6 +306,8 @@ struct NdjsonResult { violations: Vec, wall_ms: u64, ops_in_stream: usize, + #[serde(skip_serializing_if = "serde_json::Value::is_null")] + overrides: serde_json::Value, } #[derive(Debug, Serialize)] @@ -180,7 +346,20 @@ fn emit_summary(summary: &NdjsonSummary) { } } -fn emit(scenario: Scenario, report: &GauntletReport, elapsed: Duration) -> io::Result<()> { +fn overrides_json(overrides: &ConfigOverrides) -> serde_json::Value { + match serde_json::to_value(overrides) { + Ok(v) if v.as_object().map(|m| m.is_empty()).unwrap_or(true) => serde_json::Value::Null, + Ok(v) => v, + Err(_) => serde_json::Value::Null, + } +} + +fn emit( + scenario: Scenario, + report: &GauntletReport, + elapsed: Duration, + overrides: &ConfigOverrides, +) -> io::Result<()> { let result = NdjsonResult { scenario: scenario.cli_name(), seed: report.seed.0, @@ -198,6 +377,7 @@ fn emit(scenario: Scenario, report: &GauntletReport, elapsed: Duration) -> io::R .collect(), wall_ms: u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX), ops_in_stream: report.ops.len(), + overrides: overrides_json(overrides), }; let line = serde_json::to_string(&result).map_err(io::Error::other)?; let stdout = io::stdout(); @@ -206,8 +386,13 @@ fn emit(scenario: Scenario, report: &GauntletReport, elapsed: Duration) -> io::R w.flush() } -fn emit_or_log(scenario: Scenario, report: &GauntletReport, elapsed: Duration) { - if let Err(e) = emit(scenario, report, elapsed) +fn emit_or_log( + scenario: Scenario, + report: &GauntletReport, + elapsed: Duration, + overrides: &ConfigOverrides, +) { + if let Err(e) = emit(scenario, report, elapsed, overrides) && e.kind() != io::ErrorKind::BrokenPipe { eprintln!("ndjson emit failed: {e}"); @@ -353,9 +538,7 @@ fn install_interrupt(rt: &Runtime) -> Arc { return; } f.store(true, Ordering::Relaxed); - eprintln!( - "interrupt received, stopping after current batch; press Ctrl-C again to abort" - ); + eprintln!("interrupt received, stopping after current batch; press Ctrl-C again to abort"); if tokio::signal::ctrl_c().await.is_ok() { eprintln!("second interrupt, aborting"); std::process::exit(130); @@ -437,6 +620,216 @@ fn main() -> ExitCode { }; run_repro(plan, &rt) } + Cmd::Sweep { + config, + seed_start, + seeds, + dump_regressions, + no_shrink, + shrink_budget, + max_runs, + } => { + let plan = match resolve_sweep( + config, + seed_start, + seeds, + dump_regressions, + !no_shrink, + shrink_budget, + max_runs, + ) { + Ok(p) => p, + Err(e) => { + eprintln!("{e}"); + return ExitCode::from(2); + } + }; + let rt = match build_runtime() { + Ok(rt) => rt, + Err(code) => return code, + }; + let interrupt = install_interrupt(&rt); + run_sweep(plan, &rt, interrupt) + } + } +} + +struct SweepPlan { + scenario: Scenario, + seed_start: u64, + seeds: u64, + dump_regressions: Option, + shrink: bool, + shrink_budget: usize, + base_overrides: ConfigOverrides, + axes: Vec, +} + +fn resolve_sweep( + config: PathBuf, + seed_start: Option, + seeds: Option, + dump_regressions: Option, + shrink: bool, + shrink_budget: usize, + max_runs: u64, +) -> Result { + let raw = + std::fs::read_to_string(&config).map_err(|e| format!("read {}: {e}", config.display()))?; + let file: SweepConfigFile = + toml::from_str(&raw).map_err(|e| format!("parse {}: {e}", config.display()))?; + let seed_start = seed_start.or(file.seed_start).unwrap_or(0); + let seeds = seeds.or(file.seeds).unwrap_or(8); + if seeds == 0 { + return Err("--seeds must be greater than zero".to_string()); + } + if shrink && shrink_budget == 0 { + return Err("--shrink-budget must be greater than zero".to_string()); + } + let dump_regressions = dump_regressions.or(file.dump_regressions.clone()); + let axes = file.axes.axis_values(); + if axes.is_empty() { + return Err("sweep produced no combinations".to_string()); + } + if max_runs > 0 { + let combos = axes.len() as u64; + let total = combos.saturating_mul(seeds); + if total > max_runs { + return Err(format!( + "sweep would run {total} cases (combinations={combos} x seeds={seeds}), exceeds --max-runs {max_runs}; raise --max-runs or shrink axes" + )); + } + } + Ok(SweepPlan { + scenario: file.scenario, + seed_start, + seeds, + dump_regressions, + shrink, + shrink_budget, + base_overrides: file.base_overrides, + axes, + }) +} + +#[derive(Debug, Serialize)] +struct NdjsonSweepSummary { + #[serde(rename = "type")] + kind: &'static str, + scenario: &'static str, + combinations: u64, + seeds_per_combination: u64, + total_seeds: u64, + clean: u64, + failed: u64, + wall_ms: u64, + interrupted: bool, +} + +fn emit_sweep_summary(summary: &NdjsonSweepSummary) { + let line = match serde_json::to_string(summary) { + Ok(s) => s, + Err(e) => { + eprintln!("sweep summary serialize failed: {e}"); + return; + } + }; + let stdout = io::stdout(); + let mut w = stdout.lock(); + if let Err(e) = writeln!(w, "{line}").and_then(|()| w.flush()) + && e.kind() != io::ErrorKind::BrokenPipe + { + eprintln!("sweep summary emit failed: {e}"); + } +} + +fn run_sweep(plan: SweepPlan, rt: &Runtime, interrupt: Arc) -> ExitCode { + let SweepPlan { + scenario, + seed_start, + seeds, + dump_regressions, + shrink, + shrink_budget, + base_overrides, + axes, + } = plan; + let run_start = Instant::now(); + let mut any_failed = false; + let mut total_seeds: u64 = 0; + let mut total_clean: u64 = 0; + let mut total_failed: u64 = 0; + let combos = axes.len() as u64; + eprintln!( + "sweep {}: {} combinations x {} seeds = {} runs", + scenario.cli_name(), + combos, + seeds, + combos.saturating_mul(seeds), + ); + let end = match seed_start.checked_add(seeds) { + Some(e) => e, + None => { + eprintln!("seed range overflowed u64: seed_start={seed_start} seeds={seeds}"); + return ExitCode::from(2); + } + }; + for (combo_idx, axis_values) in axes.iter().enumerate() { + if interrupt.load(Ordering::Relaxed) { + break; + } + let mut overrides = base_overrides.clone(); + axis_values.apply_to(&mut overrides); + let combo_start = Instant::now(); + let overrides_for_farm = overrides.clone(); + let reports = farm::run_many_timed( + move |s| { + let mut cfg = config_for(scenario, s); + overrides_for_farm.apply_to(&mut cfg); + cfg + }, + (seed_start..end).map(Seed), + ); + let combo_wall = combo_start.elapsed(); + let combo_failed = reports.iter().filter(|(r, _)| !r.is_clean()).count(); + let combo_clean = reports.len().saturating_sub(combo_failed); + reports.iter().for_each(|(r, elapsed)| { + if !r.is_clean() { + any_failed = true; + if let Some(root) = &dump_regressions { + dump_regression(scenario, r, root, &overrides, shrink, shrink_budget, rt); + } + } + emit_or_log(scenario, r, *elapsed, &overrides); + }); + total_seeds += reports.len() as u64; + total_clean += combo_clean as u64; + total_failed += combo_failed as u64; + eprintln!( + "combo {}/{}: {} clean, {} failed, {:.1}s", + combo_idx + 1, + combos, + combo_clean, + combo_failed, + combo_wall.as_secs_f64(), + ); + } + let wall_ms = u64::try_from(run_start.elapsed().as_millis()).unwrap_or(u64::MAX); + emit_sweep_summary(&NdjsonSweepSummary { + kind: "sweep_summary", + scenario: scenario.cli_name(), + combinations: combos, + seeds_per_combination: seeds, + total_seeds, + clean: total_clean, + failed: total_failed, + wall_ms, + interrupted: interrupt.load(Ordering::Relaxed), + }); + if any_failed { + ExitCode::from(1) + } else { + ExitCode::SUCCESS } } @@ -489,10 +882,7 @@ fn run_farm(plan: FarmPlan, rt: &Runtime, interrupt: Arc) -> ExitCod let batch_wall = batch_start.elapsed(); let batch_failed = reports.iter().filter(|(r, _)| !r.is_clean()).count(); let batch_clean = reports.len().saturating_sub(batch_failed); - let batch_ops: u64 = reports - .iter() - .map(|(r, _)| r.ops_executed.0 as u64) - .sum(); + let batch_ops: u64 = reports.iter().map(|(r, _)| r.ops_executed.0 as u64).sum(); reports.iter().for_each(|(r, elapsed)| { if !r.is_clean() { any_failed = true; @@ -500,7 +890,7 @@ fn run_farm(plan: FarmPlan, rt: &Runtime, interrupt: Arc) -> ExitCod dump_regression(scenario, r, root, &overrides, shrink, shrink_budget, rt); } } - emit_or_log(scenario, r, *elapsed); + emit_or_log(scenario, r, *elapsed, &overrides); }); total_seeds += reports.len() as u64; total_clean += batch_clean as u64; @@ -624,7 +1014,7 @@ fn run_repro(plan: ReproPlan, rt: &Runtime) -> ExitCode { rt, ); } - emit_or_log(scenario, &report, elapsed); + emit_or_log(scenario, &report, elapsed, &overrides); if report.is_clean() { ExitCode::SUCCESS } else { @@ -696,7 +1086,7 @@ fn run_repro_from_record( rt, ); } - emit_or_log(scenario, &report, elapsed); + emit_or_log(scenario, &report, elapsed, &overrides); if report.is_clean() { ExitCode::SUCCESS } else { diff --git a/crates/tranquil-store/src/blockstore/compaction.rs b/crates/tranquil-store/src/blockstore/compaction.rs index 406e6d7..11af311 100644 --- a/crates/tranquil-store/src/blockstore/compaction.rs +++ b/crates/tranquil-store/src/blockstore/compaction.rs @@ -201,10 +201,8 @@ fn stream_compact( Ok::<_, CompactionError>(()) }); - let record_count = u32::try_from( - (live_count as u128).saturating_add(dead_count as u128), - ) - .unwrap_or(u32::MAX); + let record_count = + u32::try_from((live_count as u128).saturating_add(dead_count as u128)).unwrap_or(u32::MAX); let writer_position = writer.position(); let finalize_result = scan_result .and_then(|()| writer.sync().map_err(CompactionError::from)) @@ -219,7 +217,12 @@ fn stream_compact( .map_err(CompactionError::from) }) .and_then(|()| hint_writer.sync().map_err(CompactionError::from)) - .and_then(|()| manager.io().sync_dir(manager.data_dir()).map_err(CompactionError::from)); + .and_then(|()| { + manager + .io() + .sync_dir(manager.data_dir()) + .map_err(CompactionError::from) + }); let _ = manager.io().close(hint_fd); diff --git a/crates/tranquil-store/src/blockstore/group_commit.rs b/crates/tranquil-store/src/blockstore/group_commit.rs index 8967202..5027b4b 100644 --- a/crates/tranquil-store/src/blockstore/group_commit.rs +++ b/crates/tranquil-store/src/blockstore/group_commit.rs @@ -1087,9 +1087,8 @@ fn verify_persisted_blocks( entries: &[([u8; CID_SIZE], BlockLocation)], ) -> Result<(), CommitError> { use std::collections::BTreeMap; - let by_file: BTreeMap> = entries - .iter() - .fold(BTreeMap::new(), |mut acc, (cid, loc)| { + let by_file: BTreeMap> = + entries.iter().fold(BTreeMap::new(), |mut acc, (cid, loc)| { acc.entry(loc.file_id).or_default().push((cid, *loc)); acc }); diff --git a/crates/tranquil-store/src/blockstore/manager.rs b/crates/tranquil-store/src/blockstore/manager.rs index f6434d2..1f29aa7 100644 --- a/crates/tranquil-store/src/blockstore/manager.rs +++ b/crates/tranquil-store/src/blockstore/manager.rs @@ -223,10 +223,7 @@ mod tests { assert_eq!(mgr.io().file_size(next_handle.fd()).unwrap(), 0); mgr.io().sync_dir(mgr.data_dir()).unwrap(); mgr.commit_rotation(next_id, &next_handle); - assert_eq!( - mgr.open_for_read(next_id).unwrap().fd(), - next_handle.fd() - ); + assert_eq!(mgr.open_for_read(next_id).unwrap().fd(), next_handle.fd()); } #[test] @@ -236,10 +233,7 @@ mod tests { let (next_id, next_handle) = mgr.prepare_rotation(DataFileId::new(0)).unwrap(); mgr.commit_rotation(next_id, &next_handle); - assert_eq!( - mgr.open_for_read(next_id).unwrap().fd(), - next_handle.fd() - ); + assert_eq!(mgr.open_for_read(next_id).unwrap().fd(), next_handle.fd()); drop(next_handle); mgr.rollback_rotation(next_id); diff --git a/crates/tranquil-store/src/blockstore/store.rs b/crates/tranquil-store/src/blockstore/store.rs index b44fd15..2859644 100644 --- a/crates/tranquil-store/src/blockstore/store.rs +++ b/crates/tranquil-store/src/blockstore/store.rs @@ -49,10 +49,7 @@ fn commit_error_to_repo(e: CommitError) -> RepoError { )), CommitError::VerifyFailed { file_id, offset } => RepoError::storage(io::Error::new( io::ErrorKind::InvalidData, - format!( - "post-sync verify failed at {file_id}:{}", - offset.raw() - ), + format!("post-sync verify failed at {file_id}:{}", offset.raw()), )), } } diff --git a/crates/tranquil-store/src/eventlog/manager.rs b/crates/tranquil-store/src/eventlog/manager.rs index 1666173..d347b66 100644 --- a/crates/tranquil-store/src/eventlog/manager.rs +++ b/crates/tranquil-store/src/eventlog/manager.rs @@ -418,10 +418,7 @@ mod tests { assert_eq!(next_id, SegmentId::new(2)); assert_eq!(mgr.io().file_size(next_handle.fd()).unwrap(), 0); mgr.commit_rotation(next_id, &next_handle); - assert_eq!( - mgr.open_for_read(next_id).unwrap().fd(), - next_handle.fd() - ); + assert_eq!(mgr.open_for_read(next_id).unwrap().fd(), next_handle.fd()); } #[test] @@ -431,10 +428,7 @@ mod tests { let (next_id, next_handle) = mgr.prepare_rotation(SegmentId::new(1)).unwrap(); mgr.commit_rotation(next_id, &next_handle); - assert_eq!( - mgr.open_for_read(next_id).unwrap().fd(), - next_handle.fd() - ); + assert_eq!(mgr.open_for_read(next_id).unwrap().fd(), next_handle.fd()); drop(next_handle); mgr.rollback_rotation(next_id); diff --git a/crates/tranquil-store/src/eventlog/writer.rs b/crates/tranquil-store/src/eventlog/writer.rs index b08df0a..0476915 100644 --- a/crates/tranquil-store/src/eventlog/writer.rs +++ b/crates/tranquil-store/src/eventlog/writer.rs @@ -68,7 +68,8 @@ impl EventLogWriter { ) -> io::Result { let handle = manager.open_for_append(segment_id)?; manager.io().truncate(handle.fd(), 0)?; - let writer = SegmentWriter::new(manager.io(), handle.fd(), segment_id, next_seq, max_payload)?; + let writer = + SegmentWriter::new(manager.io(), handle.fd(), segment_id, next_seq, max_payload)?; writer.sync(manager.io())?; manager.io().sync_dir(manager.segments_dir())?; @@ -400,8 +401,12 @@ fn find_last_seq_from_segments( Err(e) if e.kind() != io::ErrorKind::InvalidData => Err(e), _ => { let handle = manager.open_for_read(seg_id)?; - let (_, last_seq) = - rebuild_from_segment(manager.io(), handle.fd(), DEFAULT_INDEX_INTERVAL, max_payload)?; + let (_, last_seq) = rebuild_from_segment( + manager.io(), + handle.fd(), + DEFAULT_INDEX_INTERVAL, + max_payload, + )?; Ok(last_seq) } } diff --git a/crates/tranquil-store/src/gauntlet/overrides.rs b/crates/tranquil-store/src/gauntlet/overrides.rs index 97bd118..3cbdc7d 100644 --- a/crates/tranquil-store/src/gauntlet/overrides.rs +++ b/crates/tranquil-store/src/gauntlet/overrides.rs @@ -1,15 +1,31 @@ use serde::{Deserialize, Serialize}; -use super::runner::{GauntletConfig, MaxFileSize, RunLimits, ShardCount, WallMs}; -use super::workload::OpCount; +use super::runner::{ + GauntletConfig, IoBackend, MaxFileSize, OpInterval, RestartPolicy, RunLimits, ShardCount, + WallMs, WriterConcurrency, +}; +use super::workload::{KeySpaceSize, OpCount, SizeDistribution, ValueBytes}; +use crate::sim::FaultConfig; -#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] pub struct ConfigOverrides { #[serde(default, skip_serializing_if = "Option::is_none")] pub op_count: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub max_wall_ms: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub writer_concurrency: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub key_space: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub value_bytes: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub fault_density_scale: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub fault_density_uniform: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub restart_every_n_ops: Option, #[serde(default, skip_serializing_if = "StoreOverrides::is_empty")] pub store: StoreOverrides, } @@ -66,6 +82,34 @@ impl ConfigOverrides { max_wall_ms: Some(WallMs(ms)), }; } + if let Some(n) = self.writer_concurrency { + cfg.writer_concurrency = WriterConcurrency(n.max(1)); + } + if let Some(n) = self.key_space { + cfg.workload.key_space = KeySpaceSize(n.max(1)); + } + if let Some(n) = self.value_bytes { + cfg.workload.size_distribution = SizeDistribution::Fixed(ValueBytes(n)); + } + if let Some(m) = self.fault_density_scale + && let IoBackend::Simulated { fault } = cfg.io + { + cfg.io = IoBackend::Simulated { + fault: fault.scale_probabilities(m), + }; + } + if let Some(d) = self.fault_density_uniform { + cfg.io = IoBackend::Simulated { + fault: FaultConfig::uniform_density(d.clamp(0.0, 1.0)), + }; + } + if let Some(n) = self.restart_every_n_ops { + cfg.restart_policy = if n == 0 { + RestartPolicy::Never + } else { + RestartPolicy::EveryNOps(OpInterval(n)) + }; + } if let Some(n) = self.store.max_file_size { cfg.store.max_file_size = MaxFileSize(n); } @@ -106,6 +150,12 @@ mod tests { fn round_trip_preserves_set_fields() { let o = ConfigOverrides { op_count: Some(42), + writer_concurrency: Some(16), + key_space: Some(1_000_000), + value_bytes: Some(4096), + fault_density_scale: Some(1e-3), + fault_density_uniform: Some(5e-4), + restart_every_n_ops: Some(10_000), store: StoreOverrides { max_file_size: Some(4096), group_commit: GroupCommitOverrides { @@ -120,4 +170,78 @@ mod tests { let back: ConfigOverrides = serde_json::from_str(&json).unwrap(); assert_eq!(o, back); } + + #[test] + fn fault_density_scale_scales_moderate() { + use crate::gauntlet::op::Seed; + use crate::gauntlet::scenarios::{Scenario, config_for}; + let mut cfg = config_for(Scenario::ModerateFaults, Seed(1)); + let o = ConfigOverrides { + fault_density_scale: Some(0.1), + ..ConfigOverrides::default() + }; + o.apply_to(&mut cfg); + match cfg.io { + IoBackend::Simulated { fault } => { + assert!(fault.torn_page_probability.raw() < 0.02); + assert!(fault.torn_page_probability.raw() > 0.0); + } + _ => panic!("expected simulated io"), + } + } + + #[test] + fn fault_density_scale_zero_kills_probabilities() { + use crate::gauntlet::op::Seed; + use crate::gauntlet::scenarios::{Scenario, config_for}; + let mut cfg = config_for(Scenario::ModerateFaults, Seed(1)); + let o = ConfigOverrides { + fault_density_scale: Some(0.0), + ..ConfigOverrides::default() + }; + o.apply_to(&mut cfg); + match cfg.io { + IoBackend::Simulated { fault } => { + assert_eq!(fault.partial_write_probability.raw(), 0.0); + assert_eq!(fault.torn_page_probability.raw(), 0.0); + assert_eq!(fault.io_error_probability.raw(), 0.0); + assert_eq!(fault.sync_failure_probability.raw(), 0.0); + } + _ => panic!("expected simulated io"), + } + } + + #[test] + fn fault_density_scale_is_noop_on_real_backend() { + use crate::gauntlet::op::Seed; + use crate::gauntlet::scenarios::{Scenario, config_for}; + let mut cfg = config_for(Scenario::SmokePR, Seed(1)); + assert!(matches!(cfg.io, IoBackend::Real)); + let o = ConfigOverrides { + fault_density_scale: Some(0.5), + ..ConfigOverrides::default() + }; + o.apply_to(&mut cfg); + assert!(matches!(cfg.io, IoBackend::Real)); + } + + #[test] + fn fault_density_uniform_forces_simulated_backend() { + use crate::gauntlet::op::Seed; + use crate::gauntlet::scenarios::{Scenario, config_for}; + let mut cfg = config_for(Scenario::SmokePR, Seed(1)); + assert!(matches!(cfg.io, IoBackend::Real)); + let o = ConfigOverrides { + fault_density_uniform: Some(0.25), + ..ConfigOverrides::default() + }; + o.apply_to(&mut cfg); + match cfg.io { + IoBackend::Simulated { fault } => { + assert_eq!(fault.torn_page_probability.raw(), 0.25); + assert_eq!(fault.io_error_probability.raw(), 0.25); + } + _ => panic!("expected simulated io"), + } + } } diff --git a/crates/tranquil-store/src/gauntlet/runner.rs b/crates/tranquil-store/src/gauntlet/runner.rs index 3f1400b..7ee61e2 100644 --- a/crates/tranquil-store/src/gauntlet/runner.rs +++ b/crates/tranquil-store/src/gauntlet/runner.rs @@ -595,7 +595,8 @@ fn eventlog_snapshot( let mut segment_last_ts: Vec<(SegmentId, u64)> = Vec::new(); segments.iter().for_each(|&id| { let per_segment: Vec = match s.manager.open_for_read(id) { - Ok(handle) => match SegmentReader::open(s.manager.io(), handle.fd(), MAX_EVENT_PAYLOAD) { + Ok(handle) => match SegmentReader::open(s.manager.io(), handle.fd(), MAX_EVENT_PAYLOAD) + { Ok(reader) => reader.valid_prefix().unwrap_or_default(), Err(_) => Vec::new(), }, @@ -991,8 +992,7 @@ async fn apply_op( if !oracle.contains_record(collection, rkey) { return Ok(()); } - let new_root = - delete_record_atomic(&harness.store, old_root, collection, rkey).await?; + let new_root = delete_record_atomic(&harness.store, old_root, collection, rkey).await?; oracle.delete(collection, rkey); *root = Some(new_root); Ok(()) @@ -1288,8 +1288,7 @@ async fn apply_op_concurrent( if !state.oracle.contains_record(collection, rkey) { return Ok(()); } - let new_root = - delete_record_atomic(&shared.store, old_root, collection, rkey).await?; + let new_root = delete_record_atomic(&shared.store, old_root, collection, rkey).await?; state.oracle.delete(collection, rkey); state.root = Some(new_root); Ok(()) diff --git a/crates/tranquil-store/src/gauntlet/scenarios.rs b/crates/tranquil-store/src/gauntlet/scenarios.rs index 59cb743..a0bcae7 100644 --- a/crates/tranquil-store/src/gauntlet/scenarios.rs +++ b/crates/tranquil-store/src/gauntlet/scenarios.rs @@ -99,7 +99,9 @@ impl Scenario { "Eventlog-heavy workload with FSYNC_ORDERING / MONOTONIC_SEQ / TOMBSTONE_BOUND invariants." } Self::ContendedReaders => "60% reads, 64 writer tasks, simulated moderate faults.", - Self::ContendedWriters => "Add/delete heavy, 32 writer tasks, simulated moderate faults.", + Self::ContendedWriters => { + "Add/delete heavy, 32 writer tasks, simulated moderate faults." + } } } diff --git a/crates/tranquil-store/src/sim.rs b/crates/tranquil-store/src/sim.rs index 49e830d..f467366 100644 --- a/crates/tranquil-store/src/sim.rs +++ b/crates/tranquil-store/src/sim.rs @@ -129,6 +129,44 @@ impl FaultConfig { || self.delayed_io_error_probability.is_nonzero() || self.sync_reorder_window.0 > 0 } + + pub fn scale_probabilities(self, factor: f64) -> Self { + let scale = |p: Probability| Probability::new((p.raw() * factor).clamp(0.0, 1.0)); + Self { + partial_write_probability: scale(self.partial_write_probability), + bit_flip_on_read_probability: scale(self.bit_flip_on_read_probability), + sync_failure_probability: scale(self.sync_failure_probability), + dir_sync_failure_probability: scale(self.dir_sync_failure_probability), + misdirected_write_probability: scale(self.misdirected_write_probability), + io_error_probability: scale(self.io_error_probability), + torn_page_probability: scale(self.torn_page_probability), + misdirected_read_probability: scale(self.misdirected_read_probability), + delayed_io_error_probability: scale(self.delayed_io_error_probability), + sync_reorder_window: self.sync_reorder_window, + latency_distribution_ns: self.latency_distribution_ns, + } + } + + pub fn uniform_density(density: f64) -> Self { + assert!( + density.is_finite() && (0.0..=1.0).contains(&density), + "fault density out of range: {density}" + ); + let p = Probability::new(density); + Self { + partial_write_probability: p, + bit_flip_on_read_probability: p, + sync_failure_probability: p, + dir_sync_failure_probability: p, + misdirected_write_probability: p, + io_error_probability: p, + torn_page_probability: p, + misdirected_read_probability: p, + delayed_io_error_probability: p, + sync_reorder_window: SyncReorderWindow(0), + latency_distribution_ns: LatencyNs(0), + } + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] diff --git a/crates/tranquil-store/tests/eventlog_manager_race.rs b/crates/tranquil-store/tests/eventlog_manager_race.rs index 5d58261..ff40985 100644 --- a/crates/tranquil-store/tests/eventlog_manager_race.rs +++ b/crates/tranquil-store/tests/eventlog_manager_race.rs @@ -10,15 +10,18 @@ fn concurrent_reader_survives_evict_on_segment_delete() { let sim: Arc = Arc::new(SimulatedIO::pristine(0x1eed7a11)); let segments_dir = PathBuf::from("/segments"); - let manager = Arc::new( - SegmentManager::new(Arc::clone(&sim), segments_dir.clone(), 1 << 20).unwrap(), - ); + let manager = + Arc::new(SegmentManager::new(Arc::clone(&sim), segments_dir.clone(), 1 << 20).unwrap()); let seg_id = SegmentId::new(1); let write_handle = manager.open_for_append(seg_id).unwrap(); - sim.write_at(write_handle.fd(), 0, b"arbitrary seed bytes for the segment") - .unwrap(); + sim.write_at( + write_handle.fd(), + 0, + b"arbitrary seed bytes for the segment", + ) + .unwrap(); sim.sync(write_handle.fd()).unwrap(); sim.sync_dir(&segments_dir).unwrap(); drop(write_handle); diff --git a/crates/tranquil-store/tests/fd_lifecycle.rs b/crates/tranquil-store/tests/fd_lifecycle.rs index 5d36a5c..0673888 100644 --- a/crates/tranquil-store/tests/fd_lifecycle.rs +++ b/crates/tranquil-store/tests/fd_lifecycle.rs @@ -40,7 +40,9 @@ fn config_for(dir: &Path, max_file_size: u64) -> BlockStoreConfig { fn tiny_block(seed: u64) -> Vec { let bytes = seed.to_le_bytes(); - (0..64).map(|i| bytes[i % 8] ^ (i as u8).wrapping_mul(31)).collect() + (0..64) + .map(|i| bytes[i % 8] ^ (i as u8).wrapping_mul(31)) + .collect() } #[tokio::test] diff --git a/crates/tranquil-store/tests/rotation_robustness.rs b/crates/tranquil-store/tests/rotation_robustness.rs index f60b1ed..97eae32 100644 --- a/crates/tranquil-store/tests/rotation_robustness.rs +++ b/crates/tranquil-store/tests/rotation_robustness.rs @@ -3,8 +3,8 @@ mod common; use std::collections::HashMap; use std::io; use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; use tranquil_store::blockstore::{ BlockStoreConfig, DataFileId, DataFileManager, DataFileWriter, GroupCommitConfig, @@ -59,7 +59,10 @@ impl FailingIO { impl StorageIO for FailingIO { fn open(&self, path: &Path, opts: OpenOptions) -> io::Result { let fd = self.inner.open(path, opts)?; - self.fd_to_path.lock().unwrap().insert(fd, path.to_path_buf()); + self.fd_to_path + .lock() + .unwrap() + .insert(fd, path.to_path_buf()); Ok(fd) } diff --git a/crates/tranquil-store/tests/sim_blockstore.rs b/crates/tranquil-store/tests/sim_blockstore.rs index d35e72e..70ed787 100644 --- a/crates/tranquil-store/tests/sim_blockstore.rs +++ b/crates/tranquil-store/tests/sim_blockstore.rs @@ -657,15 +657,13 @@ fn sim_multi_file_rotation_crash_recovery() { }; let block_count = ((seed % 25) + 10) as u32; - let all_cids: Vec = - (0..block_count).map(test_cid).collect(); + let all_cids: Vec = (0..block_count).map(test_cid).collect(); { let store = TranquilBlockStore::open(config.clone()).unwrap(); - (0..block_count).try_for_each(|i| { - store.put_blocks_blocking(vec![(test_cid(i), block_data(i))]) - }) - .unwrap(); + (0..block_count) + .try_for_each(|i| store.put_blocks_blocking(vec![(test_cid(i), block_data(i))])) + .unwrap(); let files = store.list_data_files().unwrap(); assert!( diff --git a/crates/tranquil-store/tests/sim_eventlog.rs b/crates/tranquil-store/tests/sim_eventlog.rs index 51e771b..73dd94e 100644 --- a/crates/tranquil-store/tests/sim_eventlog.rs +++ b/crates/tranquil-store/tests/sim_eventlog.rs @@ -39,7 +39,8 @@ fn read_all_events(mgr: &SegmentManager, seed: u64) -> Vec