feat(tranquil-store): sweep subcommand with axis override fan-out

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-04-22 13:08:50 +03:00
parent 00c9eb732f
commit 4fe01cff72
17 changed files with 643 additions and 75 deletions

View File

@@ -15,6 +15,7 @@ use tranquil_store::gauntlet::{
};
const MAX_HOURS: f64 = 1.0e6;
const DEFAULT_SWEEP_RUN_CAP: u64 = 10_000;
/// Deterministic storage-engine gauntlet: scenario fuzzing, shrinking, regression replay.
///
@@ -71,6 +72,41 @@ enum Cmd {
#[arg(long, default_value_t = DEFAULT_MAX_SHRINK_ITERATIONS, conflicts_with = "no_shrink")]
shrink_budget: usize,
},
/// Fan out a scenario across the cartesian product of declared axes.
///
/// Reads a Toml config with [axes] lists (writer_concurrency, key_space, value_bytes,
/// fault_density_scale, fault_density_uniform, restart_every_n_ops, commit_batch_size,
/// max_file_size). For each combination, runs --seeds seeds and emits one NDjson record
/// per (combo, seed).
Sweep {
/// Toml config declaring scenario & axes.
#[arg(long)]
config: PathBuf,
/// First seed in the batch range. Default 0.
#[arg(long)]
seed_start: Option<u64>,
/// Seeds per axis combination. Default 8. Must be > 0.
#[arg(long)]
seeds: Option<u64>,
/// Directory to dump regression Json on failure.
#[arg(long)]
dump_regressions: Option<PathBuf>,
/// Skip shrinking when dumping regressions.
#[arg(long)]
no_shrink: bool,
/// Max shrink attempts per failing seed.
#[arg(long, default_value_t = DEFAULT_MAX_SHRINK_ITERATIONS, conflicts_with = "no_shrink")]
shrink_budget: usize,
/// Hard cap on total (combinations x seeds). Default 10_000. Set 0 to disable.
#[arg(long, default_value_t = DEFAULT_SWEEP_RUN_CAP)]
max_runs: u64,
},
/// Replay a single seed or a saved regression file.
///
/// With --from, replays a regression Json produced by `farm --dump-regressions`.
@@ -131,6 +167,134 @@ fn load_config_file(path: &Path) -> Result<ConfigFile, String> {
toml::from_str(&raw).map_err(|e| format!("parse {}: {e}", path.display()))
}
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct SweepConfigFile {
scenario: Scenario,
#[serde(default)]
seed_start: Option<u64>,
#[serde(default)]
seeds: Option<u64>,
#[serde(default)]
dump_regressions: Option<PathBuf>,
#[serde(default)]
base_overrides: ConfigOverrides,
#[serde(default)]
axes: SweepAxes,
}
#[derive(Debug, Default, Deserialize)]
#[serde(deny_unknown_fields)]
struct SweepAxes {
#[serde(default)]
writer_concurrency: Vec<usize>,
#[serde(default)]
key_space: Vec<u32>,
#[serde(default)]
value_bytes: Vec<u32>,
#[serde(default)]
fault_density_scale: Vec<f64>,
#[serde(default)]
fault_density_uniform: Vec<f64>,
#[serde(default)]
restart_every_n_ops: Vec<usize>,
#[serde(default)]
commit_batch_size: Vec<usize>,
#[serde(default)]
max_file_size: Vec<u64>,
}
#[derive(Debug, Clone, Copy, Default)]
struct SweepAxisValues {
writer_concurrency: Option<usize>,
key_space: Option<u32>,
value_bytes: Option<u32>,
fault_density_scale: Option<f64>,
fault_density_uniform: Option<f64>,
restart_every_n_ops: Option<usize>,
commit_batch_size: Option<usize>,
max_file_size: Option<u64>,
}
impl SweepAxisValues {
fn apply_to(self, o: &mut ConfigOverrides) {
if let Some(v) = self.writer_concurrency {
o.writer_concurrency = Some(v);
}
if let Some(v) = self.key_space {
o.key_space = Some(v);
}
if let Some(v) = self.value_bytes {
o.value_bytes = Some(v);
}
if let Some(v) = self.fault_density_scale {
o.fault_density_scale = Some(v);
}
if let Some(v) = self.fault_density_uniform {
o.fault_density_uniform = Some(v);
}
if let Some(v) = self.restart_every_n_ops {
o.restart_every_n_ops = Some(v);
}
if let Some(v) = self.commit_batch_size {
o.store.group_commit.max_batch_size = Some(v);
}
if let Some(v) = self.max_file_size {
o.store.max_file_size = Some(v);
}
}
}
impl SweepAxes {
fn axis_values(&self) -> Vec<SweepAxisValues> {
expand(&self.writer_concurrency)
.into_iter()
.flat_map(|wc| {
expand(&self.key_space).into_iter().flat_map(move |ks| {
expand(&self.value_bytes).into_iter().flat_map(move |vb| {
expand(&self.fault_density_scale)
.into_iter()
.flat_map(move |fds| {
expand(&self.fault_density_uniform).into_iter().flat_map(
move |fdu| {
expand(&self.restart_every_n_ops).into_iter().flat_map(
move |rc| {
expand(&self.commit_batch_size)
.into_iter()
.flat_map(move |cb| {
expand(&self.max_file_size).into_iter().map(
move |mfs| SweepAxisValues {
writer_concurrency: wc,
key_space: ks,
value_bytes: vb,
fault_density_scale: fds,
fault_density_uniform: fdu,
restart_every_n_ops: rc,
commit_batch_size: cb,
max_file_size: mfs,
},
)
})
},
)
},
)
})
})
})
})
.collect()
}
}
fn expand<T: Copy>(values: &[T]) -> Vec<Option<T>> {
if values.is_empty() {
vec![None]
} else {
values.iter().copied().map(Some).collect()
}
}
#[derive(Debug, Serialize)]
struct NdjsonResult {
scenario: &'static str,
@@ -142,6 +306,8 @@ struct NdjsonResult {
violations: Vec<NdjsonViolation>,
wall_ms: u64,
ops_in_stream: usize,
#[serde(skip_serializing_if = "serde_json::Value::is_null")]
overrides: serde_json::Value,
}
#[derive(Debug, Serialize)]
@@ -180,7 +346,20 @@ fn emit_summary(summary: &NdjsonSummary) {
}
}
fn emit(scenario: Scenario, report: &GauntletReport, elapsed: Duration) -> io::Result<()> {
fn overrides_json(overrides: &ConfigOverrides) -> serde_json::Value {
match serde_json::to_value(overrides) {
Ok(v) if v.as_object().map(|m| m.is_empty()).unwrap_or(true) => serde_json::Value::Null,
Ok(v) => v,
Err(_) => serde_json::Value::Null,
}
}
fn emit(
scenario: Scenario,
report: &GauntletReport,
elapsed: Duration,
overrides: &ConfigOverrides,
) -> io::Result<()> {
let result = NdjsonResult {
scenario: scenario.cli_name(),
seed: report.seed.0,
@@ -198,6 +377,7 @@ fn emit(scenario: Scenario, report: &GauntletReport, elapsed: Duration) -> io::R
.collect(),
wall_ms: u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX),
ops_in_stream: report.ops.len(),
overrides: overrides_json(overrides),
};
let line = serde_json::to_string(&result).map_err(io::Error::other)?;
let stdout = io::stdout();
@@ -206,8 +386,13 @@ fn emit(scenario: Scenario, report: &GauntletReport, elapsed: Duration) -> io::R
w.flush()
}
fn emit_or_log(scenario: Scenario, report: &GauntletReport, elapsed: Duration) {
if let Err(e) = emit(scenario, report, elapsed)
fn emit_or_log(
scenario: Scenario,
report: &GauntletReport,
elapsed: Duration,
overrides: &ConfigOverrides,
) {
if let Err(e) = emit(scenario, report, elapsed, overrides)
&& e.kind() != io::ErrorKind::BrokenPipe
{
eprintln!("ndjson emit failed: {e}");
@@ -353,9 +538,7 @@ fn install_interrupt(rt: &Runtime) -> Arc<AtomicBool> {
return;
}
f.store(true, Ordering::Relaxed);
eprintln!(
"interrupt received, stopping after current batch; press Ctrl-C again to abort"
);
eprintln!("interrupt received, stopping after current batch; press Ctrl-C again to abort");
if tokio::signal::ctrl_c().await.is_ok() {
eprintln!("second interrupt, aborting");
std::process::exit(130);
@@ -437,6 +620,216 @@ fn main() -> ExitCode {
};
run_repro(plan, &rt)
}
Cmd::Sweep {
config,
seed_start,
seeds,
dump_regressions,
no_shrink,
shrink_budget,
max_runs,
} => {
let plan = match resolve_sweep(
config,
seed_start,
seeds,
dump_regressions,
!no_shrink,
shrink_budget,
max_runs,
) {
Ok(p) => p,
Err(e) => {
eprintln!("{e}");
return ExitCode::from(2);
}
};
let rt = match build_runtime() {
Ok(rt) => rt,
Err(code) => return code,
};
let interrupt = install_interrupt(&rt);
run_sweep(plan, &rt, interrupt)
}
}
}
struct SweepPlan {
scenario: Scenario,
seed_start: u64,
seeds: u64,
dump_regressions: Option<PathBuf>,
shrink: bool,
shrink_budget: usize,
base_overrides: ConfigOverrides,
axes: Vec<SweepAxisValues>,
}
fn resolve_sweep(
config: PathBuf,
seed_start: Option<u64>,
seeds: Option<u64>,
dump_regressions: Option<PathBuf>,
shrink: bool,
shrink_budget: usize,
max_runs: u64,
) -> Result<SweepPlan, String> {
let raw =
std::fs::read_to_string(&config).map_err(|e| format!("read {}: {e}", config.display()))?;
let file: SweepConfigFile =
toml::from_str(&raw).map_err(|e| format!("parse {}: {e}", config.display()))?;
let seed_start = seed_start.or(file.seed_start).unwrap_or(0);
let seeds = seeds.or(file.seeds).unwrap_or(8);
if seeds == 0 {
return Err("--seeds must be greater than zero".to_string());
}
if shrink && shrink_budget == 0 {
return Err("--shrink-budget must be greater than zero".to_string());
}
let dump_regressions = dump_regressions.or(file.dump_regressions.clone());
let axes = file.axes.axis_values();
if axes.is_empty() {
return Err("sweep produced no combinations".to_string());
}
if max_runs > 0 {
let combos = axes.len() as u64;
let total = combos.saturating_mul(seeds);
if total > max_runs {
return Err(format!(
"sweep would run {total} cases (combinations={combos} x seeds={seeds}), exceeds --max-runs {max_runs}; raise --max-runs or shrink axes"
));
}
}
Ok(SweepPlan {
scenario: file.scenario,
seed_start,
seeds,
dump_regressions,
shrink,
shrink_budget,
base_overrides: file.base_overrides,
axes,
})
}
#[derive(Debug, Serialize)]
struct NdjsonSweepSummary {
#[serde(rename = "type")]
kind: &'static str,
scenario: &'static str,
combinations: u64,
seeds_per_combination: u64,
total_seeds: u64,
clean: u64,
failed: u64,
wall_ms: u64,
interrupted: bool,
}
fn emit_sweep_summary(summary: &NdjsonSweepSummary) {
let line = match serde_json::to_string(summary) {
Ok(s) => s,
Err(e) => {
eprintln!("sweep summary serialize failed: {e}");
return;
}
};
let stdout = io::stdout();
let mut w = stdout.lock();
if let Err(e) = writeln!(w, "{line}").and_then(|()| w.flush())
&& e.kind() != io::ErrorKind::BrokenPipe
{
eprintln!("sweep summary emit failed: {e}");
}
}
fn run_sweep(plan: SweepPlan, rt: &Runtime, interrupt: Arc<AtomicBool>) -> ExitCode {
let SweepPlan {
scenario,
seed_start,
seeds,
dump_regressions,
shrink,
shrink_budget,
base_overrides,
axes,
} = plan;
let run_start = Instant::now();
let mut any_failed = false;
let mut total_seeds: u64 = 0;
let mut total_clean: u64 = 0;
let mut total_failed: u64 = 0;
let combos = axes.len() as u64;
eprintln!(
"sweep {}: {} combinations x {} seeds = {} runs",
scenario.cli_name(),
combos,
seeds,
combos.saturating_mul(seeds),
);
let end = match seed_start.checked_add(seeds) {
Some(e) => e,
None => {
eprintln!("seed range overflowed u64: seed_start={seed_start} seeds={seeds}");
return ExitCode::from(2);
}
};
for (combo_idx, axis_values) in axes.iter().enumerate() {
if interrupt.load(Ordering::Relaxed) {
break;
}
let mut overrides = base_overrides.clone();
axis_values.apply_to(&mut overrides);
let combo_start = Instant::now();
let overrides_for_farm = overrides.clone();
let reports = farm::run_many_timed(
move |s| {
let mut cfg = config_for(scenario, s);
overrides_for_farm.apply_to(&mut cfg);
cfg
},
(seed_start..end).map(Seed),
);
let combo_wall = combo_start.elapsed();
let combo_failed = reports.iter().filter(|(r, _)| !r.is_clean()).count();
let combo_clean = reports.len().saturating_sub(combo_failed);
reports.iter().for_each(|(r, elapsed)| {
if !r.is_clean() {
any_failed = true;
if let Some(root) = &dump_regressions {
dump_regression(scenario, r, root, &overrides, shrink, shrink_budget, rt);
}
}
emit_or_log(scenario, r, *elapsed, &overrides);
});
total_seeds += reports.len() as u64;
total_clean += combo_clean as u64;
total_failed += combo_failed as u64;
eprintln!(
"combo {}/{}: {} clean, {} failed, {:.1}s",
combo_idx + 1,
combos,
combo_clean,
combo_failed,
combo_wall.as_secs_f64(),
);
}
let wall_ms = u64::try_from(run_start.elapsed().as_millis()).unwrap_or(u64::MAX);
emit_sweep_summary(&NdjsonSweepSummary {
kind: "sweep_summary",
scenario: scenario.cli_name(),
combinations: combos,
seeds_per_combination: seeds,
total_seeds,
clean: total_clean,
failed: total_failed,
wall_ms,
interrupted: interrupt.load(Ordering::Relaxed),
});
if any_failed {
ExitCode::from(1)
} else {
ExitCode::SUCCESS
}
}
@@ -489,10 +882,7 @@ fn run_farm(plan: FarmPlan, rt: &Runtime, interrupt: Arc<AtomicBool>) -> ExitCod
let batch_wall = batch_start.elapsed();
let batch_failed = reports.iter().filter(|(r, _)| !r.is_clean()).count();
let batch_clean = reports.len().saturating_sub(batch_failed);
let batch_ops: u64 = reports
.iter()
.map(|(r, _)| r.ops_executed.0 as u64)
.sum();
let batch_ops: u64 = reports.iter().map(|(r, _)| r.ops_executed.0 as u64).sum();
reports.iter().for_each(|(r, elapsed)| {
if !r.is_clean() {
any_failed = true;
@@ -500,7 +890,7 @@ fn run_farm(plan: FarmPlan, rt: &Runtime, interrupt: Arc<AtomicBool>) -> ExitCod
dump_regression(scenario, r, root, &overrides, shrink, shrink_budget, rt);
}
}
emit_or_log(scenario, r, *elapsed);
emit_or_log(scenario, r, *elapsed, &overrides);
});
total_seeds += reports.len() as u64;
total_clean += batch_clean as u64;
@@ -624,7 +1014,7 @@ fn run_repro(plan: ReproPlan, rt: &Runtime) -> ExitCode {
rt,
);
}
emit_or_log(scenario, &report, elapsed);
emit_or_log(scenario, &report, elapsed, &overrides);
if report.is_clean() {
ExitCode::SUCCESS
} else {
@@ -696,7 +1086,7 @@ fn run_repro_from_record(
rt,
);
}
emit_or_log(scenario, &report, elapsed);
emit_or_log(scenario, &report, elapsed, &overrides);
if report.is_clean() {
ExitCode::SUCCESS
} else {

View File

@@ -201,10 +201,8 @@ fn stream_compact<S: StorageIO>(
Ok::<_, CompactionError>(())
});
let record_count = u32::try_from(
(live_count as u128).saturating_add(dead_count as u128),
)
.unwrap_or(u32::MAX);
let record_count =
u32::try_from((live_count as u128).saturating_add(dead_count as u128)).unwrap_or(u32::MAX);
let writer_position = writer.position();
let finalize_result = scan_result
.and_then(|()| writer.sync().map_err(CompactionError::from))
@@ -219,7 +217,12 @@ fn stream_compact<S: StorageIO>(
.map_err(CompactionError::from)
})
.and_then(|()| hint_writer.sync().map_err(CompactionError::from))
.and_then(|()| manager.io().sync_dir(manager.data_dir()).map_err(CompactionError::from));
.and_then(|()| {
manager
.io()
.sync_dir(manager.data_dir())
.map_err(CompactionError::from)
});
let _ = manager.io().close(hint_fd);

