diff --git a/crates/tranquil-pds/tests/mst_integrity.rs b/crates/tranquil-pds/tests/mst_integrity.rs new file mode 100644 index 0000000..d4edd7a --- /dev/null +++ b/crates/tranquil-pds/tests/mst_integrity.rs @@ -0,0 +1,334 @@ +mod common; +mod firehose; +mod helpers; + +use std::collections::BTreeMap; +use std::io::Cursor; +use std::sync::Arc; +use std::time::Duration; + +use bytes::Bytes; +use cid::Cid; +use common::*; +use firehose::FirehoseConsumer; +use helpers::build_car_with_signature; +use iroh_car::CarReader; +use jacquard_repo::commit::Commit; +use jacquard_repo::mst::Mst; +use jacquard_repo::storage::{BlockStore, MemoryBlockStore}; +use k256::ecdsa::SigningKey; +use reqwest::StatusCode; +use serde_json::{Value, json}; +use tranquil_db_traits::{EventBlocks, RepoEventType, SequenceNumber, SequencedEvent}; +use tranquil_scopes::RepoAction; +use tranquil_types::Did; + +async fn car_to_blocks(car_bytes: &[u8]) -> (Vec, BTreeMap) { + let mut reader = CarReader::new(Cursor::new(car_bytes)) + .await + .expect("parse CAR"); + let roots = reader.header().roots().to_vec(); + let mut blocks = BTreeMap::new(); + while let Ok(Some((cid, data))) = reader.next_block().await { + blocks.insert(cid, Bytes::from(data)); + } + (roots, blocks) +} + +async fn create_post(client: &reqwest::Client, token: &str, did: &str, rkey: &str, text: &str) { + let now = chrono::Utc::now().to_rfc3339(); + let res = client + .post(format!( + "{}/xrpc/com.atproto.repo.createRecord", + base_url().await + )) + .bearer_auth(token) + .json(&json!({ + "repo": did, + "collection": "app.bsky.feed.post", + "rkey": rkey, + "record": { + "$type": "app.bsky.feed.post", + "text": text, + "createdAt": now, + } + })) + .send() + .await + .expect("createRecord"); + assert_eq!(res.status(), StatusCode::OK); +} + +#[tokio::test] +async fn getrepo_car_roundtrips_mst_structure_and_records() { + let client = client(); + let (token, did) = create_account_and_login(&client).await; + + let expected_records: Vec<(String, String)> = (0..20) + .map(|i| { + let rkey = format!("3krtp{:08}", i); + let text = format!("roundtrip record {i}"); + (rkey, text) + }) + .collect(); + for (rkey, text) in &expected_records { + create_post(&client, &token, &did, rkey, text).await; + } + + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRepo", + base_url().await + )) + .query(&[("did", did.as_str())]) + .send() + .await + .expect("getRepo"); + assert_eq!(res.status(), StatusCode::OK); + let car_bytes = res.bytes().await.unwrap(); + + let (roots, block_map) = car_to_blocks(&car_bytes).await; + assert_eq!(roots.len(), 1, "CAR must have exactly one root"); + let commit_cid = roots[0]; + let storage = Arc::new(MemoryBlockStore::new_from_blocks(block_map)); + + let commit_bytes = storage + .get(&commit_cid) + .await + .unwrap() + .expect("CAR contains commit block"); + let commit = Commit::from_cbor(&commit_bytes).expect("parse commit"); + let data_cid = *commit.data(); + + let mst = Mst::load(storage.clone(), data_cid, None); + let loaded_root = mst.get_pointer().await.expect("load root"); + assert_eq!(loaded_root, data_cid, "loaded MST pointer == commit.data()"); + + for (rkey, _) in &expected_records { + let path = format!("app.bsky.feed.post/{rkey}"); + let leaf = mst + .get(&path) + .await + .expect("mst.get") + .unwrap_or_else(|| panic!("record {path} missing from exported MST")); + let leaf_bytes = storage + .get(&leaf) + .await + .unwrap() + .unwrap_or_else(|| panic!("record block {leaf} missing from CAR")); + assert!(!leaf_bytes.is_empty(), "record bytes empty"); + } +} + +#[tokio::test] +async fn concurrent_swap_commit_writes_serialize() { + let client = client(); + let (token, did) = create_account_and_login(&client).await; + + create_post(&client, &token, &did, "3kswap00000001", "anchor").await; + + let latest_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getLatestCommit", + base_url().await + )) + .query(&[("did", did.as_str())]) + .send() + .await + .expect("getLatestCommit"); + assert_eq!(latest_res.status(), StatusCode::OK); + let latest: Value = latest_res.json().await.unwrap(); + let swap_cid = latest["cid"].as_str().unwrap().to_string(); + + let now = chrono::Utc::now().to_rfc3339(); + let payload_a = json!({ + "repo": did, + "collection": "app.bsky.feed.post", + "rkey": "3kswap00000002", + "record": { + "$type": "app.bsky.feed.post", + "text": "writer A", + "createdAt": now, + }, + "swapCommit": swap_cid, + }); + let payload_b = json!({ + "repo": did, + "collection": "app.bsky.feed.post", + "rkey": "3kswap00000003", + "record": { + "$type": "app.bsky.feed.post", + "text": "writer B", + "createdAt": now, + }, + "swapCommit": swap_cid, + }); + + let base = base_url().await; + let (res_a, res_b) = tokio::join!( + client + .post(format!("{base}/xrpc/com.atproto.repo.putRecord")) + .bearer_auth(&token) + .json(&payload_a) + .send(), + client + .post(format!("{base}/xrpc/com.atproto.repo.putRecord")) + .bearer_auth(&token) + .json(&payload_b) + .send(), + ); + let status_a = res_a.expect("A send").status(); + let status_b = res_b.expect("B send").status(); + + let ok_a = status_a == StatusCode::OK; + let ok_b = status_b == StatusCode::OK; + assert!( + ok_a ^ ok_b, + "exactly one swap_commit write must succeed: status_a={status_a}, status_b={status_b}" + ); +} + +#[tokio::test] +async fn imported_repo_emits_commit_event_with_valid_car() { + let client = client(); + let (token, did) = create_account_and_login(&client).await; + + let signing_key = SigningKey::random(&mut rand::thread_rng()); + let (car_bytes, _car_root_cid) = build_car_with_signature(&did, &signing_key); + + let import_res = client + .post(format!( + "{}/xrpc/com.atproto.repo.importRepo", + base_url().await + )) + .bearer_auth(&token) + .header("Content-Type", "application/vnd.ipld.car") + .body(car_bytes) + .send() + .await + .expect("importRepo"); + assert_eq!( + import_res.status(), + StatusCode::OK, + "import failed: {:?}", + import_res.text().await.unwrap_or_default() + ); + + let repos = get_test_repos().await; + let typed_did = Did::new(did.clone()).unwrap(); + let events = repos + .repo + .get_events_since_seq(SequenceNumber::ZERO, None) + .await + .expect("events"); + let our: Vec<&SequencedEvent> = events + .iter() + .filter(|e| e.did == typed_did && e.event_type == RepoEventType::Commit) + .collect(); + let last = our.last().expect("at least one commit event after import"); + + let inline = match last.blocks.as_ref().expect("blocks present") { + EventBlocks::Inline(v) => v, + _ => panic!("expected inline blocks"), + }; + assert!( + !inline.is_empty(), + "import event inline blocks must not be empty" + ); + + let have_commit = inline.iter().any(|b| { + let cid = Cid::read_bytes(b.cid_bytes.as_slice()).unwrap(); + last.commit_cid + .as_ref() + .and_then(|c| c.to_cid()) + .map(|commit_cid| cid == commit_cid) + .unwrap_or(false) + }); + assert!(have_commit, "import event CAR must include commit block"); +} + +#[tokio::test] +async fn firehose_commit_block_bytes_roundtrip_to_same_cid() { + let client = client(); + let (token, did) = create_account_and_login(&client).await; + + let repos = get_test_repos().await; + let cursor = repos.repo.get_max_seq().await.unwrap().as_i64(); + let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await; + tokio::time::sleep(Duration::from_millis(100)).await; + + create_post(&client, &token, &did, "3krt001", "round-trip me").await; + let frames = consumer + .wait_for_commits(&did, 1, Duration::from_secs(10)) + .await; + let frame = frames.last().expect("frame"); + + let (_, block_map) = car_to_blocks(&frame.blocks).await; + use sha2::{Digest, Sha256}; + for (cid, bytes) in &block_map { + let mut hasher = Sha256::new(); + hasher.update(bytes); + let hash = hasher.finalize(); + let mh = multihash::Multihash::wrap(0x12, hash.as_slice()).expect("wrap"); + let recomputed = Cid::new_v1(cid.codec(), mh); + assert_eq!( + recomputed, *cid, + "CAR block {cid} bytes do not hash back to same CID" + ); + } +} + +#[tokio::test] +async fn firehose_commit_car_contains_new_record_bytes_for_every_create() { + let client = client(); + let (token, did) = create_account_and_login(&client).await; + + let repos = get_test_repos().await; + let cursor = repos.repo.get_max_seq().await.unwrap().as_i64(); + let consumer = FirehoseConsumer::connect_with_cursor(app_port(), cursor).await; + tokio::time::sleep(Duration::from_millis(100)).await; + + let now = chrono::Utc::now().to_rfc3339(); + let writes: Vec = (0..8) + .map(|i| { + json!({ + "$type": "com.atproto.repo.applyWrites#create", + "collection": "app.bsky.feed.post", + "rkey": format!("3krec{:08}", i), + "value": { + "$type": "app.bsky.feed.post", + "text": format!("rec {i}"), + "createdAt": now, + } + }) + }) + .collect(); + let res = client + .post(format!( + "{}/xrpc/com.atproto.repo.applyWrites", + base_url().await + )) + .bearer_auth(&token) + .json(&json!({ "repo": did, "writes": writes })) + .send() + .await + .expect("applyWrites"); + assert_eq!(res.status(), StatusCode::OK); + + let frames = consumer + .wait_for_commits(&did, 1, Duration::from_secs(10)) + .await; + let frame = frames.last().expect("frame"); + + let (_, block_map) = car_to_blocks(&frame.blocks).await; + for op in &frame.ops { + if op.action == RepoAction::Create { + let cid = op.cid.expect("create cid"); + assert!( + block_map.contains_key(&cid), + "record CID {cid} for path {} missing from CAR", + op.path + ); + } + } +} diff --git a/crates/tranquil-pds/tests/mst_property_fuzz.rs b/crates/tranquil-pds/tests/mst_property_fuzz.rs new file mode 100644 index 0000000..cc80d49 --- /dev/null +++ b/crates/tranquil-pds/tests/mst_property_fuzz.rs @@ -0,0 +1,348 @@ +mod common; +mod mst_verify; + +use std::collections::HashMap; +use std::str::FromStr; + +use cid::Cid; +use common::*; +use jacquard_common::smol_str::SmolStr; +use jacquard_repo::commit::Commit; +use jacquard_repo::mst::{Mst, VerifiedWriteOp}; +use jacquard_repo::storage::BlockStore; +use mst_verify::{extract_event_blocks, inline_to_store}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use reqwest::StatusCode; +use serde_json::{Value, json}; +use tranquil_db_traits::{RepoEventType, SequenceNumber, SequencedEvent}; +use tranquil_types::Did; + +const COLLECTIONS: &[&str] = &[ + "app.bsky.feed.post", + "app.bsky.feed.like", + "app.bsky.graph.follow", + "app.bsky.feed.repost", +]; + +#[derive(Copy, Clone, Debug)] +enum FuzzOp { + Create, + Update, + Delete, +} + +fn pick_op(rng: &mut StdRng, have_keys: bool) -> FuzzOp { + match (have_keys, rng.gen_range(0..10)) { + (false, _) => FuzzOp::Create, + (_, 0..=5) => FuzzOp::Create, + (_, 6..=7) => FuzzOp::Update, + _ => FuzzOp::Delete, + } +} + +fn random_rkey(rng: &mut StdRng) -> String { + let tid_char_pool = b"234567abcdefghijklmnopqrstuvwxyz"; + let mut out = Vec::with_capacity(13); + (0..13).for_each(|_| { + let c = tid_char_pool[rng.gen_range(0..tid_char_pool.len())]; + out.push(c); + }); + String::from_utf8(out).unwrap() +} + +fn random_collection(rng: &mut StdRng) -> &'static str { + COLLECTIONS[rng.gen_range(0..COLLECTIONS.len())] +} + +fn record_for_collection(col: &str, text: &str, now: &str) -> Value { + match col { + "app.bsky.feed.post" | "app.bsky.feed.repost" | "app.bsky.feed.like" => json!({ + "$type": col, + "text": text, + "createdAt": now, + }), + _ => json!({ + "$type": col, + "subject": format!("did:plc:synthetic{text}"), + "createdAt": now, + }), + } +} + +async fn verify_commit_forward_and_inverse(event: &SequencedEvent) -> Result<(), String> { + let prev_data = event + .prev_data_cid + .as_ref() + .and_then(|c| c.to_cid()) + .ok_or("no prev_data_cid")?; + let commit_cid = event + .commit_cid + .as_ref() + .and_then(|c| c.to_cid()) + .ok_or("no commit_cid")?; + let inline = extract_event_blocks(event)?; + let ops = event + .ops + .as_ref() + .and_then(|v| v.as_array()) + .ok_or("ops not array")?; + + let storage = inline_to_store(inline); + let commit_bytes = storage + .get(&commit_cid) + .await + .map_err(|e| format!("get commit: {e:?}"))? + .ok_or("missing commit block")?; + let commit = Commit::from_cbor(&commit_bytes).map_err(|e| format!("parse commit: {e:?}"))?; + let new_data = *commit.data(); + + let mut forward = Mst::load(storage.clone(), prev_data, None); + for op in ops { + let action = op["action"].as_str().ok_or("op.action")?; + let path = op["path"].as_str().ok_or("op.path")?; + match action { + "create" | "update" => { + let cid = Cid::from_str(op["cid"].as_str().ok_or("op.cid")?) + .map_err(|e| format!("{e:?}"))?; + forward = forward + .add(path, cid) + .await + .map_err(|e| format!("fwd add {path}: {e:?}"))?; + } + "delete" => { + forward = forward + .delete(path) + .await + .map_err(|e| format!("fwd delete {path}: {e:?}"))?; + } + other => return Err(format!("unknown action {other}")), + } + } + let got = forward + .persist() + .await + .map_err(|e| format!("persist: {e:?}"))?; + if got != new_data { + return Err(format!("forward root mismatch exp={new_data} got={got}")); + } + + let mut inverse = Mst::load(storage, new_data, None); + for op in ops { + let action = op["action"].as_str().ok_or("op.action")?; + let path = op["path"].as_str().ok_or("op.path")?; + let key = SmolStr::new(path); + let verified = match action { + "create" => { + let cid = Cid::from_str(op["cid"].as_str().ok_or("op.cid")?) + .map_err(|e| format!("{e:?}"))?; + VerifiedWriteOp::Create { key, cid } + } + "update" => { + let cid = Cid::from_str(op["cid"].as_str().ok_or("op.cid")?) + .map_err(|e| format!("{e:?}"))?; + let prev = Cid::from_str(op["prev"].as_str().ok_or("op.prev")?) + .map_err(|e| format!("{e:?}"))?; + VerifiedWriteOp::Update { key, cid, prev } + } + "delete" => { + let prev = Cid::from_str(op["prev"].as_str().ok_or("op.prev")?) + .map_err(|e| format!("{e:?}"))?; + VerifiedWriteOp::Delete { key, prev } + } + other => return Err(format!("unknown action {other}")), + }; + let inverted = inverse + .invert_op(verified.clone()) + .await + .map_err(|e| format!("invert {verified:?}: {e:?}"))?; + if !inverted { + return Err(format!("op not invertible: {verified:?}")); + } + } + let got_prev = inverse + .get_pointer() + .await + .map_err(|e| format!("get_pointer: {e:?}"))?; + if got_prev != prev_data { + return Err(format!( + "inverse root mismatch exp={prev_data} got={got_prev}" + )); + } + Ok(()) +} + +async fn fuzz_run_with_seed(seed: u64, steps: usize) -> Vec { + let client = client(); + let (token, did) = create_account_and_login(&client).await; + let mut rng = StdRng::seed_from_u64(seed); + + let mut live_keys: HashMap = HashMap::new(); + + for step in 0..steps { + let now = chrono::Utc::now().to_rfc3339(); + let op = pick_op(&mut rng, !live_keys.is_empty()); + match op { + FuzzOp::Create => { + let col = random_collection(&mut rng); + let rkey = random_rkey(&mut rng); + let path = format!("{col}/{rkey}"); + if live_keys.contains_key(&path) { + continue; + } + let record = record_for_collection(col, &format!("s{seed}-n{step}"), &now); + let res = client + .post(format!( + "{}/xrpc/com.atproto.repo.createRecord", + base_url().await + )) + .bearer_auth(&token) + .json(&json!({ + "repo": did, + "collection": col, + "rkey": rkey, + "record": record, + })) + .send() + .await + .expect("createRecord"); + if res.status() == StatusCode::OK { + live_keys.insert(path, col.to_string()); + } + } + FuzzOp::Update => { + let keys: Vec<&String> = live_keys.keys().collect(); + if keys.is_empty() { + continue; + } + let path = keys[rng.gen_range(0..keys.len())].clone(); + let col = live_keys.get(&path).unwrap().clone(); + let rkey = path.split('/').nth(1).unwrap().to_string(); + let record = record_for_collection(&col, &format!("s{seed}-u{step}"), &now); + let res = client + .post(format!( + "{}/xrpc/com.atproto.repo.putRecord", + base_url().await + )) + .bearer_auth(&token) + .json(&json!({ + "repo": did, + "collection": col, + "rkey": rkey, + "record": record, + })) + .send() + .await + .expect("putRecord"); + assert_eq!(res.status(), StatusCode::OK, "putRecord failed"); + } + FuzzOp::Delete => { + let keys: Vec = live_keys.keys().cloned().collect(); + if keys.is_empty() { + continue; + } + let path = keys[rng.gen_range(0..keys.len())].clone(); + let col = live_keys.get(&path).unwrap().clone(); + let rkey = path.split('/').nth(1).unwrap().to_string(); + let res = client + .post(format!( + "{}/xrpc/com.atproto.repo.deleteRecord", + base_url().await + )) + .bearer_auth(&token) + .json(&json!({ + "repo": did, + "collection": col, + "rkey": rkey, + })) + .send() + .await + .expect("deleteRecord"); + if res.status() == StatusCode::OK { + live_keys.remove(&path); + } + } + } + } + + let repos = get_test_repos().await; + let typed_did = Did::new(did.clone()).unwrap(); + let events = repos + .repo + .get_events_since_seq(SequenceNumber::ZERO, None) + .await + .expect("get_events_since_seq"); + + let our: Vec = events + .into_iter() + .filter(|e| { + e.did == typed_did + && e.event_type == RepoEventType::Commit + && e.prev_data_cid.is_some() + && e.ops + .as_ref() + .and_then(|v| v.as_array()) + .is_some_and(|a| !a.is_empty()) + }) + .collect(); + + let mut failures = Vec::new(); + for event in &our { + if let Err(msg) = verify_commit_forward_and_inverse(event).await { + failures.push(format!( + "seed={seed} seq={} ops={:?}: {msg}", + event.seq.as_i64(), + event + .ops + .as_ref() + .and_then(|v| v.as_array()) + .map(|a| a.len()) + )); + } + } + failures +} + +#[tokio::test] +async fn mst_property_fuzz_seed_1() { + let failures = fuzz_run_with_seed(1, 150).await; + assert!( + failures.is_empty(), + "fuzz seed=1 found {} invalid commits:\n - {}", + failures.len(), + failures.join("\n - ") + ); +} + +#[tokio::test] +async fn mst_property_fuzz_seed_42() { + let failures = fuzz_run_with_seed(42, 150).await; + assert!( + failures.is_empty(), + "fuzz seed=42 found {} invalid commits:\n - {}", + failures.len(), + failures.join("\n - ") + ); +} + +#[tokio::test] +async fn mst_property_fuzz_seed_9001() { + let failures = fuzz_run_with_seed(9001, 150).await; + assert!( + failures.is_empty(), + "fuzz seed=9001 found {} invalid commits:\n - {}", + failures.len(), + failures.join("\n - ") + ); +} + +#[tokio::test] +async fn mst_property_fuzz_deep_tree_seed_7() { + let failures = fuzz_run_with_seed(7, 400).await; + assert!( + failures.is_empty(), + "fuzz deep seed=7 found {} invalid commits:\n - {}", + failures.len(), + failures.join("\n - ") + ); +}