feat(pds): phantom-file self-heal goes in scheduled compaction + reachability walk

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-05-16 23:24:55 +03:00
parent 1815ddba9f
commit deb2502112
4 changed files with 159 additions and 23 deletions

View File

@@ -55,6 +55,7 @@ metrics-exporter-prometheus = { workspace = true }
multibase = { workspace = true }
multihash = { workspace = true }
p256 = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
redis = { workspace = true, optional = true }
regex = { workspace = true }

View File

@@ -666,9 +666,7 @@ impl From<crate::auth::extractor::AuthError> for ApiError {
Self::OAuthExpiredToken(Some(msg))
}
crate::auth::extractor::AuthError::UseDpopNonce(nonce) => Self::UseDpopNonce(nonce),
crate::auth::extractor::AuthError::InvalidDpopProof(msg) => {
Self::InvalidDpopProof(msg)
}
crate::auth::extractor::AuthError::InvalidDpopProof(msg) => Self::InvalidDpopProof(msg),
}
}
}

View File

@@ -399,6 +399,10 @@ pub async fn start_scheduled_tasks(
let mut compaction_ticker = interval(compaction_interval);
compaction_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let compaction_blocklist = Arc::new(parking_lot::Mutex::new(CompactionBlocklist::new(
Duration::from_secs(300),
)));
let mut reachability_ticker = interval(reachability_interval);
reachability_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
@@ -464,8 +468,9 @@ pub async fn start_scheduled_tasks(
let store = store.clone();
let threshold = cfg.scheduled.compaction_liveness_threshold;
let grace_ms = cfg.scheduled.compaction_grace_period_ms;
let blocklist = Arc::clone(&compaction_blocklist);
if let Err(e) = tokio::task::spawn_blocking(move || {
run_compaction_pass(&store, threshold, grace_ms)
run_compaction_pass(&store, threshold, grace_ms, &blocklist)
}).await.unwrap_or_else(|e| Err(anyhow::anyhow!("compaction task panicked: {e}"))) {
error!("Compaction error: {e}");
}
@@ -485,6 +490,8 @@ pub async fn start_scheduled_tasks(
live_refcounted = result.live_refcounted,
leaked_blocks = result.leaked_blocks,
repaired_blocks = result.repaired_blocks,
phantom_files_purged = result.phantom_files_purged,
phantom_blocks_purged = result.phantom_blocks_purged,
bloom_heap_mb = result.bloom_heap_bytes / (1024 * 1024),
"reachability walk complete"
);
@@ -536,11 +543,44 @@ pub async fn start_scheduled_tasks(
}
}
pub struct CompactionBlocklist {
entries: std::collections::HashMap<tranquil_store::blockstore::DataFileId, std::time::Instant>,
cool_off: Duration,
}
impl CompactionBlocklist {
pub fn new(cool_off: Duration) -> Self {
Self {
entries: std::collections::HashMap::new(),
cool_off,
}
}
pub fn record_failure(&mut self, file_id: tranquil_store::blockstore::DataFileId) {
self.entries.insert(file_id, std::time::Instant::now());
}
pub fn is_blocked(&self, file_id: tranquil_store::blockstore::DataFileId) -> bool {
self.entries
.get(&file_id)
.is_some_and(|recorded| recorded.elapsed() < self.cool_off)
}
pub fn prune_expired(&mut self) {
let cool_off = self.cool_off;
self.entries
.retain(|_, recorded| recorded.elapsed() < cool_off);
}
}
fn run_compaction_pass(
store: &tranquil_store::blockstore::TranquilBlockStore,
liveness_threshold: f64,
grace_period_ms: u64,
blocklist: &parking_lot::Mutex<CompactionBlocklist>,
) -> anyhow::Result<()> {
blocklist.lock().prune_expired();
match store.cleanup_gc_meta() {
Ok(0) => {}
Ok(n) => info!(count = n, "cleaned up stale gc_meta entries"),
@@ -553,7 +593,11 @@ fn run_compaction_pass(
let candidate = liveness_map
.iter()
.filter(|(_, info)| info.total_blocks > 0 && info.ratio() < liveness_threshold)
.filter(|(fid, info)| {
info.total_blocks > 0
&& info.ratio() < liveness_threshold
&& !blocklist.lock().is_blocked(**fid)
})
.min_by(|(_, a), (_, b)| {
a.ratio()
.partial_cmp(&b.ratio())
@@ -574,21 +618,35 @@ fn run_compaction_pass(
"compacting data file"
);
match store.compact_file(file_id, grace_period_ms) {
Ok(result) => {
Ok(tranquil_store::blockstore::CompactionResult::Compacted(stats)) => {
info!(
file_id = %result.file_id,
reclaimed_bytes = result.reclaimed_bytes,
live_blocks = result.live_blocks,
dead_blocks = result.dead_blocks,
file_id = %stats.file_id,
reclaimed_bytes = stats.reclaimed_bytes,
live_blocks = stats.live_blocks,
dead_blocks = stats.dead_blocks,
"compaction complete"
);
Ok(())
}
Ok(tranquil_store::blockstore::CompactionResult::Purged {
file_id,
phantom_blocks,
}) => {
warn!(
file_id = %file_id,
phantom_blocks,
"compaction target missing on disk, purged phantom index entries"
);
Ok(())
}
Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {
debug!(file_id = %file_id, "skipped active file");
Ok(())
}
Err(e) => Err(anyhow::anyhow!("compaction failed: {e}")),
Err(e) => {
blocklist.lock().record_failure(file_id);
Err(anyhow::anyhow!("compaction failed: {e}"))
}
}
}
}
@@ -693,6 +751,24 @@ pub async fn generate_repo_car(
.await
.context("Failed to fetch blocks")?;
let missing: Vec<Cid> = chunk
.iter()
.zip(blocks.iter())
.filter_map(|(cid, block_opt)| block_opt.is_none().then_some(*cid))
.collect();
if !missing.is_empty() {
anyhow::bail!(
"repo CAR is incomplete: {} block(s) referenced by the MST are missing from storage. First 5: {}",
missing.len(),
missing
.iter()
.take(5)
.map(|c| c.to_string())
.collect::<Vec<_>>()
.join(", ")
);
}
chunk
.iter()
.zip(blocks.iter())
@@ -746,6 +822,8 @@ pub struct ReachabilityResult {
pub leaked_blocks: u64,
pub repaired_blocks: u64,
pub bloom_heap_bytes: usize,
pub phantom_files_purged: u64,
pub phantom_blocks_purged: u64,
}
const REPO_PAGE_SIZE: i64 = 500;
@@ -761,6 +839,7 @@ fn walk_repo_dag_sync(
store: &tranquil_store::blockstore::TranquilBlockStore,
head_cid: &Cid,
reachable: &mut std::collections::HashSet<CidBytes>,
phantom_files: &mut std::collections::HashSet<tranquil_store::blockstore::DataFileId>,
) -> anyhow::Result<()> {
let mut to_visit = vec![cid_to_bytes(head_cid)?];
@@ -769,15 +848,56 @@ fn walk_repo_dag_sync(
continue;
}
let block = match store.get_block_sync(&cid_bytes)? {
Some(b) => b,
None => {
let block = match store.get_block_sync(&cid_bytes) {
Ok(Some(b)) => b,
Ok(None) => {
tracing::warn!(
?cid_bytes,
"referenced block missing during reachability walk"
);
continue;
}
Err(e) => {
let Some(entry) = store.block_index().get(&cid_bytes) else {
tracing::warn!(
?cid_bytes,
error = %e,
"reachability walk: index entry vanished between read attempt and re-check"
);
continue;
};
let file_path = store.data_file_path(entry.location.file_id);
match file_path.try_exists() {
Ok(false) => {
tracing::warn!(
?cid_bytes,
file_id = %entry.location.file_id,
error = %e,
"indexed block points at missing data file, scheduling phantom purge"
);
phantom_files.insert(entry.location.file_id);
continue;
}
Ok(true) => {
return Err(anyhow::anyhow!(
"reachability walk read error on present data file {}: {e}",
entry.location.file_id
));
}
Err(probe_err) => {
tracing::warn!(
?cid_bytes,
file_id = %entry.location.file_id,
existence_probe_error = %probe_err,
"could not probe data file existence after read error"
);
return Err(anyhow::anyhow!(
"reachability walk read error on file {}: {e}",
entry.location.file_id
));
}
}
}
};
if let Ok(commit) = Commit::from_cbor(&block) {
@@ -858,13 +978,15 @@ pub fn run_reachability_walk(
let mut repos_walked: u64 = 0;
let mut seen_heads: std::collections::HashMap<Did, CidLink> = std::collections::HashMap::new();
let mut phantom_files: std::collections::HashSet<tranquil_store::blockstore::DataFileId> =
std::collections::HashSet::new();
paginate_repos(&rt, repo_repo, |page| {
page.iter().try_for_each(|repo| -> anyhow::Result<()> {
let cid =
Cid::from_str(repo.repo_root_cid.as_str()).context("invalid repo_root_cid")?;
seen_heads.insert(repo.did.clone(), repo.repo_root_cid.clone());
walk_repo_dag_sync(store, &cid, &mut visited)?;
walk_repo_dag_sync(store, &cid, &mut visited, &mut phantom_files)?;
repos_walked = repos_walked.saturating_add(1);
if repos_walked.is_multiple_of(1000) {
info!(
@@ -894,7 +1016,7 @@ pub fn run_reachability_walk(
let cid =
Cid::from_str(repo.repo_root_cid.as_str()).context("invalid repo_root_cid")?;
let mut extra = std::collections::HashSet::new();
walk_repo_dag_sync(store, &cid, &mut extra)?;
walk_repo_dag_sync(store, &cid, &mut extra, &mut phantom_files)?;
extra.iter().for_each(|c| reachable.insert(c));
seen_heads.insert(repo.did.clone(), repo.repo_root_cid.clone());
stale_repos = stale_repos.saturating_add(1);
@@ -922,7 +1044,7 @@ pub fn run_reachability_walk(
let cid =
Cid::from_str(repo.repo_root_cid.as_str()).context("invalid repo_root_cid")?;
let mut extra = std::collections::HashSet::new();
walk_repo_dag_sync(store, &cid, &mut extra)?;
walk_repo_dag_sync(store, &cid, &mut extra, &mut phantom_files)?;
extra.iter().for_each(|c| reachable.insert(c));
quiesced_stale = quiesced_stale.saturating_add(1);
Ok(())
@@ -958,6 +1080,19 @@ pub fn run_reachability_walk(
}
};
let phantom_files_purged = u64::try_from(phantom_files.len()).unwrap_or(u64::MAX);
let phantom_blocks_purged = phantom_files
.iter()
.map(|fid| store.block_index().purge_by_file_id(*fid))
.sum::<u64>();
if phantom_files_purged > 0 {
warn!(
phantom_files_purged,
phantom_blocks_purged, "purged phantom index entries from unreadable data files"
);
}
Ok(ReachabilityResult {
repos_walked,
blocks_visited,
@@ -965,5 +1100,7 @@ pub fn run_reachability_walk(
leaked_blocks,
repaired_blocks,
bloom_heap_bytes,
phantom_files_purged,
phantom_blocks_purged,
})
}

View File

@@ -25,7 +25,10 @@ async fn create_account_and_get_did(handle: &str, email: &str, password: &str) -
.expect("createAccount request failed");
assert_eq!(res.status(), StatusCode::OK, "createAccount failed");
let body: Value = res.json().await.expect("invalid createAccount JSON");
let did_str = body["did"].as_str().expect("no did in response").to_string();
let did_str = body["did"]
.as_str()
.expect("no did in response")
.to_string();
let _ = verify_new_account(&client, &did_str).await;
Did::new(did_str).expect("invalid DID format")
}
@@ -69,9 +72,7 @@ async fn delete_oldest_tokens_evicts_lowest_created_at() {
let repos = get_test_repos().await;
let base = Utc::now();
let token_ids: Vec<String> = (0..5)
.map(|i| format!("tok-{}-{}", ts, i))
.collect();
let token_ids: Vec<String> = (0..5).map(|i| format!("tok-{}-{}", ts, i)).collect();
for (i, tid) in token_ids.iter().enumerate() {
let created = base + Duration::seconds(i as i64);
@@ -101,8 +102,7 @@ async fn delete_oldest_tokens_evicts_lowest_created_at() {
let remaining_ids: std::collections::HashSet<String> =
remaining.iter().map(|t| t.token_id.0.clone()).collect();
let expected_ids: std::collections::HashSet<String> =
token_ids[2..].iter().cloned().collect();
let expected_ids: std::collections::HashSet<String> = token_ids[2..].iter().cloned().collect();
assert_eq!(
remaining_ids, expected_ids,
"surviving tokens must be the three newest by created_at"