View File

@@ -1087,9 +1087,8 @@ fn verify_persisted_blocks<S: StorageIO>(
entries: &[([u8; CID_SIZE], BlockLocation)],
) -> Result<(), CommitError> {
use std::collections::BTreeMap;
let by_file: BTreeMap<DataFileId, Vec<(&[u8; CID_SIZE], BlockLocation)>> = entries
.iter()
.fold(BTreeMap::new(), |mut acc, (cid, loc)| {
let by_file: BTreeMap<DataFileId, Vec<(&[u8; CID_SIZE], BlockLocation)>> =
entries.iter().fold(BTreeMap::new(), |mut acc, (cid, loc)| {
acc.entry(loc.file_id).or_default().push((cid, *loc));
acc
});

View File

@@ -223,10 +223,7 @@ mod tests {
assert_eq!(mgr.io().file_size(next_handle.fd()).unwrap(), 0);
mgr.io().sync_dir(mgr.data_dir()).unwrap();
mgr.commit_rotation(next_id, &next_handle);
assert_eq!(
mgr.open_for_read(next_id).unwrap().fd(),
next_handle.fd()
);
assert_eq!(mgr.open_for_read(next_id).unwrap().fd(), next_handle.fd());
}
#[test]
@@ -236,10 +233,7 @@ mod tests {
let (next_id, next_handle) = mgr.prepare_rotation(DataFileId::new(0)).unwrap();
mgr.commit_rotation(next_id, &next_handle);
assert_eq!(
mgr.open_for_read(next_id).unwrap().fd(),
next_handle.fd()
);
assert_eq!(mgr.open_for_read(next_id).unwrap().fd(), next_handle.fd());
drop(next_handle);
mgr.rollback_rotation(next_id);

View File

@@ -49,10 +49,7 @@ fn commit_error_to_repo(e: CommitError) -> RepoError {
)),
CommitError::VerifyFailed { file_id, offset } => RepoError::storage(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"post-sync verify failed at {file_id}:{}",
offset.raw()
),
format!("post-sync verify failed at {file_id}:{}", offset.raw()),
)),
}
}

