fix(tranquil-store): exclude 0 refcount blocks from has()

Lewis: May this revision serve well! <lu5a@proton.me>
This commit is contained in:
Lewis
2026-04-13 20:22:18 +03:00
parent 8ccdd30cb3
commit d51bfd59da
9 changed files with 1061 additions and 30 deletions

View File

@@ -72,6 +72,10 @@ test-group = "io-heavy-sim"
filter = "test(/test_scale_/) | test(/full_backup_and_restore/)"
slow-timeout = { period = "120s", terminate-after = 4 }
[[profile.default.overrides]]
filter = "binary(compaction_restart) | binary(mst_refcount_integrity) | binary(gc_compaction_restart)"
slow-timeout = { period = "120s", terminate-after = 4 }
[[profile.ci.overrides]]
filter = "test(/import_with_verification/) | test(/plc_migration/)"
test-group = "serial-env-tests"

44
Cargo.lock generated
View File

@@ -7405,7 +7405,7 @@ dependencies = [
[[package]]
name = "tranquil-api"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"anyhow",
"axum",
@@ -7456,7 +7456,7 @@ dependencies = [
[[package]]
name = "tranquil-auth"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"anyhow",
"base32",
@@ -7479,7 +7479,7 @@ dependencies = [
[[package]]
name = "tranquil-cache"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -7493,7 +7493,7 @@ dependencies = [
[[package]]
name = "tranquil-comms"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -7511,7 +7511,7 @@ dependencies = [
[[package]]
name = "tranquil-config"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"confique",
"serde",
@@ -7519,7 +7519,7 @@ dependencies = [
[[package]]
name = "tranquil-crypto"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"aes-gcm",
"base64 0.22.1",
@@ -7535,7 +7535,7 @@ dependencies = [
[[package]]
name = "tranquil-db"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"async-trait",
"chrono",
@@ -7552,7 +7552,7 @@ dependencies = [
[[package]]
name = "tranquil-db-traits"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -7568,7 +7568,7 @@ dependencies = [
[[package]]
name = "tranquil-infra"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"async-trait",
"bytes",
@@ -7579,7 +7579,7 @@ dependencies = [
[[package]]
name = "tranquil-lexicon"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"chrono",
"hickory-resolver",
@@ -7597,7 +7597,7 @@ dependencies = [
[[package]]
name = "tranquil-oauth"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"anyhow",
"axum",
@@ -7620,7 +7620,7 @@ dependencies = [
[[package]]
name = "tranquil-oauth-server"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"axum",
"base64 0.22.1",
@@ -7653,7 +7653,7 @@ dependencies = [
[[package]]
name = "tranquil-pds"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"aes-gcm",
"anyhow",
@@ -7745,7 +7745,7 @@ dependencies = [
[[package]]
name = "tranquil-repo"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"bytes",
"cid",
@@ -7757,7 +7757,7 @@ dependencies = [
[[package]]
name = "tranquil-ripple"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"async-trait",
"backon",
@@ -7782,7 +7782,7 @@ dependencies = [
[[package]]
name = "tranquil-scopes"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"axum",
"futures",
@@ -7798,7 +7798,7 @@ dependencies = [
[[package]]
name = "tranquil-server"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"axum",
"clap",
@@ -7819,7 +7819,7 @@ dependencies = [
[[package]]
name = "tranquil-signal"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"async-trait",
"chrono",
@@ -7842,7 +7842,7 @@ dependencies = [
[[package]]
name = "tranquil-storage"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"async-trait",
"aws-config",
@@ -7859,7 +7859,7 @@ dependencies = [
[[package]]
name = "tranquil-store"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"async-trait",
"bytes",
@@ -7906,7 +7906,7 @@ dependencies = [
[[package]]
name = "tranquil-sync"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"anyhow",
"axum",
@@ -7928,7 +7928,7 @@ dependencies = [
[[package]]
name = "tranquil-types"
version = "0.5.3"
version = "0.5.4"
dependencies = [
"chrono",
"cid",

View File

@@ -26,7 +26,7 @@ members = [
]
[workspace.package]
version = "0.5.3"
version = "0.5.4"
edition = "2024"
license = "AGPL-3.0-or-later"

View File

@@ -0,0 +1,175 @@
mod common;
use chrono::Utc;
use common::*;
use reqwest::StatusCode;
use serde_json::{Value, json};
fn run_compaction(store: &tranquil_store::blockstore::TranquilBlockStore) {
let liveness = store.compaction_liveness(0).unwrap();
liveness
.iter()
.filter(|(_, info)| info.total_blocks > 0 && info.ratio() < 0.95)
.map(|(&fid, _)| fid)
.collect::<Vec<_>>()
.into_iter()
.for_each(|fid| {
match store.compact_file(fid, 0) {
Ok(_) => {}
Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {}
Err(e) => eprintln!("compaction: {e}"),
}
});
}
#[tokio::test]
async fn mst_blocks_survive_full_store_reopen() {
if !is_store_backend() {
eprintln!("skipping: only meaningful with tranquil-store backend");
return;
}
let client = client();
let base = base_url().await;
let block_store = get_test_block_store().await;
let store = block_store
.as_tranquil_store()
.expect("expected tranquil-store backend");
let (jwt, did) = create_account_and_login(&client).await;
let mut posts = Vec::new();
for i in 0..30 {
let res = client
.post(format!("{base}/xrpc/com.atproto.repo.createRecord"))
.bearer_auth(&jwt)
.json(&json!({
"repo": did,
"collection": "app.bsky.feed.post",
"record": {
"$type": "app.bsky.feed.post",
"text": format!("compaction test post {i}"),
"createdAt": Utc::now().to_rfc3339()
}
}))
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.unwrap();
posts.push((
body["uri"].as_str().unwrap().to_string(),
body["cid"].as_str().unwrap().to_string(),
));
}
for (uri, cid) in &posts[..20] {
let res = client
.post(format!("{base}/xrpc/com.atproto.repo.createRecord"))
.bearer_auth(&jwt)
.json(&json!({
"repo": did,
"collection": "app.bsky.feed.like",
"record": {
"$type": "app.bsky.feed.like",
"subject": { "uri": uri, "cid": cid },
"createdAt": Utc::now().to_rfc3339()
}
}))
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK, "like failed for {uri}");
}
let data_dir = store.data_dir().to_path_buf();
let index_dir = data_dir
.parent()
.unwrap()
.join("index");
let store_clone = store.clone();
tokio::task::spawn_blocking(move || {
(0..40).for_each(|_| run_compaction(&store_clone));
})
.await
.unwrap();
let repo_root_str: String = get_test_repos()
.await
.repo
.get_repo_root_by_did(&tranquil_types::Did::new(did.clone()).unwrap())
.await
.expect("db error")
.expect("no repo root")
.to_string();
let head_cid = cid::Cid::try_from(repo_root_str.as_str()).expect("invalid cid");
let car_blocks =
tranquil_pds::scheduled::collect_current_repo_blocks(block_store, &head_cid)
.await
.expect("collect blocks");
let block_count_before = car_blocks.len();
let max_file_size = store
.list_data_files()
.ok()
.and_then(|_| Some(4 * 1024 * 1024u64))
.unwrap_or(4 * 1024 * 1024);
let reopened_missing = tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
let _guard = rt.enter();
let config = tranquil_store::blockstore::BlockStoreConfig {
data_dir: data_dir.clone(),
index_dir,
max_file_size,
group_commit: tranquil_store::blockstore::GroupCommitConfig::default(),
shard_count: 1,
};
let fresh = tranquil_store::blockstore::TranquilBlockStore::open(config)
.expect("reopen failed");
let missing: Vec<String> = car_blocks
.iter()
.filter_map(|cid_bytes| {
if cid_bytes.len() < 36 {
return None;
}
let mut arr = [0u8; 36];
arr.copy_from_slice(&cid_bytes[..36]);
match fresh.get_block_sync(&arr) {
Ok(Some(_)) => None,
Ok(None) => Some(format!(
"missing {}",
cid::Cid::try_from(cid_bytes.as_slice())
.map(|c| c.to_string())
.unwrap_or_else(|_| hex::encode(cid_bytes))
)),
Err(e) => Some(format!("error: {e}")),
}
})
.collect();
drop(fresh);
missing
})
.await
.unwrap();
assert!(
reopened_missing.is_empty(),
"{} of {block_count_before} blocks missing after blockstore reopen:\n{}",
reopened_missing.len(),
reopened_missing
.iter()
.take(20)
.map(|s| s.as_str())
.collect::<Vec<_>>()
.join("\n"),
);
}

View File

@@ -215,6 +215,10 @@ impl HashTable {
self.get(cid).is_some()
}
pub fn contains_live(&self, cid: &[u8; CID_SIZE]) -> bool {
self.get(cid).is_some_and(|s| !s.refcount.is_zero())
}
pub fn insert(&mut self, new_slot: Slot) -> Result<Option<Slot>, CapacityExhausted> {
if is_empty(&new_slot.cid) {
tracing::error!("attempted to insert all-zero CID into hash table");
@@ -1195,7 +1199,7 @@ impl BlockIndex {
}
pub fn has(&self, cid: &[u8; CID_SIZE]) -> bool {
self.table.read().contains(cid)
self.table.read().contains_live(cid)
}
pub fn batch_put(

View File

@@ -104,6 +104,70 @@ pub fn compact_all_sealed(store: &TranquilBlockStore) {
});
}
pub fn tiny_blockstore_config(dir: &std::path::Path) -> BlockStoreConfig {
BlockStoreConfig {
data_dir: dir.join("data"),
index_dir: dir.join("index"),
max_file_size: 300,
group_commit: GroupCommitConfig {
checkpoint_interval_ms: 100,
checkpoint_write_threshold: 10,
..GroupCommitConfig::default()
},
shard_count: 1,
}
}
pub fn compact_by_liveness(store: &TranquilBlockStore) {
let liveness = store.compaction_liveness(0).unwrap();
liveness
.iter()
.filter(|(_, info)| info.total_blocks > 0 && info.ratio() < 0.99)
.map(|(&fid, _)| fid)
.collect::<Vec<_>>()
.into_iter()
.for_each(|fid| match store.compact_file(fid, 0) {
Ok(_) => {}
Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {}
Err(e) => eprintln!("compaction: {e}"),
});
}
pub fn compact_lowest_liveness(store: &TranquilBlockStore) {
let liveness = store.compaction_liveness(0).unwrap();
let candidate = liveness
.iter()
.filter(|(_, info)| info.total_blocks > 0 && info.ratio() < 0.99)
.min_by(|(_, a), (_, b)| {
a.ratio()
.partial_cmp(&b.ratio())
.unwrap_or(std::cmp::Ordering::Equal)
})
.map(|(&fid, _)| fid);
if let Some(fid) = candidate {
match store.compact_file(fid, 0) {
Ok(_) => {}
Err(tranquil_store::blockstore::CompactionError::ActiveFileCannotBeCompacted) => {}
Err(e) => eprintln!("compaction: {e}"),
}
}
}
pub fn collect_refcounts(store: &TranquilBlockStore, cids: &[CidBytes]) -> Vec<(u32, u32)> {
cids.iter()
.map(|cid| {
let seed = u32::from_le_bytes([cid[4], cid[5], cid[6], cid[7]]);
let rc = store
.block_index()
.get(cid)
.map(|e| e.refcount.raw())
.unwrap_or(0);
(seed, rc)
})
.collect()
}
pub struct TestStores {
pub blockstore: TranquilBlockStore,
pub eventlog: Arc<EventLog<RealIO>>,

View File

@@ -0,0 +1,567 @@
mod common;
use std::path::Path;
use std::sync::Arc;
use common::{
collect_refcounts, compact_by_liveness, compact_lowest_liveness, test_cid,
tiny_blockstore_config, with_runtime,
};
use tranquil_store::RealIO;
use tranquil_store::blockstore::{CidBytes, TranquilBlockStore};
use tranquil_store::eventlog::{EventLog, EventLogBridge, EventLogConfig};
use tranquil_store::metastore::handler::HandlerPool;
use tranquil_store::metastore::partitions::Partition;
use tranquil_store::metastore::{Metastore, MetastoreConfig};
struct FullStack {
blockstore: TranquilBlockStore,
_pool: Arc<HandlerPool>,
_event_log: Arc<EventLog<RealIO>>,
}
fn open_full_stack(base_dir: &Path) -> FullStack {
let metastore_dir = base_dir.join("metastore");
let segments_dir = base_dir.join("eventlog").join("segments");
let blockstore_data = base_dir.join("blockstore").join("data");
let blockstore_index = base_dir.join("blockstore").join("index");
[&metastore_dir, &segments_dir, &blockstore_data, &blockstore_index]
.iter()
.for_each(|d| std::fs::create_dir_all(d).unwrap());
let metastore = Metastore::open(&metastore_dir, MetastoreConfig::default()).unwrap();
let blockstore = TranquilBlockStore::open(tranquil_store::blockstore::BlockStoreConfig {
data_dir: blockstore_data,
index_dir: blockstore_index,
max_file_size: 512,
group_commit: tranquil_store::blockstore::GroupCommitConfig::default(),
shard_count: 1,
})
.unwrap();
let event_log = Arc::new(
EventLog::open(
EventLogConfig {
segments_dir,
..EventLogConfig::default()
},
RealIO::new(),
)
.unwrap(),
);
let bridge = Arc::new(EventLogBridge::new(Arc::clone(&event_log)));
let was_clean = tranquil_store::consistency::had_clean_shutdown(base_dir);
tranquil_store::consistency::remove_clean_shutdown_marker(base_dir).ok();
let indexes = metastore.partition(Partition::Indexes).clone();
let event_ops = metastore.event_ops(Arc::clone(&bridge));
let recovered = event_ops
.recover_metastore_mutations(&indexes)
.unwrap();
if recovered > 0 {
eprintln!("replayed {recovered} metastore mutations from eventlog");
}
if !was_clean || recovered > 0 {
let report = tranquil_store::consistency::verify_store_consistency(
&blockstore,
&metastore,
&event_log,
);
report.log_findings();
if report.has_repairable_issues() {
let repair = tranquil_store::consistency::repair_known_issues(&blockstore, &report);
if repair.orphan_files_removed > 0 {
eprintln!("removed {} orphan files", repair.orphan_files_removed);
}
}
}
let pool = Arc::new(HandlerPool::spawn::<RealIO>(
metastore,
bridge,
Some(blockstore.clone()),
None,
));
FullStack {
blockstore,
_pool: pool,
_event_log: event_log,
}
}
fn close_full_stack(stack: FullStack, base_dir: &Path) {
let rt = tokio::runtime::Handle::current();
rt.block_on(stack._pool.close());
if let Err(e) = stack._event_log.shutdown() {
eprintln!("eventlog shutdown: {e}");
}
tranquil_store::consistency::write_clean_shutdown_marker(base_dir).ok();
drop(stack.blockstore);
}
fn verify_blocks_and_refcounts(
store: &TranquilBlockStore,
live_cids: &[CidBytes],
expected_refcounts: Option<&[(u32, u32)]>,
label: &str,
) {
let missing: Vec<u32> = live_cids
.iter()
.filter(|cid| store.get_block_sync(cid).unwrap().is_none())
.map(|cid| u32::from_le_bytes([cid[4], cid[5], cid[6], cid[7]]))
.collect();
assert!(
missing.is_empty(),
"{label}: live blocks missing after reopen: {missing:?}"
);
match expected_refcounts {
Some(expected) => {
let actual = collect_refcounts(store, live_cids);
let mismatches: Vec<_> = expected
.iter()
.zip(actual.iter())
.filter(|((_, exp_rc), (_, act_rc))| exp_rc != act_rc)
.map(|((seed, exp), (_, act))| format!("seed {seed}: before={exp} after={act}"))
.collect();
assert!(
mismatches.is_empty(),
"{label}: refcounts changed across reopen:\n{}",
mismatches.join("\n"),
);
}
None => {
live_cids.iter().for_each(|cid| {
let rc = store
.block_index()
.get(cid)
.map(|e| e.refcount.raw())
.unwrap_or(0);
assert!(
rc > 0,
"{label}: refcount dropped to 0 for seed {}",
u32::from_le_bytes([cid[4], cid[5], cid[6], cid[7]])
);
});
}
}
}
#[test]
fn hundreds_of_compaction_cycles() {
with_runtime(|| {
let dir = tempfile::TempDir::new().unwrap();
let live_cids: Vec<CidBytes> = (0..15u32).map(test_cid).collect();
{
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
live_cids.iter().for_each(|cid| {
store
.put_blocks_blocking(vec![(*cid, vec![0xAA; 80])])
.unwrap();
});
(0..500u32).for_each(|round| {
let churn = test_cid(2000 + round);
store
.put_blocks_blocking(vec![(churn, vec![0xDD; 80])])
.unwrap();
store.apply_commit_blocking(vec![], vec![churn]).unwrap();
if round % 3 == 0 {
compact_lowest_liveness(&store);
}
});
(0..200).for_each(|_| compact_by_liveness(&store));
live_cids.iter().for_each(|cid| {
assert!(
store.get_block_sync(cid).unwrap().is_some(),
"sanity: block present before drop"
);
});
drop(store);
}
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
verify_blocks_and_refcounts(&store, &live_cids, None, "500 churn + 200 compact rounds");
});
}
#[test]
fn commit_style_decrements() {
with_runtime(|| {
let dir = tempfile::TempDir::new().unwrap();
let shared_nodes: Vec<CidBytes> = (0..10u32).map(test_cid).collect();
let refcounts_before = {
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
shared_nodes.iter().for_each(|cid| {
store
.put_blocks_blocking(vec![(*cid, vec![0xAA; 80])])
.unwrap();
});
let mut prev_commit = test_cid(5000);
store
.put_blocks_blocking(vec![(prev_commit, vec![0xCC; 80])])
.unwrap();
(0..500u32).for_each(|round| {
let new_commit = test_cid(5001 + round);
let new_mst_node = test_cid(6000 + round);
let old_mst_node = test_cid(7000 + round);
store
.put_blocks_blocking(vec![
(new_commit, vec![0xBB; 80]),
(new_mst_node, vec![0xCC; 60]),
(old_mst_node, vec![0xDD; 60]),
])
.unwrap();
store
.apply_commit_blocking(vec![], vec![prev_commit, old_mst_node])
.unwrap();
if round > 0 {
let prev_mst = test_cid(6000 + round - 1);
store
.apply_commit_blocking(vec![], vec![prev_mst])
.unwrap();
}
prev_commit = new_commit;
if round % 2 == 0 {
compact_lowest_liveness(&store);
}
});
(0..300).for_each(|_| {
compact_by_liveness(&store);
std::thread::sleep(std::time::Duration::from_millis(1));
});
shared_nodes.iter().for_each(|cid| {
assert!(
store.get_block_sync(cid).unwrap().is_some(),
"sanity: shared node present before drop"
);
});
let rc = collect_refcounts(&store, &shared_nodes);
drop(store);
rc
};
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
verify_blocks_and_refcounts(
&store,
&shared_nodes,
Some(&refcounts_before),
"500 commits + 300 compact rounds",
);
});
}
#[test]
fn extreme_file_churn_with_dedup_hits() {
with_runtime(|| {
let dir = tempfile::TempDir::new().unwrap();
let live_cids: Vec<CidBytes> = (0..8u32).map(test_cid).collect();
let live_data: Vec<u8> = vec![0xAA; 80];
let refcounts_before = {
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
live_cids.iter().for_each(|cid| {
store
.put_blocks_blocking(vec![(*cid, live_data.clone())])
.unwrap();
});
(0..300u32).for_each(|round| {
let churn = test_cid(3000 + round);
store
.put_blocks_blocking(vec![(churn, vec![0xEE; 80])])
.unwrap();
store.apply_commit_blocking(vec![], vec![churn]).unwrap();
if round % 50 == 0 {
live_cids.iter().for_each(|cid| {
store
.put_blocks_blocking(vec![(*cid, live_data.clone())])
.unwrap();
});
}
if round % 2 == 0 {
compact_lowest_liveness(&store);
}
});
(0..200).for_each(|_| compact_by_liveness(&store));
let rc = collect_refcounts(&store, &live_cids);
drop(store);
rc
};
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
verify_blocks_and_refcounts(
&store,
&live_cids,
Some(&refcounts_before),
"300 churn + dedup re-puts + 200 compacts",
);
});
}
#[test]
fn long_idle_compaction_only_phase() {
with_runtime(|| {
let dir = tempfile::TempDir::new().unwrap();
let live_cids: Vec<CidBytes> = (0..20u32).map(test_cid).collect();
{
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
live_cids.iter().for_each(|cid| {
store
.put_blocks_blocking(vec![(*cid, vec![0xAA; 80])])
.unwrap();
});
(0..100u32).for_each(|round| {
let churn = test_cid(4000 + round);
store
.put_blocks_blocking(vec![(churn, vec![0xFF; 80])])
.unwrap();
store.apply_commit_blocking(vec![], vec![churn]).unwrap();
});
(0..500).for_each(|_| {
compact_by_liveness(&store);
std::thread::sleep(std::time::Duration::from_millis(1));
});
live_cids.iter().for_each(|cid| {
assert!(
store.get_block_sync(cid).unwrap().is_some(),
"sanity: block present before drop"
);
});
drop(store);
}
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
verify_blocks_and_refcounts(
&store,
&live_cids,
None,
"idle with 500 compaction-only rounds",
);
});
}
#[test]
fn multiple_restart_cycles_blockstore() {
with_runtime(|| {
let dir = tempfile::TempDir::new().unwrap();
let live_cids: Vec<CidBytes> = (0..10u32).map(test_cid).collect();
{
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
live_cids.iter().for_each(|cid| {
store
.put_blocks_blocking(vec![(*cid, vec![0xAA; 80])])
.unwrap();
});
(0..50u32).for_each(|round| {
let churn = test_cid(8000 + round);
store
.put_blocks_blocking(vec![(churn, vec![0xBB; 80])])
.unwrap();
store.apply_commit_blocking(vec![], vec![churn]).unwrap();
});
drop(store);
}
(0..10u32).for_each(|cycle| {
{
let store =
TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
(0..50u32).for_each(|round| {
let churn = test_cid(9000 + cycle * 100 + round);
store
.put_blocks_blocking(vec![(churn, vec![0xCC; 80])])
.unwrap();
store.apply_commit_blocking(vec![], vec![churn]).unwrap();
compact_lowest_liveness(&store);
});
(0..50).for_each(|_| compact_by_liveness(&store));
live_cids.iter().for_each(|cid| {
assert!(
store.get_block_sync(cid).unwrap().is_some(),
"cycle {cycle}: block missing before drop"
);
});
drop(store);
}
let store = TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap();
verify_blocks_and_refcounts(
&store,
&live_cids,
None,
&format!("blockstore restart cycle {cycle}"),
);
});
});
}
#[test]
fn full_stack_compaction_restart_preserves_refcounts() {
with_runtime(|| {
let base = tempfile::TempDir::new().unwrap();
let base_dir = base.path().to_path_buf();
let live_cids: Vec<CidBytes> = (0..15u32).map(test_cid).collect();
let refcounts_before = {
let stack = open_full_stack(&base_dir);
live_cids.iter().for_each(|cid| {
stack
.blockstore
.put_blocks_blocking(vec![(*cid, vec![0xAA; 80])])
.unwrap();
});
(0..500u32).for_each(|round| {
let churn = test_cid(2000 + round);
stack
.blockstore
.put_blocks_blocking(vec![(churn, vec![0xDD; 80])])
.unwrap();
stack
.blockstore
.apply_commit_blocking(vec![], vec![churn])
.unwrap();
if round % 3 == 0 {
compact_lowest_liveness(&stack.blockstore);
}
});
(0..200).for_each(|_| compact_by_liveness(&stack.blockstore));
let rc = collect_refcounts(&stack.blockstore, &live_cids);
live_cids.iter().for_each(|cid| {
assert!(
stack.blockstore.get_block_sync(cid).unwrap().is_some(),
"sanity: block present before shutdown"
);
});
close_full_stack(stack, &base_dir);
rc
};
let stack = open_full_stack(&base_dir);
verify_blocks_and_refcounts(
&stack.blockstore,
&live_cids,
Some(&refcounts_before),
"full stack restart",
);
close_full_stack(stack, &base_dir);
});
}
#[test]
fn full_stack_multiple_restart_cycles() {
with_runtime(|| {
let base = tempfile::TempDir::new().unwrap();
let base_dir = base.path().to_path_buf();
let live_cids: Vec<CidBytes> = (0..10u32).map(test_cid).collect();
{
let stack = open_full_stack(&base_dir);
live_cids.iter().for_each(|cid| {
stack
.blockstore
.put_blocks_blocking(vec![(*cid, vec![0xAA; 80])])
.unwrap();
});
close_full_stack(stack, &base_dir);
}
(0..10u32).for_each(|cycle| {
let refcounts_before = {
let stack = open_full_stack(&base_dir);
(0..50u32).for_each(|round| {
let churn = test_cid(5000 + cycle * 100 + round);
stack
.blockstore
.put_blocks_blocking(vec![(churn, vec![0xBB; 80])])
.unwrap();
stack
.blockstore
.apply_commit_blocking(vec![], vec![churn])
.unwrap();
compact_lowest_liveness(&stack.blockstore);
});
(0..30).for_each(|_| compact_by_liveness(&stack.blockstore));
let rc = collect_refcounts(&stack.blockstore, &live_cids);
live_cids.iter().for_each(|cid| {
assert!(
stack.blockstore.get_block_sync(cid).unwrap().is_some(),
"cycle {cycle}: block missing before shutdown"
);
});
close_full_stack(stack, &base_dir);
rc
};
let stack = open_full_stack(&base_dir);
verify_blocks_and_refcounts(
&stack.blockstore,
&live_cids,
Some(&refcounts_before),
&format!("full stack cycle {cycle}"),
);
close_full_stack(stack, &base_dir);
});
});
}

View File

@@ -268,12 +268,12 @@ async fn mst_create_update_delete_with_refcounts() {
assert_eq!(&retrieved_a_v2.unwrap()[..], &record_a_v2);
assert!(
store.has(&cid_a_v1).await.unwrap(),
"cid_a_v1 should still exist, tombstoned but not GC'd"
store.get(&cid_a_v1).await.unwrap().is_some(),
"cid_a_v1 data should still exist, tombstoned but not GC'd"
);
assert!(
store.has(&cid_b).await.unwrap(),
"cid_b should still exist, tombstoned but not GC'd"
store.get(&cid_b).await.unwrap().is_some(),
"cid_b data should still exist, tombstoned but not GC'd"
);
assert!(
@@ -284,8 +284,8 @@ async fn mst_create_update_delete_with_refcounts() {
assert_eq!(&retrieved_c[..], &record_c);
assert!(
store.has(&cid_shared).await.unwrap(),
"shared-content block should still exist, tombstoned but not GC'd"
store.get(&cid_shared).await.unwrap().is_some(),
"shared-content block data should still exist, tombstoned but not GC'd"
);
let loaded_mst = Mst::load(storage.clone(), mst_root_v2, None);

View File

@@ -0,0 +1,217 @@
mod common;
use std::sync::Arc;
use cid::Cid;
use common::{compact_by_liveness, tiny_blockstore_config};
use jacquard_repo::mst::Mst;
use jacquard_repo::storage::BlockStore;
use tranquil_store::blockstore::TranquilBlockStore;
fn cid_to_fixed(cid: &Cid) -> [u8; 36] {
let bytes = cid.to_bytes();
let mut arr = [0u8; 36];
arr.copy_from_slice(&bytes[..36]);
arr
}
fn make_record_bytes(seed: u32) -> Vec<u8> {
serde_ipld_dagcbor::to_vec(&serde_json::json!({
"$type": "app.bsky.feed.post",
"text": format!("record {seed}"),
"createdAt": "2026-01-01T00:00:00Z"
}))
.unwrap()
}
fn make_fake_commit_cid(counter: u32) -> Cid {
let data = format!("commit-{counter}");
let mh = multihash::Multihash::wrap(0x12, &{
use sha2::Digest;
sha2::Sha256::digest(data.as_bytes())
})
.unwrap();
Cid::new_v1(0x71, mh)
}
async fn compute_obsolete_from_diff<S: BlockStore + Sync + Send + 'static>(
old_mst: &Mst<S>,
new_mst: &Mst<S>,
old_commit_cid: Cid,
) -> Vec<Cid> {
let diff = old_mst.diff(new_mst).await.unwrap();
std::iter::once(old_commit_cid)
.chain(diff.removed_mst_blocks.into_iter())
.chain(diff.removed_cids.into_iter())
.collect()
}
#[tokio::test]
async fn mst_shared_subtrees_survive_incremental_writes_compaction_restart() {
let dir = tempfile::TempDir::new().unwrap();
let mut commit_counter = 0u32;
let final_node_cids: Vec<Cid>;
{
let store = Arc::new(TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap());
let mut mst = Mst::new(store.clone());
let mut root: Option<Cid> = None;
let mut prev_commit = make_fake_commit_cid(commit_counter);
commit_counter += 1;
for i in 0..30u32 {
let record_bytes = make_record_bytes(i);
let record_cid = store.put(&record_bytes).await.unwrap();
let key = format!("app.bsky.feed.post/{i:06}");
mst = match root {
None => mst.add(&key, record_cid).await.unwrap(),
Some(r) => {
let loaded = Mst::load(store.clone(), r, None);
loaded.add(&key, record_cid).await.unwrap()
}
};
let new_root = mst.persist().await.unwrap();
if let Some(old_root) = root {
let old_settled = Mst::load(store.clone(), old_root, None);
let new_settled = Mst::load(store.clone(), new_root, None);
let obsolete =
compute_obsolete_from_diff(&old_settled, &new_settled, prev_commit).await;
let obsolete_fixed: Vec<[u8; 36]> =
obsolete.iter().map(|c| cid_to_fixed(c)).collect();
let s = store.clone();
tokio::task::spawn_blocking(move || {
s.apply_commit_blocking(vec![], obsolete_fixed).unwrap();
})
.await
.unwrap();
}
root = Some(new_root);
prev_commit = make_fake_commit_cid(commit_counter);
commit_counter += 1;
if i % 5 == 0 {
let s = store.clone();
tokio::task::spawn_blocking(move || compact_by_liveness(&s))
.await
.unwrap();
}
}
for i in 0..15u32 {
let record_bytes = make_record_bytes(1000 + i);
let record_cid = store.put(&record_bytes).await.unwrap();
let key = format!("app.bsky.feed.like/{i:06}");
let loaded = Mst::load(store.clone(), root.unwrap(), None);
mst = loaded.add(&key, record_cid).await.unwrap();
let new_root = mst.persist().await.unwrap();
let old_settled = Mst::load(store.clone(), root.unwrap(), None);
let new_settled = Mst::load(store.clone(), new_root, None);
let obsolete =
compute_obsolete_from_diff(&old_settled, &new_settled, prev_commit).await;
let obsolete_fixed: Vec<[u8; 36]> =
obsolete.iter().map(|c| cid_to_fixed(c)).collect();
let s = store.clone();
tokio::task::spawn_blocking(move || {
s.apply_commit_blocking(vec![], obsolete_fixed).unwrap();
})
.await
.unwrap();
root = Some(new_root);
prev_commit = make_fake_commit_cid(commit_counter);
commit_counter += 1;
if i % 3 == 0 {
let s = store.clone();
tokio::task::spawn_blocking(move || compact_by_liveness(&s))
.await
.unwrap();
}
}
let final_settled = Mst::load(store.clone(), root.unwrap(), None);
final_node_cids = final_settled.collect_node_cids().await.unwrap();
final_node_cids.iter().for_each(|cid| {
let fixed = cid_to_fixed(cid);
let rc = store.block_index().get(&fixed).map(|e| e.refcount.raw());
assert!(
rc.is_some_and(|r| r > 0),
"MST node {cid} has refcount {rc:?} before shutdown"
);
});
let s = store.clone();
tokio::task::spawn_blocking(move || {
(0..100).for_each(|_| compact_by_liveness(&s));
})
.await
.unwrap();
final_node_cids.iter().for_each(|cid| {
let fixed = cid_to_fixed(cid);
let block = store.get_block_sync(&fixed).unwrap();
assert!(
block.is_some(),
"MST node {cid} missing after compaction before shutdown"
);
});
drop(store);
}
{
let store = Arc::new(
TranquilBlockStore::open(tiny_blockstore_config(dir.path())).unwrap(),
);
let missing: Vec<String> = final_node_cids
.iter()
.filter_map(|cid| {
let fixed = cid_to_fixed(cid);
match store.get_block_sync(&fixed) {
Ok(Some(_)) => None,
Ok(None) => {
let rc = store.block_index().get(&fixed).map(|e| e.refcount.raw());
Some(format!("{cid} missing, index refcount {rc:?}"))
}
Err(e) => Some(format!("{cid} error: {e}")),
}
})
.collect();
assert!(
missing.is_empty(),
"{} of {} MST nodes missing after reopen:\n{}",
missing.len(),
final_node_cids.len(),
missing.join("\n"),
);
let refcount_issues: Vec<String> = final_node_cids
.iter()
.filter_map(|cid| {
let fixed = cid_to_fixed(cid);
let rc = store.block_index().get(&fixed).map(|e| e.refcount.raw());
match rc {
Some(0) => Some(format!("{cid} refcount dropped to 0")),
None => Some(format!("{cid} not in index")),
_ => None,
}
})
.collect();
assert!(
refcount_issues.is_empty(),
"MST nodes with bad refcounts after reopen:\n{}",
refcount_issues.join("\n"),
);
}
}