diff --git a/.config/nextest.toml b/.config/nextest.toml index beb0ea8..cfcfcee 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -72,6 +72,10 @@ test-group = "io-heavy-sim" filter = "test(/test_scale_/) | test(/full_backup_and_restore/)" slow-timeout = { period = "120s", terminate-after = 4 } +[[profile.default.overrides]] +filter = "binary(compaction_restart) | binary(mst_refcount_integrity) | binary(gc_compaction_restart)" +slow-timeout = { period = "120s", terminate-after = 4 } + [[profile.ci.overrides]] filter = "test(/import_with_verification/) | test(/plc_migration/)" test-group = "serial-env-tests" diff --git a/Cargo.lock b/Cargo.lock index 2e9d586..19d4db1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7405,7 +7405,7 @@ dependencies = [ [[package]] name = "tranquil-api" -version = "0.5.3" +version = "0.5.4" dependencies = [ "anyhow", "axum", @@ -7456,7 +7456,7 @@ dependencies = [ [[package]] name = "tranquil-auth" -version = "0.5.3" +version = "0.5.4" dependencies = [ "anyhow", "base32", @@ -7479,7 +7479,7 @@ dependencies = [ [[package]] name = "tranquil-cache" -version = "0.5.3" +version = "0.5.4" dependencies = [ "async-trait", "base64 0.22.1", @@ -7493,7 +7493,7 @@ dependencies = [ [[package]] name = "tranquil-comms" -version = "0.5.3" +version = "0.5.4" dependencies = [ "async-trait", "base64 0.22.1", @@ -7511,7 +7511,7 @@ dependencies = [ [[package]] name = "tranquil-config" -version = "0.5.3" +version = "0.5.4" dependencies = [ "confique", "serde", @@ -7519,7 +7519,7 @@ dependencies = [ [[package]] name = "tranquil-crypto" -version = "0.5.3" +version = "0.5.4" dependencies = [ "aes-gcm", "base64 0.22.1", @@ -7535,7 +7535,7 @@ dependencies = [ [[package]] name = "tranquil-db" -version = "0.5.3" +version = "0.5.4" dependencies = [ "async-trait", "chrono", @@ -7552,7 +7552,7 @@ dependencies = [ [[package]] name = "tranquil-db-traits" -version = "0.5.3" +version = "0.5.4" dependencies = [ "async-trait", "base64 0.22.1", @@ -7568,7 +7568,7 @@ dependencies = [ [[package]] name = "tranquil-infra" -version = "0.5.3" +version = "0.5.4" dependencies = [ "async-trait", "bytes", @@ -7579,7 +7579,7 @@ dependencies = [ [[package]] name = "tranquil-lexicon" -version = "0.5.3" +version = "0.5.4" dependencies = [ "chrono", "hickory-resolver", @@ -7597,7 +7597,7 @@ dependencies = [ [[package]] name = "tranquil-oauth" -version = "0.5.3" +version = "0.5.4" dependencies = [ "anyhow", "axum", @@ -7620,7 +7620,7 @@ dependencies = [ [[package]] name = "tranquil-oauth-server" -version = "0.5.3" +version = "0.5.4" dependencies = [ "axum", "base64 0.22.1", @@ -7653,7 +7653,7 @@ dependencies = [ [[package]] name = "tranquil-pds" -version = "0.5.3" +version = "0.5.4" dependencies = [ "aes-gcm", "anyhow", @@ -7745,7 +7745,7 @@ dependencies = [ [[package]] name = "tranquil-repo" -version = "0.5.3" +version = "0.5.4" dependencies = [ "bytes", "cid", @@ -7757,7 +7757,7 @@ dependencies = [ [[package]] name = "tranquil-ripple" -version = "0.5.3" +version = "0.5.4" dependencies = [ "async-trait", "backon", @@ -7782,7 +7782,7 @@ dependencies = [ [[package]] name = "tranquil-scopes" -version = "0.5.3" +version = "0.5.4" dependencies = [ "axum", "futures", @@ -7798,7 +7798,7 @@ dependencies = [ [[package]] name = "tranquil-server" -version = "0.5.3" +version = "0.5.4" dependencies = [ "axum", "clap", @@ -7819,7 +7819,7 @@ dependencies = [ [[package]] name = "tranquil-signal" -version = "0.5.3" +version = "0.5.4" dependencies = [ "async-trait", "chrono", @@ -7842,7 +7842,7 @@ dependencies = [ [[package]] name = "tranquil-storage" -version = "0.5.3" +version = "0.5.4" dependencies = [ "async-trait", "aws-config", @@ -7859,7 +7859,7 @@ dependencies = [ [[package]] name = "tranquil-store" -version = "0.5.3" +version = "0.5.4" dependencies = [ "async-trait", "bytes", @@ -7906,7 +7906,7 @@ dependencies = [ [[package]] name = "tranquil-sync" -version = "0.5.3" +version = "0.5.4" dependencies = [ "anyhow", "axum", @@ -7928,7 +7928,7 @@ dependencies = [ [[package]] name = "tranquil-types" -version = "0.5.3" +version = "0.5.4" dependencies = [ "chrono", "cid", diff --git a/Cargo.toml b/Cargo.toml index a8995d0..4b00bee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ members = [ ] [workspace.package] -version = "0.5.3" +version = "0.5.4" edition = "2024" license = "AGPL-3.0-or-later" diff --git a/crates/tranquil-pds/tests/gc_compaction_restart.rs b/crates/tranquil-pds/tests/gc_compaction_restart.rs new file mode 100644 index 0000000..aedef91 --- /dev/null +++ b/crates/tranquil-pds/tests/gc_compaction_restart.rs @@ -0,0 +1,175 @@ +mod common; + +use chrono::Utc; +use common::*; +use reqwest::StatusCode; +use serde_json::{Value, json}; + +fn run_compaction(store: &tranquil_store::blockstore::TranquilBlockStore) { + let liveness = store.compaction_liveness(0).unwrap(); + liveness + .iter() + .filter(|(_, info)| info.total_blocks > 0 && info.ratio() < 0.95) + .map(|(&fid, _)| fid) + .collect::>() + .into_iter() + .for_each(|fid| { + match store.compact_file(fid, 0) { + Ok(_) => {} + Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {} + Err(e) => eprintln!("compaction: {e}"), + } + }); +} + +#[tokio::test] +async fn mst_blocks_survive_full_store_reopen() { + if !is_store_backend() { + eprintln!("skipping: only meaningful with tranquil-store backend"); + return; + } + + let client = client(); + let base = base_url().await; + let block_store = get_test_block_store().await; + + let store = block_store + .as_tranquil_store() + .expect("expected tranquil-store backend"); + + let (jwt, did) = create_account_and_login(&client).await; + + let mut posts = Vec::new(); + for i in 0..30 { + let res = client + .post(format!("{base}/xrpc/com.atproto.repo.createRecord")) + .bearer_auth(&jwt) + .json(&json!({ + "repo": did, + "collection": "app.bsky.feed.post", + "record": { + "$type": "app.bsky.feed.post", + "text": format!("compaction test post {i}"), + "createdAt": Utc::now().to_rfc3339() + } + })) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await.unwrap(); + posts.push(( + body["uri"].as_str().unwrap().to_string(), + body["cid"].as_str().unwrap().to_string(), + )); + } + + for (uri, cid) in &posts[..20] { + let res = client + .post(format!("{base}/xrpc/com.atproto.repo.createRecord")) + .bearer_auth(&jwt) + .json(&json!({ + "repo": did, + "collection": "app.bsky.feed.like", + "record": { + "$type": "app.bsky.feed.like", + "subject": { "uri": uri, "cid": cid }, + "createdAt": Utc::now().to_rfc3339() + } + })) + .send() + .await + .unwrap(); + assert_eq!(res.status(), StatusCode::OK, "like failed for {uri}"); + } + + let data_dir = store.data_dir().to_path_buf(); + let index_dir = data_dir + .parent() + .unwrap() + .join("index"); + + let store_clone = store.clone(); + tokio::task::spawn_blocking(move || { + (0..40).for_each(|_| run_compaction(&store_clone)); + }) + .await + .unwrap(); + + let repo_root_str: String = get_test_repos() + .await + .repo + .get_repo_root_by_did(&tranquil_types::Did::new(did.clone()).unwrap()) + .await + .expect("db error") + .expect("no repo root") + .to_string(); + + let head_cid = cid::Cid::try_from(repo_root_str.as_str()).expect("invalid cid"); + + let car_blocks = + tranquil_pds::scheduled::collect_current_repo_blocks(block_store, &head_cid) + .await + .expect("collect blocks"); + + let block_count_before = car_blocks.len(); + + let max_file_size = store + .list_data_files() + .ok() + .and_then(|_| Some(4 * 1024 * 1024u64)) + .unwrap_or(4 * 1024 * 1024); + + let reopened_missing = tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Handle::current(); + let _guard = rt.enter(); + + let config = tranquil_store::blockstore::BlockStoreConfig { + data_dir: data_dir.clone(), + index_dir, + max_file_size, + group_commit: tranquil_store::blockstore::GroupCommitConfig::default(), + shard_count: 1, + }; + let fresh = tranquil_store::blockstore::TranquilBlockStore::open(config) + .expect("reopen failed"); + + let missing: Vec = car_blocks + .iter() + .filter_map(|cid_bytes| { + if cid_bytes.len() < 36 { + return None; + } + let mut arr = [0u8; 36]; + arr.copy_from_slice(&cid_bytes[..36]); + match fresh.get_block_sync(&arr) { + Ok(Some(_)) => None, + Ok(None) => Some(format!( + "missing {}", + cid::Cid::try_from(cid_bytes.as_slice()) + .map(|c| c.to_string()) + .unwrap_or_else(|_| hex::encode(cid_bytes)) + )), + Err(e) => Some(format!("error: {e}")), + } + }) + .collect(); + + drop(fresh); + missing + }) + .await + .unwrap(); + + assert!( + reopened_missing.is_empty(), + "{} of {block_count_before} blocks missing after blockstore reopen:\n{}", + reopened_missing.len(), + reopened_missing + .iter() + .take(20) + .map(|s| s.as_str()) + .collect::>() + .join("\n"), + ); +} diff --git a/crates/tranquil-store/src/blockstore/hash_index.rs b/crates/tranquil-store/src/blockstore/hash_index.rs index fece45d..e9668ec 100644 --- a/crates/tranquil-store/src/blockstore/hash_index.rs +++ b/crates/tranquil-store/src/blockstore/hash_index.rs @@ -215,6 +215,10 @@ impl HashTable { self.get(cid).is_some() } + pub fn contains_live(&self, cid: &[u8; CID_SIZE]) -> bool { + self.get(cid).is_some_and(|s| !s.refcount.is_zero()) + } + pub fn insert(&mut self, new_slot: Slot) -> Result, CapacityExhausted> { if is_empty(&new_slot.cid) { tracing::error!("attempted to insert all-zero CID into hash table"); @@ -1195,7 +1199,7 @@ impl BlockIndex { } pub fn has(&self, cid: &[u8; CID_SIZE]) -> bool { - self.table.read().contains(cid) + self.table.read().contains_live(cid) } pub fn batch_put( diff --git a/crates/tranquil-store/tests/common/mod.rs b/crates/tranquil-store/tests/common/mod.rs index e00af4e..92443b3 100644 --- a/crates/tranquil-store/tests/common/mod.rs +++ b/crates/tranquil-store/tests/common/mod.rs @@ -104,6 +104,70 @@ pub fn compact_all_sealed(store: &TranquilBlockStore) { }); } +pub fn tiny_blockstore_config(dir: &std::path::Path) -> BlockStoreConfig { + BlockStoreConfig { + data_dir: dir.join("data"), + index_dir: dir.join("index"), + max_file_size: 300, + group_commit: GroupCommitConfig { + checkpoint_interval_ms: 100, + checkpoint_write_threshold: 10, + ..GroupCommitConfig::default() + }, + shard_count: 1, + } +} + +pub fn compact_by_liveness(store: &TranquilBlockStore) { + let liveness = store.compaction_liveness(0).unwrap(); + liveness + .iter() + .filter(|(_, info)| info.total_blocks > 0 && info.ratio() < 0.99) + .map(|(&fid, _)| fid) + .collect::>() + .into_iter() + .for_each(|fid| match store.compact_file(fid, 0) { + Ok(_) => {} + Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {} + Err(e) => eprintln!("compaction: {e}"), + }); +} + +pub fn compact_lowest_liveness(store: &TranquilBlockStore) { + let liveness = store.compaction_liveness(0).unwrap(); + let candidate = liveness + .iter() + .filter(|(_, info)| info.total_blocks > 0 && info.ratio() < 0.99) + .min_by(|(_, a), (_, b)| { + a.ratio() + .partial_cmp(&b.ratio()) + .unwrap_or(std::cmp::Ordering::Equal) + }) + .map(|(&fid, _)| fid); + + if let Some(fid) = candidate { + match store.compact_file(fid, 0) { + Ok(_) => {} + Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {} + Err(e) => eprintln!("compaction: {e}"), + } + } +} + +pub fn collect_refcounts(store: &TranquilBlockStore, cids: &[CidBytes]) -> Vec<(u32, u32)> { + cids.iter() + .map(|cid| { + let seed = u32::from_le_bytes([cid[4], cid[5], cid[6], cid[7]]); + let rc = store + .block_index() + .get(cid) + .map(|e| e.refcount.raw()) + .unwrap_or(0); + (seed, rc) + }) + .collect() +} + pub struct TestStores { pub blockstore: TranquilBlockStore, pub eventlog: Arc>, diff --git a/crates/tranquil-store/tests/compaction_restart.rs b/crates/tranquil-store/tests/compaction_restart.rs new file mode 100644 index 0000000..08f38b5 --- /dev/null +++ b/crates/tranquil-store/tests/compaction_restart.rs @@ -0,0 +1,567 @@ +mod common; + +use std::path::Path; +use std::sync::Arc; + +use common::{ + collect_refcounts, compact_by_liveness, compact_lowest_liveness, test_cid, + tiny_blockstore_config, with_runtime, +}; +use tranquil_store::RealIO; +use tranquil_store::blockstore::{CidBytes, TranquilBlockStore}; +use tranquil_store::eventlog::{EventLog, EventLogBridge, EventLogConfig}; +use tranquil_store::metastore::handler::HandlerPool; +use tranquil_store::metastore::partitions::Partition; +use tranquil_store::metastore::{Metastore, MetastoreConfig}; + +struct FullStack { + blockstore: TranquilBlockStore, + _pool: Arc, + _event_log: Arc>, +} + +fn open_full_stack(base_dir: &Path) -> FullStack { + let metastore_dir = base_dir.join("metastore"); + let segments_dir = base_dir.join("eventlog").join("segments"); + let blockstore_data = base_dir.join("blockstore").join("data"); + let blockstore_index = base_dir.join("blockstore").join("index"); + + [&metastore_dir, &segments_dir, &blockstore_data, &blockstore_index] + .iter() + .for_each(|d| std::fs::create_dir_all(d).unwrap()); + + let metastore = Metastore::open(&metastore_dir, MetastoreConfig::default()).unwrap(); + + let blockstore = TranquilBlockStore::open(tranquil_store::blockstore::BlockStoreConfig { + data_dir: blockstore_data, + index_dir: blockstore_index, + max_file_size: 512, + group_commit: tranquil_store::blockstore::GroupCommitConfig::default(), + shard_count: 1, + }) + .unwrap(); + + let event_log = Arc::new( + EventLog::open( + EventLogConfig { + segments_dir, + ..EventLogConfig::default() + }, + RealIO::new(), + ) + .unwrap(), + ); + + let bridge = Arc::new(EventLogBridge::new(Arc::clone(&event_log))); + + let was_clean = tranquil_store::consistency::had_clean_shutdown(base_dir); + tranquil_store::consistency::remove_clean_shutdown_marker(base_dir).ok(); + + let indexes = metastore.partition(Partition::Indexes).clone(); + let event_ops = metastore.event_ops(Arc::clone(&bridge)); + let recovered = event_ops + .recover_metastore_mutations(&indexes) + .unwrap(); + if recovered > 0 { + eprintln!("replayed {recovered} metastore mutations from eventlog"); + } + + if !was_clean || recovered > 0 { + let report = tranquil_store::consistency::verify_store_consistency( + &blockstore, + &metastore, + &event_log, + ); + report.log_findings(); + if report.has_repairable_issues() { + let repair = tranquil_store::consistency::repair_known_issues(&blockstore, &report); + if repair.orphan_files_removed > 0 { + eprintln!("removed {} orphan files", repair.orphan_files_removed); + } + } + } + + let pool = Arc::new(HandlerPool::spawn::( + metastore, + bridge, + Some(blockstore.clone()), + None, + )); + + FullStack { + blockstore, + _pool: pool, + _event_log: event_log, + } +} + +fn close_full_stack(stack: FullStack, base_dir: &Path) { + let rt = tokio::runtime::Handle::current(); + rt.block_on(stack._pool.close()); + if let Err(e) = stack._event_log.shutdown() { + eprintln!("eventlog shutdown: {e}"); + } + tranquil_store::consistency::write_clean_shutdown_marker(base_dir).ok(); + drop(stack.blockstore); +} + +fn verify_blocks_and_refcounts( + store: &TranquilBlockStore, + live_cids: &[CidBytes], + expected_refcounts: Option<&[(u32, u32)]>, + label: &str, +) { + let missing: Vec = live_cids + .iter() + .filter(|cid| store.get_block_sync(cid).unwrap().is_none()) + .map(|cid| u32::from_le_bytes([cid[4], cid[5], cid[6], cid[7]])) + .collect(); + + assert!( + missing.is_empty(), + "{label}: live blocks missing after reopen: {missing:?}" + ); + + match expected_refcounts { + Some(expected) => { + let actual = collect_refcounts(store, live_cids); + let mismatches: Vec<_> = expected + .iter() + .zip(actual.iter()) + .filter(|((_, exp_rc), (_, act_rc))| exp_rc != act_rc) + .map(|((seed, exp), (_, act))| format!("seed {seed}: before={exp} after={act}")) + .collect(); + + assert!( + mismatches.is_empty(), + "{label}: refcounts changed across reopen:\n{}", + mismatches.join("\n"), + ); + } + None => { + live_cids.iter().for_each(|cid| { + let rc = store + .block_index() + .get(cid) + .map(|e| e.refcount.raw()) + .unwrap_or(0); + assert!( + rc > 0, + "{label}: refcount dropped to 0 for seed {}", + u32::from_le_bytes([cid[4], cid[5], cid[6], cid[7]]) + ); + }); + } + } +} + +#[test] +fn hundreds_of_compaction_cycles() { + with_runtime(|| { + let dir = tempfile::TempDir::new().unwrap(); + + let live_cids: Vec = (0..15u32).map(test_cid).collect(); + + { + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + + live_cids.iter().for_each(|cid| { + store + .put_blocks_blocking(vec![(*cid, vec![0xAA; 80])]) + .unwrap(); + }); + + (0..500u32).for_each(|round| { + let churn = test_cid(2000 + round); + store + .put_blocks_blocking(vec![(churn, vec![0xDD; 80])]) + .unwrap(); + store.apply_commit_blocking(vec![], vec![churn]).unwrap(); + + if round % 3 == 0 { + compact_lowest_liveness(&store); + } + }); + + (0..200).for_each(|_| compact_by_liveness(&store)); + + live_cids.iter().for_each(|cid| { + assert!( + store.get_block_sync(cid).unwrap().is_some(), + "sanity: block present before drop" + ); + }); + + drop(store); + } + + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + verify_blocks_and_refcounts(&store, &live_cids, None, "500 churn + 200 compact rounds"); + }); +} + +#[test] +fn commit_style_decrements() { + with_runtime(|| { + let dir = tempfile::TempDir::new().unwrap(); + + let shared_nodes: Vec = (0..10u32).map(test_cid).collect(); + + let refcounts_before = { + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + + shared_nodes.iter().for_each(|cid| { + store + .put_blocks_blocking(vec![(*cid, vec![0xAA; 80])]) + .unwrap(); + }); + + let mut prev_commit = test_cid(5000); + store + .put_blocks_blocking(vec![(prev_commit, vec![0xCC; 80])]) + .unwrap(); + + (0..500u32).for_each(|round| { + let new_commit = test_cid(5001 + round); + let new_mst_node = test_cid(6000 + round); + let old_mst_node = test_cid(7000 + round); + + store + .put_blocks_blocking(vec![ + (new_commit, vec![0xBB; 80]), + (new_mst_node, vec![0xCC; 60]), + (old_mst_node, vec![0xDD; 60]), + ]) + .unwrap(); + + store + .apply_commit_blocking(vec![], vec![prev_commit, old_mst_node]) + .unwrap(); + + if round > 0 { + let prev_mst = test_cid(6000 + round - 1); + store + .apply_commit_blocking(vec![], vec![prev_mst]) + .unwrap(); + } + + prev_commit = new_commit; + + if round % 2 == 0 { + compact_lowest_liveness(&store); + } + }); + + (0..300).for_each(|_| { + compact_by_liveness(&store); + std::thread::sleep(std::time::Duration::from_millis(1)); + }); + + shared_nodes.iter().for_each(|cid| { + assert!( + store.get_block_sync(cid).unwrap().is_some(), + "sanity: shared node present before drop" + ); + }); + + let rc = collect_refcounts(&store, &shared_nodes); + drop(store); + rc + }; + + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + verify_blocks_and_refcounts( + &store, + &shared_nodes, + Some(&refcounts_before), + "500 commits + 300 compact rounds", + ); + }); +} + +#[test] +fn extreme_file_churn_with_dedup_hits() { + with_runtime(|| { + let dir = tempfile::TempDir::new().unwrap(); + + let live_cids: Vec = (0..8u32).map(test_cid).collect(); + let live_data: Vec = vec![0xAA; 80]; + + let refcounts_before = { + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + + live_cids.iter().for_each(|cid| { + store + .put_blocks_blocking(vec![(*cid, live_data.clone())]) + .unwrap(); + }); + + (0..300u32).for_each(|round| { + let churn = test_cid(3000 + round); + store + .put_blocks_blocking(vec![(churn, vec![0xEE; 80])]) + .unwrap(); + store.apply_commit_blocking(vec![], vec![churn]).unwrap(); + + if round % 50 == 0 { + live_cids.iter().for_each(|cid| { + store + .put_blocks_blocking(vec![(*cid, live_data.clone())]) + .unwrap(); + }); + } + + if round % 2 == 0 { + compact_lowest_liveness(&store); + } + }); + + (0..200).for_each(|_| compact_by_liveness(&store)); + + let rc = collect_refcounts(&store, &live_cids); + drop(store); + rc + }; + + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + verify_blocks_and_refcounts( + &store, + &live_cids, + Some(&refcounts_before), + "300 churn + dedup re-puts + 200 compacts", + ); + }); +} + +#[test] +fn long_idle_compaction_only_phase() { + with_runtime(|| { + let dir = tempfile::TempDir::new().unwrap(); + + let live_cids: Vec = (0..20u32).map(test_cid).collect(); + + { + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + + live_cids.iter().for_each(|cid| { + store + .put_blocks_blocking(vec![(*cid, vec![0xAA; 80])]) + .unwrap(); + }); + + (0..100u32).for_each(|round| { + let churn = test_cid(4000 + round); + store + .put_blocks_blocking(vec![(churn, vec![0xFF; 80])]) + .unwrap(); + store.apply_commit_blocking(vec![], vec![churn]).unwrap(); + }); + + (0..500).for_each(|_| { + compact_by_liveness(&store); + std::thread::sleep(std::time::Duration::from_millis(1)); + }); + + live_cids.iter().for_each(|cid| { + assert!( + store.get_block_sync(cid).unwrap().is_some(), + "sanity: block present before drop" + ); + }); + + drop(store); + } + + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + verify_blocks_and_refcounts( + &store, + &live_cids, + None, + "idle with 500 compaction-only rounds", + ); + }); +} + +#[test] +fn multiple_restart_cycles_blockstore() { + with_runtime(|| { + let dir = tempfile::TempDir::new().unwrap(); + + let live_cids: Vec = (0..10u32).map(test_cid).collect(); + + { + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + live_cids.iter().for_each(|cid| { + store + .put_blocks_blocking(vec![(*cid, vec![0xAA; 80])]) + .unwrap(); + }); + (0..50u32).for_each(|round| { + let churn = test_cid(8000 + round); + store + .put_blocks_blocking(vec![(churn, vec![0xBB; 80])]) + .unwrap(); + store.apply_commit_blocking(vec![], vec![churn]).unwrap(); + }); + drop(store); + } + + (0..10u32).for_each(|cycle| { + { + let store = + TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + + (0..50u32).for_each(|round| { + let churn = test_cid(9000 + cycle * 100 + round); + store + .put_blocks_blocking(vec![(churn, vec![0xCC; 80])]) + .unwrap(); + store.apply_commit_blocking(vec![], vec![churn]).unwrap(); + compact_lowest_liveness(&store); + }); + + (0..50).for_each(|_| compact_by_liveness(&store)); + + live_cids.iter().for_each(|cid| { + assert!( + store.get_block_sync(cid).unwrap().is_some(), + "cycle {cycle}: block missing before drop" + ); + }); + + drop(store); + } + + let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(); + verify_blocks_and_refcounts( + &store, + &live_cids, + None, + &format!("blockstore restart cycle {cycle}"), + ); + }); + }); +} + +#[test] +fn full_stack_compaction_restart_preserves_refcounts() { + with_runtime(|| { + let base = tempfile::TempDir::new().unwrap(); + let base_dir = base.path().to_path_buf(); + + let live_cids: Vec = (0..15u32).map(test_cid).collect(); + + let refcounts_before = { + let stack = open_full_stack(&base_dir); + + live_cids.iter().for_each(|cid| { + stack + .blockstore + .put_blocks_blocking(vec![(*cid, vec![0xAA; 80])]) + .unwrap(); + }); + + (0..500u32).for_each(|round| { + let churn = test_cid(2000 + round); + stack + .blockstore + .put_blocks_blocking(vec![(churn, vec![0xDD; 80])]) + .unwrap(); + stack + .blockstore + .apply_commit_blocking(vec![], vec![churn]) + .unwrap(); + + if round % 3 == 0 { + compact_lowest_liveness(&stack.blockstore); + } + }); + + (0..200).for_each(|_| compact_by_liveness(&stack.blockstore)); + + let rc = collect_refcounts(&stack.blockstore, &live_cids); + + live_cids.iter().for_each(|cid| { + assert!( + stack.blockstore.get_block_sync(cid).unwrap().is_some(), + "sanity: block present before shutdown" + ); + }); + + close_full_stack(stack, &base_dir); + rc + }; + + let stack = open_full_stack(&base_dir); + verify_blocks_and_refcounts( + &stack.blockstore, + &live_cids, + Some(&refcounts_before), + "full stack restart", + ); + close_full_stack(stack, &base_dir); + }); +} + +#[test] +fn full_stack_multiple_restart_cycles() { + with_runtime(|| { + let base = tempfile::TempDir::new().unwrap(); + let base_dir = base.path().to_path_buf(); + + let live_cids: Vec = (0..10u32).map(test_cid).collect(); + + { + let stack = open_full_stack(&base_dir); + live_cids.iter().for_each(|cid| { + stack + .blockstore + .put_blocks_blocking(vec![(*cid, vec![0xAA; 80])]) + .unwrap(); + }); + close_full_stack(stack, &base_dir); + } + + (0..10u32).for_each(|cycle| { + let refcounts_before = { + let stack = open_full_stack(&base_dir); + + (0..50u32).for_each(|round| { + let churn = test_cid(5000 + cycle * 100 + round); + stack + .blockstore + .put_blocks_blocking(vec![(churn, vec![0xBB; 80])]) + .unwrap(); + stack + .blockstore + .apply_commit_blocking(vec![], vec![churn]) + .unwrap(); + compact_lowest_liveness(&stack.blockstore); + }); + + (0..30).for_each(|_| compact_by_liveness(&stack.blockstore)); + + let rc = collect_refcounts(&stack.blockstore, &live_cids); + + live_cids.iter().for_each(|cid| { + assert!( + stack.blockstore.get_block_sync(cid).unwrap().is_some(), + "cycle {cycle}: block missing before shutdown" + ); + }); + + close_full_stack(stack, &base_dir); + rc + }; + + let stack = open_full_stack(&base_dir); + verify_blocks_and_refcounts( + &stack.blockstore, + &live_cids, + Some(&refcounts_before), + &format!("full stack cycle {cycle}"), + ); + close_full_stack(stack, &base_dir); + }); + }); +} diff --git a/crates/tranquil-store/tests/mst_integration.rs b/crates/tranquil-store/tests/mst_integration.rs index 818ade0..fe4c907 100644 --- a/crates/tranquil-store/tests/mst_integration.rs +++ b/crates/tranquil-store/tests/mst_integration.rs @@ -268,12 +268,12 @@ async fn mst_create_update_delete_with_refcounts() { assert_eq!(&retrieved_a_v2.unwrap()[..], &record_a_v2); assert!( - store.has(&cid_a_v1).await.unwrap(), - "cid_a_v1 should still exist, tombstoned but not GC'd" + store.get(&cid_a_v1).await.unwrap().is_some(), + "cid_a_v1 data should still exist, tombstoned but not GC'd" ); assert!( - store.has(&cid_b).await.unwrap(), - "cid_b should still exist, tombstoned but not GC'd" + store.get(&cid_b).await.unwrap().is_some(), + "cid_b data should still exist, tombstoned but not GC'd" ); assert!( @@ -284,8 +284,8 @@ async fn mst_create_update_delete_with_refcounts() { assert_eq!(&retrieved_c[..], &record_c); assert!( - store.has(&cid_shared).await.unwrap(), - "shared-content block should still exist, tombstoned but not GC'd" + store.get(&cid_shared).await.unwrap().is_some(), + "shared-content block data should still exist, tombstoned but not GC'd" ); let loaded_mst = Mst::load(storage.clone(), mst_root_v2, None); diff --git a/crates/tranquil-store/tests/mst_refcount_integrity.rs b/crates/tranquil-store/tests/mst_refcount_integrity.rs new file mode 100644 index 0000000..8421ed7 --- /dev/null +++ b/crates/tranquil-store/tests/mst_refcount_integrity.rs @@ -0,0 +1,217 @@ +mod common; + +use std::sync::Arc; + +use cid::Cid; +use common::{compact_by_liveness, tiny_blockstore_config}; +use jacquard_repo::mst::Mst; +use jacquard_repo::storage::BlockStore; +use tranquil_store::blockstore::TranquilBlockStore; + +fn cid_to_fixed(cid: &Cid) -> [u8; 36] { + let bytes = cid.to_bytes(); + let mut arr = [0u8; 36]; + arr.copy_from_slice(&bytes[..36]); + arr +} + +fn make_record_bytes(seed: u32) -> Vec { + serde_ipld_dagcbor::to_vec(&serde_json::json!({ + "$type": "app.bsky.feed.post", + "text": format!("record {seed}"), + "createdAt": "2026-01-01T00:00:00Z" + })) + .unwrap() +} + +fn make_fake_commit_cid(counter: u32) -> Cid { + let data = format!("commit-{counter}"); + let mh = multihash::Multihash::wrap(0x12, &{ + use sha2::Digest; + sha2::Sha256::digest(data.as_bytes()) + }) + .unwrap(); + Cid::new_v1(0x71, mh) +} + +async fn compute_obsolete_from_diff( + old_mst: &Mst, + new_mst: &Mst, + old_commit_cid: Cid, +) -> Vec { + let diff = old_mst.diff(new_mst).await.unwrap(); + std::iter::once(old_commit_cid) + .chain(diff.removed_mst_blocks.into_iter()) + .chain(diff.removed_cids.into_iter()) + .collect() +} + +#[tokio::test] +async fn mst_shared_subtrees_survive_incremental_writes_compaction_restart() { + let dir = tempfile::TempDir::new().unwrap(); + + let mut commit_counter = 0u32; + let final_node_cids: Vec; + + { + let store = Arc::new(TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap()); + + let mut mst = Mst::new(store.clone()); + let mut root: Option = None; + let mut prev_commit = make_fake_commit_cid(commit_counter); + commit_counter += 1; + + for i in 0..30u32 { + let record_bytes = make_record_bytes(i); + let record_cid = store.put(&record_bytes).await.unwrap(); + let key = format!("app.bsky.feed.post/{i:06}"); + mst = match root { + None => mst.add(&key, record_cid).await.unwrap(), + Some(r) => { + let loaded = Mst::load(store.clone(), r, None); + loaded.add(&key, record_cid).await.unwrap() + } + }; + let new_root = mst.persist().await.unwrap(); + + if let Some(old_root) = root { + let old_settled = Mst::load(store.clone(), old_root, None); + let new_settled = Mst::load(store.clone(), new_root, None); + + let obsolete = + compute_obsolete_from_diff(&old_settled, &new_settled, prev_commit).await; + let obsolete_fixed: Vec<[u8; 36]> = + obsolete.iter().map(|c| cid_to_fixed(c)).collect(); + let s = store.clone(); + tokio::task::spawn_blocking(move || { + s.apply_commit_blocking(vec![], obsolete_fixed).unwrap(); + }) + .await + .unwrap(); + } + + root = Some(new_root); + prev_commit = make_fake_commit_cid(commit_counter); + commit_counter += 1; + + if i % 5 == 0 { + let s = store.clone(); + tokio::task::spawn_blocking(move || compact_by_liveness(&s)) + .await + .unwrap(); + } + } + + for i in 0..15u32 { + let record_bytes = make_record_bytes(1000 + i); + let record_cid = store.put(&record_bytes).await.unwrap(); + let key = format!("app.bsky.feed.like/{i:06}"); + let loaded = Mst::load(store.clone(), root.unwrap(), None); + mst = loaded.add(&key, record_cid).await.unwrap(); + let new_root = mst.persist().await.unwrap(); + + let old_settled = Mst::load(store.clone(), root.unwrap(), None); + let new_settled = Mst::load(store.clone(), new_root, None); + + let obsolete = + compute_obsolete_from_diff(&old_settled, &new_settled, prev_commit).await; + let obsolete_fixed: Vec<[u8; 36]> = + obsolete.iter().map(|c| cid_to_fixed(c)).collect(); + let s = store.clone(); + tokio::task::spawn_blocking(move || { + s.apply_commit_blocking(vec![], obsolete_fixed).unwrap(); + }) + .await + .unwrap(); + + root = Some(new_root); + prev_commit = make_fake_commit_cid(commit_counter); + commit_counter += 1; + + if i % 3 == 0 { + let s = store.clone(); + tokio::task::spawn_blocking(move || compact_by_liveness(&s)) + .await + .unwrap(); + } + } + + let final_settled = Mst::load(store.clone(), root.unwrap(), None); + final_node_cids = final_settled.collect_node_cids().await.unwrap(); + + final_node_cids.iter().for_each(|cid| { + let fixed = cid_to_fixed(cid); + let rc = store.block_index().get(&fixed).map(|e| e.refcount.raw()); + assert!( + rc.is_some_and(|r| r > 0), + "MST node {cid} has refcount {rc:?} before shutdown" + ); + }); + + let s = store.clone(); + tokio::task::spawn_blocking(move || { + (0..100).for_each(|_| compact_by_liveness(&s)); + }) + .await + .unwrap(); + + final_node_cids.iter().for_each(|cid| { + let fixed = cid_to_fixed(cid); + let block = store.get_block_sync(&fixed).unwrap(); + assert!( + block.is_some(), + "MST node {cid} missing after compaction before shutdown" + ); + }); + + drop(store); + } + + { + let store = Arc::new( + TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(), + ); + + let missing: Vec = final_node_cids + .iter() + .filter_map(|cid| { + let fixed = cid_to_fixed(cid); + match store.get_block_sync(&fixed) { + Ok(Some(_)) => None, + Ok(None) => { + let rc = store.block_index().get(&fixed).map(|e| e.refcount.raw()); + Some(format!("{cid} missing, index refcount {rc:?}")) + } + Err(e) => Some(format!("{cid} error: {e}")), + } + }) + .collect(); + + assert!( + missing.is_empty(), + "{} of {} MST nodes missing after reopen:\n{}", + missing.len(), + final_node_cids.len(), + missing.join("\n"), + ); + + let refcount_issues: Vec = final_node_cids + .iter() + .filter_map(|cid| { + let fixed = cid_to_fixed(cid); + let rc = store.block_index().get(&fixed).map(|e| e.refcount.raw()); + match rc { + Some(0) => Some(format!("{cid} refcount dropped to 0")), + None => Some(format!("{cid} not in index")), + _ => None, + } + }) + .collect(); + + assert!( + refcount_issues.is_empty(), + "MST nodes with bad refcounts after reopen:\n{}", + refcount_issues.join("\n"), + ); + } +}