diff --git a/crates/tranquil-store/src/blockstore/compaction.rs b/crates/tranquil-store/src/blockstore/compaction.rs index b1ea082..0ebbdbc 100644 --- a/crates/tranquil-store/src/blockstore/compaction.rs +++ b/crates/tranquil-store/src/blockstore/compaction.rs @@ -7,7 +7,9 @@ use super::group_commit::{ActiveFileSet, FileIdAllocator}; use super::hash_index::{BlockIndex, BlockIndexError}; use super::hint::{HintFileWriter, hint_file_path}; use super::manager::DataFileManager; -use super::types::{BlockLocation, CidBytes, CommitEpoch, CompactionResult, DataFileId}; +use super::types::{ + BlockLocation, CidBytes, CommitEpoch, CompactionResult, CompactionStats, DataFileId, +}; #[derive(Debug)] pub enum CompactionError { @@ -68,7 +70,13 @@ pub(super) fn compact_on_writer_thread( return Err(CompactionError::ActiveFileCannotBeCompacted); } - let source_handle = manager.open_for_read(source_file_id)?; + let source_handle = match manager.open_for_read(source_file_id) { + Ok(handle) => handle, + Err(e) if e.kind() == io::ErrorKind::NotFound => { + return purge_phantom_file(manager, index, hint_positions, epoch, source_file_id); + } + Err(e) => return Err(CompactionError::Io(e)), + }; let source_size = manager.io().file_size(source_handle.fd())?; let new_file_id = file_ids.allocate(); @@ -92,10 +100,16 @@ pub(super) fn compact_on_writer_thread( .ok(); Err(e) } - Ok((new_size, live_count, dead_count)) => { - if let Err(e) = index.write_checkpoint(epoch.current(), hint_positions) { - tracing::warn!(error = %e, "pre-delete checkpoint failed during compaction"); + Ok((new_size, live_count, dead_count, new_hint_offset)) => { + match live_count { + 0 => hint_positions.forget_extra(new_file_id), + _ => hint_positions.record_extra(new_file_id, new_hint_offset), } + hint_positions.forget_extra(source_file_id); + + index + .write_checkpoint(epoch.current(), hint_positions) + .map_err(CompactionError::Io)?; manager.delete_data_file(source_file_id)?; manager @@ -124,18 +138,51 @@ pub(super) fn compact_on_writer_thread( "compaction complete" ); - Ok(CompactionResult { + Ok(CompactionResult::Compacted(CompactionStats { file_id: source_file_id, old_size: source_size, new_size, live_blocks: live_count, dead_blocks: dead_count, reclaimed_bytes, - }) + })) } } } +fn purge_phantom_file( + manager: &DataFileManager, + index: &BlockIndex, + hint_positions: &super::group_commit::ShardHintPositions, + epoch: &super::types::EpochCounter, + source_file_id: DataFileId, +) -> Result { + let phantom_blocks = index.purge_by_file_id(source_file_id); + + tracing::warn!( + file_id = %source_file_id, + phantom_blocks, + "source data file missing on disk, purged phantom index entries" + ); + + hint_positions.forget_extra(source_file_id); + + manager + .io() + .delete(&hint_file_path(manager.data_dir(), source_file_id)) + .ok(); + manager.io().sync_dir(manager.data_dir()).ok(); + + index + .write_checkpoint(epoch.current(), hint_positions) + .map_err(CompactionError::Io)?; + + Ok(CompactionResult::Purged { + file_id: source_file_id, + phantom_blocks, + }) +} + fn stream_compact( manager: &DataFileManager, index: &BlockIndex, @@ -144,7 +191,7 @@ fn stream_compact( new_file_id: DataFileId, current_epoch: CommitEpoch, grace_period_ms: u64, -) -> Result<(u64, u64, u64), CompactionError> { +) -> Result<(u64, u64, u64, super::types::HintOffset), CompactionError> { let mut reader = DataFileReader::open(manager.io(), source_fd)?; let now = crate::wall_clock_ms(); @@ -225,6 +272,7 @@ fn stream_compact( }) .and_then(|()| manager.io().barrier().map_err(CompactionError::from)); + let final_hint_offset = hint_writer.position(); let _ = manager.io().close(hint_fd); finalize_result?; @@ -233,5 +281,5 @@ fn stream_compact( index.apply_compaction(&relocations, &dead_cids); - Ok((new_size, live_count, dead_count)) + Ok((new_size, live_count, dead_count, final_hint_offset)) } diff --git a/crates/tranquil-store/src/blockstore/group_commit.rs b/crates/tranquil-store/src/blockstore/group_commit.rs index e1f4f7d..7ade216 100644 --- a/crates/tranquil-store/src/blockstore/group_commit.rs +++ b/crates/tranquil-store/src/blockstore/group_commit.rs @@ -74,30 +74,49 @@ impl ActiveFileSet { } pub struct ShardHintPositions { - positions: RwLock>, + shard_positions: RwLock>, + extra_positions: RwLock>, } impl ShardHintPositions { pub fn new(shard_count: u8) -> Self { Self { - positions: RwLock::new( + shard_positions: RwLock::new( (0..shard_count as usize) .map(|_| (DataFileId::new(0), HintOffset::new(0))) .collect(), ), + extra_positions: RwLock::new(HashMap::new()), } } pub fn update(&self, shard_id: ShardId, file_id: DataFileId, offset: HintOffset) { - let mut positions = self.positions.write(); + let mut positions = self.shard_positions.write(); let idx = shard_id.as_usize(); if idx < positions.len() { positions[idx] = (file_id, offset); } } + pub fn record_extra(&self, file_id: DataFileId, offset: HintOffset) { + self.extra_positions.write().insert(file_id, offset); + } + + pub fn forget_extra(&self, file_id: DataFileId) { + self.extra_positions.write().remove(&file_id); + } + pub fn snapshot(&self) -> CheckpointPositions { - CheckpointPositions(self.positions.read().clone()) + let shard = self.shard_positions.read().clone(); + let extra = self.extra_positions.read().clone(); + debug_assert!( + shard + .iter() + .filter(|(fid, _)| fid.raw() != 0) + .all(|(fid, _)| !extra.contains_key(fid)), + "shard_positions and extra_positions must not overlap on the same DataFileId" + ); + CheckpointPositions(shard.into_iter().chain(extra).collect()) } } diff --git a/crates/tranquil-store/src/blockstore/hash_index.rs b/crates/tranquil-store/src/blockstore/hash_index.rs index 9f7e4c2..fe87211 100644 --- a/crates/tranquil-store/src/blockstore/hash_index.rs +++ b/crates/tranquil-store/src/blockstore/hash_index.rs @@ -606,6 +606,27 @@ impl HashTable { }); } + pub fn purge_by_file_id(&mut self, file_id: DataFileId) -> u64 { + let victims: Vec<(CidBytes, RefCount)> = self + .iter() + .filter(|s| s.file_id == file_id) + .map(|s| (s.cid, s.refcount)) + .collect(); + + let live_discarded = victims.iter().filter(|(_, rc)| !rc.is_zero()).count(); + if live_discarded > 0 { + tracing::warn!( + file_id = %file_id, + live_discarded, + total_purged = victims.len(), + "discarding live index entries for missing data file" + ); + } + + let removed = victims.iter().filter(|(cid, _)| self.remove(cid)).count(); + u64::try_from(removed).unwrap_or(u64::MAX) + } + pub fn cleanup_stale_gc(&mut self) -> u64 { self.slots .iter_mut() @@ -1503,6 +1524,10 @@ impl BlockIndex { }) } + pub fn purge_by_file_id(&self, file_id: DataFileId) -> u64 { + self.table.write().purge_by_file_id(file_id) + } + pub fn read_write_cursor(&self) -> Option { self.table.read().write_cursor() } diff --git a/crates/tranquil-store/src/blockstore/mod.rs b/crates/tranquil-store/src/blockstore/mod.rs index 6d9191f..18b72aa 100644 --- a/crates/tranquil-store/src/blockstore/mod.rs +++ b/crates/tranquil-store/src/blockstore/mod.rs @@ -30,8 +30,8 @@ pub use store::QuiesceGuard; pub use store::{BlockStoreConfig, DEFAULT_SHARD_COUNT, OpenRetryPolicy, TranquilBlockStore}; pub use types::{ BlockLength, BlockLocation, BlockOffset, BlockstoreSnapshot, CidBytes, CollectionResult, - CommitEpoch, CompactionResult, DataFileId, EpochCounter, HintOffset, IndexEntry, LivenessInfo, - MAX_BLOCK_SIZE, RefCount, ShardId, WallClockMs, WriteCursor, + CommitEpoch, CompactionResult, CompactionStats, DataFileId, EpochCounter, HintOffset, + IndexEntry, LivenessInfo, MAX_BLOCK_SIZE, RefCount, ShardId, WallClockMs, WriteCursor, }; use std::io; diff --git a/crates/tranquil-store/src/blockstore/store.rs b/crates/tranquil-store/src/blockstore/store.rs index 53f1695..4008f12 100644 --- a/crates/tranquil-store/src/blockstore/store.rs +++ b/crates/tranquil-store/src/blockstore/store.rs @@ -599,6 +599,16 @@ impl TranquilBlockStore { .map_err(RepoError::storage) } + pub fn list_hint_files(&self) -> Result, RepoError> { + let io = self.reader.manager().io(); + super::list_files_by_extension(io, &self.data_dir, super::hint::HINT_FILE_EXTENSION) + .map_err(RepoError::storage) + } + + pub fn hint_file_path(&self, file_id: DataFileId) -> std::path::PathBuf { + super::hint::hint_file_path(&self.data_dir, file_id) + } + pub fn put_blocks_blocking( &self, blocks: Vec<([u8; CID_SIZE], Vec)>, diff --git a/crates/tranquil-store/src/blockstore/types.rs b/crates/tranquil-store/src/blockstore/types.rs index 8e6178f..99cf957 100644 --- a/crates/tranquil-store/src/blockstore/types.rs +++ b/crates/tranquil-store/src/blockstore/types.rs @@ -68,7 +68,8 @@ pub struct CollectionResult { pub total_bytes: u64, } -pub struct CompactionResult { +#[derive(Debug)] +pub struct CompactionStats { pub file_id: DataFileId, pub old_size: u64, pub new_size: u64, @@ -77,6 +78,24 @@ pub struct CompactionResult { pub reclaimed_bytes: u64, } +#[derive(Debug)] +pub enum CompactionResult { + Compacted(CompactionStats), + Purged { + file_id: DataFileId, + phantom_blocks: u64, + }, +} + +impl CompactionResult { + pub fn file_id(&self) -> DataFileId { + match self { + Self::Compacted(stats) => stats.file_id, + Self::Purged { file_id, .. } => *file_id, + } + } +} + pub struct LivenessInfo { pub live_bytes: u64, pub total_bytes: u64, diff --git a/crates/tranquil-store/tests/gc.rs b/crates/tranquil-store/tests/gc.rs index 507c4c4..0a10a0b 100644 --- a/crates/tranquil-store/tests/gc.rs +++ b/crates/tranquil-store/tests/gc.rs @@ -193,10 +193,15 @@ fn compact_data_file_preserves_live_removes_dead() { .unwrap(); std::thread::sleep(std::time::Duration::from_millis(5)); - let result = store.compact_file(first_file, 0).unwrap(); - assert!(result.dead_blocks > 0, "should have removed dead blocks"); - assert!(result.live_blocks > 0, "should have preserved live blocks"); - assert!(result.reclaimed_bytes > 0, "should have reclaimed space"); + let stats = match store.compact_file(first_file, 0).unwrap() { + tranquil_store::blockstore::CompactionResult::Compacted(s) => s, + tranquil_store::blockstore::CompactionResult::Purged { .. } => { + panic!("expected compaction, got phantom purge") + } + }; + assert!(stats.dead_blocks > 0, "should have removed dead blocks"); + assert!(stats.live_blocks > 0, "should have preserved live blocks"); + assert!(stats.reclaimed_bytes > 0, "should have reclaimed space"); [1u8, 3].iter().for_each(|&seed| { let data = store.get_block_sync(&test_cid(seed)).unwrap(); diff --git a/crates/tranquil-store/tests/gc_stress.rs b/crates/tranquil-store/tests/gc_stress.rs index da9cadc..7825bbe 100644 --- a/crates/tranquil-store/tests/gc_stress.rs +++ b/crates/tranquil-store/tests/gc_stress.rs @@ -579,9 +579,14 @@ fn all_dead_file_compaction() { advance_epoch(&store); std::thread::sleep(std::time::Duration::from_millis(5)); - let result = store.compact_file(first_file, 0).unwrap(); - assert_eq!(result.live_blocks, 0); - assert!(result.dead_blocks > 0); + let stats = match store.compact_file(first_file, 0).unwrap() { + tranquil_store::blockstore::CompactionResult::Compacted(s) => s, + tranquil_store::blockstore::CompactionResult::Purged { .. } => { + panic!("expected compaction, got phantom purge") + } + }; + assert_eq!(stats.live_blocks, 0); + assert!(stats.dead_blocks > 0); (0u32..5).for_each(|seed| { let data = store.get_block_sync(&test_cid_u32(seed)).unwrap(); @@ -838,10 +843,15 @@ fn grace_period_prevents_collection_during_active_write() { let files = store.list_data_files().unwrap(); let first_file = files[0]; - let result = store.compact_file(first_file, 600_000).unwrap(); + let stats = match store.compact_file(first_file, 600_000).unwrap() { + tranquil_store::blockstore::CompactionResult::Compacted(s) => s, + tranquil_store::blockstore::CompactionResult::Purged { .. } => { + panic!("expected compaction, got phantom purge") + } + }; assert_eq!( - result.dead_blocks, 0, + stats.dead_blocks, 0, "grace period should prevent any collection" );