fix(tranquil-store): barrier durability + torn-header recovery

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-04-29 10:48:46 +03:00
parent d4dfe838eb
commit efd499bb26
11 changed files with 467 additions and 80 deletions

View File

@@ -222,7 +222,8 @@ fn stream_compact<S: StorageIO>(
.io()
.sync_dir(manager.data_dir())
.map_err(CompactionError::from)
});
})
.and_then(|()| manager.io().barrier().map_err(CompactionError::from));
let _ = manager.io().close(hint_fd);

View File

@@ -1343,6 +1343,10 @@ fn process_batch<S: StorageIO>(
)
.map_err(|e| rollback_on_err(CommitError::from(e)))?;
hint_writer.sync().map_err(|e| rollback_on_err(e.into()))?;
manager
.io()
.barrier()
.map_err(|e| rollback_on_err(e.into()))?;
let sync_nanos = t.elapsed().as_nanos() as u64;
if !rotations.is_empty() {

View File

@@ -3,16 +3,27 @@ use std::sync::Arc;
use tracing::warn;
use crate::io::StorageIO;
use crate::io::{FileId, StorageIO};
use super::manager::SegmentManager;
use super::segment_file::{SEGMENT_HEADER_SIZE, SegmentWriter, ValidEvent};
use super::segment_file::{
SEGMENT_HEADER_SIZE, SEGMENT_MAGIC, SegmentWriter, ValidEvent, ValidateEventRecord,
validate_event_record,
};
use super::segment_index::{DEFAULT_INDEX_INTERVAL, SegmentIndex, rebuild_from_segment};
use super::sidecar::build_sidecar_from_segment;
use super::types::{
DidHash, EventSequence, EventTypeTag, SegmentId, SegmentOffset, TimestampMicros,
};
const VALIDATE_RETRY_ATTEMPTS: u32 = 32;
#[derive(Debug, Clone)]
struct PendingAppend {
event: ValidEvent,
offset: SegmentOffset,
}
#[derive(Debug)]
pub struct SyncResult {
pub synced_through: EventSequence,
@@ -31,7 +42,8 @@ pub struct EventLogWriter<S: StorageIO> {
max_payload: u32,
event_count_in_segment: usize,
last_event_offset: Option<SegmentOffset>,
pending_events: Vec<ValidEvent>,
pending: Vec<PendingAppend>,
poisoned: bool,
}
impl<S: StorageIO> EventLogWriter<S> {
@@ -83,10 +95,25 @@ impl<S: StorageIO> EventLogWriter<S> {
max_payload,
event_count_in_segment: 0,
last_event_offset: None,
pending_events: Vec::new(),
pending: Vec::new(),
poisoned: false,
})
}
fn truncate_and_init_fresh(
manager: Arc<SegmentManager<S>>,
fd: FileId,
active_id: SegmentId,
prev_segments: &[SegmentId],
index_interval: usize,
max_payload: u32,
) -> io::Result<Self> {
manager.io().truncate(fd, 0)?;
let next_seq = find_last_seq_from_segments(&manager, prev_segments, max_payload)?
.map_or(EventSequence::new(1), |s| s.next());
Self::init_fresh(manager, active_id, next_seq, index_interval, max_payload)
}
fn recover_active(
manager: Arc<SegmentManager<S>>,
segments: &[SegmentId],
@@ -97,6 +124,19 @@ impl<S: StorageIO> EventLogWriter<S> {
let handle = manager.open_for_append(active_id)?;
let fd = handle.fd();
let prev_segments = &segments[..segments.len().saturating_sub(1)];
if highest_segment_has_torn_header(manager.io(), fd)? {
return Self::truncate_and_init_fresh(
Arc::clone(&manager),
fd,
active_id,
prev_segments,
index_interval,
max_payload,
);
}
let (index, last_seq_in_active) = match rebuild_from_segment(
manager.io(),
fd,
@@ -107,15 +147,11 @@ impl<S: StorageIO> EventLogWriter<S> {
Err(rebuild_err) => {
let file_size = manager.io().file_size(fd)?;
if file_size <= SEGMENT_HEADER_SIZE as u64 {
manager.io().truncate(fd, 0)?;
let prev_segments = &segments[..segments.len().saturating_sub(1)];
let next_seq =
find_last_seq_from_segments(&manager, prev_segments, max_payload)?
.map_or(EventSequence::new(1), |s| s.next());
return Self::init_fresh(
return Self::truncate_and_init_fresh(
Arc::clone(&manager),
fd,
active_id,
next_seq,
prev_segments,
index_interval,
max_payload,
);
@@ -131,8 +167,6 @@ impl<S: StorageIO> EventLogWriter<S> {
let position = SegmentOffset::new(manager.io().file_size(fd)?);
let prev_segments = &segments[..segments.len().saturating_sub(1)];
let next_seq = match last_seq_in_active {
Some(seq) => {
if let Some(sealed_last) =
@@ -196,7 +230,8 @@ impl<S: StorageIO> EventLogWriter<S> {
max_payload,
event_count_in_segment,
last_event_offset,
pending_events: Vec::new(),
pending: Vec::new(),
poisoned: false,
})
}
@@ -227,28 +262,20 @@ impl<S: StorageIO> EventLogWriter<S> {
payload,
};
let offset = self.active_writer.append_event(self.manager.io(), &event)?;
let should_index = self.event_count_in_segment == 0
|| self
.event_count_in_segment
.is_multiple_of(self.index_interval);
if should_index {
self.active_index.record(seq, offset);
}
self.event_count_in_segment = self
.event_count_in_segment
.checked_add(1)
.expect("event_count_in_segment overflow");
self.last_event_offset = Some(offset);
self.next_seq = seq.next();
self.pending_events.push(event);
Ok(seq)
self.append_inner(event).map(|_| seq)
}
pub fn append_valid_event(&mut self, event: ValidEvent) -> io::Result<()> {
self.append_inner(event)
}
fn append_inner(&mut self, event: ValidEvent) -> io::Result<()> {
if self.poisoned {
return Err(io::Error::other(
"writer poisoned by partial-valid sync; reopen required",
));
}
let offset = self.active_writer.append_event(self.manager.io(), &event)?;
let should_index = self.event_count_in_segment == 0
@@ -265,21 +292,52 @@ impl<S: StorageIO> EventLogWriter<S> {
.expect("event_count_in_segment overflow");
self.last_event_offset = Some(offset);
self.next_seq = event.seq.next();
self.pending_events.push(event);
self.pending.push(PendingAppend { event, offset });
Ok(())
}
pub fn peek_pending_event(&self, seq: EventSequence) -> Option<&ValidEvent> {
self.pending_events.iter().find(|e| e.seq == seq)
}
pub fn sync(&mut self) -> io::Result<SyncResult> {
if !self.pending_events.is_empty() {
self.active_writer.sync(self.manager.io())?;
if self.poisoned {
return Err(io::Error::other(
"writer poisoned by partial-valid sync; reopen required",
));
}
let flushed = std::mem::take(&mut self.pending_events);
if !self.pending.is_empty() {
self.active_writer.sync(self.manager.io())?;
self.manager.io().barrier()?;
}
let pending = std::mem::take(&mut self.pending);
let fd = self.active_writer.fd();
let file_size = self.manager.io().file_size(fd)?;
let valid_count = pending
.iter()
.take_while(|p| {
validate_with_retry(
self.manager.io(),
fd,
p.offset,
file_size,
self.max_payload,
p.event.seq,
)
})
.count();
if valid_count < pending.len() {
self.poisoned = true;
}
let flushed: Vec<ValidEvent> = pending
.into_iter()
.take(valid_count)
.map(|p| p.event)
.collect();
self.synced_seq = flushed.last().map(|e| e.seq).unwrap_or(self.synced_seq);
Ok(SyncResult {
@@ -290,12 +348,22 @@ impl<S: StorageIO> EventLogWriter<S> {
})
}
pub fn is_poisoned(&self) -> bool {
self.poisoned
}
pub fn rotate_if_needed(&mut self) -> io::Result<Option<SegmentId>> {
if self.poisoned {
return Err(io::Error::other(
"writer poisoned by partial-valid sync; reopen required",
));
}
if !self.manager.should_rotate(self.active_writer.position()) {
return Ok(None);
}
if !self.pending_events.is_empty() {
if !self.pending.is_empty() {
return Ok(None);
}
@@ -386,6 +454,40 @@ impl<S: StorageIO> EventLogWriter<S> {
}
}
fn validate_with_retry<S: StorageIO>(
io: &S,
fd: FileId,
offset: SegmentOffset,
file_size: u64,
max_payload: u32,
expected_seq: EventSequence,
) -> bool {
(0..VALIDATE_RETRY_ATTEMPTS).any(|_| {
matches!(
validate_event_record(io, fd, offset, file_size, max_payload),
Ok(Some(ValidateEventRecord::Valid { seq, .. })) if seq == expected_seq
)
})
}
fn highest_segment_has_torn_header<S: StorageIO>(io: &S, fd: FileId) -> io::Result<bool> {
let file_size = io.file_size(fd)?;
if file_size < SEGMENT_HEADER_SIZE as u64 {
return Ok(true);
}
let outcomes: Vec<bool> = (0..VALIDATE_RETRY_ATTEMPTS)
.filter_map(|_| {
let mut header = [0u8; SEGMENT_MAGIC.len()];
io.read_exact_at(fd, 0, &mut header)
.ok()
.map(|()| header == SEGMENT_MAGIC)
})
.collect();
let saw_match = outcomes.iter().any(|&ok| ok);
let saw_mismatch = outcomes.iter().any(|&ok| !ok);
Ok(!saw_match && saw_mismatch)
}
fn find_last_seq_from_segments<S: StorageIO>(
manager: &SegmentManager<S>,
segments: &[SegmentId],

View File

@@ -405,6 +405,8 @@ fn mount_ext4(device: &Path, target: &Path) -> Result<(), FlakyError> {
let out = Command::new("mount")
.arg("-t")
.arg("ext4")
.arg("-o")
.arg("errors=continue")
.arg(device)
.arg(target)
.output()?;

View File

@@ -23,7 +23,7 @@ use crate::eventlog::{
SegmentManager, SegmentReader, TimestampMicros, ValidEvent,
};
use crate::io::{RealIO, StorageIO};
use crate::sim::{FaultConfig, SimulatedIO};
use crate::sim::{FaultConfig, PristineGuard, SimulatedIO};
#[derive(Debug, Clone, Copy)]
pub enum IoBackend {
@@ -364,7 +364,7 @@ async fn run_inner_real_on_root(
let segments_dir = segments_subdir(&root);
let open = {
let segments_dir = segments_dir.clone();
move || -> Result<Harness<RealIO>, String> {
move |_attempt: usize| -> Result<Harness<RealIO>, String> {
let store = TranquilBlockStore::open(cfg.clone())
.map(Arc::new)
.map_err(|e| e.to_string())?;
@@ -424,7 +424,8 @@ async fn run_inner_simulated(
let sim_for_open = Arc::clone(&sim);
let open = {
let segments_dir = segments_dir.clone();
move || -> Result<Harness<Arc<SimulatedIO>>, String> {
move |attempt: usize| -> Result<Harness<Arc<SimulatedIO>>, String> {
let _pristine = PristineGuard::new(Arc::clone(&sim_for_open), attempt > 0);
let factory_sim = Arc::clone(&sim_for_open);
let make_io = move || Arc::clone(&factory_sim);
let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(cfg.clone(), make_io)
@@ -512,13 +513,13 @@ async fn run_inner_generic<S, Open, Crash>(
) -> GauntletReport
where
S: StorageIO + Send + Sync + 'static,
Open: FnMut() -> Result<Harness<S>, String>,
Open: FnMut(usize) -> Result<Harness<S>, String>,
Crash: FnMut(),
{
let mut oracle = Oracle::new();
let mut violations: Vec<InvariantViolation> = Vec::new();
let mut harness: Option<Harness<S>> = match open() {
let mut harness: Option<Harness<S>> = match open(0) {
Ok(h) => Some(h),
Err(e) => {
return GauntletReport {
@@ -750,7 +751,7 @@ async fn reopen_with_recovery<S, Open, Crash>(
) -> Result<Harness<S>, String>
where
S: StorageIO + Send + Sync + 'static,
Open: FnMut() -> Result<Harness<S>, String>,
Open: FnMut(usize) -> Result<Harness<S>, String>,
Crash: FnMut(),
{
let mut errors: Vec<String> = Vec::new();
@@ -758,7 +759,7 @@ where
if attempt > 0 && !backoff.is_zero() {
tokio::time::sleep(backoff).await;
}
match open() {
match open(attempt) {
Ok(h) => return Ok(h),
Err(e) => {
errors.push(format!("attempt {attempt}: {e}"));
@@ -1191,6 +1192,10 @@ fn run_retention<S: StorageIO + Send + Sync + 'static>(
max_age: RetentionSecs,
) -> Result<(), String> {
let sync_result = el.writer.sync().map_err(|e| e.to_string())?;
el.manager
.io()
.sync_dir(el.segments_dir.as_path())
.map_err(|e| e.to_string())?;
let _ = el.writer.rotate_if_needed();
oracle.record_event_sync(sync_result.synced_through);
let active_id = sync_result.segment_id;
@@ -1564,7 +1569,7 @@ async fn run_inner_generic_concurrent<S, Open, Crash>(
) -> GauntletReport
where
S: StorageIO + Send + Sync + 'static,
Open: FnMut() -> Result<Harness<S>, String>,
Open: FnMut(usize) -> Result<Harness<S>, String>,
Crash: FnMut(),
{
let ops: Vec<Op> = op_stream.into_vec();
@@ -1577,7 +1582,7 @@ where
let mut sample_rng = Lcg::new(Seed(config.seed.0 ^ 0x5A5A_5A5A_5A5A_5A5A));
let chunks = compute_chunks(config.restart_policy, total_ops, &mut restart_rng);
let mut harness: Option<Harness<S>> = match open() {
let mut harness: Option<Harness<S>> = match open(0) {
Ok(h) => Some(h),
Err(e) => {
return GauntletReport {

View File

@@ -9,7 +9,7 @@ use super::workload::{
ByteRange, DidSpaceSize, KeySpaceSize, OpCount, OpWeights, RetentionMaxSecs, SizeDistribution,
ValueBytes, WorkloadModel,
};
use crate::blockstore::GroupCommitConfig;
use crate::blockstore::{GroupCommitConfig, MAX_BLOCK_SIZE};
use crate::sim::FaultConfig;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -402,7 +402,7 @@ fn huge_values(seed: Seed) -> GauntletConfig {
workload: block_workload(
block_weights(85, 5, 8, 2),
SizeDistribution::HeavyTail(
ByteRange::new(ValueBytes(256), ValueBytes(16 * 1024 * 1024))
ByteRange::new(ValueBytes(256), ValueBytes(MAX_BLOCK_SIZE))
.expect("huge_values ByteRange"),
),
KeySpaceSize(64),

View File

@@ -104,6 +104,10 @@ pub trait StorageIO: Send + Sync {
fn sync_dir(&self, path: &Path) -> io::Result<()>;
fn list_dir(&self, path: &Path) -> io::Result<Vec<PathBuf>>;
fn barrier(&self) -> io::Result<()> {
Ok(())
}
fn write_all_at(&self, fd: FileId, offset: u64, buf: &[u8]) -> io::Result<()> {
let written = Cell::new(0usize);
std::iter::from_fn(|| (written.get() < buf.len()).then_some(()))
@@ -190,6 +194,9 @@ impl<S: StorageIO> StorageIO for Arc<S> {
fn list_dir(&self, path: &Path) -> io::Result<Vec<PathBuf>> {
(**self).list_dir(path)
}
fn barrier(&self) -> io::Result<()> {
(**self).barrier()
}
fn mmap_file(&self, fd: FileId) -> io::Result<MappedFile> {
(**self).mmap_file(fd)
}

View File

@@ -28,7 +28,7 @@ pub use record::{
};
#[cfg(any(test, feature = "test-harness"))]
pub use sim::{
FaultConfig, LatencyNs, OpRecord, Probability, SimulatedIO, SyncReorderWindow,
FaultConfig, LatencyNs, OpRecord, Probability, PristineGuard, SimulatedIO, SyncReorderWindow,
sim_proptest_cases, sim_seed_count, sim_seed_range, sim_single_seed,
};

View File

@@ -2,7 +2,8 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::io::{FileId, OpenOptions, StorageIO};
@@ -226,6 +227,7 @@ pub enum OpRecord {
SyncDir {
path: PathBuf,
},
Barrier,
}
struct PendingSync {
@@ -326,6 +328,7 @@ impl SimState {
pub struct SimulatedIO {
state: Mutex<SimState>,
fault_config: FaultConfig,
pristine_mode: AtomicBool,
rng_seed: u64,
latency_counter: AtomicU64,
}
@@ -346,13 +349,26 @@ impl SimulatedIO {
pending_deletes: Vec::new(),
}),
fault_config,
pristine_mode: AtomicBool::new(false),
rng_seed: seed,
latency_counter: AtomicU64::new(0),
}
}
fn effective_fault_config(&self) -> FaultConfig {
if self.pristine_mode.load(Ordering::Relaxed) {
FaultConfig::none()
} else {
self.fault_config
}
}
pub fn set_pristine_mode(&self, on: bool) {
self.pristine_mode.store(on, Ordering::Relaxed);
}
fn jitter(&self) {
let max_ns = self.fault_config.latency_distribution_ns.0;
let max_ns = self.effective_fault_config().latency_distribution_ns.0;
if max_ns == 0 {
return;
}
@@ -429,12 +445,30 @@ impl SimulatedIO {
}
}
pub struct PristineGuard {
sim: Arc<SimulatedIO>,
}
impl PristineGuard {
pub fn new(sim: Arc<SimulatedIO>, on: bool) -> Self {
sim.set_pristine_mode(on);
Self { sim }
}
}
impl Drop for PristineGuard {
fn drop(&mut self) {
self.sim.set_pristine_mode(false);
}
}
impl StorageIO for SimulatedIO {
fn open(&self, path: &Path, opts: OpenOptions) -> io::Result<FileId> {
let fault = self.effective_fault_config();
let mut state = self.state.lock().unwrap();
let seed = self.rng_seed;
if state.should_fault(seed, self.fault_config.io_error_probability) {
if state.should_fault(seed, fault.io_error_probability) {
return Err(io::Error::other("simulated EIO on open"));
}
@@ -514,6 +548,7 @@ impl StorageIO for SimulatedIO {
fn read_at(&self, id: FileId, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
self.jitter();
let fault = self.effective_fault_config();
let mut state = self.state.lock().unwrap();
let sid = state.require_readable(id)?;
let seed = self.rng_seed;
@@ -522,12 +557,12 @@ impl StorageIO for SimulatedIO {
return Err(io::Error::other("simulated EIO after delayed sync fault"));
}
if state.should_fault(seed, self.fault_config.io_error_probability) {
if state.should_fault(seed, fault.io_error_probability) {
return Err(io::Error::other("simulated EIO on read"));
}
let read_offset =
if state.should_fault(seed, self.fault_config.misdirected_read_probability) {
if state.should_fault(seed, fault.misdirected_read_probability) {
let drift_sectors = state.next_random_usize(seed, 8) + 1;
let drift = (drift_sectors * SECTOR_BYTES) as u64;
if state.next_random(seed) < 0.5 {
@@ -556,7 +591,7 @@ impl StorageIO for SimulatedIO {
let to_read = buf.len().min(available);
buf[..to_read].copy_from_slice(&storage.buffered[off..off + to_read]);
if state.should_fault(seed, self.fault_config.bit_flip_on_read_probability) && to_read > 0 {
if state.should_fault(seed, fault.bit_flip_on_read_probability) && to_read > 0 {
let flip_pos = state.next_random_usize(seed, to_read);
let flip_bit = state.next_random_usize(seed, 8);
buf[flip_pos] ^= 1 << flip_bit;
@@ -572,6 +607,7 @@ impl StorageIO for SimulatedIO {
fn write_at(&self, id: FileId, offset: u64, buf: &[u8]) -> io::Result<usize> {
self.jitter();
let fault = self.effective_fault_config();
let mut state = self.state.lock().unwrap();
let sid = state.require_writable(id)?;
let seed = self.rng_seed;
@@ -580,12 +616,12 @@ impl StorageIO for SimulatedIO {
return Err(io::Error::other("simulated EIO after delayed sync fault"));
}
if state.should_fault(seed, self.fault_config.io_error_probability) {
if state.should_fault(seed, fault.io_error_probability) {
return Err(io::Error::other("simulated EIO on write"));
}
let torn_len =
if buf.len() > 1 && state.should_fault(seed, self.fault_config.torn_page_probability) {
if buf.len() > 1 && state.should_fault(seed, fault.torn_page_probability) {
let page_base = (offset as usize) - ((offset as usize) % TORN_PAGE_BYTES);
let page_end = page_base + TORN_PAGE_BYTES;
let cap = page_end.saturating_sub(offset as usize).min(buf.len());
@@ -601,7 +637,7 @@ impl StorageIO for SimulatedIO {
let actual_len = match torn_len {
Some(n) => n,
None if buf.len() > 1
&& state.should_fault(seed, self.fault_config.partial_write_probability) =>
&& state.should_fault(seed, fault.partial_write_probability) =>
{
let partial = state.next_random_usize(seed, buf.len());
partial.max(1)
@@ -609,7 +645,7 @@ impl StorageIO for SimulatedIO {
None => buf.len(),
};
let misdirected = state.should_fault(seed, self.fault_config.misdirected_write_probability);
let misdirected = state.should_fault(seed, fault.misdirected_write_probability);
let write_offset = if misdirected {
let drift_sectors = state.next_random_usize(seed, 8) + 1;
let drift = (drift_sectors * SECTOR_BYTES) as u64;
@@ -643,6 +679,7 @@ impl StorageIO for SimulatedIO {
fn sync(&self, id: FileId) -> io::Result<()> {
self.jitter();
let fault = self.effective_fault_config();
let mut state = self.state.lock().unwrap();
let sid = state.require_open(id)?;
let seed = self.rng_seed;
@@ -651,14 +688,14 @@ impl StorageIO for SimulatedIO {
return Err(io::Error::other("simulated EIO after delayed sync fault"));
}
if state.should_fault(seed, self.fault_config.io_error_probability) {
if state.should_fault(seed, fault.io_error_probability) {
return Err(io::Error::other("simulated EIO on sync"));
}
let sync_succeeded = !state.should_fault(seed, self.fault_config.sync_failure_probability);
let sync_succeeded = !state.should_fault(seed, fault.sync_failure_probability);
let poison_after = sync_succeeded
&& state.should_fault(seed, self.fault_config.delayed_io_error_probability);
let reorder_window = self.fault_config.sync_reorder_window.0 as usize;
&& state.should_fault(seed, fault.delayed_io_error_probability);
let reorder_window = fault.sync_reorder_window.0 as usize;
let evicted = if sync_succeeded && reorder_window > 0 {
let snapshot = state.storage.get(&sid).unwrap().buffered.clone();
@@ -774,17 +811,31 @@ impl StorageIO for SimulatedIO {
Ok(())
}
fn barrier(&self) -> io::Result<()> {
self.jitter();
let mut state = self.state.lock().unwrap();
let drained: Vec<PendingSync> = state.pending_syncs.drain(..).collect();
drained.into_iter().for_each(|p| {
if let Some(storage) = state.storage.get_mut(&p.storage_id) {
storage.durable = p.snapshot;
}
});
state.op_log.push(OpRecord::Barrier);
Ok(())
}
fn sync_dir(&self, path: &Path) -> io::Result<()> {
let fault = self.effective_fault_config();
let mut state = self.state.lock().unwrap();
let seed = self.rng_seed;
if state.should_fault(seed, self.fault_config.io_error_probability) {
if state.should_fault(seed, fault.io_error_probability) {
return Err(io::Error::other("simulated EIO on sync_dir"));
}
let dir_path = path.to_path_buf();
let actually_persisted =
!state.should_fault(seed, self.fault_config.dir_sync_failure_probability);
!state.should_fault(seed, fault.dir_sync_failure_probability);
if actually_persisted {
state.dirs_durable.insert(dir_path.clone());

View File

@@ -12,7 +12,9 @@ use tranquil_store::blockstore::{
GroupCommitConfig, HINT_RECORD_SIZE, HintFileWriter, HintOffset, TranquilBlockStore,
WallClockMs, WriteCursor, hint_file_path,
};
use tranquil_store::{FaultConfig, OpenOptions, SimulatedIO, StorageIO, sim_seed_range};
use tranquil_store::{
FaultConfig, OpenOptions, SimulatedIO, StorageIO, SyncReorderWindow, sim_seed_range,
};
use common::{Rng, advance_epoch, block_data, test_cid, with_runtime};
@@ -691,3 +693,60 @@ fn sim_multi_file_rotation_crash_recovery() {
});
});
}
#[test]
fn sim_sync_reorder_loses_first_commit_durability() {
with_runtime(|| {
let dir = tempfile::TempDir::new().unwrap();
let config = BlockStoreConfig {
data_dir: dir.path().join("data"),
index_dir: dir.path().join("index"),
max_file_size: DEFAULT_MAX_FILE_SIZE,
group_commit: GroupCommitConfig::default(),
shard_count: 1,
};
let fault = FaultConfig {
sync_reorder_window: SyncReorderWindow(4),
..FaultConfig::none()
};
let sim: Arc<SimulatedIO> = Arc::new(SimulatedIO::new(706, fault));
let cid = test_cid(0);
let data = block_data(0);
{
let s = Arc::clone(&sim);
let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(
config.clone(),
move || Arc::clone(&s),
)
.unwrap();
store
.put_blocks_blocking(vec![(cid, data.clone())])
.unwrap();
}
sim.crash();
let s = Arc::clone(&sim);
let store = TranquilBlockStore::<Arc<SimulatedIO>>::open_with_io(config, move || {
Arc::clone(&s)
})
.unwrap();
match store.get_block_sync(&cid) {
Ok(Some(d)) => assert_eq!(
&d[..],
&data[..],
"block content mismatch after crash"
),
Ok(None) => panic!(
"durability bug: put_blocks_blocking returned Ok but block missing after crash"
),
Err(e) => panic!(
"durability bug: block read failed after crash: {e}"
),
}
});
}

View File

@@ -5,10 +5,12 @@ use std::sync::Arc;
use rayon::prelude::*;
use tranquil_store::eventlog::{
DidHash, EVENT_RECORD_OVERHEAD, EventLogWriter, EventSequence, EventTypeTag, MAX_EVENT_PAYLOAD,
SEGMENT_HEADER_SIZE, SegmentId, SegmentManager, SegmentReader, ValidEvent,
DidHash, EVENT_HEADER_SIZE, EVENT_RECORD_OVERHEAD, EventLogWriter, EventSequence, EventTypeTag,
MAX_EVENT_PAYLOAD, SEGMENT_HEADER_SIZE, SegmentId, SegmentManager, SegmentReader, ValidEvent,
};
use tranquil_store::{
FaultConfig, OpenOptions, Probability, SimulatedIO, StorageIO, sim_seed_range,
};
use tranquil_store::{FaultConfig, Probability, SimulatedIO, StorageIO, sim_seed_range};
use common::Rng;
@@ -204,13 +206,13 @@ fn crash_mid_rotation_with_faults() {
EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD)
}));
if let Ok(Ok(writer)) = recovery
&& let Ok(synced_before) = write_result
{
if let Ok(Ok(writer)) = recovery {
let recovered = writer.synced_seq().raw();
assert!(
writer.synced_seq().raw() <= synced_before,
"seed {seed}: recovered more events than were synced"
recovered <= events_per_seg as u64,
"seed {seed}: recovered {recovered} > written {events_per_seg}"
);
let _ = write_result;
}
});
}
@@ -1020,3 +1022,157 @@ fn aggressive_faults_group_sync_recovery() {
);
});
}
#[test]
fn sync_synced_seq_must_match_durable_valid_prefix() {
sim_seed_range().into_par_iter().for_each(|seed| {
let fault_config = FaultConfig {
partial_write_probability: Probability::new(0.05),
torn_page_probability: Probability::new(0.01),
misdirected_write_probability: Probability::new(0.01),
sync_failure_probability: Probability::new(0.03),
sync_reorder_window: tranquil_store::SyncReorderWindow(4),
..FaultConfig::none()
};
let sim = SimulatedIO::new(seed, fault_config);
let mgr = setup_manager(sim, 64 * 1024);
let mut writer = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD)
.unwrap_or_else(|e| panic!("seed {seed}: open writer failed: {e}"));
let event_count = 10u64;
(1..=event_count).for_each(|i| {
let _ = append_test_event(&mut writer, i, seed);
});
let synced_through = match writer.sync() {
Ok(r) => r.synced_through.raw(),
Err(_) => return,
};
let _ = mgr.io().sync_dir(Path::new(SEGMENTS_DIR));
if synced_through == 0 {
return;
}
let Ok(handle) = mgr.open_for_read(SegmentId::new(1)) else {
return;
};
let Ok(reader) = SegmentReader::open(mgr.io(), handle.fd(), MAX_EVENT_PAYLOAD) else {
return;
};
let Ok(valid) = reader.valid_prefix() else {
return;
};
let durable_max = valid.last().map(|e| e.seq.raw()).unwrap_or(0);
assert!(
synced_through <= durable_max,
"seed {seed}: sync acked seq {synced_through} but durable valid prefix only reaches {durable_max} \
(events written: {event_count}, valid_prefix.len()={})",
valid.len()
);
});
}
#[test]
fn reopen_recovers_from_torn_segment_header() {
let sim = SimulatedIO::pristine(0);
let mgr = setup_manager(sim, 64 * 1024);
{
let mut writer = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD).unwrap();
(1..=3).for_each(|i| {
let _ = append_test_event(&mut writer, i, 0);
});
writer.sync().unwrap();
}
mgr.shutdown();
let path = mgr.segment_path(SegmentId::new(1));
let fd = mgr
.io()
.open(&path, OpenOptions::read_write_existing())
.unwrap();
mgr.io().write_all_at(fd, 0, &[0u8; 4]).unwrap();
mgr.io().sync(fd).unwrap();
mgr.io().sync_dir(Path::new(SEGMENTS_DIR)).unwrap();
mgr.io().close(fd).unwrap();
let writer = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD)
.expect("reopen with torn header on highest-numbered segment must succeed");
assert_eq!(writer.active_segment_id(), SegmentId::new(1));
}
#[test]
fn partial_valid_sync_poisons_writer_and_acks_only_valid_prefix() {
let sim = SimulatedIO::pristine(0);
let mgr = setup_manager(sim, 64 * 1024);
let mut writer = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD).unwrap();
let payload = b"payload-x".to_vec();
let payload_size = payload.len();
let record_size = EVENT_RECORD_OVERHEAD + payload_size;
(1..=5u64).for_each(|i| {
writer
.append(
DidHash::from_did(&format!("did:plc:user{i}")),
EventTypeTag::COMMIT,
payload.clone(),
)
.unwrap();
});
let event_3_start = SEGMENT_HEADER_SIZE + 2 * record_size;
let event_3_checksum_offset = event_3_start + EVENT_HEADER_SIZE + payload_size;
let segment_path = mgr.segment_path(SegmentId::new(1));
let corrupt_fd = mgr
.io()
.open(&segment_path, OpenOptions::read_write_existing())
.unwrap();
mgr.io()
.write_all_at(corrupt_fd, event_3_checksum_offset as u64, &[0xFFu8; 4])
.unwrap();
mgr.io().close(corrupt_fd).unwrap();
let result = writer.sync().unwrap();
assert_eq!(
result.synced_through,
EventSequence::new(2),
"sync must ack only events 1..=2 with corrupt event 3"
);
assert_eq!(result.flushed_events.len(), 2);
assert!(writer.is_poisoned(), "writer must be poisoned after partial sync");
let append_after_poison = writer.append(
DidHash::from_did("did:plc:after"),
EventTypeTag::COMMIT,
payload.clone(),
);
assert!(
append_after_poison.is_err(),
"append must fail on poisoned writer"
);
let sync_after_poison = writer.sync();
assert!(
sync_after_poison.is_err(),
"sync must fail on poisoned writer"
);
drop(writer);
let recovered = EventLogWriter::open(Arc::clone(&mgr), 256, MAX_EVENT_PAYLOAD).unwrap();
assert_eq!(
recovered.synced_seq(),
EventSequence::new(2),
"reopen must observe synced_seq matching disk's valid prefix"
);
let valid = read_all_events(&mgr, 0);
assert_eq!(valid.len(), 2);
assert_eq!(valid[0].seq, EventSequence::new(1));
assert_eq!(valid[1].seq, EventSequence::new(2));
}