fix(tranquil-store): atomic record commits, hint-as-truth recovery

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-04-20 21:00:54 +03:00
parent c74bf967cf
commit 2afd075496
10 changed files with 460 additions and 193 deletions

View File

@@ -61,8 +61,6 @@ impl Drop for InFlightGuard<'_> {
impl DynamicRegistry {
pub fn new() -> Self {
let network_disabled =
std::env::var("TRANQUIL_LEXICON_OFFLINE").is_ok_and(|v| v == "1" || v == "true");
Self {
store: RwLock::new(SchemaStore {
schemas: HashMap::new(),
@@ -70,11 +68,18 @@ impl DynamicRegistry {
}),
negative_cache: RwLock::new(HashMap::new()),
in_flight: RwLock::new(HashMap::new()),
network_disabled: AtomicBool::new(network_disabled),
network_disabled: AtomicBool::new(false),
}
}
#[allow(dead_code)]
pub fn from_env() -> Self {
let registry = Self::new();
let disabled =
std::env::var("TRANQUIL_LEXICON_OFFLINE").is_ok_and(|v| v == "1" || v == "true");
registry.set_network_disabled(disabled);
registry
}
pub fn set_network_disabled(&self, disabled: bool) {
self.network_disabled.store(disabled, Ordering::Relaxed);
}

View File

@@ -25,7 +25,7 @@ impl LexiconRegistry {
Self {
schemas: HashMap::new(),
#[cfg(feature = "resolve")]
dynamic: crate::dynamic::DynamicRegistry::new(),
dynamic: crate::dynamic::DynamicRegistry::from_env(),
}
}

View File

@@ -11,7 +11,7 @@ use crate::fsync_order::PostBlockstoreHook;
use super::BlocksSynced;
use crate::io::{FileId, OpenOptions, StorageIO};
use super::data_file::{CID_SIZE, DataFileWriter};
use super::data_file::{CID_SIZE, DataFileWriter, ReadBlockRecord, decode_block_record};
use super::hash_index::{BlockIndex, BlockIndexError, CheckpointPositions};
use super::hint::{HintFileWriter, hint_file_path};
use super::manager::DataFileManager;
@@ -106,6 +106,10 @@ pub enum CommitError {
Io(Arc<io::Error>),
Index(String),
ChannelClosed,
VerifyFailed {
file_id: DataFileId,
offset: BlockOffset,
},
}
impl std::fmt::Display for CommitError {
@@ -114,6 +118,11 @@ impl std::fmt::Display for CommitError {
Self::Io(e) => write!(f, "io: {}", e.as_ref()),
Self::Index(e) => write!(f, "index: {e}"),
Self::ChannelClosed => write!(f, "commit channel closed"),
Self::VerifyFailed { file_id, offset } => write!(
f,
"post-sync verify failed at {file_id}:{} (misdirected write or durable corruption)",
offset.raw()
),
}
}
}
@@ -122,7 +131,7 @@ impl std::error::Error for CommitError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Io(e) => Some(e.as_ref()),
Self::Index(_) | Self::ChannelClosed => None,
Self::Index(_) | Self::ChannelClosed | Self::VerifyFailed { .. } => None,
}
}
}
@@ -181,6 +190,7 @@ pub struct GroupCommitConfig {
pub channel_capacity: usize,
pub checkpoint_interval_ms: u64,
pub checkpoint_write_threshold: u64,
pub verify_persisted_blocks: bool,
}
impl Default for GroupCommitConfig {
@@ -190,6 +200,7 @@ impl Default for GroupCommitConfig {
channel_capacity: 4096,
checkpoint_interval_ms: 60_000,
checkpoint_write_threshold: 100_000,
verify_persisted_blocks: false,
}
}
}
@@ -200,6 +211,7 @@ struct ShardContext {
file_ids: Arc<FileIdAllocator>,
active_files: Arc<ActiveFileSet>,
hint_positions: Arc<ShardHintPositions>,
verify_persisted_blocks: bool,
}
struct ActiveState {
@@ -403,6 +415,7 @@ impl GroupCommitWriter {
file_ids: Arc::clone(&file_ids),
active_files: Arc::clone(&active_files),
hint_positions: Arc::clone(&hint_positions),
verify_persisted_blocks: config.verify_persisted_blocks,
};
SingleShardWriter::spawn(
ctx,
@@ -1057,6 +1070,119 @@ struct RotationState {
hint_fd: FileId,
}
fn verify_persisted_blocks<S: StorageIO>(
manager: &DataFileManager<S>,
entries: &[([u8; CID_SIZE], BlockLocation)],
) -> Result<(), CommitError> {
use std::collections::BTreeMap;
let by_file: BTreeMap<DataFileId, Vec<(&[u8; CID_SIZE], BlockLocation)>> = entries
.iter()
.fold(BTreeMap::new(), |mut acc, (cid, loc)| {
acc.entry(loc.file_id).or_default().push((cid, *loc));
acc
});
by_file.into_iter().try_for_each(|(file_id, locations)| {
let path = manager.data_file_path(file_id);
let fd = match manager.io().open(&path, OpenOptions::read_only_existing()) {
Ok(fd) => fd,
Err(_) => return Ok(()),
};
let file_size = match manager.io().file_size(fd) {
Ok(s) => s,
Err(_) => {
let _ = manager.io().close(fd);
return Ok(());
}
};
let result = locations.into_iter().try_for_each(|(expected_cid, loc)| {
verify_block_at(manager, fd, file_size, expected_cid, loc)
});
let _ = manager.io().close(fd);
result
})
}
#[derive(Debug)]
enum VerifyOutcome {
NoFaultDetected,
Faulted,
}
fn verify_block_at<S: StorageIO>(
manager: &DataFileManager<S>,
fd: FileId,
file_size: u64,
expected_cid: &[u8; CID_SIZE],
loc: BlockLocation,
) -> Result<(), CommitError> {
let passed = (0..VERIFY_RETRY_ATTEMPTS).any(|_| {
matches!(
verify_once(manager, fd, file_size, expected_cid, loc),
VerifyOutcome::NoFaultDetected
)
});
match passed {
true => Ok(()),
false => Err(CommitError::VerifyFailed {
file_id: loc.file_id,
offset: loc.offset,
}),
}
}
fn verify_once<S: StorageIO>(
manager: &DataFileManager<S>,
fd: FileId,
file_size: u64,
expected_cid: &[u8; CID_SIZE],
loc: BlockLocation,
) -> VerifyOutcome {
match decode_block_record(manager.io(), fd, loc.offset, file_size) {
Ok(Some(ReadBlockRecord::Valid { cid_bytes, .. })) if cid_bytes == *expected_cid => {
VerifyOutcome::NoFaultDetected
}
Ok(Some(ReadBlockRecord::Valid { .. })) => {
tracing::warn!(
file_id = %loc.file_id,
offset = loc.offset.raw(),
"verify: stored CID mismatch (misdirected write)"
);
VerifyOutcome::Faulted
}
Ok(Some(ReadBlockRecord::Corrupted { .. } | ReadBlockRecord::Truncated { .. }))
| Ok(None) => {
tracing::warn!(
file_id = %loc.file_id,
offset = loc.offset.raw(),
"verify: block undecodable at location"
);
VerifyOutcome::Faulted
}
Err(_) => VerifyOutcome::NoFaultDetected,
}
}
const VERIFY_RETRY_ATTEMPTS: u32 = 4;
fn rollback_batch<S: StorageIO>(
manager: &DataFileManager<S>,
state: &ActiveState,
rotations: &[RotationState],
) {
let _ = manager
.io()
.truncate(state.hint_fd, state.hint_position.raw());
let _ = manager.io().sync(state.hint_fd);
rotations.iter().for_each(|rot| {
manager.rollback_rotation(rot.file_id, rot.fd);
let _ = manager.io().close(rot.hint_fd);
let _ = manager
.io()
.delete(&hint_file_path(manager.data_dir(), rot.file_id));
});
}
fn process_batch<S: StorageIO>(
manager: &DataFileManager<S>,
index: &BlockIndex,
@@ -1154,13 +1280,7 @@ fn process_batch<S: StorageIO>(
});
if let Err(e) = write_result {
rotations.into_iter().for_each(|rot| {
manager.rollback_rotation(rot.file_id, rot.fd);
let _ = manager.io().close(rot.hint_fd);
let _ = manager
.io()
.delete(&hint_file_path(manager.data_dir(), rot.file_id));
});
rollback_batch(manager, state, &rotations);
return Err(e);
}
@@ -1169,13 +1289,22 @@ fn process_batch<S: StorageIO>(
let current_epoch = epoch.current();
let now = crate::wall_clock_ms();
let rollback_on_err = |e: CommitError| -> CommitError {
rollback_batch(manager, state, &rotations);
e
};
all_decrements
.iter()
.try_for_each(|cid| hint_writer.append_decrement(cid, current_epoch, now))?;
.try_for_each(|cid| hint_writer.append_decrement(cid, current_epoch, now))
.map_err(|e| rollback_on_err(CommitError::from(e)))?;
let t = std::time::Instant::now();
data_writer.sync()?;
hint_writer.sync()?;
data_writer.sync().map_err(|e| rollback_on_err(e.into()))?;
if ctx.verify_persisted_blocks {
verify_persisted_blocks(manager, &index_entries).map_err(rollback_on_err)?;
}
hint_writer.sync().map_err(|e| rollback_on_err(e.into()))?;
let sync_nanos = t.elapsed().as_nanos() as u64;
if !rotations.is_empty() {

View File

@@ -127,6 +127,7 @@ impl<S: StorageIO> DataFileManager<S> {
pub fn rollback_rotation(&self, file_id: DataFileId, fd: FileId) {
let _ = self.io.close(fd);
self.handles.write().remove(&file_id);
let _ = self.io.delete(&self.data_file_path(file_id));
}
pub fn should_rotate(&self, position: BlockOffset) -> bool {
@@ -216,7 +217,7 @@ mod tests {
}
#[test]
fn rotation_rollback_cleans_handle() {
fn rotation_rollback_cleans_handle_and_deletes_file() {
let mgr = setup_manager(1024);
let _fd0 = mgr.open_for_append(DataFileId::new(0)).unwrap();
let (next_id, next_fd) = mgr.prepare_rotation(DataFileId::new(0)).unwrap();
@@ -225,10 +226,10 @@ mod tests {
assert_eq!(mgr.open_for_read(next_id).unwrap(), next_fd);
mgr.rollback_rotation(next_id, next_fd);
let reopened_fd = mgr.open_for_read(next_id).unwrap();
assert_ne!(
reopened_fd, next_fd,
"rollback should have closed the cached fd"
let reopen = mgr.open_for_read(next_id);
assert!(
reopen.is_err_and(|e| e.kind() == io::ErrorKind::NotFound),
"rollback must delete the data file so recovery cannot resurrect uncommitted bytes"
);
}

View File

@@ -127,26 +127,37 @@ impl<S: StorageIO> BlockStoreReader<S> {
file_size: u64,
location: BlockLocation,
) -> Result<Bytes, ReadError> {
match decode_block_record(self.manager.io(), fd, location.offset, file_size)? {
Some(ReadBlockRecord::Valid { data, .. })
if data.len() == location.length.raw() as usize =>
{
Ok(Bytes::from(data))
}
Some(ReadBlockRecord::Valid { .. }) => Err(ReadError::Corrupted {
file_id: location.file_id,
offset: location.offset,
}),
Some(ReadBlockRecord::Corrupted { offset } | ReadBlockRecord::Truncated { offset }) => {
Err(ReadError::Corrupted {
let attempt_once = || -> Result<Bytes, ReadError> {
match decode_block_record(self.manager.io(), fd, location.offset, file_size)? {
Some(ReadBlockRecord::Valid { data, .. })
if data.len() == location.length.raw() as usize =>
{
Ok(Bytes::from(data))
}
Some(ReadBlockRecord::Valid { .. }) => Err(ReadError::Corrupted {
file_id: location.file_id,
offset: location.offset,
}),
Some(
ReadBlockRecord::Corrupted { offset } | ReadBlockRecord::Truncated { offset },
) => Err(ReadError::Corrupted {
file_id: location.file_id,
offset,
})
}),
None => Err(ReadError::Corrupted {
file_id: location.file_id,
offset: location.offset,
}),
}
None => Err(ReadError::Corrupted {
file_id: location.file_id,
offset: location.offset,
}),
}
};
(0..READ_RETRY_ATTEMPTS.saturating_sub(1))
.find_map(|_| match attempt_once() {
Ok(bytes) => Some(Ok(bytes)),
Err(ReadError::Corrupted { .. }) => None,
Err(e) => Some(Err(e)),
})
.unwrap_or_else(attempt_once)
}
}
const READ_RETRY_ATTEMPTS: u32 = 4;

View File

@@ -20,8 +20,8 @@ use super::hash_index::BlockIndex;
use super::manager::DataFileManager;
use super::reader::{BlockStoreReader, ReadError};
use super::types::{
BlockLength, BlockLocation, BlockOffset, CollectionResult, CompactionResult, DataFileId,
EpochCounter, LivenessInfo, WallClockMs, WriteCursor,
BlockLocation, BlockOffset, CollectionResult, CompactionResult, DataFileId, EpochCounter,
LivenessInfo, WallClockMs,
};
fn cid_to_bytes(cid: &Cid) -> Result<[u8; CID_SIZE], RepoError> {
@@ -37,10 +37,6 @@ fn cid_to_bytes(cid: &Cid) -> Result<[u8; CID_SIZE], RepoError> {
})
}
fn block_index_err_to_repo(e: super::hash_index::BlockIndexError) -> RepoError {
RepoError::storage(io::Error::other(e.to_string()))
}
fn commit_error_to_repo(e: CommitError) -> RepoError {
match e {
CommitError::Io(io_err) => {
@@ -51,6 +47,13 @@ fn commit_error_to_repo(e: CommitError) -> RepoError {
io::ErrorKind::BrokenPipe,
"blockstore commit channel closed",
)),
CommitError::VerifyFailed { file_id, offset } => RepoError::storage(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"post-sync verify failed at {file_id}:{}",
offset.raw()
),
)),
}
}
@@ -299,7 +302,15 @@ impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> {
Err(e) => return Err(RepoError::storage(e)),
};
let result = Self::scan_and_index(io, index, fd, file_id, start_offset);
let hint_path = super::hint::hint_file_path(data_dir, file_id);
let hint_exists = io
.open(&hint_path, OpenOptions::read_only_existing())
.map(|fd| {
let _ = io.close(fd);
})
.is_ok();
let result = Self::scan_and_index(io, index, fd, file_id, start_offset, hint_exists);
let _ = io.close(fd);
@@ -312,6 +323,7 @@ impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> {
fd: crate::io::FileId,
file_id: DataFileId,
start_offset: BlockOffset,
hint_exists: bool,
) -> Result<(), RepoError> {
let file_size = io.file_size(fd).map_err(RepoError::storage)?;
@@ -320,7 +332,7 @@ impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> {
}
let scan_pos = &mut { start_offset };
let (recovered_entries, last_valid_end) = std::iter::from_fn(|| {
let (scanned_entries, last_valid_end) = std::iter::from_fn(|| {
match super::data_file::decode_block_record(io, fd, *scan_pos, file_size) {
Err(e) => {
tracing::warn!(
@@ -341,9 +353,10 @@ impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> {
Ok(n) if n <= super::types::MAX_BLOCK_SIZE => n,
_ => return None,
};
let length = BlockLength::new(raw_len);
let length = super::types::BlockLength::new(raw_len);
let record_size = BLOCK_RECORD_OVERHEAD as u64 + u64::from(raw_len);
*scan_pos = scan_pos.advance(record_size);
let new_end = offset.advance(record_size);
*scan_pos = new_end;
Some((
cid_bytes,
BlockLocation {
@@ -351,6 +364,7 @@ impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> {
offset,
length,
},
new_end,
))
}
Ok(Some(ReadBlockRecord::Corrupted { .. } | ReadBlockRecord::Truncated { .. })) => {
@@ -360,11 +374,8 @@ impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> {
})
.fold(
(Vec::new(), start_offset),
|(mut entries, _), (cid_bytes, location)| {
let new_end = location
.offset
.advance(BLOCK_RECORD_OVERHEAD as u64 + location.length.as_u64());
entries.push((cid_bytes, location));
|(mut entries, _), (cid, loc, new_end)| {
entries.push((cid, loc));
(entries, new_end)
},
);
@@ -374,28 +385,27 @@ impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> {
file_id = %file_id,
truncating_from = last_valid_end.raw(),
file_size,
"truncating partial/corrupted tail"
scanned_count = scanned_entries.len(),
"truncating partial/unacked tail"
);
io.truncate(fd, last_valid_end.raw())
.map_err(RepoError::storage)?;
io.sync(fd).map_err(RepoError::storage)?;
}
if !recovered_entries.is_empty() {
let new_cursor = WriteCursor {
if !hint_exists && !scanned_entries.is_empty() {
tracing::info!(
file_id = %file_id,
scanned = scanned_entries.len(),
"rebuilding index from data file (no hint file, treating as restored backup)"
);
let cursor = super::types::WriteCursor {
file_id,
offset: last_valid_end,
};
let inserted = index
.batch_put_if_absent(&recovered_entries, new_cursor)
.map_err(block_index_err_to_repo)?;
tracing::info!(
file_id = %file_id,
scanned = recovered_entries.len(),
inserted,
new_cursor_offset = last_valid_end.raw(),
"recovery data file scan"
);
index
.batch_put_if_absent(&scanned_entries, cursor)
.map_err(|e| RepoError::storage(io::Error::other(e.to_string())))?;
}
Ok(())

View File

@@ -42,6 +42,8 @@ pub struct GroupCommitOverrides {
pub checkpoint_interval_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checkpoint_write_threshold: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub verify_persisted_blocks: Option<bool>,
}
impl GroupCommitOverrides {
@@ -50,6 +52,7 @@ impl GroupCommitOverrides {
&& self.channel_capacity.is_none()
&& self.checkpoint_interval_ms.is_none()
&& self.checkpoint_write_threshold.is_none()
&& self.verify_persisted_blocks.is_none()
}
}
@@ -82,6 +85,9 @@ impl ConfigOverrides {
if let Some(n) = gc.checkpoint_write_threshold {
cfg.store.group_commit.checkpoint_write_threshold = n;
}
if let Some(b) = gc.verify_persisted_blocks {
cfg.store.group_commit.verify_persisted_blocks = b;
}
}
}

View File

@@ -6,7 +6,6 @@ use std::time::Duration;
use cid::Cid;
use jacquard_repo::mst::Mst;
use jacquard_repo::storage::BlockStore;
use super::invariants::{
EventLogSnapshot, InvariantCtx, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for,
@@ -16,7 +15,7 @@ use super::oracle::{CidFormatError, EventExpectation, Oracle, hex_short, try_cid
use super::workload::{Lcg, OpCount, SizeDistribution, ValueBytes, WorkloadModel};
use crate::blockstore::{
BlockStoreConfig, CidBytes, CompactionError, GroupCommitConfig, TranquilBlockStore,
hash_to_cid_bytes,
hash_to_cid, hash_to_cid_bytes,
};
use crate::eventlog::{
DEFAULT_INDEX_INTERVAL, DidHash, EventLogWriter, EventTypeTag, MAX_EVENT_PAYLOAD, SegmentId,
@@ -128,8 +127,6 @@ impl GauntletReport {
#[derive(Debug, thiserror::Error)]
enum OpError {
#[error("put record: {0}")]
PutRecord(String),
#[error("mst add: {0}")]
MstAdd(String),
#[error("mst delete: {0}")]
@@ -970,60 +967,32 @@ async fn apply_op<S: StorageIO + Send + Sync + 'static>(
value_seed,
} => {
let record_bytes = make_record_bytes(*value_seed, workload.size_distribution);
let record_cid = harness
.store
.put(&record_bytes)
.await
.map_err(|e| OpError::PutRecord(e.to_string()))?;
let record_cid = hash_to_cid(&record_bytes);
let record_cid_bytes = try_cid_to_fixed(&record_cid)?;
let outcome = add_record_inner(
let (new_root, applied) = add_record_atomic(
&harness.store,
*root,
collection,
rkey,
record_cid,
record_cid_bytes,
record_bytes,
)
.await;
match outcome {
Ok((new_root, applied)) => {
*root = Some(new_root);
if applied {
oracle.add(collection.clone(), rkey.clone(), record_cid_bytes);
}
Ok(())
}
Err(e) => {
if let Err(cleanup_err) =
decrement_obsolete(&harness.store, vec![record_cid_bytes]).await
{
tracing::warn!(
op_error = %e,
cleanup_error = %cleanup_err,
"AddRecord cleanup decrement failed",
);
}
Err(e)
}
.await?;
*root = Some(new_root);
if applied {
oracle.add(collection.clone(), rkey.clone(), record_cid_bytes);
}
Ok(())
}
Op::DeleteRecord { collection, rkey } => {
let Some(old_root) = *root else { return Ok(()) };
if !oracle.contains_record(collection, rkey) {
return Ok(());
}
let key = format!("{}/{}", collection.0, rkey.0);
let loaded = Mst::load(harness.store.clone(), old_root, None);
let updated = loaded
.delete(&key)
.await
.map_err(|e| OpError::MstDelete(e.to_string()))?;
let new_root = updated
.persist()
.await
.map_err(|e| OpError::MstPersist(e.to_string()))?;
apply_mst_diff(&harness.store, old_root, new_root).await?;
let new_root =
delete_record_atomic(&harness.store, old_root, collection, rkey).await?;
oracle.delete(collection, rkey);
*root = Some(new_root);
Ok(())
@@ -1145,13 +1114,14 @@ fn segment_last_timestamp<S: StorageIO + Send + Sync + 'static>(
Ok(events.last().map(|e: &ValidEvent| e.timestamp.raw()))
}
async fn add_record_inner<S: StorageIO + Send + Sync + 'static>(
async fn add_record_atomic<S: StorageIO + Send + Sync + 'static>(
store: &Arc<TranquilBlockStore<S>>,
root: Option<Cid>,
collection: &super::op::CollectionName,
rkey: &super::op::RecordKey,
record_cid: Cid,
record_cid_bytes: CidBytes,
record_bytes: Vec<u8>,
) -> Result<(Cid, bool), OpError> {
let key = format!("{}/{}", collection.0, rkey.0);
let loaded = match root {
@@ -1162,58 +1132,90 @@ async fn add_record_inner<S: StorageIO + Send + Sync + 'static>(
.add(&key, record_cid)
.await
.map_err(|e| OpError::MstAdd(e.to_string()))?;
let diff = loaded
.diff(&updated)
.await
.map_err(|e| OpError::MstDiff(e.to_string()))?;
let new_root = updated
.persist()
.get_pointer()
.await
.map_err(|e| OpError::MstPersist(e.to_string()))?;
match root {
Some(old_root) if old_root == new_root => {
decrement_obsolete(store, vec![record_cid_bytes]).await?;
Ok((new_root, false))
}
Some(old_root) => {
apply_mst_diff(store, old_root, new_root).await?;
Ok((new_root, true))
}
None => Ok((new_root, true)),
if matches!(root, Some(r) if r == new_root) {
return Ok((new_root, false));
}
let blocks = diff_blocks_plus_record(diff.new_mst_blocks, record_cid_bytes, record_bytes)?;
let obsolete = diff_obsolete(diff.removed_mst_blocks, diff.removed_cids)?;
commit_atomic(store, blocks, obsolete).await?;
Ok((new_root, true))
}
async fn decrement_obsolete<S: StorageIO + Send + Sync + 'static>(
async fn delete_record_atomic<S: StorageIO + Send + Sync + 'static>(
store: &Arc<TranquilBlockStore<S>>,
old_root: Cid,
collection: &super::op::CollectionName,
rkey: &super::op::RecordKey,
) -> Result<Cid, OpError> {
let key = format!("{}/{}", collection.0, rkey.0);
let loaded = Mst::load(store.clone(), old_root, None);
let updated = loaded
.delete(&key)
.await
.map_err(|e| OpError::MstDelete(e.to_string()))?;
let diff = loaded
.diff(&updated)
.await
.map_err(|e| OpError::MstDiff(e.to_string()))?;
let new_root = updated
.get_pointer()
.await
.map_err(|e| OpError::MstPersist(e.to_string()))?;
let blocks: Vec<(CidBytes, Vec<u8>)> = diff
.new_mst_blocks
.into_iter()
.map(|(c, b)| Ok::<_, OpError>((try_cid_to_fixed(&c)?, b.to_vec())))
.collect::<Result<_, _>>()?;
let obsolete = diff_obsolete(diff.removed_mst_blocks, diff.removed_cids)?;
commit_atomic(store, blocks, obsolete).await?;
Ok(new_root)
}
fn diff_blocks_plus_record(
new_mst_blocks: std::collections::BTreeMap<Cid, bytes::Bytes>,
record_cid_bytes: CidBytes,
record_bytes: Vec<u8>,
) -> Result<Vec<(CidBytes, Vec<u8>)>, OpError> {
let mut blocks: Vec<(CidBytes, Vec<u8>)> = Vec::with_capacity(new_mst_blocks.len() + 1);
blocks.push((record_cid_bytes, record_bytes));
new_mst_blocks.into_iter().try_for_each(|(c, b)| {
let cb = try_cid_to_fixed(&c)?;
blocks.push((cb, b.to_vec()));
Ok::<_, OpError>(())
})?;
Ok(blocks)
}
fn diff_obsolete(
removed_mst_blocks: Vec<Cid>,
removed_cids: Vec<Cid>,
) -> Result<Vec<CidBytes>, OpError> {
removed_mst_blocks
.into_iter()
.chain(removed_cids.into_iter())
.map(|c| try_cid_to_fixed(&c))
.collect::<Result<_, _>>()
.map_err(OpError::from)
}
async fn commit_atomic<S: StorageIO + Send + Sync + 'static>(
store: &Arc<TranquilBlockStore<S>>,
blocks: Vec<(CidBytes, Vec<u8>)>,
obsolete: Vec<CidBytes>,
) -> Result<(), OpError> {
let s = store.clone();
tokio::task::spawn_blocking(move || {
s.apply_commit_blocking(vec![], obsolete)
.map_err(|e| e.to_string())
})
.await
.map_err(|e| OpError::Join(e.to_string()))?
.map_err(OpError::ApplyCommit)
}
async fn apply_mst_diff<S: StorageIO + Send + Sync + 'static>(
store: &Arc<TranquilBlockStore<S>>,
old_root: Cid,
new_root: Cid,
) -> Result<(), OpError> {
let old_m = Mst::load(store.clone(), old_root, None);
let new_m = Mst::load(store.clone(), new_root, None);
let diff = old_m
.diff(&new_m)
.await
.map_err(|e| OpError::MstDiff(e.to_string()))?;
let obsolete: Vec<CidBytes> = diff
.removed_mst_blocks
.into_iter()
.chain(diff.removed_cids.into_iter())
.map(|c| try_cid_to_fixed(&c))
.collect::<Result<_, _>>()?;
let s = store.clone();
tokio::task::spawn_blocking(move || {
s.apply_commit_blocking(vec![], obsolete)
s.apply_commit_blocking(blocks, obsolete)
.map_err(|e| e.to_string())
})
.await
@@ -1239,6 +1241,7 @@ fn compact_by_liveness<S: StorageIO + Send + Sync + 'static>(
.try_for_each(|fid| match store.compact_file(fid, 0) {
Ok(_) => Ok(()),
Err(CompactionError::ActiveFileCannotBeCompacted) => Ok(()),
Err(CompactionError::Io(e)) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(OpError::CompactFile(format!("{fid}: {e}"))),
})
}
@@ -1255,47 +1258,27 @@ async fn apply_op_concurrent<S: StorageIO + Send + Sync + 'static>(
value_seed,
} => {
let record_bytes = make_record_bytes(*value_seed, workload.size_distribution);
let record_cid = shared
.store
.put(&record_bytes)
.await
.map_err(|e| OpError::PutRecord(e.to_string()))?;
let record_cid = hash_to_cid(&record_bytes);
let record_cid_bytes = try_cid_to_fixed(&record_cid)?;
let mut state = shared.write.lock().await;
let outcome = add_record_inner(
let (new_root, applied) = add_record_atomic(
&shared.store,
state.root,
collection,
rkey,
record_cid,
record_cid_bytes,
record_bytes,
)
.await;
match outcome {
Ok((new_root, applied)) => {
state.root = Some(new_root);
if applied {
state
.oracle
.add(collection.clone(), rkey.clone(), record_cid_bytes);
}
Ok(())
}
Err(e) => {
drop(state);
if let Err(cleanup) =
decrement_obsolete(&shared.store, vec![record_cid_bytes]).await
{
tracing::warn!(
op_error = %e,
cleanup_error = %cleanup,
"AddRecord concurrent cleanup decrement failed",
);
}
Err(e)
}
.await?;
state.root = Some(new_root);
if applied {
state
.oracle
.add(collection.clone(), rkey.clone(), record_cid_bytes);
}
Ok(())
}
Op::DeleteRecord { collection, rkey } => {
let mut state = shared.write.lock().await;
@@ -1305,17 +1288,8 @@ async fn apply_op_concurrent<S: StorageIO + Send + Sync + 'static>(
if !state.oracle.contains_record(collection, rkey) {
return Ok(());
}
let key = format!("{}/{}", collection.0, rkey.0);
let loaded = Mst::load(shared.store.clone(), old_root, None);
let updated = loaded
.delete(&key)
.await
.map_err(|e| OpError::MstDelete(e.to_string()))?;
let new_root = updated
.persist()
.await
.map_err(|e| OpError::MstPersist(e.to_string()))?;
apply_mst_diff(&shared.store, old_root, new_root).await?;
let new_root =
delete_record_atomic(&shared.store, old_root, collection, rkey).await?;
state.oracle.delete(collection, rkey);
state.root = Some(new_root);
Ok(())

View File

@@ -523,7 +523,10 @@ fn sim_microbench_workload() -> WorkloadModel {
fn sim_store() -> StoreConfig {
StoreConfig {
max_file_size: MaxFileSize(16 * 1024),
group_commit: GroupCommitConfig::default(),
group_commit: GroupCommitConfig {
verify_persisted_blocks: true,
..GroupCommitConfig::default()
},
shard_count: ShardCount(1),
}
}

View File

@@ -0,0 +1,128 @@
mod common;
use std::sync::Arc;
use tranquil_store::OpenOptions;
use tranquil_store::RealIO;
use tranquil_store::StorageIO;
use tranquil_store::blockstore::BlockLength;
use tranquil_store::blockstore::{
BlockLocation, BlockOffset, BlockStoreConfig, DataFileId, DataFileManager, DataFileWriter,
GroupCommitConfig, HintFileWriter, HintOffset, TranquilBlockStore, hint_file_path,
};
use common::{test_cid, with_runtime};
fn fresh_store_dir() -> (tempfile::TempDir, BlockStoreConfig) {
let dir = tempfile::TempDir::new().unwrap();
let data_dir = dir.path().join("data");
let index_dir = dir.path().join("index");
std::fs::create_dir_all(&data_dir).unwrap();
std::fs::create_dir_all(&index_dir).unwrap();
let config = BlockStoreConfig {
data_dir,
index_dir,
max_file_size: 8192,
group_commit: GroupCommitConfig::default(),
shard_count: 1,
};
(dir, config)
}
fn hint_file_size(path: &std::path::Path) -> u64 {
let io = RealIO::new();
let fd = io.open(path, OpenOptions::read_write()).unwrap();
let size = io.file_size(fd).unwrap();
let _ = io.close(fd);
size
}
#[test]
fn rollback_rotation_does_not_leave_orphan_data_file() {
with_runtime(|| {
let (_dir, config) = fresh_store_dir();
let data_dir = config.data_dir.clone();
{
let store = TranquilBlockStore::open(config.clone()).unwrap();
store
.put_blocks_blocking(vec![(test_cid(1), vec![0x11; 64])])
.unwrap();
drop(store);
}
let orphan_cid = test_cid(99_999);
{
let io: Arc<RealIO> = Arc::new(RealIO::new());
let manager = DataFileManager::new(Arc::clone(&io), data_dir.clone(), 4096);
let (next_id, next_fd) = manager.prepare_rotation(DataFileId::new(0)).unwrap();
manager.commit_rotation(next_id, next_fd);
let mut writer = DataFileWriter::new(&*io, next_fd, next_id).unwrap();
let _ = writer.append_block(&orphan_cid, &vec![0xAB; 256]).unwrap();
writer.sync().unwrap();
io.sync_dir(&data_dir).unwrap();
let _ = io.delete(&hint_file_path(&data_dir, next_id));
manager.rollback_rotation(next_id, next_fd);
}
let store = TranquilBlockStore::open(config).unwrap();
assert!(
store.get_block_sync(&orphan_cid).unwrap().is_none(),
"rollback_rotation must delete the uncommitted data file; otherwise recovery's \
backup-restore branch resurrects rejected blocks"
);
});
}
#[test]
fn truncated_old_hint_drops_rejected_entry_on_reopen() {
with_runtime(|| {
let (_dir, config) = fresh_store_dir();
let data_dir = config.data_dir.clone();
let old_file_id = DataFileId::new(0);
let old_hint_path = hint_file_path(&data_dir, old_file_id);
let keep_cid = test_cid(1);
{
let store = TranquilBlockStore::open(config.clone()).unwrap();
store
.put_blocks_blocking(vec![(keep_cid, vec![0x11; 64])])
.unwrap();
drop(store);
}
let hint_len_before = hint_file_size(&old_hint_path);
let rejected_cid = test_cid(42_424);
{
let io: Arc<RealIO> = Arc::new(RealIO::new());
let fd = io.open(&old_hint_path, OpenOptions::read_write()).unwrap();
let mut writer = HintFileWriter::resume(&*io, fd, HintOffset::new(hint_len_before));
writer
.append_hint(
&rejected_cid,
&BlockLocation {
file_id: old_file_id,
offset: BlockOffset::new(4096),
length: BlockLength::new(64),
},
)
.unwrap();
writer.sync().unwrap();
io.truncate(fd, hint_len_before).unwrap();
io.sync(fd).unwrap();
let _ = io.close(fd);
}
let store = TranquilBlockStore::open(config).unwrap();
assert!(
store.get_block_sync(&rejected_cid).unwrap().is_none(),
"after rollback_batch truncates state.hint_fd, the rejected hint is gone and reopen is clean"
);
assert!(
store.get_block_sync(&keep_cid).unwrap().is_some(),
"legitimate pre-batch block remains readable after rollback"
);
});
}