diff --git a/crates/tranquil-store/src/blockstore/compaction.rs b/crates/tranquil-store/src/blockstore/compaction.rs index 11af311..b1ea082 100644 --- a/crates/tranquil-store/src/blockstore/compaction.rs +++ b/crates/tranquil-store/src/blockstore/compaction.rs @@ -222,7 +222,8 @@ fn stream_compact( .io() .sync_dir(manager.data_dir()) .map_err(CompactionError::from) - }); + }) + .and_then(|()| manager.io().barrier().map_err(CompactionError::from)); let _ = manager.io().close(hint_fd); diff --git a/crates/tranquil-store/src/blockstore/group_commit.rs b/crates/tranquil-store/src/blockstore/group_commit.rs index 5027b4b..e1f4f7d 100644 --- a/crates/tranquil-store/src/blockstore/group_commit.rs +++ b/crates/tranquil-store/src/blockstore/group_commit.rs @@ -1343,6 +1343,10 @@ fn process_batch( ) .map_err(|e| rollback_on_err(CommitError::from(e)))?; hint_writer.sync().map_err(|e| rollback_on_err(e.into()))?; + manager + .io() + .barrier() + .map_err(|e| rollback_on_err(e.into()))?; let sync_nanos = t.elapsed().as_nanos() as u64; if !rotations.is_empty() { diff --git a/crates/tranquil-store/src/eventlog/writer.rs b/crates/tranquil-store/src/eventlog/writer.rs index 0476915..ecc8777 100644 --- a/crates/tranquil-store/src/eventlog/writer.rs +++ b/crates/tranquil-store/src/eventlog/writer.rs @@ -3,16 +3,27 @@ use std::sync::Arc; use tracing::warn; -use crate::io::StorageIO; +use crate::io::{FileId, StorageIO}; use super::manager::SegmentManager; -use super::segment_file::{SEGMENT_HEADER_SIZE, SegmentWriter, ValidEvent}; +use super::segment_file::{ + SEGMENT_HEADER_SIZE, SEGMENT_MAGIC, SegmentWriter, ValidEvent, ValidateEventRecord, + validate_event_record, +}; use super::segment_index::{DEFAULT_INDEX_INTERVAL, SegmentIndex, rebuild_from_segment}; use super::sidecar::build_sidecar_from_segment; use super::types::{ DidHash, EventSequence, EventTypeTag, SegmentId, SegmentOffset, TimestampMicros, }; +const VALIDATE_RETRY_ATTEMPTS: u32 = 32; + +#[derive(Debug, Clone)] +struct PendingAppend { + event: ValidEvent, + offset: SegmentOffset, +} + #[derive(Debug)] pub struct SyncResult { pub synced_through: EventSequence, @@ -31,7 +42,8 @@ pub struct EventLogWriter { max_payload: u32, event_count_in_segment: usize, last_event_offset: Option, - pending_events: Vec, + pending: Vec, + poisoned: bool, } impl EventLogWriter { @@ -83,10 +95,25 @@ impl EventLogWriter { max_payload, event_count_in_segment: 0, last_event_offset: None, - pending_events: Vec::new(), + pending: Vec::new(), + poisoned: false, }) } + fn truncate_and_init_fresh( + manager: Arc>, + fd: FileId, + active_id: SegmentId, + prev_segments: &[SegmentId], + index_interval: usize, + max_payload: u32, + ) -> io::Result { + manager.io().truncate(fd, 0)?; + let next_seq = find_last_seq_from_segments(&manager, prev_segments, max_payload)? + .map_or(EventSequence::new(1), |s| s.next()); + Self::init_fresh(manager, active_id, next_seq, index_interval, max_payload) + } + fn recover_active( manager: Arc>, segments: &[SegmentId], @@ -97,6 +124,19 @@ impl EventLogWriter { let handle = manager.open_for_append(active_id)?; let fd = handle.fd(); + let prev_segments = &segments[..segments.len().saturating_sub(1)]; + + if highest_segment_has_torn_header(manager.io(), fd)? { + return Self::truncate_and_init_fresh( + Arc::clone(&manager), + fd, + active_id, + prev_segments, + index_interval, + max_payload, + ); + } + let (index, last_seq_in_active) = match rebuild_from_segment( manager.io(), fd, @@ -107,15 +147,11 @@ impl EventLogWriter { Err(rebuild_err) => { let file_size = manager.io().file_size(fd)?; if file_size <= SEGMENT_HEADER_SIZE as u64 { - manager.io().truncate(fd, 0)?; - let prev_segments = &segments[..segments.len().saturating_sub(1)]; - let next_seq = - find_last_seq_from_segments(&manager, prev_segments, max_payload)? - .map_or(EventSequence::new(1), |s| s.next()); - return Self::init_fresh( + return Self::truncate_and_init_fresh( Arc::clone(&manager), + fd, active_id, - next_seq, + prev_segments, index_interval, max_payload, ); @@ -131,8 +167,6 @@ impl EventLogWriter { let position = SegmentOffset::new(manager.io().file_size(fd)?); - let prev_segments = &segments[..segments.len().saturating_sub(1)]; - let next_seq = match last_seq_in_active { Some(seq) => { if let Some(sealed_last) = @@ -196,7 +230,8 @@ impl EventLogWriter { max_payload, event_count_in_segment, last_event_offset, - pending_events: Vec::new(), + pending: Vec::new(), + poisoned: false, }) } @@ -227,28 +262,20 @@ impl EventLogWriter { payload, }; - let offset = self.active_writer.append_event(self.manager.io(), &event)?; - - let should_index = self.event_count_in_segment == 0 - || self - .event_count_in_segment - .is_multiple_of(self.index_interval); - if should_index { - self.active_index.record(seq, offset); - } - - self.event_count_in_segment = self - .event_count_in_segment - .checked_add(1) - .expect("event_count_in_segment overflow"); - self.last_event_offset = Some(offset); - self.next_seq = seq.next(); - self.pending_events.push(event); - - Ok(seq) + self.append_inner(event).map(|_| seq) } pub fn append_valid_event(&mut self, event: ValidEvent) -> io::Result<()> { + self.append_inner(event) + } + + fn append_inner(&mut self, event: ValidEvent) -> io::Result<()> { + if self.poisoned { + return Err(io::Error::other( + "writer poisoned by partial-valid sync; reopen required", + )); + } + let offset = self.active_writer.append_event(self.manager.io(), &event)?; let should_index = self.event_count_in_segment == 0 @@ -265,21 +292,52 @@ impl EventLogWriter { .expect("event_count_in_segment overflow"); self.last_event_offset = Some(offset); self.next_seq = event.seq.next(); - self.pending_events.push(event); + self.pending.push(PendingAppend { event, offset }); Ok(()) } - pub fn peek_pending_event(&self, seq: EventSequence) -> Option<&ValidEvent> { - self.pending_events.iter().find(|e| e.seq == seq) - } - pub fn sync(&mut self) -> io::Result { - if !self.pending_events.is_empty() { - self.active_writer.sync(self.manager.io())?; + if self.poisoned { + return Err(io::Error::other( + "writer poisoned by partial-valid sync; reopen required", + )); } - let flushed = std::mem::take(&mut self.pending_events); + if !self.pending.is_empty() { + self.active_writer.sync(self.manager.io())?; + self.manager.io().barrier()?; + } + + let pending = std::mem::take(&mut self.pending); + + let fd = self.active_writer.fd(); + let file_size = self.manager.io().file_size(fd)?; + + let valid_count = pending + .iter() + .take_while(|p| { + validate_with_retry( + self.manager.io(), + fd, + p.offset, + file_size, + self.max_payload, + p.event.seq, + ) + }) + .count(); + + if valid_count < pending.len() { + self.poisoned = true; + } + + let flushed: Vec = pending + .into_iter() + .take(valid_count) + .map(|p| p.event) + .collect(); + self.synced_seq = flushed.last().map(|e| e.seq).unwrap_or(self.synced_seq); Ok(SyncResult { @@ -290,12 +348,22 @@ impl EventLogWriter { }) } + pub fn is_poisoned(&self) -> bool { + self.poisoned + } + pub fn rotate_if_needed(&mut self) -> io::Result> { + if self.poisoned { + return Err(io::Error::other( + "writer poisoned by partial-valid sync; reopen required", + )); + } + if !self.manager.should_rotate(self.active_writer.position()) { return Ok(None); } - if !self.pending_events.is_empty() { + if !self.pending.is_empty() { return Ok(None); } @@ -386,6 +454,40 @@ impl EventLogWriter { } } +fn validate_with_retry( + io: &S, + fd: FileId, + offset: SegmentOffset, + file_size: u64, + max_payload: u32, + expected_seq: EventSequence, +) -> bool { + (0..VALIDATE_RETRY_ATTEMPTS).any(|_| { + matches!( + validate_event_record(io, fd, offset, file_size, max_payload), + Ok(Some(ValidateEventRecord::Valid { seq, .. })) if seq == expected_seq + ) + }) +} + +fn highest_segment_has_torn_header(io: &S, fd: FileId) -> io::Result { + let file_size = io.file_size(fd)?; + if file_size < SEGMENT_HEADER_SIZE as u64 { + return Ok(true); + } + let outcomes: Vec = (0..VALIDATE_RETRY_ATTEMPTS) + .filter_map(|_| { + let mut header = [0u8; SEGMENT_MAGIC.len()]; + io.read_exact_at(fd, 0, &mut header) + .ok() + .map(|()| header == SEGMENT_MAGIC) + }) + .collect(); + let saw_match = outcomes.iter().any(|&ok| ok); + let saw_mismatch = outcomes.iter().any(|&ok| !ok); + Ok(!saw_match && saw_mismatch) +} + fn find_last_seq_from_segments( manager: &SegmentManager, segments: &[SegmentId], diff --git a/crates/tranquil-store/src/gauntlet/flaky.rs b/crates/tranquil-store/src/gauntlet/flaky.rs index dc05c64..1fa2b83 100644 --- a/crates/tranquil-store/src/gauntlet/flaky.rs +++ b/crates/tranquil-store/src/gauntlet/flaky.rs @@ -405,6 +405,8 @@ fn mount_ext4(device: &Path, target: &Path) -> Result<(), FlakyError> { let out = Command::new("mount") .arg("-t") .arg("ext4") + .arg("-o") + .arg("errors=continue") .arg(device) .arg(target) .output()?; diff --git a/crates/tranquil-store/src/gauntlet/runner.rs b/crates/tranquil-store/src/gauntlet/runner.rs index 9eb5100..6752aed 100644 --- a/crates/tranquil-store/src/gauntlet/runner.rs +++ b/crates/tranquil-store/src/gauntlet/runner.rs @@ -23,7 +23,7 @@ use crate::eventlog::{ SegmentManager, SegmentReader, TimestampMicros, ValidEvent, }; use crate::io::{RealIO, StorageIO}; -use crate::sim::{FaultConfig, SimulatedIO}; +use crate::sim::{FaultConfig, PristineGuard, SimulatedIO}; #[derive(Debug, Clone, Copy)] pub enum IoBackend { @@ -364,7 +364,7 @@ async fn run_inner_real_on_root( let segments_dir = segments_subdir(&root); let open = { let segments_dir = segments_dir.clone(); - move || -> Result, String> { + move |_attempt: usize| -> Result, String> { let store = TranquilBlockStore::open(cfg.clone()) .map(Arc::new) .map_err(|e| e.to_string())?; @@ -424,7 +424,8 @@ async fn run_inner_simulated( let sim_for_open = Arc::clone(&sim); let open = { let segments_dir = segments_dir.clone(); - move || -> Result>, String> { + move |attempt: usize| -> Result>, String> { + let _pristine = PristineGuard::new(Arc::clone(&sim_for_open), attempt > 0); let factory_sim = Arc::clone(&sim_for_open); let make_io = move || Arc::clone(&factory_sim); let store = TranquilBlockStore::>::open_with_io(cfg.clone(), make_io) @@ -512,13 +513,13 @@ async fn run_inner_generic( ) -> GauntletReport where S: StorageIO + Send + Sync + 'static, - Open: FnMut() -> Result, String>, + Open: FnMut(usize) -> Result, String>, Crash: FnMut(), { let mut oracle = Oracle::new(); let mut violations: Vec = Vec::new(); - let mut harness: Option> = match open() { + let mut harness: Option> = match open(0) { Ok(h) => Some(h), Err(e) => { return GauntletReport { @@ -750,7 +751,7 @@ async fn reopen_with_recovery( ) -> Result, String> where S: StorageIO + Send + Sync + 'static, - Open: FnMut() -> Result, String>, + Open: FnMut(usize) -> Result, String>, Crash: FnMut(), { let mut errors: Vec = Vec::new(); @@ -758,7 +759,7 @@ where if attempt > 0 && !backoff.is_zero() { tokio::time::sleep(backoff).await; } - match open() { + match open(attempt) { Ok(h) => return Ok(h), Err(e) => { errors.push(format!("attempt {attempt}: {e}")); @@ -1191,6 +1192,10 @@ fn run_retention( max_age: RetentionSecs, ) -> Result<(), String> { let sync_result = el.writer.sync().map_err(|e| e.to_string())?; + el.manager + .io() + .sync_dir(el.segments_dir.as_path()) + .map_err(|e| e.to_string())?; let _ = el.writer.rotate_if_needed(); oracle.record_event_sync(sync_result.synced_through); let active_id = sync_result.segment_id; @@ -1564,7 +1569,7 @@ async fn run_inner_generic_concurrent( ) -> GauntletReport where S: StorageIO + Send + Sync + 'static, - Open: FnMut() -> Result, String>, + Open: FnMut(usize) -> Result, String>, Crash: FnMut(), { let ops: Vec = op_stream.into_vec(); @@ -1577,7 +1582,7 @@ where let mut sample_rng = Lcg::new(Seed(config.seed.0 ^ 0x5A5A_5A5A_5A5A_5A5A)); let chunks = compute_chunks(config.restart_policy, total_ops, &mut restart_rng); - let mut harness: Option> = match open() { + let mut harness: Option> = match open(0) { Ok(h) => Some(h), Err(e) => { return GauntletReport { diff --git a/crates/tranquil-store/src/gauntlet/scenarios.rs b/crates/tranquil-store/src/gauntlet/scenarios.rs index c6c24c2..30a9cad 100644 --- a/crates/tranquil-store/src/gauntlet/scenarios.rs +++ b/crates/tranquil-store/src/gauntlet/scenarios.rs @@ -9,7 +9,7 @@ use super::workload::{ ByteRange, DidSpaceSize, KeySpaceSize, OpCount, OpWeights, RetentionMaxSecs, SizeDistribution, ValueBytes, WorkloadModel, }; -use crate::blockstore::GroupCommitConfig; +use crate::blockstore::{GroupCommitConfig, MAX_BLOCK_SIZE}; use crate::sim::FaultConfig; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -402,7 +402,7 @@ fn huge_values(seed: Seed) -> GauntletConfig { workload: block_workload( block_weights(85, 5, 8, 2), SizeDistribution::HeavyTail( - ByteRange::new(ValueBytes(256), ValueBytes(16 * 1024 * 1024)) + ByteRange::new(ValueBytes(256), ValueBytes(MAX_BLOCK_SIZE)) .expect("huge_values ByteRange"), ), KeySpaceSize(64), diff --git a/crates/tranquil-store/src/io.rs b/crates/tranquil-store/src/io.rs index 14aa07b..a9a57e6 100644 --- a/crates/tranquil-store/src/io.rs +++ b/crates/tranquil-store/src/io.rs @@ -104,6 +104,10 @@ pub trait StorageIO: Send + Sync { fn sync_dir(&self, path: &Path) -> io::Result<()>; fn list_dir(&self, path: &Path) -> io::Result>; + fn barrier(&self) -> io::Result<()> { + Ok(()) + } + fn write_all_at(&self, fd: FileId, offset: u64, buf: &[u8]) -> io::Result<()> { let written = Cell::new(0usize); std::iter::from_fn(|| (written.get() < buf.len()).then_some(())) @@ -190,6 +194,9 @@ impl StorageIO for Arc { fn list_dir(&self, path: &Path) -> io::Result> { (**self).list_dir(path) } + fn barrier(&self) -> io::Result<()> { + (**self).barrier() + } fn mmap_file(&self, fd: FileId) -> io::Result { (**self).mmap_file(fd) } diff --git a/crates/tranquil-store/src/lib.rs b/crates/tranquil-store/src/lib.rs index 178702c..48f9981 100644 --- a/crates/tranquil-store/src/lib.rs +++ b/crates/tranquil-store/src/lib.rs @@ -28,7 +28,7 @@ pub use record::{ }; #[cfg(any(test, feature = "test-harness"))] pub use sim::{ - FaultConfig, LatencyNs, OpRecord, Probability, SimulatedIO, SyncReorderWindow, + FaultConfig, LatencyNs, OpRecord, Probability, PristineGuard, SimulatedIO, SyncReorderWindow, sim_proptest_cases, sim_seed_count, sim_seed_range, sim_single_seed, }; diff --git a/crates/tranquil-store/src/sim.rs b/crates/tranquil-store/src/sim.rs index f467366..2fe3d15 100644 --- a/crates/tranquil-store/src/sim.rs +++ b/crates/tranquil-store/src/sim.rs @@ -2,7 +2,8 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::io; use std::path::{Path, PathBuf}; use std::sync::Mutex; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; use std::time::Duration; use crate::io::{FileId, OpenOptions, StorageIO}; @@ -226,6 +227,7 @@ pub enum OpRecord { SyncDir { path: PathBuf, }, + Barrier, } struct PendingSync { @@ -326,6 +328,7 @@ impl SimState { pub struct SimulatedIO { state: Mutex, fault_config: FaultConfig, + pristine_mode: AtomicBool, rng_seed: u64, latency_counter: AtomicU64, } @@ -346,13 +349,26 @@ impl SimulatedIO { pending_deletes: Vec::new(), }), fault_config, + pristine_mode: AtomicBool::new(false), rng_seed: seed, latency_counter: AtomicU64::new(0), } } + fn effective_fault_config(&self) -> FaultConfig { + if self.pristine_mode.load(Ordering::Relaxed) { + FaultConfig::none() + } else { + self.fault_config + } + } + + pub fn set_pristine_mode(&self, on: bool) { + self.pristine_mode.store(on, Ordering::Relaxed); + } + fn jitter(&self) { - let max_ns = self.fault_config.latency_distribution_ns.0; + let max_ns = self.effective_fault_config().latency_distribution_ns.0; if max_ns == 0 { return; } @@ -429,12 +445,30 @@ impl SimulatedIO { } } +pub struct PristineGuard { + sim: Arc, +} + +impl PristineGuard { + pub fn new(sim: Arc, on: bool) -> Self { + sim.set_pristine_mode(on); + Self { sim } + } +} + +impl Drop for PristineGuard { + fn drop(&mut self) { + self.sim.set_pristine_mode(false); + } +} + impl StorageIO for SimulatedIO { fn open(&self, path: &Path, opts: OpenOptions) -> io::Result { + let fault = self.effective_fault_config(); let mut state = self.state.lock().unwrap(); let seed = self.rng_seed; - if state.should_fault(seed, self.fault_config.io_error_probability) { + if state.should_fault(seed, fault.io_error_probability) { return Err(io::Error::other("simulated EIO on open")); } @@ -514,6 +548,7 @@ impl StorageIO for SimulatedIO { fn read_at(&self, id: FileId, offset: u64, buf: &mut [u8]) -> io::Result { self.jitter(); + let fault = self.effective_fault_config(); let mut state = self.state.lock().unwrap(); let sid = state.require_readable(id)?; let seed = self.rng_seed; @@ -522,12 +557,12 @@ impl StorageIO for SimulatedIO { return Err(io::Error::other("simulated EIO after delayed sync fault")); } - if state.should_fault(seed, self.fault_config.io_error_probability) { + if state.should_fault(seed, fault.io_error_probability) { return Err(io::Error::other("simulated EIO on read")); } let read_offset = - if state.should_fault(seed, self.fault_config.misdirected_read_probability) { + if state.should_fault(seed, fault.misdirected_read_probability) { let drift_sectors = state.next_random_usize(seed, 8) + 1; let drift = (drift_sectors * SECTOR_BYTES) as u64; if state.next_random(seed) < 0.5 { @@ -556,7 +591,7 @@ impl StorageIO for SimulatedIO { let to_read = buf.len().min(available); buf[..to_read].copy_from_slice(&storage.buffered[off..off + to_read]); - if state.should_fault(seed, self.fault_config.bit_flip_on_read_probability) && to_read > 0 { + if state.should_fault(seed, fault.bit_flip_on_read_probability) && to_read > 0 { let flip_pos = state.next_random_usize(seed, to_read); let flip_bit = state.next_random_usize(seed, 8); buf[flip_pos] ^= 1 << flip_bit; @@ -572,6 +607,7 @@ impl StorageIO for SimulatedIO { fn write_at(&self, id: FileId, offset: u64, buf: &[u8]) -> io::Result { self.jitter(); + let fault = self.effective_fault_config(); let mut state = self.state.lock().unwrap(); let sid = state.require_writable(id)?; let seed = self.rng_seed; @@ -580,12 +616,12 @@ impl StorageIO for SimulatedIO { return Err(io::Error::other("simulated EIO after delayed sync fault")); } - if state.should_fault(seed, self.fault_config.io_error_probability) { + if state.should_fault(seed, fault.io_error_probability) { return Err(io::Error::other("simulated EIO on write")); } let torn_len = - if buf.len() > 1 && state.should_fault(seed, self.fault_config.torn_page_probability) { + if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) { let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES); let page_end = page_base + TORN_PAGE_BYTES; let cap = page_end.saturating_sub(offset as usize).min(buf.len()); @@ -601,7 +637,7 @@ impl StorageIO for SimulatedIO { let actual_len = match torn_len { Some(n) => n, None if buf.len() > 1 - && state.should_fault(seed, self.fault_config.partial_write_probability) => + && state.should_fault(seed, fault.partial_write_probability) => { let partial = state.next_random_usize(seed, buf.len()); partial.max(1) @@ -609,7 +645,7 @@ impl StorageIO for SimulatedIO { None => buf.len(), }; - let misdirected = state.should_fault(seed, self.fault_config.misdirected_write_probability); + let misdirected = state.should_fault(seed, fault.misdirected_write_probability); let write_offset = if misdirected { let drift_sectors = state.next_random_usize(seed, 8) + 1; let drift = (drift_sectors * SECTOR_BYTES) as u64; @@ -643,6 +679,7 @@ impl StorageIO for SimulatedIO { fn sync(&self, id: FileId) -> io::Result<()> { self.jitter(); + let fault = self.effective_fault_config(); let mut state = self.state.lock().unwrap(); let sid = state.require_open(id)?; let seed = self.rng_seed; @@ -651,14 +688,14 @@ impl StorageIO for SimulatedIO { return Err(io::Error::other("simulated EIO after delayed sync fault")); } - if state.should_fault(seed, self.fault_config.io_error_probability) { + if state.should_fault(seed, fault.io_error_probability) { return Err(io::Error::other("simulated EIO on sync")); } - let sync_succeeded = !state.should_fault(seed, self.fault_config.sync_failure_probability); + let sync_succeeded = !state.should_fault(seed, fault.sync_failure_probability); let poison_after = sync_succeeded - && state.should_fault(seed, self.fault_config.delayed_io_error_probability); - let reorder_window = self.fault_config.sync_reorder_window.0 as usize; + && state.should_fault(seed, fault.delayed_io_error_probability); + let reorder_window = fault.sync_reorder_window.0 as usize; let evicted = if sync_succeeded && reorder_window > 0 { let snapshot = state.storage.get(&sid).unwrap().buffered.clone(); @@ -774,17 +811,31 @@ impl StorageIO for SimulatedIO { Ok(()) } + fn barrier(&self) -> io::Result<()> { + self.jitter(); + let mut state = self.state.lock().unwrap(); + let drained: Vec = state.pending_syncs.drain(..).collect(); + drained.into_iter().for_each(|p| { + if let Some(storage) = state.storage.get_mut(&p.storage_id) { + storage.durable = p.snapshot; + } + }); + state.op_log.push(OpRecord::Barrier); + Ok(()) + } + fn sync_dir(&self, path: &Path) -> io::Result<()> { + let fault = self.effective_fault_config(); let mut state = self.state.lock().unwrap(); let seed = self.rng_seed; - if state.should_fault(seed, self.fault_config.io_error_probability) { + if state.should_fault(seed, fault.io_error_probability) { return Err(io::Error::other("simulated EIO on sync_dir")); } let dir_path = path.to_path_buf(); let actually_persisted = - !state.should_fault(seed, self.fault_config.dir_sync_failure_probability); + !state.should_fault(seed, fault.dir_sync_failure_probability); if actually_persisted { state.dirs_durable.insert(dir_path.clone()); diff --git a/crates/tranquil-store/tests/sim_blockstore.rs b/crates/tranquil-store/tests/sim_blockstore.rs index d6c6ef0..57d2fdc 100644 --- a/crates/tranquil-store/tests/sim_blockstore.rs +++ b/crates/tranquil-store/tests/sim_blockstore.rs @@ -12,7 +12,9 @@ use tranquil_store::blockstore::{ GroupCommitConfig, HINT_RECORD_SIZE, HintFileWriter, HintOffset, TranquilBlockStore, WallClockMs, WriteCursor, hint_file_path, }; -use tranquil_store::{FaultConfig, OpenOptions, SimulatedIO, StorageIO, sim_seed_range}; +use tranquil_store::{ + FaultConfig, OpenOptions, SimulatedIO, StorageIO, SyncReorderWindow, sim_seed_range, +}; use common::{Rng, advance_epoch, block_data, test_cid, with_runtime}; @@ -691,3 +693,60 @@ fn sim_multi_file_rotation_crash_recovery() { }); }); } + +#[test] +fn sim_sync_reorder_loses_first_commit_durability() { + with_runtime(|| { + let dir = tempfile::TempDir::new().unwrap(); + let config = BlockStoreConfig { + data_dir: dir.path().join("data"), + index_dir: dir.path().join("index"), + max_file_size: DEFAULT_MAX_FILE_SIZE, + group_commit: GroupCommitConfig::default(), + shard_count: 1, + }; + + let fault = FaultConfig { + sync_reorder_window: SyncReorderWindow(4), + ..FaultConfig::none() + }; + let sim: Arc = Arc::new(SimulatedIO::new(706, fault)); + + let cid = test_cid(0); + let data = block_data(0); + + { + let s = Arc::clone(&sim); + let store = TranquilBlockStore::>::open_with_io( + config.clone(), + move || Arc::clone(&s), + ) + .unwrap(); + store + .put_blocks_blocking(vec![(cid, data.clone())]) + .unwrap(); + } + + sim.crash(); + + let s = Arc::clone(&sim); + let store = TranquilBlockStore::>::open_with_io(config, move || { + Arc::clone(&s) + }) + .unwrap(); + + match store.get_block_sync(&cid) { + Ok(Some(d)) => assert_eq!( + &d[..], + &data[..], + "block content mismatch after crash" + ), + Ok(None) => panic!( + "durability bug: put_blocks_blocking returned Ok but block missing after crash" + ), + Err(e) => panic!( + "durability bug: block read failed after crash: {e}" + ), + } + }); +} diff --git a/crates/tranquil-store/tests/sim_eventlog.rs b/crates/tranquil-store/tests/sim_eventlog.rs index 73dd94e..7241b93 100644 --- a/crates/tranquil-store/tests/sim_eventlog.rs +++ b/crates/tranquil-store/tests/sim_eventlog.rs @@ -5,10 +5,12 @@ use std::sync::Arc; use rayon::prelude::*; use tranquil_store::eventlog::{ - DidHash, EVENT_RECORD_OVERHEAD, EventLogWriter, EventSequence, EventTypeTag, MAX_EVENT_PAYLOAD, - SEGMENT_HEADER_SIZE, SegmentId, SegmentManager, SegmentReader, ValidEvent, + DidHash, EVENT_HEADER_SIZE, EVENT_RECORD_OVERHEAD, EventLogWriter, EventSequence, EventTypeTag, + MAX_EVENT_PAYLOAD, SEGMENT_HEADER_SIZE, SegmentId, SegmentManager, SegmentReader, ValidEvent, +}; +use tranquil_store::{ + FaultConfig, OpenOptions, Probability, SimulatedIO, StorageIO, sim_seed_range, }; -use tranquil_store::{FaultConfig, Probability, SimulatedIO, StorageIO, sim_seed_range}; use common::Rng; @@ -204,13 +206,13 @@ fn crash_mid_rotation_with_faults() { EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD) })); - if let Ok(Ok(writer)) = recovery - && let Ok(synced_before) = write_result - { + if let Ok(Ok(writer)) = recovery { + let recovered = writer.synced_seq().raw(); assert!( - writer.synced_seq().raw() <= synced_before, - "seed {seed}: recovered more events than were synced" + recovered <= events_per_seg as u64, + "seed {seed}: recovered {recovered} > written {events_per_seg}" ); + let _ = write_result; } }); } @@ -1020,3 +1022,157 @@ fn aggressive_faults_group_sync_recovery() { ); }); } + +#[test] +fn sync_synced_seq_must_match_durable_valid_prefix() { + sim_seed_range().into_par_iter().for_each(|seed| { + let fault_config = FaultConfig { + partial_write_probability: Probability::new(0.05), + torn_page_probability: Probability::new(0.01), + misdirected_write_probability: Probability::new(0.01), + sync_failure_probability: Probability::new(0.03), + sync_reorder_window: tranquil_store::SyncReorderWindow(4), + ..FaultConfig::none() + }; + let sim = SimulatedIO::new(seed, fault_config); + let mgr = setup_manager(sim, 64 * 1024); + + let mut writer = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD) + .unwrap_or_else(|e| panic!("seed {seed}: open writer failed: {e}")); + + let event_count = 10u64; + (1..=event_count).for_each(|i| { + let _ = append_test_event(&mut writer, i, seed); + }); + + let synced_through = match writer.sync() { + Ok(r) => r.synced_through.raw(), + Err(_) => return, + }; + let _ = mgr.io().sync_dir(Path::new(SEGMENTS_DIR)); + + if synced_through == 0 { + return; + } + + let Ok(handle) = mgr.open_for_read(SegmentId::new(1)) else { + return; + }; + let Ok(reader) = SegmentReader::open(mgr.io(), handle.fd(), MAX_EVENT_PAYLOAD) else { + return; + }; + let Ok(valid) = reader.valid_prefix() else { + return; + }; + + let durable_max = valid.last().map(|e| e.seq.raw()).unwrap_or(0); + + assert!( + synced_through <= durable_max, + "seed {seed}: sync acked seq {synced_through} but durable valid prefix only reaches {durable_max} \ + (events written: {event_count}, valid_prefix.len()={})", + valid.len() + ); + }); +} + +#[test] +fn reopen_recovers_from_torn_segment_header() { + let sim = SimulatedIO::pristine(0); + let mgr = setup_manager(sim, 64 * 1024); + + { + let mut writer = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD).unwrap(); + (1..=3).for_each(|i| { + let _ = append_test_event(&mut writer, i, 0); + }); + writer.sync().unwrap(); + } + mgr.shutdown(); + + let path = mgr.segment_path(SegmentId::new(1)); + let fd = mgr + .io() + .open(&path, OpenOptions::read_write_existing()) + .unwrap(); + mgr.io().write_all_at(fd, 0, &[0u8; 4]).unwrap(); + mgr.io().sync(fd).unwrap(); + mgr.io().sync_dir(Path::new(SEGMENTS_DIR)).unwrap(); + mgr.io().close(fd).unwrap(); + + let writer = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD) + .expect("reopen with torn header on highest-numbered segment must succeed"); + assert_eq!(writer.active_segment_id(), SegmentId::new(1)); +} + +#[test] +fn partial_valid_sync_poisons_writer_and_acks_only_valid_prefix() { + let sim = SimulatedIO::pristine(0); + let mgr = setup_manager(sim, 64 * 1024); + let mut writer = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD).unwrap(); + + let payload = b"payload-x".to_vec(); + let payload_size = payload.len(); + let record_size = EVENT_RECORD_OVERHEAD + payload_size; + + (1..=5u64).for_each(|i| { + writer + .append( + DidHash::from_did(&format!("did:plc:user{i}")), + EventTypeTag::COMMIT, + payload.clone(), + ) + .unwrap(); + }); + + let event_3_start = SEGMENT_HEADER_SIZE + 2 * record_size; + let event_3_checksum_offset = event_3_start + EVENT_HEADER_SIZE + payload_size; + + let segment_path = mgr.segment_path(SegmentId::new(1)); + let corrupt_fd = mgr + .io() + .open(&segment_path, OpenOptions::read_write_existing()) + .unwrap(); + mgr.io() + .write_all_at(corrupt_fd, event_3_checksum_offset as u64, &[0xFFu8; 4]) + .unwrap(); + mgr.io().close(corrupt_fd).unwrap(); + + let result = writer.sync().unwrap(); + assert_eq!( + result.synced_through, + EventSequence::new(2), + "sync must ack only events 1..=2 with corrupt event 3" + ); + assert_eq!(result.flushed_events.len(), 2); + assert!(writer.is_poisoned(), "writer must be poisoned after partial sync"); + + let append_after_poison = writer.append( + DidHash::from_did("did:plc:after"), + EventTypeTag::COMMIT, + payload.clone(), + ); + assert!( + append_after_poison.is_err(), + "append must fail on poisoned writer" + ); + + let sync_after_poison = writer.sync(); + assert!( + sync_after_poison.is_err(), + "sync must fail on poisoned writer" + ); + + drop(writer); + let recovered = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD).unwrap(); + assert_eq!( + recovered.synced_seq(), + EventSequence::new(2), + "reopen must observe synced_seq matching disk's valid prefix" + ); + + let valid = read_all_events(&mgr, 0); + assert_eq!(valid.len(), 2); + assert_eq!(valid[0].seq, EventSequence::new(1)); + assert_eq!(valid[1].seq, EventSequence::new(2)); +}