diff --git a/crates/tranquil-store/src/gauntlet/regression.rs b/crates/tranquil-store/src/gauntlet/regression.rs new file mode 100644 index 0000000..ebf73b9 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/regression.rs @@ -0,0 +1,290 @@ +use std::io; +use std::path::{Path, PathBuf}; + +use serde::{Deserialize, Serialize}; + +use super::invariants::InvariantViolation; +use super::op::{Op, OpStream, Seed}; +use super::overrides::ConfigOverrides; +use super::runner::{GauntletConfig, GauntletReport}; +use super::scenarios::{Scenario, UnknownScenario, config_for}; + +pub const SCHEMA_VERSION: u32 = 1; +pub const MIN_SUPPORTED_SCHEMA_VERSION: u32 = 1; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RegressionViolation { + pub invariant: String, + pub detail: String, +} + +impl From<&InvariantViolation> for RegressionViolation { + fn from(v: &InvariantViolation) -> Self { + Self { + invariant: v.invariant.to_string(), + detail: v.detail.clone(), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RegressionRecord { + pub schema_version: u32, + pub scenario: String, + pub seed: Seed, + #[serde(default)] + pub overrides: ConfigOverrides, + pub violations: Vec, + pub ops: Vec, + #[serde(default)] + pub original_ops_len: usize, +} + +#[derive(Debug, thiserror::Error)] +pub enum RegressionLoadError { + #[error("read {path}: {source}")] + Read { path: PathBuf, source: io::Error }, + #[error("parse {path}: {source}")] + Parse { + path: PathBuf, + source: serde_json::Error, + }, + #[error("schema version {found} outside supported range {min}..={max}")] + UnsupportedVersion { found: u32, min: u32, max: u32 }, + #[error(transparent)] + UnknownScenario(#[from] UnknownScenario), +} + +impl RegressionRecord { + pub fn from_report( + scenario: Scenario, + overrides: ConfigOverrides, + report: &GauntletReport, + original_ops_len: usize, + shrunk_ops: OpStream, + ) -> Self { + Self { + schema_version: SCHEMA_VERSION, + scenario: scenario.name().to_string(), + seed: report.seed, + overrides, + violations: report + .violations + .iter() + .map(RegressionViolation::from) + .collect(), + ops: shrunk_ops.into_vec(), + original_ops_len, + } + } + + pub fn file_path(&self, root: &Path) -> PathBuf { + root.join("gauntlet") + .join(sanitize(&self.scenario)) + .join(format!("{:016x}.json", self.seed.0)) + } + + pub fn write_to(&self, root: &Path) -> io::Result { + let path = self.file_path(root); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let json = serde_json::to_vec_pretty(self).map_err(io::Error::other)?; + let tmp = path.with_extension("json.tmp"); + { + let mut f = std::fs::File::create(&tmp)?; + io::Write::write_all(&mut f, &json)?; + f.sync_all()?; + } + std::fs::rename(&tmp, &path)?; + if let Some(parent) = path.parent() { + if let Ok(dir) = std::fs::File::open(parent) { + let _ = dir.sync_all(); + } + } + Ok(path) + } + + pub fn load(path: &Path) -> Result { + let raw = std::fs::read(path).map_err(|source| RegressionLoadError::Read { + path: path.to_path_buf(), + source, + })?; + let record: RegressionRecord = + serde_json::from_slice(&raw).map_err(|source| RegressionLoadError::Parse { + path: path.to_path_buf(), + source, + })?; + if record.schema_version < MIN_SUPPORTED_SCHEMA_VERSION + || record.schema_version > SCHEMA_VERSION + { + return Err(RegressionLoadError::UnsupportedVersion { + found: record.schema_version, + min: MIN_SUPPORTED_SCHEMA_VERSION, + max: SCHEMA_VERSION, + }); + } + Ok(record) + } + + pub fn scenario_enum(&self) -> Result { + self.scenario.parse::() + } + + pub fn build_config(&self) -> Result { + let scenario = self.scenario_enum()?; + let mut cfg = config_for(scenario, self.seed); + self.overrides.apply_to(&mut cfg); + Ok(cfg) + } + + pub fn op_stream(&self) -> OpStream { + OpStream::from_vec(self.ops.clone()) + } +} + +fn sanitize(s: &str) -> String { + s.chars() + .map(|c| match c { + 'a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '-' => c, + _ => '_', + }) + .collect() +} + +pub fn default_root() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("proptest-regressions") +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::gauntlet::op::{CollectionName, RecordKey, ValueSeed}; + use crate::gauntlet::overrides::ConfigOverrides; + + fn sample_record() -> RegressionRecord { + use crate::gauntlet::overrides::StoreOverrides; + + let ops = vec![ + Op::AddRecord { + collection: CollectionName("c".into()), + rkey: RecordKey("r".into()), + value_seed: ValueSeed(1), + }, + Op::Compact, + ]; + let overrides = ConfigOverrides { + op_count: Some(128), + store: StoreOverrides { + max_file_size: Some(4096), + ..StoreOverrides::default() + }, + ..ConfigOverrides::default() + }; + RegressionRecord { + schema_version: SCHEMA_VERSION, + scenario: "HugeValues".to_string(), + seed: Seed(0xdeadbeef), + overrides, + violations: vec![RegressionViolation { + invariant: "ByteBudget".to_string(), + detail: "exceeded".to_string(), + }], + ops, + original_ops_len: 500, + } + } + + #[test] + fn round_trip_preserves_all_fields() { + let dir = tempfile::TempDir::new().unwrap(); + let original = sample_record(); + let path = original.write_to(dir.path()).unwrap(); + assert!(path.exists()); + let loaded = RegressionRecord::load(&path).unwrap(); + assert_eq!(loaded.schema_version, original.schema_version); + assert_eq!(loaded.scenario, original.scenario); + assert_eq!(loaded.seed.0, original.seed.0); + assert_eq!(loaded.overrides, original.overrides); + assert_eq!(loaded.violations, original.violations); + assert_eq!(loaded.ops.len(), original.ops.len()); + assert_eq!(loaded.original_ops_len, original.original_ops_len); + } + + #[test] + fn build_config_applies_overrides() { + let record = sample_record(); + let cfg = record.build_config().unwrap(); + assert_eq!(cfg.op_count.0, 128); + assert_eq!(cfg.store.max_file_size.0, 4096); + } + + #[test] + fn rejects_future_schema_version() { + let dir = tempfile::TempDir::new().unwrap(); + let mut r = sample_record(); + r.schema_version = SCHEMA_VERSION + 1; + let path = r.write_to(dir.path()).unwrap(); + match RegressionRecord::load(&path) { + Err(RegressionLoadError::UnsupportedVersion { found, min, max }) => { + assert_eq!(found, SCHEMA_VERSION + 1); + assert_eq!(min, MIN_SUPPORTED_SCHEMA_VERSION); + assert_eq!(max, SCHEMA_VERSION); + } + other => panic!("expected UnsupportedVersion, got {other:?}"), + } + } + + #[test] + fn rejects_past_schema_version_below_min() { + let dir = tempfile::TempDir::new().unwrap(); + let mut r = sample_record(); + r.schema_version = MIN_SUPPORTED_SCHEMA_VERSION.saturating_sub(1); + let path = r.write_to(dir.path()).unwrap(); + match RegressionRecord::load(&path) { + Err(RegressionLoadError::UnsupportedVersion { found, min, max }) => { + assert_eq!(found, MIN_SUPPORTED_SCHEMA_VERSION.saturating_sub(1)); + assert_eq!(min, MIN_SUPPORTED_SCHEMA_VERSION); + assert_eq!(max, SCHEMA_VERSION); + } + other => panic!("expected UnsupportedVersion, got {other:?}"), + } + } + + #[test] + fn atomic_write_leaves_no_tmp_file() { + let dir = tempfile::TempDir::new().unwrap(); + let r = sample_record(); + let path = r.write_to(dir.path()).unwrap(); + assert!(path.exists()); + let tmp = path.with_extension("json.tmp"); + assert!( + !tmp.exists(), + "tmp sibling {tmp:?} should have been renamed" + ); + } + + #[test] + fn rejects_malformed_json() { + let dir = tempfile::TempDir::new().unwrap(); + let path = dir.path().join("bad.json"); + std::fs::write(&path, b"{not json").unwrap(); + assert!(matches!( + RegressionRecord::load(&path), + Err(RegressionLoadError::Parse { .. }) + )); + } + + #[test] + fn sanitize_strips_slashes_and_traversal() { + assert_eq!(sanitize("foo/bar baz"), "foo_bar_baz"); + assert_eq!(sanitize("../etc"), "___etc"); + } + + #[test] + fn unknown_scenario_name_errors() { + let mut r = sample_record(); + r.scenario = "BogusScenario".to_string(); + assert!(r.build_config().is_err()); + } +} diff --git a/crates/tranquil-store/src/gauntlet/shrink.rs b/crates/tranquil-store/src/gauntlet/shrink.rs new file mode 100644 index 0000000..745a240 --- /dev/null +++ b/crates/tranquil-store/src/gauntlet/shrink.rs @@ -0,0 +1,191 @@ +use std::collections::BTreeSet; + +use super::op::OpStream; +use super::runner::{Gauntlet, GauntletConfig, GauntletReport}; + +pub const DEFAULT_MAX_SHRINK_ITERATIONS: usize = 256; + +#[derive(Debug)] +pub struct ShrinkOutcome { + pub ops: OpStream, + pub report: GauntletReport, + pub iterations: usize, +} + +pub async fn shrink_failure( + config: GauntletConfig, + initial_ops: OpStream, + initial_report: GauntletReport, + max_iterations: usize, +) -> ShrinkOutcome { + let target: BTreeSet<&'static str> = initial_report.violation_invariants(); + if target.is_empty() { + return ShrinkOutcome { + ops: initial_ops, + report: initial_report, + iterations: 0, + }; + } + + let mut current_ops = initial_ops; + let mut current_report = initial_report; + let mut iterations = 0usize; + + while iterations < max_iterations { + match try_one_shrink_round(&config, ¤t_ops, &target, max_iterations - iterations) + .await + { + ShrinkRound::Progress { + ops, + report, + runs_used, + } => { + current_ops = ops; + current_report = report; + iterations += runs_used; + } + ShrinkRound::Exhausted { runs_used } => { + iterations += runs_used; + break; + } + } + } + + ShrinkOutcome { + ops: current_ops, + report: current_report, + iterations, + } +} + +enum ShrinkRound { + Progress { + ops: OpStream, + report: GauntletReport, + runs_used: usize, + }, + Exhausted { + runs_used: usize, + }, +} + +async fn try_one_shrink_round( + config: &GauntletConfig, + current_ops: &OpStream, + target: &BTreeSet<&'static str>, + budget: usize, +) -> ShrinkRound { + let mut runs_used = 0usize; + for candidate in current_ops.shrink_candidates() { + if candidate.is_empty() || candidate.len() >= current_ops.len() { + continue; + } + if runs_used >= budget { + return ShrinkRound::Exhausted { runs_used }; + } + runs_used += 1; + let gauntlet = match Gauntlet::new(config.clone()) { + Ok(g) => g, + Err(_) => continue, + }; + let report = gauntlet.run_with_ops(candidate.clone()).await; + let got: BTreeSet<&'static str> = report.violation_invariants(); + if !got.is_disjoint(target) { + return ShrinkRound::Progress { + ops: candidate, + report, + runs_used, + }; + } + } + ShrinkRound::Exhausted { runs_used } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::blockstore::GroupCommitConfig; + use crate::gauntlet::invariants::{InvariantSet, InvariantViolation}; + use crate::gauntlet::op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed}; + use crate::gauntlet::runner::{ + GauntletConfig, IoBackend, MaxFileSize, OpErrorCount, OpsExecuted, RestartCount, + RestartPolicy, RunLimits, ShardCount, StoreConfig, WriterConcurrency, + }; + use crate::gauntlet::workload::{ + DidSpaceSize, KeySpaceSize, OpCount, OpWeights, RetentionMaxSecs, SizeDistribution, + ValueBytes, WorkloadModel, + }; + use crate::sim::FaultConfig; + + fn dummy_config() -> GauntletConfig { + GauntletConfig { + seed: Seed(1), + io: IoBackend::Simulated { + fault: FaultConfig::none(), + }, + workload: WorkloadModel { + weights: OpWeights::default(), + size_distribution: SizeDistribution::Fixed(ValueBytes(16)), + collections: vec![CollectionName("c".into())], + key_space: KeySpaceSize(4), + did_space: DidSpaceSize(1), + retention_max_secs: RetentionMaxSecs(60), + }, + op_count: OpCount(4), + invariants: InvariantSet::EMPTY, + limits: RunLimits { max_wall_ms: None }, + restart_policy: RestartPolicy::Never, + store: StoreConfig { + max_file_size: MaxFileSize(4096), + group_commit: GroupCommitConfig::default(), + shard_count: ShardCount(1), + }, + eventlog: None, + writer_concurrency: WriterConcurrency(1), + } + } + + fn fake_report(seed: u64, names: &[&'static str]) -> GauntletReport { + GauntletReport { + seed: Seed(seed), + ops_executed: OpsExecuted(0), + op_errors: OpErrorCount(0), + restarts: RestartCount(0), + violations: names + .iter() + .copied() + .map(|n| InvariantViolation { + invariant: n, + detail: "x".to_string(), + }) + .collect(), + ops: OpStream::empty(), + } + } + + fn sample_stream() -> OpStream { + OpStream::from_vec(vec![ + Op::AddRecord { + collection: CollectionName("c".into()), + rkey: RecordKey("a".into()), + value_seed: ValueSeed(1), + }, + Op::Compact, + ]) + } + + #[test] + fn clean_report_returns_input_unchanged() { + let cfg = dummy_config(); + let ops = sample_stream(); + let clean = fake_report(1, &[]); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let before_len = ops.len(); + let out = rt.block_on(shrink_failure(cfg, ops, clean, 8)); + assert_eq!(out.iterations, 0); + assert_eq!(out.ops.len(), before_len); + } +} diff --git a/crates/tranquil-store/src/sim.rs b/crates/tranquil-store/src/sim.rs index 9fbf01f..49e830d 100644 --- a/crates/tranquil-store/src/sim.rs +++ b/crates/tranquil-store/src/sim.rs @@ -1,53 +1,134 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::io; use std::path::{Path, PathBuf}; use std::sync::Mutex; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; use crate::io::{FileId, OpenOptions, StorageIO}; +pub const TORN_PAGE_BYTES: usize = 4096; +pub const SECTOR_BYTES: usize = 512; + +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct Probability(f64); + +impl Probability { + pub const ZERO: Self = Self(0.0); + + pub fn new(p: f64) -> Self { + assert!( + p.is_finite() && (0.0..=1.0).contains(&p), + "probability out of range: {p}" + ); + Self(p) + } + + pub fn raw(self) -> f64 { + self.0 + } + + pub fn is_nonzero(self) -> bool { + self.0 > 0.0 + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct SyncReorderWindow(pub u32); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct LatencyNs(pub u64); + #[derive(Debug, Clone, Copy)] pub struct FaultConfig { - pub partial_write_probability: f64, - pub bit_flip_on_read_probability: f64, - pub sync_failure_probability: f64, - pub dir_sync_failure_probability: f64, - pub misdirected_write_probability: f64, - pub io_error_probability: f64, + pub partial_write_probability: Probability, + pub bit_flip_on_read_probability: Probability, + pub sync_failure_probability: Probability, + pub dir_sync_failure_probability: Probability, + pub misdirected_write_probability: Probability, + pub io_error_probability: Probability, + pub torn_page_probability: Probability, + pub misdirected_read_probability: Probability, + pub delayed_io_error_probability: Probability, + pub sync_reorder_window: SyncReorderWindow, + pub latency_distribution_ns: LatencyNs, } impl FaultConfig { pub fn none() -> Self { Self { - partial_write_probability: 0.0, - bit_flip_on_read_probability: 0.0, - sync_failure_probability: 0.0, - dir_sync_failure_probability: 0.0, - misdirected_write_probability: 0.0, - io_error_probability: 0.0, + partial_write_probability: Probability::ZERO, + bit_flip_on_read_probability: Probability::ZERO, + sync_failure_probability: Probability::ZERO, + dir_sync_failure_probability: Probability::ZERO, + misdirected_write_probability: Probability::ZERO, + io_error_probability: Probability::ZERO, + torn_page_probability: Probability::ZERO, + misdirected_read_probability: Probability::ZERO, + delayed_io_error_probability: Probability::ZERO, + sync_reorder_window: SyncReorderWindow(0), + latency_distribution_ns: LatencyNs(0), } } pub fn moderate() -> Self { Self { - partial_write_probability: 0.05, - bit_flip_on_read_probability: 0.01, - sync_failure_probability: 0.03, - dir_sync_failure_probability: 0.02, - misdirected_write_probability: 0.01, - io_error_probability: 0.02, + partial_write_probability: Probability::new(0.05), + bit_flip_on_read_probability: Probability::new(0.01), + sync_failure_probability: Probability::new(0.03), + dir_sync_failure_probability: Probability::new(0.02), + misdirected_write_probability: Probability::new(0.01), + io_error_probability: Probability::new(0.02), + torn_page_probability: Probability::new(0.01), + misdirected_read_probability: Probability::new(0.005), + delayed_io_error_probability: Probability::new(0.01), + sync_reorder_window: SyncReorderWindow(4), + latency_distribution_ns: LatencyNs(50_000), } } pub fn aggressive() -> Self { Self { - partial_write_probability: 0.15, - bit_flip_on_read_probability: 0.05, - sync_failure_probability: 0.10, - dir_sync_failure_probability: 0.05, - misdirected_write_probability: 0.05, - io_error_probability: 0.08, + partial_write_probability: Probability::new(0.15), + bit_flip_on_read_probability: Probability::new(0.05), + sync_failure_probability: Probability::new(0.10), + dir_sync_failure_probability: Probability::new(0.05), + misdirected_write_probability: Probability::new(0.05), + io_error_probability: Probability::new(0.08), + torn_page_probability: Probability::new(0.05), + misdirected_read_probability: Probability::new(0.02), + delayed_io_error_probability: Probability::new(0.05), + sync_reorder_window: SyncReorderWindow(8), + latency_distribution_ns: LatencyNs(250_000), } } + + pub fn torn_pages_only() -> Self { + Self { + torn_page_probability: Probability::new(0.25), + ..Self::none() + } + } + + pub fn fsyncgate_only() -> Self { + Self { + delayed_io_error_probability: Probability::new(0.05), + ..Self::none() + } + } + + pub fn injects_errors(&self) -> bool { + self.partial_write_probability.is_nonzero() + || self.bit_flip_on_read_probability.is_nonzero() + || self.sync_failure_probability.is_nonzero() + || self.dir_sync_failure_probability.is_nonzero() + || self.misdirected_write_probability.is_nonzero() + || self.io_error_probability.is_nonzero() + || self.torn_page_probability.is_nonzero() + || self.misdirected_read_probability.is_nonzero() + || self.delayed_io_error_probability.is_nonzero() + || self.sync_reorder_window.0 > 0 + } } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -57,6 +138,7 @@ struct SimStorage { buffered: Vec, durable: Vec, dir_entry_durable: bool, + io_poisoned: bool, } struct SimFd { @@ -108,6 +190,17 @@ pub enum OpRecord { }, } +struct PendingSync { + storage_id: StorageId, + snapshot: Vec, +} + +struct PendingDelete { + path: PathBuf, + storage_id: StorageId, + was_dir_durable: bool, +} + struct SimState { storage: HashMap, paths: HashMap, @@ -117,6 +210,8 @@ struct SimState { rng_counter: u64, next_fd_id: u64, next_storage_id: u64, + pending_syncs: VecDeque, + pending_deletes: Vec, } impl SimState { @@ -137,8 +232,8 @@ impl SimState { (mixed as usize) % max } - fn should_fault(&mut self, seed: u64, probability: f64) -> bool { - probability > 0.0 && self.next_random(seed) < probability + fn should_fault(&mut self, seed: u64, probability: Probability) -> bool { + probability.is_nonzero() && self.next_random(seed) < probability.raw() } fn alloc_fd_id(&mut self) -> FileId { @@ -194,6 +289,7 @@ pub struct SimulatedIO { state: Mutex, fault_config: FaultConfig, rng_seed: u64, + latency_counter: AtomicU64, } impl SimulatedIO { @@ -208,12 +304,26 @@ impl SimulatedIO { rng_counter: 0, next_fd_id: 1, next_storage_id: 1, + pending_syncs: VecDeque::new(), + pending_deletes: Vec::new(), }), fault_config, rng_seed: seed, + latency_counter: AtomicU64::new(0), } } + fn jitter(&self) { + let max_ns = self.fault_config.latency_distribution_ns.0; + if max_ns == 0 { + return; + } + let c = self.latency_counter.fetch_add(1, Ordering::Relaxed); + let r = splitmix64(self.rng_seed.wrapping_add(c)); + let ns = r % max_ns; + std::thread::sleep(Duration::from_nanos(ns)); + } + pub fn pristine(seed: u64) -> Self { Self::new(seed, FaultConfig::none()) } @@ -222,6 +332,14 @@ impl SimulatedIO { let mut state = self.state.lock().unwrap(); state.fds.clear(); + state.pending_syncs.clear(); + + let pending = std::mem::take(&mut state.pending_deletes); + pending.into_iter().for_each(|pd| { + if pd.was_dir_durable && state.storage.contains_key(&pd.storage_id) { + state.paths.insert(pd.path, pd.storage_id); + } + }); let orphaned: Vec = state .storage @@ -237,10 +355,10 @@ impl SimulatedIO { let live_sids: HashSet = state.storage.keys().copied().collect(); state.paths.retain(|_, sid| live_sids.contains(sid)); - state - .storage - .values_mut() - .for_each(|s| s.buffered = s.durable.clone()); + state.storage.values_mut().for_each(|s| { + s.buffered = s.durable.clone(); + s.io_poisoned = false; + }); } pub fn op_log(&self) -> Vec { @@ -314,6 +432,7 @@ impl StorageIO for SimulatedIO { buffered: Vec::new(), durable: Vec::new(), dir_entry_durable: false, + io_poisoned: false, }, ); state.paths.insert(path_buf.clone(), sid); @@ -345,8 +464,9 @@ impl StorageIO for SimulatedIO { let sid = fd_info.storage_id; let unlinked = !state.paths.values().any(|s| *s == sid); let no_remaining_fds = !state.fds.values().any(|f| f.storage_id == sid); + let pending_deleted = state.pending_deletes.iter().any(|pd| pd.storage_id == sid); - if unlinked && no_remaining_fds { + if unlinked && no_remaining_fds && !pending_deleted { state.storage.remove(&sid); } @@ -355,17 +475,35 @@ impl StorageIO for SimulatedIO { } fn read_at(&self, id: FileId, offset: u64, buf: &mut [u8]) -> io::Result { + self.jitter(); let mut state = self.state.lock().unwrap(); let sid = state.require_readable(id)?; let seed = self.rng_seed; + if state.storage.get(&sid).is_some_and(|s| s.io_poisoned) { + return Err(io::Error::other("simulated EIO after delayed sync fault")); + } + if state.should_fault(seed, self.fault_config.io_error_probability) { return Err(io::Error::other("simulated EIO on read")); } + let read_offset = + if state.should_fault(seed, self.fault_config.misdirected_read_probability) { + let drift_sectors = state.next_random_usize(seed, 8) + 1; + let drift = (drift_sectors * SECTOR_BYTES) as u64; + if state.next_random(seed) < 0.5 { + offset.saturating_sub(drift) + } else { + offset.saturating_add(drift) + } + } else { + offset + }; + let storage = state.storage.get(&sid).unwrap(); - let off = usize::try_from(offset) + let off = usize::try_from(read_offset) .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "offset exceeds usize"))?; if off >= storage.buffered.len() { state.op_log.push(OpRecord::ReadAt { @@ -395,26 +533,48 @@ impl StorageIO for SimulatedIO { } fn write_at(&self, id: FileId, offset: u64, buf: &[u8]) -> io::Result { + self.jitter(); let mut state = self.state.lock().unwrap(); let sid = state.require_writable(id)?; let seed = self.rng_seed; + if state.storage.get(&sid).is_some_and(|s| s.io_poisoned) { + return Err(io::Error::other("simulated EIO after delayed sync fault")); + } + if state.should_fault(seed, self.fault_config.io_error_probability) { return Err(io::Error::other("simulated EIO on write")); } - let actual_len = if buf.len() > 1 - && state.should_fault(seed, self.fault_config.partial_write_probability) - { - let partial = state.next_random_usize(seed, buf.len()); - partial.max(1) - } else { - buf.len() + let torn_len = + if buf.len() > 1 && state.should_fault(seed, self.fault_config.torn_page_probability) { + let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES); + let page_end = page_base + TORN_PAGE_BYTES; + let cap = page_end.saturating_sub(offset as usize).min(buf.len()); + let max_sectors = cap / SECTOR_BYTES; + (max_sectors >= 2).then(|| { + let n = state.next_random_usize(seed, max_sectors - 1) + 1; + n * SECTOR_BYTES + }) + } else { + None + }; + + let actual_len = match torn_len { + Some(n) => n, + None if buf.len() > 1 + && state.should_fault(seed, self.fault_config.partial_write_probability) => + { + let partial = state.next_random_usize(seed, buf.len()); + partial.max(1) + } + None => buf.len(), }; let misdirected = state.should_fault(seed, self.fault_config.misdirected_write_probability); let write_offset = if misdirected { - let drift = state.next_random_usize(seed, 64) as u64; + let drift_sectors = state.next_random_usize(seed, 8) + 1; + let drift = (drift_sectors * SECTOR_BYTES) as u64; if state.next_random(seed) < 0.5 { offset.saturating_sub(drift) } else { @@ -444,21 +604,56 @@ impl StorageIO for SimulatedIO { } fn sync(&self, id: FileId) -> io::Result<()> { + self.jitter(); let mut state = self.state.lock().unwrap(); let sid = state.require_open(id)?; let seed = self.rng_seed; + if state.storage.get(&sid).is_some_and(|s| s.io_poisoned) { + return Err(io::Error::other("simulated EIO after delayed sync fault")); + } + if state.should_fault(seed, self.fault_config.io_error_probability) { return Err(io::Error::other("simulated EIO on sync")); } let sync_succeeded = !state.should_fault(seed, self.fault_config.sync_failure_probability); + let poison_after = sync_succeeded + && state.should_fault(seed, self.fault_config.delayed_io_error_probability); + let reorder_window = self.fault_config.sync_reorder_window.0 as usize; + + let evicted = if sync_succeeded && reorder_window > 0 { + let snapshot = state.storage.get(&sid).unwrap().buffered.clone(); + state.pending_syncs.push_back(PendingSync { + storage_id: sid, + snapshot, + }); + if state.pending_syncs.len() > reorder_window { + state.pending_syncs.pop_front() + } else { + None + } + } else { + None + }; + + if let Some(PendingSync { + storage_id: old_sid, + snapshot, + }) = evicted + && let Some(old) = state.storage.get_mut(&old_sid) + { + old.durable = snapshot; + } let storage = state.storage.get_mut(&sid).unwrap(); - if sync_succeeded { + if sync_succeeded && reorder_window == 0 { storage.durable = storage.buffered.clone(); } + if poison_after { + storage.io_poisoned = true; + } state.op_log.push(OpRecord::Sync { fd: id, @@ -517,11 +712,17 @@ impl StorageIO for SimulatedIO { .remove(&path_buf) .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "file not found"))?; - let has_open_fds = state.fds.values().any(|fd_info| fd_info.storage_id == sid); + let was_dir_durable = state + .storage + .get(&sid) + .map(|s| s.dir_entry_durable) + .unwrap_or(false); - if !has_open_fds { - state.storage.remove(&sid); - } + state.pending_deletes.push(PendingDelete { + path: path_buf.clone(), + storage_id: sid, + was_dir_durable, + }); state.op_log.push(OpRecord::Delete { path: path_buf }); Ok(()) @@ -562,6 +763,18 @@ impl StorageIO for SimulatedIO { storage.dir_entry_durable = true; } }); + + let drained = std::mem::take(&mut state.pending_deletes); + let (committed, remaining): (Vec<_>, Vec<_>) = drained + .into_iter() + .partition(|pd| pd.path.parent() == Some(path)); + state.pending_deletes = remaining; + committed.into_iter().for_each(|pd| { + let has_fds = state.fds.values().any(|f| f.storage_id == pd.storage_id); + if !has_fds { + state.storage.remove(&pd.storage_id); + } + }); } state.op_log.push(OpRecord::SyncDir { path: dir_path }); @@ -684,6 +897,67 @@ mod tests { assert!(result.is_err()); } + #[test] + fn delete_without_dir_sync_reverts_on_crash() { + let sim = SimulatedIO::pristine(42); + let dir = Path::new("/test"); + sim.mkdir(dir).unwrap(); + sim.sync_dir(dir).unwrap(); + + let path = Path::new("/test/file.dat"); + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); + sim.write_at(fd, 0, b"durable data").unwrap(); + sim.sync(fd).unwrap(); + sim.sync_dir(dir).unwrap(); + sim.close(fd).unwrap(); + + sim.delete(path).unwrap(); + sim.crash(); + + let fd = sim.open(path, OpenOptions::read()).unwrap(); + let mut buf = vec![0u8; 12]; + sim.read_at(fd, 0, &mut buf).unwrap(); + assert_eq!(&buf, b"durable data"); + } + + #[test] + fn delete_commits_after_dir_sync() { + let sim = SimulatedIO::pristine(42); + let dir = Path::new("/test"); + sim.mkdir(dir).unwrap(); + sim.sync_dir(dir).unwrap(); + + let path = Path::new("/test/file.dat"); + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); + sim.write_at(fd, 0, b"data").unwrap(); + sim.sync(fd).unwrap(); + sim.sync_dir(dir).unwrap(); + sim.close(fd).unwrap(); + + sim.delete(path).unwrap(); + sim.sync_dir(dir).unwrap(); + sim.crash(); + + let result = sim.open(path, OpenOptions::read()); + assert!(result.is_err()); + } + + #[test] + fn delete_of_never_durable_file_stays_gone_on_crash() { + let sim = SimulatedIO::pristine(42); + let path = Path::new("/test/file.dat"); + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); + sim.write_at(fd, 0, b"volatile").unwrap(); + sim.sync(fd).unwrap(); + sim.close(fd).unwrap(); + + sim.delete(path).unwrap(); + sim.crash(); + + let result = sim.open(path, OpenOptions::read()); + assert!(result.is_err()); + } + #[test] fn dir_sync_makes_file_durable() { let sim = SimulatedIO::pristine(42); @@ -850,6 +1124,155 @@ mod tests { assert_eq!(result.unwrap_err().kind(), io::ErrorKind::PermissionDenied); } + #[test] + fn torn_page_truncates_within_page() { + let fc = FaultConfig { + torn_page_probability: Probability::new(1.0), + ..FaultConfig::none() + }; + let sim = SimulatedIO::new(123, fc); + let path = Path::new("/test/file.dat"); + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); + let data = vec![0xAAu8; TORN_PAGE_BYTES + 1024]; + let written = sim.write_at(fd, 0, &data).unwrap(); + assert!(written >= 1); + assert!(written <= TORN_PAGE_BYTES); + } + + #[test] + fn delayed_io_error_poisons_storage_after_sync() { + let fc = FaultConfig { + delayed_io_error_probability: Probability::new(1.0), + ..FaultConfig::none() + }; + let sim = SimulatedIO::new(7, fc); + let path = Path::new("/test/file.dat"); + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); + sim.write_at(fd, 0, b"hello").unwrap(); + sim.sync(fd).unwrap(); + + let err = sim.write_at(fd, 5, b"world").unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::Other); + let err2 = sim.sync(fd).unwrap_err(); + assert_eq!(err2.kind(), io::ErrorKind::Other); + let mut buf = [0u8; 5]; + let err3 = sim.read_at(fd, 0, &mut buf).unwrap_err(); + assert_eq!(err3.kind(), io::ErrorKind::Other); + } + + #[test] + fn misdirected_read_reads_wrong_offset() { + let fc = FaultConfig { + misdirected_read_probability: Probability::new(1.0), + ..FaultConfig::none() + }; + let sim = SimulatedIO::new(1, fc); + let path = Path::new("/test/file.dat"); + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); + let data: Vec = (0..2048u32).flat_map(|n| n.to_le_bytes()).collect(); + sim.write_at(fd, 0, &data).unwrap(); + sim.sync(fd).unwrap(); + + let mut drifted_hit = false; + for _ in 0..32 { + let mut buf = [0u8; 16]; + let target_off = 4096u64; + let expected = &data[target_off as usize..target_off as usize + 16]; + if sim.read_at(fd, target_off, &mut buf).unwrap() == 16 && buf != expected { + drifted_hit = true; + break; + } + } + assert!( + drifted_hit, + "misdirected read never drifted away from target" + ); + } + + #[test] + fn sync_reorder_window_defers_durability() { + let fc = FaultConfig { + sync_reorder_window: SyncReorderWindow(2), + ..FaultConfig::none() + }; + let sim = SimulatedIO::new(42, fc); + let dir = Path::new("/test"); + sim.mkdir(dir).unwrap(); + sim.sync_dir(dir).unwrap(); + + let a = Path::new("/test/a.dat"); + let fd_a = sim.open(a, OpenOptions::read_write()).unwrap(); + sim.write_at(fd_a, 0, b"A").unwrap(); + sim.sync(fd_a).unwrap(); + assert!(sim.durable_contents(fd_a).unwrap().is_empty()); + + let b = Path::new("/test/b.dat"); + let fd_b = sim.open(b, OpenOptions::read_write()).unwrap(); + sim.write_at(fd_b, 0, b"B").unwrap(); + sim.sync(fd_b).unwrap(); + assert!(sim.durable_contents(fd_a).unwrap().is_empty()); + assert!(sim.durable_contents(fd_b).unwrap().is_empty()); + + let c = Path::new("/test/c.dat"); + let fd_c = sim.open(c, OpenOptions::read_write()).unwrap(); + sim.write_at(fd_c, 0, b"C").unwrap(); + sim.sync(fd_c).unwrap(); + assert_eq!(sim.durable_contents(fd_a).unwrap(), b"A"); + assert!(sim.durable_contents(fd_b).unwrap().is_empty()); + assert!(sim.durable_contents(fd_c).unwrap().is_empty()); + } + + #[test] + fn sync_reorder_commits_at_sync_time_snapshot_not_current_buffer() { + let fc = FaultConfig { + sync_reorder_window: SyncReorderWindow(1), + ..FaultConfig::none() + }; + let sim = SimulatedIO::new(42, fc); + let dir = Path::new("/test"); + sim.mkdir(dir).unwrap(); + sim.sync_dir(dir).unwrap(); + + let a = Path::new("/test/a.dat"); + let fd_a = sim.open(a, OpenOptions::read_write()).unwrap(); + sim.write_at(fd_a, 0, b"first").unwrap(); + sim.sync(fd_a).unwrap(); + sim.write_at(fd_a, 0, b"second").unwrap(); + + let b = Path::new("/test/b.dat"); + let fd_b = sim.open(b, OpenOptions::read_write()).unwrap(); + sim.write_at(fd_b, 0, b"b").unwrap(); + sim.sync(fd_b).unwrap(); + + assert_eq!( + sim.durable_contents(fd_a).unwrap(), + b"first", + "reordered sync must commit buffered-at-sync-call, not current buffered" + ); + } + + #[test] + fn crash_drops_pending_reordered_syncs() { + let fc = FaultConfig { + sync_reorder_window: SyncReorderWindow(4), + ..FaultConfig::none() + }; + let sim = SimulatedIO::new(42, fc); + let dir = Path::new("/test"); + sim.mkdir(dir).unwrap(); + sim.sync_dir(dir).unwrap(); + + let path = Path::new("/test/file.dat"); + let fd = sim.open(path, OpenOptions::read_write()).unwrap(); + sim.sync_dir(dir).unwrap(); + sim.write_at(fd, 0, b"pending").unwrap(); + sim.sync(fd).unwrap(); + sim.crash(); + + let fd2 = sim.open(path, OpenOptions::read()).unwrap(); + assert_eq!(sim.file_size(fd2).unwrap(), 0); + } + #[test] fn last_sync_persisted_tracks_truth() { let sim = SimulatedIO::pristine(42);