diff --git a/crates/tranquil-pds/Cargo.toml b/crates/tranquil-pds/Cargo.toml index d564537..912ec53 100644 --- a/crates/tranquil-pds/Cargo.toml +++ b/crates/tranquil-pds/Cargo.toml @@ -55,6 +55,7 @@ metrics-exporter-prometheus = { workspace = true } multibase = { workspace = true } multihash = { workspace = true } p256 = { workspace = true } +parking_lot = { workspace = true } rand = { workspace = true } redis = { workspace = true, optional = true } regex = { workspace = true } diff --git a/crates/tranquil-pds/src/api/error.rs b/crates/tranquil-pds/src/api/error.rs index 1ec5029..dc1f284 100644 --- a/crates/tranquil-pds/src/api/error.rs +++ b/crates/tranquil-pds/src/api/error.rs @@ -666,9 +666,7 @@ impl From for ApiError { Self::OAuthExpiredToken(Some(msg)) } crate::auth::extractor::AuthError::UseDpopNonce(nonce) => Self::UseDpopNonce(nonce), - crate::auth::extractor::AuthError::InvalidDpopProof(msg) => { - Self::InvalidDpopProof(msg) - } + crate::auth::extractor::AuthError::InvalidDpopProof(msg) => Self::InvalidDpopProof(msg), } } } diff --git a/crates/tranquil-pds/src/scheduled.rs b/crates/tranquil-pds/src/scheduled.rs index bd1b46a..31f17d5 100644 --- a/crates/tranquil-pds/src/scheduled.rs +++ b/crates/tranquil-pds/src/scheduled.rs @@ -399,6 +399,10 @@ pub async fn start_scheduled_tasks( let mut compaction_ticker = interval(compaction_interval); compaction_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let compaction_blocklist = Arc::new(parking_lot::Mutex::new(CompactionBlocklist::new( + Duration::from_secs(300), + ))); + let mut reachability_ticker = interval(reachability_interval); reachability_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); @@ -464,8 +468,9 @@ pub async fn start_scheduled_tasks( let store = store.clone(); let threshold = cfg.scheduled.compaction_liveness_threshold; let grace_ms = cfg.scheduled.compaction_grace_period_ms; + let blocklist = Arc::clone(&compaction_blocklist); if let Err(e) = tokio::task::spawn_blocking(move || { - run_compaction_pass(&store, threshold, grace_ms) + run_compaction_pass(&store, threshold, grace_ms, &blocklist) }).await.unwrap_or_else(|e| Err(anyhow::anyhow!("compaction task panicked: {e}"))) { error!("Compaction error: {e}"); } @@ -485,6 +490,8 @@ pub async fn start_scheduled_tasks( live_refcounted = result.live_refcounted, leaked_blocks = result.leaked_blocks, repaired_blocks = result.repaired_blocks, + phantom_files_purged = result.phantom_files_purged, + phantom_blocks_purged = result.phantom_blocks_purged, bloom_heap_mb = result.bloom_heap_bytes / (1024 * 1024), "reachability walk complete" ); @@ -536,11 +543,44 @@ pub async fn start_scheduled_tasks( } } +pub struct CompactionBlocklist { + entries: std::collections::HashMap, + cool_off: Duration, +} + +impl CompactionBlocklist { + pub fn new(cool_off: Duration) -> Self { + Self { + entries: std::collections::HashMap::new(), + cool_off, + } + } + + pub fn record_failure(&mut self, file_id: tranquil_store::blockstore::DataFileId) { + self.entries.insert(file_id, std::time::Instant::now()); + } + + pub fn is_blocked(&self, file_id: tranquil_store::blockstore::DataFileId) -> bool { + self.entries + .get(&file_id) + .is_some_and(|recorded| recorded.elapsed() < self.cool_off) + } + + pub fn prune_expired(&mut self) { + let cool_off = self.cool_off; + self.entries + .retain(|_, recorded| recorded.elapsed() < cool_off); + } +} + fn run_compaction_pass( store: &tranquil_store::blockstore::TranquilBlockStore, liveness_threshold: f64, grace_period_ms: u64, + blocklist: &parking_lot::Mutex, ) -> anyhow::Result<()> { + blocklist.lock().prune_expired(); + match store.cleanup_gc_meta() { Ok(0) => {} Ok(n) => info!(count = n, "cleaned up stale gc_meta entries"), @@ -553,7 +593,11 @@ fn run_compaction_pass( let candidate = liveness_map .iter() - .filter(|(_, info)| info.total_blocks > 0 && info.ratio() < liveness_threshold) + .filter(|(fid, info)| { + info.total_blocks > 0 + && info.ratio() < liveness_threshold + && !blocklist.lock().is_blocked(**fid) + }) .min_by(|(_, a), (_, b)| { a.ratio() .partial_cmp(&b.ratio()) @@ -574,21 +618,35 @@ fn run_compaction_pass( "compacting data file" ); match store.compact_file(file_id, grace_period_ms) { - Ok(result) => { + Ok(tranquil_store::blockstore::CompactionResult::Compacted(stats)) => { info!( - file_id = %result.file_id, - reclaimed_bytes = result.reclaimed_bytes, - live_blocks = result.live_blocks, - dead_blocks = result.dead_blocks, + file_id = %stats.file_id, + reclaimed_bytes = stats.reclaimed_bytes, + live_blocks = stats.live_blocks, + dead_blocks = stats.dead_blocks, "compaction complete" ); Ok(()) } + Ok(tranquil_store::blockstore::CompactionResult::Purged { + file_id, + phantom_blocks, + }) => { + warn!( + file_id = %file_id, + phantom_blocks, + "compaction target missing on disk, purged phantom index entries" + ); + Ok(()) + } Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => { debug!(file_id = %file_id, "skipped active file"); Ok(()) } - Err(e) => Err(anyhow::anyhow!("compaction failed: {e}")), + Err(e) => { + blocklist.lock().record_failure(file_id); + Err(anyhow::anyhow!("compaction failed: {e}")) + } } } } @@ -693,6 +751,24 @@ pub async fn generate_repo_car( .await .context("Failed to fetch blocks")?; + let missing: Vec = chunk + .iter() + .zip(blocks.iter()) + .filter_map(|(cid, block_opt)| block_opt.is_none().then_some(*cid)) + .collect(); + if !missing.is_empty() { + anyhow::bail!( + "repo CAR is incomplete: {} block(s) referenced by the MST are missing from storage. First 5: {}", + missing.len(), + missing + .iter() + .take(5) + .map(|c| c.to_string()) + .collect::>() + .join(", ") + ); + } + chunk .iter() .zip(blocks.iter()) @@ -746,6 +822,8 @@ pub struct ReachabilityResult { pub leaked_blocks: u64, pub repaired_blocks: u64, pub bloom_heap_bytes: usize, + pub phantom_files_purged: u64, + pub phantom_blocks_purged: u64, } const REPO_PAGE_SIZE: i64 = 500; @@ -761,6 +839,7 @@ fn walk_repo_dag_sync( store: &tranquil_store::blockstore::TranquilBlockStore, head_cid: &Cid, reachable: &mut std::collections::HashSet, + phantom_files: &mut std::collections::HashSet, ) -> anyhow::Result<()> { let mut to_visit = vec![cid_to_bytes(head_cid)?]; @@ -769,15 +848,56 @@ fn walk_repo_dag_sync( continue; } - let block = match store.get_block_sync(&cid_bytes)? { - Some(b) => b, - None => { + let block = match store.get_block_sync(&cid_bytes) { + Ok(Some(b)) => b, + Ok(None) => { tracing::warn!( ?cid_bytes, "referenced block missing during reachability walk" ); continue; } + Err(e) => { + let Some(entry) = store.block_index().get(&cid_bytes) else { + tracing::warn!( + ?cid_bytes, + error = %e, + "reachability walk: index entry vanished between read attempt and re-check" + ); + continue; + }; + let file_path = store.data_file_path(entry.location.file_id); + match file_path.try_exists() { + Ok(false) => { + tracing::warn!( + ?cid_bytes, + file_id = %entry.location.file_id, + error = %e, + "indexed block points at missing data file, scheduling phantom purge" + ); + phantom_files.insert(entry.location.file_id); + continue; + } + Ok(true) => { + return Err(anyhow::anyhow!( + "reachability walk read error on present data file {}: {e}", + entry.location.file_id + )); + } + Err(probe_err) => { + tracing::warn!( + ?cid_bytes, + file_id = %entry.location.file_id, + existence_probe_error = %probe_err, + "could not probe data file existence after read error" + ); + return Err(anyhow::anyhow!( + "reachability walk read error on file {}: {e}", + entry.location.file_id + )); + } + } + } }; if let Ok(commit) = Commit::from_cbor(&block) { @@ -858,13 +978,15 @@ pub fn run_reachability_walk( let mut repos_walked: u64 = 0; let mut seen_heads: std::collections::HashMap = std::collections::HashMap::new(); + let mut phantom_files: std::collections::HashSet = + std::collections::HashSet::new(); paginate_repos(&rt, repo_repo, |page| { page.iter().try_for_each(|repo| -> anyhow::Result<()> { let cid = Cid::from_str(repo.repo_root_cid.as_str()).context("invalid repo_root_cid")?; seen_heads.insert(repo.did.clone(), repo.repo_root_cid.clone()); - walk_repo_dag_sync(store, &cid, &mut visited)?; + walk_repo_dag_sync(store, &cid, &mut visited, &mut phantom_files)?; repos_walked = repos_walked.saturating_add(1); if repos_walked.is_multiple_of(1000) { info!( @@ -894,7 +1016,7 @@ pub fn run_reachability_walk( let cid = Cid::from_str(repo.repo_root_cid.as_str()).context("invalid repo_root_cid")?; let mut extra = std::collections::HashSet::new(); - walk_repo_dag_sync(store, &cid, &mut extra)?; + walk_repo_dag_sync(store, &cid, &mut extra, &mut phantom_files)?; extra.iter().for_each(|c| reachable.insert(c)); seen_heads.insert(repo.did.clone(), repo.repo_root_cid.clone()); stale_repos = stale_repos.saturating_add(1); @@ -922,7 +1044,7 @@ pub fn run_reachability_walk( let cid = Cid::from_str(repo.repo_root_cid.as_str()).context("invalid repo_root_cid")?; let mut extra = std::collections::HashSet::new(); - walk_repo_dag_sync(store, &cid, &mut extra)?; + walk_repo_dag_sync(store, &cid, &mut extra, &mut phantom_files)?; extra.iter().for_each(|c| reachable.insert(c)); quiesced_stale = quiesced_stale.saturating_add(1); Ok(()) @@ -958,6 +1080,19 @@ pub fn run_reachability_walk( } }; + let phantom_files_purged = u64::try_from(phantom_files.len()).unwrap_or(u64::MAX); + let phantom_blocks_purged = phantom_files + .iter() + .map(|fid| store.block_index().purge_by_file_id(*fid)) + .sum::(); + + if phantom_files_purged > 0 { + warn!( + phantom_files_purged, + phantom_blocks_purged, "purged phantom index entries from unreadable data files" + ); + } + Ok(ReachabilityResult { repos_walked, blocks_visited, @@ -965,5 +1100,7 @@ pub fn run_reachability_walk( leaked_blocks, repaired_blocks, bloom_heap_bytes, + phantom_files_purged, + phantom_blocks_purged, }) } diff --git a/crates/tranquil-pds/tests/oauth_token_eviction.rs b/crates/tranquil-pds/tests/oauth_token_eviction.rs index 3b69c3c..00bb644 100644 --- a/crates/tranquil-pds/tests/oauth_token_eviction.rs +++ b/crates/tranquil-pds/tests/oauth_token_eviction.rs @@ -25,7 +25,10 @@ async fn create_account_and_get_did(handle: &str, email: &str, password: &str) - .expect("createAccount request failed"); assert_eq!(res.status(), StatusCode::OK, "createAccount failed"); let body: Value = res.json().await.expect("invalid createAccount JSON"); - let did_str = body["did"].as_str().expect("no did in response").to_string(); + let did_str = body["did"] + .as_str() + .expect("no did in response") + .to_string(); let _ = verify_new_account(&client, &did_str).await; Did::new(did_str).expect("invalid DID format") } @@ -69,9 +72,7 @@ async fn delete_oldest_tokens_evicts_lowest_created_at() { let repos = get_test_repos().await; let base = Utc::now(); - let token_ids: Vec = (0..5) - .map(|i| format!("tok-{}-{}", ts, i)) - .collect(); + let token_ids: Vec = (0..5).map(|i| format!("tok-{}-{}", ts, i)).collect(); for (i, tid) in token_ids.iter().enumerate() { let created = base + Duration::seconds(i as i64); @@ -101,8 +102,7 @@ async fn delete_oldest_tokens_evicts_lowest_created_at() { let remaining_ids: std::collections::HashSet = remaining.iter().map(|t| t.token_id.0.clone()).collect(); - let expected_ids: std::collections::HashSet = - token_ids[2..].iter().cloned().collect(); + let expected_ids: std::collections::HashSet = token_ids[2..].iter().cloned().collect(); assert_eq!( remaining_ids, expected_ids, "surviving tokens must be the three newest by created_at"