feat(store): try to self-heal phantom index entries on compaction

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-05-16 23:24:55 +03:00
parent 1901b0a630
commit d07d702dd4
8 changed files with 161 additions and 25 deletions

View File

@@ -7,7 +7,9 @@ use super::group_commit::{ActiveFileSet, FileIdAllocator};
use super::hash_index::{BlockIndex, BlockIndexError};
use super::hint::{HintFileWriter, hint_file_path};
use super::manager::DataFileManager;
use super::types::{BlockLocation, CidBytes, CommitEpoch, CompactionResult, DataFileId};
use super::types::{
BlockLocation, CidBytes, CommitEpoch, CompactionResult, CompactionStats, DataFileId,
};
#[derive(Debug)]
pub enum CompactionError {
@@ -68,7 +70,13 @@ pub(super) fn compact_on_writer_thread<S: StorageIO>(
return Err(CompactionError::ActiveFileCannotBeCompacted);
}
let source_handle = manager.open_for_read(source_file_id)?;
let source_handle = match manager.open_for_read(source_file_id) {
Ok(handle) => handle,
Err(e) if e.kind() == io::ErrorKind::NotFound => {
return purge_phantom_file(manager, index, hint_positions, epoch, source_file_id);
}
Err(e) => return Err(CompactionError::Io(e)),
};
let source_size = manager.io().file_size(source_handle.fd())?;
let new_file_id = file_ids.allocate();
@@ -92,10 +100,16 @@ pub(super) fn compact_on_writer_thread<S: StorageIO>(
.ok();
Err(e)
}
Ok((new_size, live_count, dead_count)) => {
if let Err(e) = index.write_checkpoint(epoch.current(), hint_positions) {
tracing::warn!(error = %e, "pre-delete checkpoint failed during compaction");
Ok((new_size, live_count, dead_count, new_hint_offset)) => {
match live_count {
0 => hint_positions.forget_extra(new_file_id),
_ => hint_positions.record_extra(new_file_id, new_hint_offset),
}
hint_positions.forget_extra(source_file_id);
index
.write_checkpoint(epoch.current(), hint_positions)
.map_err(CompactionError::Io)?;
manager.delete_data_file(source_file_id)?;
manager
@@ -124,18 +138,51 @@ pub(super) fn compact_on_writer_thread<S: StorageIO>(
"compaction complete"
);
Ok(CompactionResult {
Ok(CompactionResult::Compacted(CompactionStats {
file_id: source_file_id,
old_size: source_size,
new_size,
live_blocks: live_count,
dead_blocks: dead_count,
reclaimed_bytes,
})
}))
}
}
}
fn purge_phantom_file<S: StorageIO>(
manager: &DataFileManager<S>,
index: &BlockIndex,
hint_positions: &super::group_commit::ShardHintPositions,
epoch: &super::types::EpochCounter,
source_file_id: DataFileId,
) -> Result<CompactionResult, CompactionError> {
let phantom_blocks = index.purge_by_file_id(source_file_id);
tracing::warn!(
file_id = %source_file_id,
phantom_blocks,
"source data file missing on disk, purged phantom index entries"
);
hint_positions.forget_extra(source_file_id);
manager
.io()
.delete(&hint_file_path(manager.data_dir(), source_file_id))
.ok();
manager.io().sync_dir(manager.data_dir()).ok();
index
.write_checkpoint(epoch.current(), hint_positions)
.map_err(CompactionError::Io)?;
Ok(CompactionResult::Purged {
file_id: source_file_id,
phantom_blocks,
})
}
fn stream_compact<S: StorageIO>(
manager: &DataFileManager<S>,
index: &BlockIndex,
@@ -144,7 +191,7 @@ fn stream_compact<S: StorageIO>(
new_file_id: DataFileId,
current_epoch: CommitEpoch,
grace_period_ms: u64,
) -> Result<(u64, u64, u64), CompactionError> {
) -> Result<(u64, u64, u64, super::types::HintOffset), CompactionError> {
let mut reader = DataFileReader::open(manager.io(), source_fd)?;
let now = crate::wall_clock_ms();
@@ -225,6 +272,7 @@ fn stream_compact<S: StorageIO>(
})
.and_then(|()| manager.io().barrier().map_err(CompactionError::from));
let final_hint_offset = hint_writer.position();
let _ = manager.io().close(hint_fd);
finalize_result?;
@@ -233,5 +281,5 @@ fn stream_compact<S: StorageIO>(
index.apply_compaction(&relocations, &dead_cids);
Ok((new_size, live_count, dead_count))
Ok((new_size, live_count, dead_count, final_hint_offset))
}

View File

@@ -74,30 +74,49 @@ impl ActiveFileSet {
}
pub struct ShardHintPositions {
positions: RwLock<Vec<(DataFileId, HintOffset)>>,
shard_positions: RwLock<Vec<(DataFileId, HintOffset)>>,
extra_positions: RwLock<HashMap<DataFileId, HintOffset>>,
}
impl ShardHintPositions {
pub fn new(shard_count: u8) -> Self {
Self {
positions: RwLock::new(
shard_positions: RwLock::new(
(0..shard_count as usize)
.map(|_| (DataFileId::new(0), HintOffset::new(0)))
.collect(),
),
extra_positions: RwLock::new(HashMap::new()),
}
}
pub fn update(&self, shard_id: ShardId, file_id: DataFileId, offset: HintOffset) {
let mut positions = self.positions.write();
let mut positions = self.shard_positions.write();
let idx = shard_id.as_usize();
if idx < positions.len() {
positions[idx] = (file_id, offset);
}
}
pub fn record_extra(&self, file_id: DataFileId, offset: HintOffset) {
self.extra_positions.write().insert(file_id, offset);
}
pub fn forget_extra(&self, file_id: DataFileId) {
self.extra_positions.write().remove(&file_id);
}
pub fn snapshot(&self) -> CheckpointPositions {
CheckpointPositions(self.positions.read().clone())
let shard = self.shard_positions.read().clone();
let extra = self.extra_positions.read().clone();
debug_assert!(
shard
.iter()
.filter(|(fid, _)| fid.raw() != 0)
.all(|(fid, _)| !extra.contains_key(fid)),
"shard_positions and extra_positions must not overlap on the same DataFileId"
);
CheckpointPositions(shard.into_iter().chain(extra).collect())
}
}

View File

@@ -606,6 +606,27 @@ impl HashTable {
});
}
pub fn purge_by_file_id(&mut self, file_id: DataFileId) -> u64 {
let victims: Vec<(CidBytes, RefCount)> = self
.iter()
.filter(|s| s.file_id == file_id)
.map(|s| (s.cid, s.refcount))
.collect();
let live_discarded = victims.iter().filter(|(_, rc)| !rc.is_zero()).count();
if live_discarded > 0 {
tracing::warn!(
file_id = %file_id,
live_discarded,
total_purged = victims.len(),
"discarding live index entries for missing data file"
);
}
let removed = victims.iter().filter(|(cid, _)| self.remove(cid)).count();
u64::try_from(removed).unwrap_or(u64::MAX)
}
pub fn cleanup_stale_gc(&mut self) -> u64 {
self.slots
.iter_mut()
@@ -1503,6 +1524,10 @@ impl BlockIndex {
})
}
pub fn purge_by_file_id(&self, file_id: DataFileId) -> u64 {
self.table.write().purge_by_file_id(file_id)
}
pub fn read_write_cursor(&self) -> Option<WriteCursor> {
self.table.read().write_cursor()
}

