From 2afd0754960741843600f97dfb56e3da79d27543 Mon Sep 17 00:00:00 2001 From: Lewis Date: Mon, 20 Apr 2026 21:00:54 +0300 Subject: [PATCH] fix(tranquil-store): atomic record commits, hint-as-truth recovery Lewis: May this revision serve well! --- crates/tranquil-lexicon/src/dynamic.rs | 13 +- crates/tranquil-lexicon/src/registry.rs | 2 +- .../src/blockstore/group_commit.rs | 153 +++++++++++- .../tranquil-store/src/blockstore/manager.rs | 11 +- .../tranquil-store/src/blockstore/reader.rs | 47 ++-- crates/tranquil-store/src/blockstore/store.rs | 66 +++--- .../tranquil-store/src/gauntlet/overrides.rs | 6 + crates/tranquil-store/src/gauntlet/runner.rs | 222 ++++++++---------- .../tranquil-store/src/gauntlet/scenarios.rs | 5 +- .../tests/verify_rollback_orphan.rs | 128 ++++++++++ 10 files changed, 460 insertions(+), 193 deletions(-) create mode 100644 crates/tranquil-store/tests/verify_rollback_orphan.rs diff --git a/crates/tranquil-lexicon/src/dynamic.rs b/crates/tranquil-lexicon/src/dynamic.rs index 036226c..48691e4 100644 --- a/crates/tranquil-lexicon/src/dynamic.rs +++ b/crates/tranquil-lexicon/src/dynamic.rs @@ -61,8 +61,6 @@ impl Drop for InFlightGuard<'_> { impl DynamicRegistry { pub fn new() -> Self { - let network_disabled = - std::env::var("TRANQUIL_LEXICON_OFFLINE").is_ok_and(|v| v == "1" || v == "true"); Self { store: RwLock::new(SchemaStore { schemas: HashMap::new(), @@ -70,11 +68,18 @@ impl DynamicRegistry { }), negative_cache: RwLock::new(HashMap::new()), in_flight: RwLock::new(HashMap::new()), - network_disabled: AtomicBool::new(network_disabled), + network_disabled: AtomicBool::new(false), } } - #[allow(dead_code)] + pub fn from_env() -> Self { + let registry = Self::new(); + let disabled = + std::env::var("TRANQUIL_LEXICON_OFFLINE").is_ok_and(|v| v == "1" || v == "true"); + registry.set_network_disabled(disabled); + registry + } + pub fn set_network_disabled(&self, disabled: bool) { self.network_disabled.store(disabled, Ordering::Relaxed); } diff --git a/crates/tranquil-lexicon/src/registry.rs b/crates/tranquil-lexicon/src/registry.rs index 083084d..2aba6ea 100644 --- a/crates/tranquil-lexicon/src/registry.rs +++ b/crates/tranquil-lexicon/src/registry.rs @@ -25,7 +25,7 @@ impl LexiconRegistry { Self { schemas: HashMap::new(), #[cfg(feature = "resolve")] - dynamic: crate::dynamic::DynamicRegistry::new(), + dynamic: crate::dynamic::DynamicRegistry::from_env(), } } diff --git a/crates/tranquil-store/src/blockstore/group_commit.rs b/crates/tranquil-store/src/blockstore/group_commit.rs index 2e7b54d..407fefc 100644 --- a/crates/tranquil-store/src/blockstore/group_commit.rs +++ b/crates/tranquil-store/src/blockstore/group_commit.rs @@ -11,7 +11,7 @@ use crate::fsync_order::PostBlockstoreHook; use super::BlocksSynced; use crate::io::{FileId, OpenOptions, StorageIO}; -use super::data_file::{CID_SIZE, DataFileWriter}; +use super::data_file::{CID_SIZE, DataFileWriter, ReadBlockRecord, decode_block_record}; use super::hash_index::{BlockIndex, BlockIndexError, CheckpointPositions}; use super::hint::{HintFileWriter, hint_file_path}; use super::manager::DataFileManager; @@ -106,6 +106,10 @@ pub enum CommitError { Io(Arc), Index(String), ChannelClosed, + VerifyFailed { + file_id: DataFileId, + offset: BlockOffset, + }, } impl std::fmt::Display for CommitError { @@ -114,6 +118,11 @@ impl std::fmt::Display for CommitError { Self::Io(e) => write!(f, "io: {}", e.as_ref()), Self::Index(e) => write!(f, "index: {e}"), Self::ChannelClosed => write!(f, "commit channel closed"), + Self::VerifyFailed { file_id, offset } => write!( + f, + "post-sync verify failed at {file_id}:{} (misdirected write or durable corruption)", + offset.raw() + ), } } } @@ -122,7 +131,7 @@ impl std::error::Error for CommitError { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { Self::Io(e) => Some(e.as_ref()), - Self::Index(_) | Self::ChannelClosed => None, + Self::Index(_) | Self::ChannelClosed | Self::VerifyFailed { .. } => None, } } } @@ -181,6 +190,7 @@ pub struct GroupCommitConfig { pub channel_capacity: usize, pub checkpoint_interval_ms: u64, pub checkpoint_write_threshold: u64, + pub verify_persisted_blocks: bool, } impl Default for GroupCommitConfig { @@ -190,6 +200,7 @@ impl Default for GroupCommitConfig { channel_capacity: 4096, checkpoint_interval_ms: 60_000, checkpoint_write_threshold: 100_000, + verify_persisted_blocks: false, } } } @@ -200,6 +211,7 @@ struct ShardContext { file_ids: Arc, active_files: Arc, hint_positions: Arc, + verify_persisted_blocks: bool, } struct ActiveState { @@ -403,6 +415,7 @@ impl GroupCommitWriter { file_ids: Arc::clone(&file_ids), active_files: Arc::clone(&active_files), hint_positions: Arc::clone(&hint_positions), + verify_persisted_blocks: config.verify_persisted_blocks, }; SingleShardWriter::spawn( ctx, @@ -1057,6 +1070,119 @@ struct RotationState { hint_fd: FileId, } +fn verify_persisted_blocks( + manager: &DataFileManager, + entries: &[([u8; CID_SIZE], BlockLocation)], +) -> Result<(), CommitError> { + use std::collections::BTreeMap; + let by_file: BTreeMap> = entries + .iter() + .fold(BTreeMap::new(), |mut acc, (cid, loc)| { + acc.entry(loc.file_id).or_default().push((cid, *loc)); + acc + }); + + by_file.into_iter().try_for_each(|(file_id, locations)| { + let path = manager.data_file_path(file_id); + let fd = match manager.io().open(&path, OpenOptions::read_only_existing()) { + Ok(fd) => fd, + Err(_) => return Ok(()), + }; + let file_size = match manager.io().file_size(fd) { + Ok(s) => s, + Err(_) => { + let _ = manager.io().close(fd); + return Ok(()); + } + }; + let result = locations.into_iter().try_for_each(|(expected_cid, loc)| { + verify_block_at(manager, fd, file_size, expected_cid, loc) + }); + let _ = manager.io().close(fd); + result + }) +} + +#[derive(Debug)] +enum VerifyOutcome { + NoFaultDetected, + Faulted, +} + +fn verify_block_at( + manager: &DataFileManager, + fd: FileId, + file_size: u64, + expected_cid: &[u8; CID_SIZE], + loc: BlockLocation, +) -> Result<(), CommitError> { + let passed = (0..VERIFY_RETRY_ATTEMPTS).any(|_| { + matches!( + verify_once(manager, fd, file_size, expected_cid, loc), + VerifyOutcome::NoFaultDetected + ) + }); + match passed { + true => Ok(()), + false => Err(CommitError::VerifyFailed { + file_id: loc.file_id, + offset: loc.offset, + }), + } +} + +fn verify_once( + manager: &DataFileManager, + fd: FileId, + file_size: u64, + expected_cid: &[u8; CID_SIZE], + loc: BlockLocation, +) -> VerifyOutcome { + match decode_block_record(manager.io(), fd, loc.offset, file_size) { + Ok(Some(ReadBlockRecord::Valid { cid_bytes, .. })) if cid_bytes == *expected_cid => { + VerifyOutcome::NoFaultDetected + } + Ok(Some(ReadBlockRecord::Valid { .. })) => { + tracing::warn!( + file_id = %loc.file_id, + offset = loc.offset.raw(), + "verify: stored CID mismatch (misdirected write)" + ); + VerifyOutcome::Faulted + } + Ok(Some(ReadBlockRecord::Corrupted { .. } | ReadBlockRecord::Truncated { .. })) + | Ok(None) => { + tracing::warn!( + file_id = %loc.file_id, + offset = loc.offset.raw(), + "verify: block undecodable at location" + ); + VerifyOutcome::Faulted + } + Err(_) => VerifyOutcome::NoFaultDetected, + } +} + +const VERIFY_RETRY_ATTEMPTS: u32 = 4; + +fn rollback_batch( + manager: &DataFileManager, + state: &ActiveState, + rotations: &[RotationState], +) { + let _ = manager + .io() + .truncate(state.hint_fd, state.hint_position.raw()); + let _ = manager.io().sync(state.hint_fd); + rotations.iter().for_each(|rot| { + manager.rollback_rotation(rot.file_id, rot.fd); + let _ = manager.io().close(rot.hint_fd); + let _ = manager + .io() + .delete(&hint_file_path(manager.data_dir(), rot.file_id)); + }); +} + fn process_batch( manager: &DataFileManager, index: &BlockIndex, @@ -1154,13 +1280,7 @@ fn process_batch( }); if let Err(e) = write_result { - rotations.into_iter().for_each(|rot| { - manager.rollback_rotation(rot.file_id, rot.fd); - let _ = manager.io().close(rot.hint_fd); - let _ = manager - .io() - .delete(&hint_file_path(manager.data_dir(), rot.file_id)); - }); + rollback_batch(manager, state, &rotations); return Err(e); } @@ -1169,13 +1289,22 @@ fn process_batch( let current_epoch = epoch.current(); let now = crate::wall_clock_ms(); + let rollback_on_err = |e: CommitError| -> CommitError { + rollback_batch(manager, state, &rotations); + e + }; + all_decrements .iter() - .try_for_each(|cid| hint_writer.append_decrement(cid, current_epoch, now))?; + .try_for_each(|cid| hint_writer.append_decrement(cid, current_epoch, now)) + .map_err(|e| rollback_on_err(CommitError::from(e)))?; let t = std::time::Instant::now(); - data_writer.sync()?; - hint_writer.sync()?; + data_writer.sync().map_err(|e| rollback_on_err(e.into()))?; + if ctx.verify_persisted_blocks { + verify_persisted_blocks(manager, &index_entries).map_err(rollback_on_err)?; + } + hint_writer.sync().map_err(|e| rollback_on_err(e.into()))?; let sync_nanos = t.elapsed().as_nanos() as u64; if !rotations.is_empty() { diff --git a/crates/tranquil-store/src/blockstore/manager.rs b/crates/tranquil-store/src/blockstore/manager.rs index 3e1e9e0..77b9b93 100644 --- a/crates/tranquil-store/src/blockstore/manager.rs +++ b/crates/tranquil-store/src/blockstore/manager.rs @@ -127,6 +127,7 @@ impl DataFileManager { pub fn rollback_rotation(&self, file_id: DataFileId, fd: FileId) { let _ = self.io.close(fd); self.handles.write().remove(&file_id); + let _ = self.io.delete(&self.data_file_path(file_id)); } pub fn should_rotate(&self, position: BlockOffset) -> bool { @@ -216,7 +217,7 @@ mod tests { } #[test] - fn rotation_rollback_cleans_handle() { + fn rotation_rollback_cleans_handle_and_deletes_file() { let mgr = setup_manager(1024); let _fd0 = mgr.open_for_append(DataFileId::new(0)).unwrap(); let (next_id, next_fd) = mgr.prepare_rotation(DataFileId::new(0)).unwrap(); @@ -225,10 +226,10 @@ mod tests { assert_eq!(mgr.open_for_read(next_id).unwrap(), next_fd); mgr.rollback_rotation(next_id, next_fd); - let reopened_fd = mgr.open_for_read(next_id).unwrap(); - assert_ne!( - reopened_fd, next_fd, - "rollback should have closed the cached fd" + let reopen = mgr.open_for_read(next_id); + assert!( + reopen.is_err_and(|e| e.kind() == io::ErrorKind::NotFound), + "rollback must delete the data file so recovery cannot resurrect uncommitted bytes" ); } diff --git a/crates/tranquil-store/src/blockstore/reader.rs b/crates/tranquil-store/src/blockstore/reader.rs index de33d58..f26f499 100644 --- a/crates/tranquil-store/src/blockstore/reader.rs +++ b/crates/tranquil-store/src/blockstore/reader.rs @@ -127,26 +127,37 @@ impl BlockStoreReader { file_size: u64, location: BlockLocation, ) -> Result { - match decode_block_record(self.manager.io(), fd, location.offset, file_size)? { - Some(ReadBlockRecord::Valid { data, .. }) - if data.len() == location.length.raw() as usize => - { - Ok(Bytes::from(data)) - } - Some(ReadBlockRecord::Valid { .. }) => Err(ReadError::Corrupted { - file_id: location.file_id, - offset: location.offset, - }), - Some(ReadBlockRecord::Corrupted { offset } | ReadBlockRecord::Truncated { offset }) => { - Err(ReadError::Corrupted { + let attempt_once = || -> Result { + match decode_block_record(self.manager.io(), fd, location.offset, file_size)? { + Some(ReadBlockRecord::Valid { data, .. }) + if data.len() == location.length.raw() as usize => + { + Ok(Bytes::from(data)) + } + Some(ReadBlockRecord::Valid { .. }) => Err(ReadError::Corrupted { + file_id: location.file_id, + offset: location.offset, + }), + Some( + ReadBlockRecord::Corrupted { offset } | ReadBlockRecord::Truncated { offset }, + ) => Err(ReadError::Corrupted { file_id: location.file_id, offset, - }) + }), + None => Err(ReadError::Corrupted { + file_id: location.file_id, + offset: location.offset, + }), } - None => Err(ReadError::Corrupted { - file_id: location.file_id, - offset: location.offset, - }), - } + }; + (0..READ_RETRY_ATTEMPTS.saturating_sub(1)) + .find_map(|_| match attempt_once() { + Ok(bytes) => Some(Ok(bytes)), + Err(ReadError::Corrupted { .. }) => None, + Err(e) => Some(Err(e)), + }) + .unwrap_or_else(attempt_once) } } + +const READ_RETRY_ATTEMPTS: u32 = 4; diff --git a/crates/tranquil-store/src/blockstore/store.rs b/crates/tranquil-store/src/blockstore/store.rs index 6e88d77..8928822 100644 --- a/crates/tranquil-store/src/blockstore/store.rs +++ b/crates/tranquil-store/src/blockstore/store.rs @@ -20,8 +20,8 @@ use super::hash_index::BlockIndex; use super::manager::DataFileManager; use super::reader::{BlockStoreReader, ReadError}; use super::types::{ - BlockLength, BlockLocation, BlockOffset, CollectionResult, CompactionResult, DataFileId, - EpochCounter, LivenessInfo, WallClockMs, WriteCursor, + BlockLocation, BlockOffset, CollectionResult, CompactionResult, DataFileId, EpochCounter, + LivenessInfo, WallClockMs, }; fn cid_to_bytes(cid: &Cid) -> Result<[u8; CID_SIZE], RepoError> { @@ -37,10 +37,6 @@ fn cid_to_bytes(cid: &Cid) -> Result<[u8; CID_SIZE], RepoError> { }) } -fn block_index_err_to_repo(e: super::hash_index::BlockIndexError) -> RepoError { - RepoError::storage(io::Error::other(e.to_string())) -} - fn commit_error_to_repo(e: CommitError) -> RepoError { match e { CommitError::Io(io_err) => { @@ -51,6 +47,13 @@ fn commit_error_to_repo(e: CommitError) -> RepoError { io::ErrorKind::BrokenPipe, "blockstore commit channel closed", )), + CommitError::VerifyFailed { file_id, offset } => RepoError::storage(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "post-sync verify failed at {file_id}:{}", + offset.raw() + ), + )), } } @@ -299,7 +302,15 @@ impl TranquilBlockStore { Err(e) => return Err(RepoError::storage(e)), }; - let result = Self::scan_and_index(io, index, fd, file_id, start_offset); + let hint_path = super::hint::hint_file_path(data_dir, file_id); + let hint_exists = io + .open(&hint_path, OpenOptions::read_only_existing()) + .map(|fd| { + let _ = io.close(fd); + }) + .is_ok(); + + let result = Self::scan_and_index(io, index, fd, file_id, start_offset, hint_exists); let _ = io.close(fd); @@ -312,6 +323,7 @@ impl TranquilBlockStore { fd: crate::io::FileId, file_id: DataFileId, start_offset: BlockOffset, + hint_exists: bool, ) -> Result<(), RepoError> { let file_size = io.file_size(fd).map_err(RepoError::storage)?; @@ -320,7 +332,7 @@ impl TranquilBlockStore { } let scan_pos = &mut { start_offset }; - let (recovered_entries, last_valid_end) = std::iter::from_fn(|| { + let (scanned_entries, last_valid_end) = std::iter::from_fn(|| { match super::data_file::decode_block_record(io, fd, *scan_pos, file_size) { Err(e) => { tracing::warn!( @@ -341,9 +353,10 @@ impl TranquilBlockStore { Ok(n) if n <= super::types::MAX_BLOCK_SIZE => n, _ => return None, }; - let length = BlockLength::new(raw_len); + let length = super::types::BlockLength::new(raw_len); let record_size = BLOCK_RECORD_OVERHEAD as u64 + u64::from(raw_len); - *scan_pos = scan_pos.advance(record_size); + let new_end = offset.advance(record_size); + *scan_pos = new_end; Some(( cid_bytes, BlockLocation { @@ -351,6 +364,7 @@ impl TranquilBlockStore { offset, length, }, + new_end, )) } Ok(Some(ReadBlockRecord::Corrupted { .. } | ReadBlockRecord::Truncated { .. })) => { @@ -360,11 +374,8 @@ impl TranquilBlockStore { }) .fold( (Vec::new(), start_offset), - |(mut entries, _), (cid_bytes, location)| { - let new_end = location - .offset - .advance(BLOCK_RECORD_OVERHEAD as u64 + location.length.as_u64()); - entries.push((cid_bytes, location)); + |(mut entries, _), (cid, loc, new_end)| { + entries.push((cid, loc)); (entries, new_end) }, ); @@ -374,28 +385,27 @@ impl TranquilBlockStore { file_id = %file_id, truncating_from = last_valid_end.raw(), file_size, - "truncating partial/corrupted tail" + scanned_count = scanned_entries.len(), + "truncating partial/unacked tail" ); io.truncate(fd, last_valid_end.raw()) .map_err(RepoError::storage)?; io.sync(fd).map_err(RepoError::storage)?; } - if !recovered_entries.is_empty() { - let new_cursor = WriteCursor { + if !hint_exists && !scanned_entries.is_empty() { + tracing::info!( + file_id = %file_id, + scanned = scanned_entries.len(), + "rebuilding index from data file (no hint file, treating as restored backup)" + ); + let cursor = super::types::WriteCursor { file_id, offset: last_valid_end, }; - let inserted = index - .batch_put_if_absent(&recovered_entries, new_cursor) - .map_err(block_index_err_to_repo)?; - tracing::info!( - file_id = %file_id, - scanned = recovered_entries.len(), - inserted, - new_cursor_offset = last_valid_end.raw(), - "recovery data file scan" - ); + index + .batch_put_if_absent(&scanned_entries, cursor) + .map_err(|e| RepoError::storage(io::Error::other(e.to_string())))?; } Ok(()) diff --git a/crates/tranquil-store/src/gauntlet/overrides.rs b/crates/tranquil-store/src/gauntlet/overrides.rs index e37475c..97bd118 100644 --- a/crates/tranquil-store/src/gauntlet/overrides.rs +++ b/crates/tranquil-store/src/gauntlet/overrides.rs @@ -42,6 +42,8 @@ pub struct GroupCommitOverrides { pub checkpoint_interval_ms: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub checkpoint_write_threshold: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub verify_persisted_blocks: Option, } impl GroupCommitOverrides { @@ -50,6 +52,7 @@ impl GroupCommitOverrides { && self.channel_capacity.is_none() && self.checkpoint_interval_ms.is_none() && self.checkpoint_write_threshold.is_none() + && self.verify_persisted_blocks.is_none() } } @@ -82,6 +85,9 @@ impl ConfigOverrides { if let Some(n) = gc.checkpoint_write_threshold { cfg.store.group_commit.checkpoint_write_threshold = n; } + if let Some(b) = gc.verify_persisted_blocks { + cfg.store.group_commit.verify_persisted_blocks = b; + } } } diff --git a/crates/tranquil-store/src/gauntlet/runner.rs b/crates/tranquil-store/src/gauntlet/runner.rs index 7e8e026..24d1ff6 100644 --- a/crates/tranquil-store/src/gauntlet/runner.rs +++ b/crates/tranquil-store/src/gauntlet/runner.rs @@ -6,7 +6,6 @@ use std::time::Duration; use cid::Cid; use jacquard_repo::mst::Mst; -use jacquard_repo::storage::BlockStore; use super::invariants::{ EventLogSnapshot, InvariantCtx, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for, @@ -16,7 +15,7 @@ use super::oracle::{CidFormatError, EventExpectation, Oracle, hex_short, try_cid use super::workload::{Lcg, OpCount, SizeDistribution, ValueBytes, WorkloadModel}; use crate::blockstore::{ BlockStoreConfig, CidBytes, CompactionError, GroupCommitConfig, TranquilBlockStore, - hash_to_cid_bytes, + hash_to_cid, hash_to_cid_bytes, }; use crate::eventlog::{ DEFAULT_INDEX_INTERVAL, DidHash, EventLogWriter, EventTypeTag, MAX_EVENT_PAYLOAD, SegmentId, @@ -128,8 +127,6 @@ impl GauntletReport { #[derive(Debug, thiserror::Error)] enum OpError { - #[error("put record: {0}")] - PutRecord(String), #[error("mst add: {0}")] MstAdd(String), #[error("mst delete: {0}")] @@ -970,60 +967,32 @@ async fn apply_op( value_seed, } => { let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); - let record_cid = harness - .store - .put(&record_bytes) - .await - .map_err(|e| OpError::PutRecord(e.to_string()))?; + let record_cid = hash_to_cid(&record_bytes); let record_cid_bytes = try_cid_to_fixed(&record_cid)?; - let outcome = add_record_inner( + let (new_root, applied) = add_record_atomic( &harness.store, *root, collection, rkey, record_cid, record_cid_bytes, + record_bytes, ) - .await; - match outcome { - Ok((new_root, applied)) => { - *root = Some(new_root); - if applied { - oracle.add(collection.clone(), rkey.clone(), record_cid_bytes); - } - Ok(()) - } - Err(e) => { - if let Err(cleanup_err) = - decrement_obsolete(&harness.store, vec![record_cid_bytes]).await - { - tracing::warn!( - op_error = %e, - cleanup_error = %cleanup_err, - "AddRecord cleanup decrement failed", - ); - } - Err(e) - } + .await?; + *root = Some(new_root); + if applied { + oracle.add(collection.clone(), rkey.clone(), record_cid_bytes); } + Ok(()) } Op::DeleteRecord { collection, rkey } => { let Some(old_root) = *root else { return Ok(()) }; if !oracle.contains_record(collection, rkey) { return Ok(()); } - let key = format!("{}/{}", collection.0, rkey.0); - let loaded = Mst::load(harness.store.clone(), old_root, None); - let updated = loaded - .delete(&key) - .await - .map_err(|e| OpError::MstDelete(e.to_string()))?; - let new_root = updated - .persist() - .await - .map_err(|e| OpError::MstPersist(e.to_string()))?; - apply_mst_diff(&harness.store, old_root, new_root).await?; + let new_root = + delete_record_atomic(&harness.store, old_root, collection, rkey).await?; oracle.delete(collection, rkey); *root = Some(new_root); Ok(()) @@ -1145,13 +1114,14 @@ fn segment_last_timestamp( Ok(events.last().map(|e: &ValidEvent| e.timestamp.raw())) } -async fn add_record_inner( +async fn add_record_atomic( store: &Arc>, root: Option, collection: &super::op::CollectionName, rkey: &super::op::RecordKey, record_cid: Cid, record_cid_bytes: CidBytes, + record_bytes: Vec, ) -> Result<(Cid, bool), OpError> { let key = format!("{}/{}", collection.0, rkey.0); let loaded = match root { @@ -1162,58 +1132,90 @@ async fn add_record_inner( .add(&key, record_cid) .await .map_err(|e| OpError::MstAdd(e.to_string()))?; + let diff = loaded + .diff(&updated) + .await + .map_err(|e| OpError::MstDiff(e.to_string()))?; let new_root = updated - .persist() + .get_pointer() .await .map_err(|e| OpError::MstPersist(e.to_string()))?; - match root { - Some(old_root) if old_root == new_root => { - decrement_obsolete(store, vec![record_cid_bytes]).await?; - Ok((new_root, false)) - } - Some(old_root) => { - apply_mst_diff(store, old_root, new_root).await?; - Ok((new_root, true)) - } - None => Ok((new_root, true)), + if matches!(root, Some(r) if r == new_root) { + return Ok((new_root, false)); } + + let blocks = diff_blocks_plus_record(diff.new_mst_blocks, record_cid_bytes, record_bytes)?; + let obsolete = diff_obsolete(diff.removed_mst_blocks, diff.removed_cids)?; + commit_atomic(store, blocks, obsolete).await?; + Ok((new_root, true)) } -async fn decrement_obsolete( +async fn delete_record_atomic( store: &Arc>, + old_root: Cid, + collection: &super::op::CollectionName, + rkey: &super::op::RecordKey, +) -> Result { + let key = format!("{}/{}", collection.0, rkey.0); + let loaded = Mst::load(store.clone(), old_root, None); + let updated = loaded + .delete(&key) + .await + .map_err(|e| OpError::MstDelete(e.to_string()))?; + let diff = loaded + .diff(&updated) + .await + .map_err(|e| OpError::MstDiff(e.to_string()))?; + let new_root = updated + .get_pointer() + .await + .map_err(|e| OpError::MstPersist(e.to_string()))?; + let blocks: Vec<(CidBytes, Vec)> = diff + .new_mst_blocks + .into_iter() + .map(|(c, b)| Ok::<_, OpError>((try_cid_to_fixed(&c)?, b.to_vec()))) + .collect::>()?; + let obsolete = diff_obsolete(diff.removed_mst_blocks, diff.removed_cids)?; + commit_atomic(store, blocks, obsolete).await?; + Ok(new_root) +} + +fn diff_blocks_plus_record( + new_mst_blocks: std::collections::BTreeMap, + record_cid_bytes: CidBytes, + record_bytes: Vec, +) -> Result)>, OpError> { + let mut blocks: Vec<(CidBytes, Vec)> = Vec::with_capacity(new_mst_blocks.len() + 1); + blocks.push((record_cid_bytes, record_bytes)); + new_mst_blocks.into_iter().try_for_each(|(c, b)| { + let cb = try_cid_to_fixed(&c)?; + blocks.push((cb, b.to_vec())); + Ok::<_, OpError>(()) + })?; + Ok(blocks) +} + +fn diff_obsolete( + removed_mst_blocks: Vec, + removed_cids: Vec, +) -> Result, OpError> { + removed_mst_blocks + .into_iter() + .chain(removed_cids.into_iter()) + .map(|c| try_cid_to_fixed(&c)) + .collect::>() + .map_err(OpError::from) +} + +async fn commit_atomic( + store: &Arc>, + blocks: Vec<(CidBytes, Vec)>, obsolete: Vec, ) -> Result<(), OpError> { let s = store.clone(); tokio::task::spawn_blocking(move || { - s.apply_commit_blocking(vec![], obsolete) - .map_err(|e| e.to_string()) - }) - .await - .map_err(|e| OpError::Join(e.to_string()))? - .map_err(OpError::ApplyCommit) -} - -async fn apply_mst_diff( - store: &Arc>, - old_root: Cid, - new_root: Cid, -) -> Result<(), OpError> { - let old_m = Mst::load(store.clone(), old_root, None); - let new_m = Mst::load(store.clone(), new_root, None); - let diff = old_m - .diff(&new_m) - .await - .map_err(|e| OpError::MstDiff(e.to_string()))?; - let obsolete: Vec = diff - .removed_mst_blocks - .into_iter() - .chain(diff.removed_cids.into_iter()) - .map(|c| try_cid_to_fixed(&c)) - .collect::>()?; - let s = store.clone(); - tokio::task::spawn_blocking(move || { - s.apply_commit_blocking(vec![], obsolete) + s.apply_commit_blocking(blocks, obsolete) .map_err(|e| e.to_string()) }) .await @@ -1239,6 +1241,7 @@ fn compact_by_liveness( .try_for_each(|fid| match store.compact_file(fid, 0) { Ok(_) => Ok(()), Err(CompactionError::ActiveFileCannotBeCompacted) => Ok(()), + Err(CompactionError::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), Err(e) => Err(OpError::CompactFile(format!("{fid}: {e}"))), }) } @@ -1255,47 +1258,27 @@ async fn apply_op_concurrent( value_seed, } => { let record_bytes = make_record_bytes(*value_seed, workload.size_distribution); - let record_cid = shared - .store - .put(&record_bytes) - .await - .map_err(|e| OpError::PutRecord(e.to_string()))?; + let record_cid = hash_to_cid(&record_bytes); let record_cid_bytes = try_cid_to_fixed(&record_cid)?; let mut state = shared.write.lock().await; - let outcome = add_record_inner( + let (new_root, applied) = add_record_atomic( &shared.store, state.root, collection, rkey, record_cid, record_cid_bytes, + record_bytes, ) - .await; - match outcome { - Ok((new_root, applied)) => { - state.root = Some(new_root); - if applied { - state - .oracle - .add(collection.clone(), rkey.clone(), record_cid_bytes); - } - Ok(()) - } - Err(e) => { - drop(state); - if let Err(cleanup) = - decrement_obsolete(&shared.store, vec![record_cid_bytes]).await - { - tracing::warn!( - op_error = %e, - cleanup_error = %cleanup, - "AddRecord concurrent cleanup decrement failed", - ); - } - Err(e) - } + .await?; + state.root = Some(new_root); + if applied { + state + .oracle + .add(collection.clone(), rkey.clone(), record_cid_bytes); } + Ok(()) } Op::DeleteRecord { collection, rkey } => { let mut state = shared.write.lock().await; @@ -1305,17 +1288,8 @@ async fn apply_op_concurrent( if !state.oracle.contains_record(collection, rkey) { return Ok(()); } - let key = format!("{}/{}", collection.0, rkey.0); - let loaded = Mst::load(shared.store.clone(), old_root, None); - let updated = loaded - .delete(&key) - .await - .map_err(|e| OpError::MstDelete(e.to_string()))?; - let new_root = updated - .persist() - .await - .map_err(|e| OpError::MstPersist(e.to_string()))?; - apply_mst_diff(&shared.store, old_root, new_root).await?; + let new_root = + delete_record_atomic(&shared.store, old_root, collection, rkey).await?; state.oracle.delete(collection, rkey); state.root = Some(new_root); Ok(()) diff --git a/crates/tranquil-store/src/gauntlet/scenarios.rs b/crates/tranquil-store/src/gauntlet/scenarios.rs index 9ab9152..59cb743 100644 --- a/crates/tranquil-store/src/gauntlet/scenarios.rs +++ b/crates/tranquil-store/src/gauntlet/scenarios.rs @@ -523,7 +523,10 @@ fn sim_microbench_workload() -> WorkloadModel { fn sim_store() -> StoreConfig { StoreConfig { max_file_size: MaxFileSize(16 * 1024), - group_commit: GroupCommitConfig::default(), + group_commit: GroupCommitConfig { + verify_persisted_blocks: true, + ..GroupCommitConfig::default() + }, shard_count: ShardCount(1), } } diff --git a/crates/tranquil-store/tests/verify_rollback_orphan.rs b/crates/tranquil-store/tests/verify_rollback_orphan.rs new file mode 100644 index 0000000..c6903dd --- /dev/null +++ b/crates/tranquil-store/tests/verify_rollback_orphan.rs @@ -0,0 +1,128 @@ +mod common; + +use std::sync::Arc; + +use tranquil_store::OpenOptions; +use tranquil_store::RealIO; +use tranquil_store::StorageIO; +use tranquil_store::blockstore::BlockLength; +use tranquil_store::blockstore::{ + BlockLocation, BlockOffset, BlockStoreConfig, DataFileId, DataFileManager, DataFileWriter, + GroupCommitConfig, HintFileWriter, HintOffset, TranquilBlockStore, hint_file_path, +}; + +use common::{test_cid, with_runtime}; + +fn fresh_store_dir() -> (tempfile::TempDir, BlockStoreConfig) { + let dir = tempfile::TempDir::new().unwrap(); + let data_dir = dir.path().join("data"); + let index_dir = dir.path().join("index"); + std::fs::create_dir_all(&data_dir).unwrap(); + std::fs::create_dir_all(&index_dir).unwrap(); + let config = BlockStoreConfig { + data_dir, + index_dir, + max_file_size: 8192, + group_commit: GroupCommitConfig::default(), + shard_count: 1, + }; + (dir, config) +} + +fn hint_file_size(path: &std::path::Path) -> u64 { + let io = RealIO::new(); + let fd = io.open(path, OpenOptions::read_write()).unwrap(); + let size = io.file_size(fd).unwrap(); + let _ = io.close(fd); + size +} + +#[test] +fn rollback_rotation_does_not_leave_orphan_data_file() { + with_runtime(|| { + let (_dir, config) = fresh_store_dir(); + let data_dir = config.data_dir.clone(); + + { + let store = TranquilBlockStore::open(config.clone()).unwrap(); + store + .put_blocks_blocking(vec![(test_cid(1), vec![0x11; 64])]) + .unwrap(); + drop(store); + } + + let orphan_cid = test_cid(99_999); + { + let io: Arc = Arc::new(RealIO::new()); + let manager = DataFileManager::new(Arc::clone(&io), data_dir.clone(), 4096); + let (next_id, next_fd) = manager.prepare_rotation(DataFileId::new(0)).unwrap(); + manager.commit_rotation(next_id, next_fd); + + let mut writer = DataFileWriter::new(&*io, next_fd, next_id).unwrap(); + let _ = writer.append_block(&orphan_cid, &vec![0xAB; 256]).unwrap(); + writer.sync().unwrap(); + io.sync_dir(&data_dir).unwrap(); + + let _ = io.delete(&hint_file_path(&data_dir, next_id)); + manager.rollback_rotation(next_id, next_fd); + } + + let store = TranquilBlockStore::open(config).unwrap(); + assert!( + store.get_block_sync(&orphan_cid).unwrap().is_none(), + "rollback_rotation must delete the uncommitted data file; otherwise recovery's \ + backup-restore branch resurrects rejected blocks" + ); + }); +} + +#[test] +fn truncated_old_hint_drops_rejected_entry_on_reopen() { + with_runtime(|| { + let (_dir, config) = fresh_store_dir(); + let data_dir = config.data_dir.clone(); + let old_file_id = DataFileId::new(0); + let old_hint_path = hint_file_path(&data_dir, old_file_id); + + let keep_cid = test_cid(1); + { + let store = TranquilBlockStore::open(config.clone()).unwrap(); + store + .put_blocks_blocking(vec![(keep_cid, vec![0x11; 64])]) + .unwrap(); + drop(store); + } + + let hint_len_before = hint_file_size(&old_hint_path); + let rejected_cid = test_cid(42_424); + { + let io: Arc = Arc::new(RealIO::new()); + let fd = io.open(&old_hint_path, OpenOptions::read_write()).unwrap(); + let mut writer = HintFileWriter::resume(&*io, fd, HintOffset::new(hint_len_before)); + writer + .append_hint( + &rejected_cid, + &BlockLocation { + file_id: old_file_id, + offset: BlockOffset::new(4096), + length: BlockLength::new(64), + }, + ) + .unwrap(); + writer.sync().unwrap(); + io.truncate(fd, hint_len_before).unwrap(); + io.sync(fd).unwrap(); + let _ = io.close(fd); + } + + let store = TranquilBlockStore::open(config).unwrap(); + assert!( + store.get_block_sync(&rejected_cid).unwrap().is_none(), + "after rollback_batch truncates state.hint_fd, the rejected hint is gone and reopen is clean" + ); + assert!( + store.get_block_sync(&keep_cid).unwrap().is_some(), + "legitimate pre-batch block remains readable after rollback" + ); + }); +}