View File

@@ -418,10 +418,7 @@ mod tests {
assert_eq!(next_id, SegmentId::new(2));
assert_eq!(mgr.io().file_size(next_handle.fd()).unwrap(), 0);
mgr.commit_rotation(next_id, &next_handle);
assert_eq!(
mgr.open_for_read(next_id).unwrap().fd(),
next_handle.fd()
);
assert_eq!(mgr.open_for_read(next_id).unwrap().fd(), next_handle.fd());
}
#[test]
@@ -431,10 +428,7 @@ mod tests {
let (next_id, next_handle) = mgr.prepare_rotation(SegmentId::new(1)).unwrap();
mgr.commit_rotation(next_id, &next_handle);
assert_eq!(
mgr.open_for_read(next_id).unwrap().fd(),
next_handle.fd()
);
assert_eq!(mgr.open_for_read(next_id).unwrap().fd(), next_handle.fd());
drop(next_handle);
mgr.rollback_rotation(next_id);

View File

@@ -68,7 +68,8 @@ impl<S: StorageIO> EventLogWriter<S> {
) -> io::Result<Self> {
let handle = manager.open_for_append(segment_id)?;
manager.io().truncate(handle.fd(), 0)?;
let writer = SegmentWriter::new(manager.io(), handle.fd(), segment_id, next_seq, max_payload)?;
let writer =
SegmentWriter::new(manager.io(), handle.fd(), segment_id, next_seq, max_payload)?;
writer.sync(manager.io())?;
manager.io().sync_dir(manager.segments_dir())?;
@@ -400,8 +401,12 @@ fn find_last_seq_from_segments<S: StorageIO>(
Err(e) if e.kind() != io::ErrorKind::InvalidData => Err(e),
_ => {
let handle = manager.open_for_read(seg_id)?;
let (_, last_seq) =
rebuild_from_segment(manager.io(), handle.fd(), DEFAULT_INDEX_INTERVAL, max_payload)?;
let (_, last_seq) = rebuild_from_segment(
manager.io(),
handle.fd(),
DEFAULT_INDEX_INTERVAL,
max_payload,
)?;
Ok(last_seq)
}
}

View File

@@ -1,15 +1,31 @@
use serde::{Deserialize, Serialize};
use super::runner::{GauntletConfig, MaxFileSize, RunLimits, ShardCount, WallMs};
use super::workload::OpCount;
use super::runner::{
GauntletConfig, IoBackend, MaxFileSize, OpInterval, RestartPolicy, RunLimits, ShardCount,
WallMs, WriterConcurrency,
};
use super::workload::{KeySpaceSize, OpCount, SizeDistribution, ValueBytes};
use crate::sim::FaultConfig;
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct ConfigOverrides {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub op_count: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_wall_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub writer_concurrency: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub key_space: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub value_bytes: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub fault_density_scale: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub fault_density_uniform: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub restart_every_n_ops: Option<usize>,
#[serde(default, skip_serializing_if = "StoreOverrides::is_empty")]
pub store: StoreOverrides,
}
@@ -66,6 +82,34 @@ impl ConfigOverrides {
max_wall_ms: Some(WallMs(ms)),
};
}
if let Some(n) = self.writer_concurrency {
cfg.writer_concurrency = WriterConcurrency(n.max(1));
}
if let Some(n) = self.key_space {
cfg.workload.key_space = KeySpaceSize(n.max(1));
}
if let Some(n) = self.value_bytes {
cfg.workload.size_distribution = SizeDistribution::Fixed(ValueBytes(n));
}
if let Some(m) = self.fault_density_scale
&& let IoBackend::Simulated { fault } = cfg.io
{
cfg.io = IoBackend::Simulated {
fault: fault.scale_probabilities(m),
};
}
if let Some(d) = self.fault_density_uniform {
cfg.io = IoBackend::Simulated {
fault: FaultConfig::uniform_density(d.clamp(0.0, 1.0)),
};
}
if let Some(n) = self.restart_every_n_ops {
cfg.restart_policy = if n == 0 {
RestartPolicy::Never
} else {
RestartPolicy::EveryNOps(OpInterval(n))
};
}
if let Some(n) = self.store.max_file_size {
cfg.store.max_file_size = MaxFileSize(n);
}
@@ -106,6 +150,12 @@ mod tests {
fn round_trip_preserves_set_fields() {
let o = ConfigOverrides {
op_count: Some(42),
writer_concurrency: Some(16),
key_space: Some(1_000_000),
value_bytes: Some(4096),
fault_density_scale: Some(1e-3),
fault_density_uniform: Some(5e-4),
restart_every_n_ops: Some(10_000),
store: StoreOverrides {
max_file_size: Some(4096),
group_commit: GroupCommitOverrides {
@@ -120,4 +170,78 @@ mod tests {
let back: ConfigOverrides = serde_json::from_str(&json).unwrap();
assert_eq!(o, back);
}
#[test]
fn fault_density_scale_scales_moderate() {
use crate::gauntlet::op::Seed;
use crate::gauntlet::scenarios::{Scenario, config_for};
let mut cfg = config_for(Scenario::ModerateFaults, Seed(1));
let o = ConfigOverrides {
fault_density_scale: Some(0.1),
..ConfigOverrides::default()
};
o.apply_to(&mut cfg);
match cfg.io {
IoBackend::Simulated { fault } => {
assert!(fault.torn_page_probability.raw() < 0.02);
assert!(fault.torn_page_probability.raw() > 0.0);
}
_ => panic!("expected simulated io"),
}
}
#[test]
fn fault_density_scale_zero_kills_probabilities() {
use crate::gauntlet::op::Seed;
use crate::gauntlet::scenarios::{Scenario, config_for};
let mut cfg = config_for(Scenario::ModerateFaults, Seed(1));
let o = ConfigOverrides {
fault_density_scale: Some(0.0),
..ConfigOverrides::default()
};
o.apply_to(&mut cfg);
match cfg.io {
IoBackend::Simulated { fault } => {
assert_eq!(fault.partial_write_probability.raw(), 0.0);
assert_eq!(fault.torn_page_probability.raw(), 0.0);
assert_eq!(fault.io_error_probability.raw(), 0.0);
assert_eq!(fault.sync_failure_probability.raw(), 0.0);
}
_ => panic!("expected simulated io"),
}
}
#[test]
fn fault_density_scale_is_noop_on_real_backend() {
use crate::gauntlet::op::Seed;
use crate::gauntlet::scenarios::{Scenario, config_for};
let mut cfg = config_for(Scenario::SmokePR, Seed(1));
assert!(matches!(cfg.io, IoBackend::Real));
let o = ConfigOverrides {
fault_density_scale: Some(0.5),
..ConfigOverrides::default()
};
o.apply_to(&mut cfg);
assert!(matches!(cfg.io, IoBackend::Real));
}
#[test]
fn fault_density_uniform_forces_simulated_backend() {
use crate::gauntlet::op::Seed;
use crate::gauntlet::scenarios::{Scenario, config_for};
let mut cfg = config_for(Scenario::SmokePR, Seed(1));
assert!(matches!(cfg.io, IoBackend::Real));
let o = ConfigOverrides {
fault_density_uniform: Some(0.25),
..ConfigOverrides::default()
};
o.apply_to(&mut cfg);
match cfg.io {
IoBackend::Simulated { fault } => {
assert_eq!(fault.torn_page_probability.raw(), 0.25);
assert_eq!(fault.io_error_probability.raw(), 0.25);
}
_ => panic!("expected simulated io"),
}
}
}

View File

@@ -595,7 +595,8 @@ fn eventlog_snapshot<S: StorageIO + Send + Sync + 'static>(
let mut segment_last_ts: Vec<(SegmentId, u64)> = Vec::new();
segments.iter().for_each(|&id| {
let per_segment: Vec<ValidEvent> = match s.manager.open_for_read(id) {
Ok(handle) => match SegmentReader::open(s.manager.io(), handle.fd(), MAX_EVENT_PAYLOAD) {
Ok(handle) => match SegmentReader::open(s.manager.io(), handle.fd(), MAX_EVENT_PAYLOAD)
{
Ok(reader) => reader.valid_prefix().unwrap_or_default(),
Err(_) => Vec::new(),
},
@@ -991,8 +992,7 @@ async fn apply_op<S: StorageIO + Send + Sync + 'static>(
if !oracle.contains_record(collection, rkey) {
return Ok(());
}
let new_root =
delete_record_atomic(&harness.store, old_root, collection, rkey).await?;
let new_root = delete_record_atomic(&harness.store, old_root, collection, rkey).await?;
oracle.delete(collection, rkey);
*root = Some(new_root);
Ok(())
@@ -1288,8 +1288,7 @@ async fn apply_op_concurrent<S: StorageIO + Send + Sync + 'static>(
if !state.oracle.contains_record(collection, rkey) {
return Ok(());
}
let new_root =
delete_record_atomic(&shared.store, old_root, collection, rkey).await?;
let new_root = delete_record_atomic(&shared.store, old_root, collection, rkey).await?;
state.oracle.delete(collection, rkey);
state.root = Some(new_root);
Ok(())

View File

@@ -99,7 +99,9 @@ impl Scenario {
"Eventlog-heavy workload with FSYNC_ORDERING / MONOTONIC_SEQ / TOMBSTONE_BOUND invariants."
}
Self::ContendedReaders => "60% reads, 64 writer tasks, simulated moderate faults.",
Self::ContendedWriters => "Add/delete heavy, 32 writer tasks, simulated moderate faults.",
Self::ContendedWriters => {
"Add/delete heavy, 32 writer tasks, simulated moderate faults."
}
}
}

View File

@@ -129,6 +129,44 @@ impl FaultConfig {
|| self.delayed_io_error_probability.is_nonzero()
|| self.sync_reorder_window.0 > 0
}
pub fn scale_probabilities(self, factor: f64) -> Self {
let scale = |p: Probability| Probability::new((p.raw() * factor).clamp(0.0, 1.0));
Self {
partial_write_probability: scale(self.partial_write_probability),
bit_flip_on_read_probability: scale(self.bit_flip_on_read_probability),
sync_failure_probability: scale(self.sync_failure_probability),
dir_sync_failure_probability: scale(self.dir_sync_failure_probability),
misdirected_write_probability: scale(self.misdirected_write_probability),
io_error_probability: scale(self.io_error_probability),
torn_page_probability: scale(self.torn_page_probability),
misdirected_read_probability: scale(self.misdirected_read_probability),
delayed_io_error_probability: scale(self.delayed_io_error_probability),
sync_reorder_window: self.sync_reorder_window,
latency_distribution_ns: self.latency_distribution_ns,
}
}
pub fn uniform_density(density: f64) -> Self {
assert!(
density.is_finite() && (0.0..=1.0).contains(&density),
"fault density out of range: {density}"
);
let p = Probability::new(density);
Self {
partial_write_probability: p,
bit_flip_on_read_probability: p,
sync_failure_probability: p,
dir_sync_failure_probability: p,
misdirected_write_probability: p,
io_error_probability: p,
torn_page_probability: p,
misdirected_read_probability: p,
delayed_io_error_probability: p,
sync_reorder_window: SyncReorderWindow(0),
latency_distribution_ns: LatencyNs(0),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]

View File

@@ -10,15 +10,18 @@ fn concurrent_reader_survives_evict_on_segment_delete() {
let sim: Arc<SimulatedIO> = Arc::new(SimulatedIO::pristine(0x1eed7a11));
let segments_dir = PathBuf::from("/segments");
let manager = Arc::new(
SegmentManager::new(Arc::clone(&sim), segments_dir.clone(), 1 << 20).unwrap(),
);
let manager =
Arc::new(SegmentManager::new(Arc::clone(&sim), segments_dir.clone(), 1 << 20).unwrap());
let seg_id = SegmentId::new(1);
let write_handle = manager.open_for_append(seg_id).unwrap();
sim.write_at(write_handle.fd(), 0, b"arbitrary seed bytes for the segment")
.unwrap();
sim.write_at(
write_handle.fd(),
0,
b"arbitrary seed bytes for the segment",
)
.unwrap();
sim.sync(write_handle.fd()).unwrap();
sim.sync_dir(&segments_dir).unwrap();
drop(write_handle);

View File

@@ -40,7 +40,9 @@ fn config_for(dir: &Path, max_file_size: u64) -> BlockStoreConfig {
fn tiny_block(seed: u64) -> Vec<u8> {
let bytes = seed.to_le_bytes();
(0..64).map(|i| bytes[i % 8] ^ (i as u8).wrapping_mul(31)).collect()
(0..64)
.map(|i| bytes[i % 8] ^ (i as u8).wrapping_mul(31))
.collect()
}
#[tokio::test]

View File

@@ -3,8 +3,8 @@ mod common;
use std::collections::HashMap;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tranquil_store::blockstore::{
BlockStoreConfig, DataFileId, DataFileManager, DataFileWriter, GroupCommitConfig,
@@ -59,7 +59,10 @@ impl FailingIO {
impl StorageIO for FailingIO {
fn open(&self, path: &Path, opts: OpenOptions) -> io::Result<FileId> {
let fd = self.inner.open(path, opts)?;
self.fd_to_path.lock().unwrap().insert(fd, path.to_path_buf());
self.fd_to_path
.lock()
.unwrap()
.insert(fd, path.to_path_buf());
Ok(fd)
}

View File

@@ -657,15 +657,13 @@ fn sim_multi_file_rotation_crash_recovery() {
};
let block_count = ((seed % 25) + 10) as u32;
let all_cids: Vec<CidBytes> =
(0..block_count).map(test_cid).collect();
let all_cids: Vec<CidBytes> = (0..block_count).map(test_cid).collect();
{
let store = TranquilBlockStore::open(config.clone()).unwrap();
(0..block_count).try_for_each(|i| {
store.put_blocks_blocking(vec![(test_cid(i), block_data(i))])
})
.unwrap();
(0..block_count)
.try_for_each(|i| store.put_blocks_blocking(vec![(test_cid(i), block_data(i))]))
.unwrap();
let files = store.list_data_files().unwrap();
assert!(

View File

@@ -39,7 +39,8 @@ fn read_all_events(mgr: &SegmentManager<SimulatedIO>, seed: u64) -> Vec<ValidEve
.flat_map(|&seg_id| {
let fd = mgr
.open_for_read(seg_id)
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read({seg_id}) failed: {e}")).fd();
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read({seg_id}) failed: {e}"))
.fd();
SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD)
.unwrap_or_else(|e| {
panic!(
@@ -256,7 +257,8 @@ fn segment_deletion_does_not_corrupt_neighbors() {
let seg2_fd = mgr
.open_for_read(SegmentId::new(2))
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(2) failed: {e}")).fd();
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(2) failed: {e}"))
.fd();
let seg2_events = SegmentReader::open(mgr.io(), seg2_fd, MAX_EVENT_PAYLOAD)
.unwrap_or_else(|e| {
panic!("seed {seed}: SegmentReader::open(2, MAX_EVENT_PAYLOAD) failed: {e}")
@@ -271,7 +273,8 @@ fn segment_deletion_does_not_corrupt_neighbors() {
let seg3_fd = mgr
.open_for_read(SegmentId::new(3))
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(3) failed: {e}")).fd();
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(3) failed: {e}"))
.fd();
let seg3_events = SegmentReader::open(mgr.io(), seg3_fd, MAX_EVENT_PAYLOAD)
.unwrap_or_else(|e| {
panic!("seed {seed}: SegmentReader::open(3, MAX_EVENT_PAYLOAD) failed: {e}")
@@ -388,7 +391,8 @@ fn fsync_ordering_unsynced_events_never_durable() {
let fd = mgr
.open_for_read(SegmentId::new(1))
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")).fd();
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}"))
.fd();
let recovered = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD)
.unwrap_or_else(|e| {
panic!("seed {seed}: SegmentReader::open(1, MAX_EVENT_PAYLOAD) failed: {e}")
@@ -458,7 +462,8 @@ fn fsync_ordering_proof_sync_before_blockstore_ack() {
let fd = mgr
.open_for_read(SegmentId::new(1))
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")).fd();
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}"))
.fd();
let recovered = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD)
.unwrap_or_else(|e| {
panic!("seed {seed}: SegmentReader::open(1, MAX_EVENT_PAYLOAD) failed: {e}")
@@ -579,7 +584,8 @@ fn group_sync_crash_mid_sync_partial_fsync() {
let fd = mgr
.open_for_read(SegmentId::new(1))
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")).fd();
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}"))
.fd();
let events = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD)
.unwrap_or_else(|e| {
panic!("seed {seed}: SegmentReader::open(1, MAX_EVENT_PAYLOAD) failed: {e}")
@@ -652,7 +658,8 @@ fn group_sync_no_double_sync_no_skipped_events() {
let fd = mgr
.open_for_read(SegmentId::new(1))
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}")).fd();
.unwrap_or_else(|e| panic!("seed {seed}: open_for_read(1) failed: {e}"))
.fd();
let events = SegmentReader::open(mgr.io(), fd, MAX_EVENT_PAYLOAD)
.unwrap_or_else(|e| {
panic!("seed {seed}: SegmentReader::open(1, MAX_EVENT_PAYLOAD) failed: {e}")
@@ -938,7 +945,8 @@ fn aggressive_faults_group_sync_recovery() {
let pristine_fd = pristine_mgr
.open_for_read(SegmentId::new(1))
.unwrap_or_else(|e| panic!("seed {seed}: pristine open_for_read(1) failed: {e}")).fd();
.unwrap_or_else(|e| panic!("seed {seed}: pristine open_for_read(1) failed: {e}"))
.fd();
let pristine_events = SegmentReader::open(
pristine_mgr.io(),
pristine_fd,

View File

@@ -37,9 +37,18 @@ gauntlet-farm SCENARIO HOURS="6" DUMP="proptest-regressions":
gauntlet-repro SEED SCENARIO="smoke-pr":
SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --scenario {{SCENARIO}} --seed {{SEED}}
gauntlet-repro-config CONFIG SEED:
SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --config {{CONFIG}} --seed {{SEED}}
gauntlet-repro-from FILE:
SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- repro --from {{FILE}}
gauntlet-sweep CONFIG SEEDS="8" DUMP="proptest-regressions":
SQLX_OFFLINE=true cargo run --release -p tranquil-store --bin tranquil-gauntlet --features tranquil-store/gauntlet-cli -- sweep --config {{CONFIG}} --seeds {{SEEDS}} --dump-regressions {{DUMP}}
gauntlet-soak HOURS="24" OUTPUT="":
SQLX_OFFLINE=true GAUNTLET_SOAK_HOURS={{HOURS}} GAUNTLET_SOAK_OUTPUT={{OUTPUT}} cargo nextest run -p tranquil-store --features tranquil-store/test-harness --profile gauntlet-soak --test gauntlet_soak --run-ignored all -- soak_long_leak_gate
test-unit:
SQLX_OFFLINE=true cargo test --test dpop_unit --test validation_edge_cases --test scope_edge_cases