View File

@@ -30,8 +30,8 @@ pub use store::QuiesceGuard;
pub use store::{BlockStoreConfig, DEFAULT_SHARD_COUNT, OpenRetryPolicy, TranquilBlockStore};
pub use types::{
BlockLength, BlockLocation, BlockOffset, BlockstoreSnapshot, CidBytes, CollectionResult,
CommitEpoch, CompactionResult, DataFileId, EpochCounter, HintOffset, IndexEntry, LivenessInfo,
MAX_BLOCK_SIZE, RefCount, ShardId, WallClockMs, WriteCursor,
CommitEpoch, CompactionResult, CompactionStats, DataFileId, EpochCounter, HintOffset,
IndexEntry, LivenessInfo, MAX_BLOCK_SIZE, RefCount, ShardId, WallClockMs, WriteCursor,
};
use std::io;

View File

@@ -599,6 +599,16 @@ impl<S: StorageIO + Send + Sync + 'static> TranquilBlockStore<S> {
.map_err(RepoError::storage)
}
pub fn list_hint_files(&self) -> Result<Vec<DataFileId>, RepoError> {
let io = self.reader.manager().io();
super::list_files_by_extension(io, &self.data_dir, super::hint::HINT_FILE_EXTENSION)
.map_err(RepoError::storage)
}
pub fn hint_file_path(&self, file_id: DataFileId) -> std::path::PathBuf {
super::hint::hint_file_path(&self.data_dir, file_id)
}
pub fn put_blocks_blocking(
&self,
blocks: Vec<([u8; CID_SIZE], Vec<u8>)>,

View File

@@ -68,7 +68,8 @@ pub struct CollectionResult {
pub total_bytes: u64,
}
pub struct CompactionResult {
#[derive(Debug)]
pub struct CompactionStats {
pub file_id: DataFileId,
pub old_size: u64,
pub new_size: u64,
@@ -77,6 +78,24 @@ pub struct CompactionResult {
pub reclaimed_bytes: u64,
}
#[derive(Debug)]
pub enum CompactionResult {
Compacted(CompactionStats),
Purged {
file_id: DataFileId,
phantom_blocks: u64,
},
}
impl CompactionResult {
pub fn file_id(&self) -> DataFileId {
match self {
Self::Compacted(stats) => stats.file_id,
Self::Purged { file_id, .. } => *file_id,
}
}
}
pub struct LivenessInfo {
pub live_bytes: u64,
pub total_bytes: u64,

View File

@@ -193,10 +193,15 @@ fn compact_data_file_preserves_live_removes_dead() {
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(5));
let result = store.compact_file(first_file, 0).unwrap();
assert!(result.dead_blocks > 0, "should have removed dead blocks");
assert!(result.live_blocks > 0, "should have preserved live blocks");
assert!(result.reclaimed_bytes > 0, "should have reclaimed space");
let stats = match store.compact_file(first_file, 0).unwrap() {
tranquil_store::blockstore::CompactionResult::Compacted(s) => s,
tranquil_store::blockstore::CompactionResult::Purged { .. } => {
panic!("expected compaction, got phantom purge")
}
};
assert!(stats.dead_blocks > 0, "should have removed dead blocks");
assert!(stats.live_blocks > 0, "should have preserved live blocks");
assert!(stats.reclaimed_bytes > 0, "should have reclaimed space");
[1u8, 3].iter().for_each(|&seed| {
let data = store.get_block_sync(&test_cid(seed)).unwrap();

View File

@@ -579,9 +579,14 @@ fn all_dead_file_compaction() {
advance_epoch(&store);
std::thread::sleep(std::time::Duration::from_millis(5));
let result = store.compact_file(first_file, 0).unwrap();
assert_eq!(result.live_blocks, 0);
assert!(result.dead_blocks > 0);
let stats = match store.compact_file(first_file, 0).unwrap() {
tranquil_store::blockstore::CompactionResult::Compacted(s) => s,
tranquil_store::blockstore::CompactionResult::Purged { .. } => {
panic!("expected compaction, got phantom purge")
}
};
assert_eq!(stats.live_blocks, 0);
assert!(stats.dead_blocks > 0);
(0u32..5).for_each(|seed| {
let data = store.get_block_sync(&test_cid_u32(seed)).unwrap();
@@ -838,10 +843,15 @@ fn grace_period_prevents_collection_during_active_write() {
let files = store.list_data_files().unwrap();
let first_file = files[0];
let result = store.compact_file(first_file, 600_000).unwrap();
let stats = match store.compact_file(first_file, 600_000).unwrap() {
tranquil_store::blockstore::CompactionResult::Compacted(s) => s,
tranquil_store::blockstore::CompactionResult::Purged { .. } => {
panic!("expected compaction, got phantom purge")
}
};
assert_eq!(
result.dead_blocks, 0,
stats.dead_blocks, 0,
"grace period should prevent any collection"
);