From 80cee7e6f8b6c4dc13d169da3532e771ba23b059 Mon Sep 17 00:00:00 2001 From: lewis Date: Tue, 9 Dec 2025 21:50:07 +0200 Subject: [PATCH] More endpoints baybee --- TODO.md | 6 +- src/lib.rs | 12 + src/sync/mod.rs | 575 +++++++++++++++++++++++++++++++++++++++++++- tests/common/mod.rs | 10 +- tests/identity.rs | 27 ++- tests/lifecycle.rs | 570 +++++++++++++++++++++++++++++++++++++++++++ tests/proxy.rs | 66 +++++ tests/sync.rs | 203 ++++++++++++++++ 8 files changed, 1460 insertions(+), 9 deletions(-) diff --git a/TODO.md b/TODO.md index b67686a..d56b37c 100644 --- a/TODO.md +++ b/TODO.md @@ -68,10 +68,10 @@ Lewis' corrected big boy todofile - [ ] Broadcast real-time commit events. - [ ] Handle cursor replay (backfill). - [ ] Bulk Export - - [ ] Implement `com.atproto.sync.getRepo` (Return full CAR file of repo). - - [ ] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs). + - [x] Implement `com.atproto.sync.getRepo` (Return full CAR file of repo). + - [x] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs). - [x] Implement `com.atproto.sync.getLatestCommit`. - - [ ] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord). + - [x] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord). - [x] Implement `com.atproto.sync.getRepoStatus`. - [x] Implement `com.atproto.sync.listRepos`. - [x] Implement `com.atproto.sync.notifyOfUpdate`. diff --git a/src/lib.rs b/src/lib.rs index 7da84ed..e4ee112 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -118,6 +118,18 @@ pub fn app(state: AppState) -> Router { "/xrpc/com.atproto.sync.requestCrawl", post(sync::request_crawl), ) + .route( + "/xrpc/com.atproto.sync.getBlocks", + get(sync::get_blocks), + ) + .route( + "/xrpc/com.atproto.sync.getRepo", + get(sync::get_repo), + ) + .route( + "/xrpc/com.atproto.sync.getRecord", + get(sync::get_record), + ) .route( "/xrpc/com.atproto.moderation.createReport", post(api::moderation::create_report), diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 891036d..f88ba36 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -7,10 +7,45 @@ use axum::{ http::header, response::{IntoResponse, Response}, }; +use bytes::Bytes; +use cid::Cid; +use jacquard_repo::{commit::Commit, storage::BlockStore}; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::collections::HashSet; +use std::io::Write; use tracing::{error, info}; +fn write_varint(mut writer: W, mut value: u64) -> std::io::Result<()> { + loop { + let mut byte = (value & 0x7F) as u8; + value >>= 7; + if value != 0 { + byte |= 0x80; + } + writer.write_all(&[byte])?; + if value == 0 { + break; + } + } + Ok(()) +} + +fn ld_write(mut writer: W, data: &[u8]) -> std::io::Result<()> { + write_varint(&mut writer, data.len() as u64)?; + writer.write_all(data)?; + Ok(()) +} + +fn encode_car_header(root_cid: &Cid) -> Vec { + let header = serde_ipld_dagcbor::to_vec(&serde_json::json!({ + "version": 1u64, + "roots": [root_cid.to_bytes()] + })) + .unwrap_or_default(); + header +} + #[derive(Deserialize)] pub struct GetLatestCommitParams { pub did: String, @@ -471,8 +506,546 @@ pub async fn request_crawl( Json(input): Json, ) -> Response { info!("Received requestCrawl for hostname: {}", input.hostname); - // TODO: Queue job for crawling info!("TODO: Queue job for requestCrawl (not implemented)"); (StatusCode::OK, Json(json!({}))).into_response() } + +#[derive(Deserialize)] +pub struct GetBlocksParams { + pub did: String, + pub cids: String, +} + +pub async fn get_blocks( + State(state): State, + Query(params): Query, +) -> Response { + let did = params.did.trim(); + + if did.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did is required"})), + ) + .into_response(); + } + + let cid_strings: Vec<&str> = params.cids.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect(); + + if cid_strings.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "cids is required"})), + ) + .into_response(); + } + + let repo_result = sqlx::query!( + r#" + SELECT r.repo_root_cid + FROM repos r + JOIN users u ON r.user_id = u.id + WHERE u.did = $1 + "#, + did + ) + .fetch_optional(&state.db) + .await; + + let repo_root_cid_str = match repo_result { + Ok(Some(row)) => row.repo_root_cid, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in get_blocks: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let root_cid = match repo_root_cid_str.parse::() { + Ok(c) => c, + Err(e) => { + error!("Failed to parse root CID: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let mut requested_cids: Vec = Vec::new(); + for cid_str in &cid_strings { + match cid_str.parse::() { + Ok(c) => requested_cids.push(c), + Err(e) => { + error!("Failed to parse CID '{}': {:?}", cid_str, e); + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", cid_str)})), + ) + .into_response(); + } + } + } + + let mut buf = Vec::new(); + let header = encode_car_header(&root_cid); + if let Err(e) = ld_write(&mut buf, &header) { + error!("Failed to write CAR header: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + + for cid in &requested_cids { + let cid_bytes = cid.to_bytes(); + let block_result = sqlx::query!( + "SELECT data FROM blocks WHERE cid = $1", + &cid_bytes + ) + .fetch_optional(&state.db) + .await; + + match block_result { + Ok(Some(row)) => { + let mut block_data = Vec::new(); + block_data.extend_from_slice(&cid_bytes); + block_data.extend_from_slice(&row.data); + if let Err(e) = ld_write(&mut buf, &block_data) { + error!("Failed to write block: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + } + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "BlockNotFound", "message": format!("Block not found: {}", cid)})), + ) + .into_response(); + } + Err(e) => { + error!("DB error fetching block: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + } + } + + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/vnd.ipld.car") + .body(Body::from(buf)) + .unwrap() +} + +#[derive(Deserialize)] +pub struct GetRepoParams { + pub did: String, + pub since: Option, +} + +pub async fn get_repo( + State(state): State, + Query(params): Query, +) -> Response { + let did = params.did.trim(); + + if did.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did is required"})), + ) + .into_response(); + } + + let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) + .fetch_optional(&state.db) + .await; + + let user_id = match user_result { + Ok(Some(row)) => row.id, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in get_repo: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) + .fetch_optional(&state.db) + .await; + + let repo_root_cid_str = match repo_result { + Ok(Some(row)) => row.repo_root_cid, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in get_repo: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let root_cid = match repo_root_cid_str.parse::() { + Ok(c) => c, + Err(e) => { + error!("Failed to parse root CID: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let commit_bytes = match state.block_store.get(&root_cid).await { + Ok(Some(b)) => b, + Ok(None) => { + error!("Commit block not found: {}", root_cid); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + Err(e) => { + error!("Failed to load commit block: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let commit = match Commit::from_cbor(&commit_bytes) { + Ok(c) => c, + Err(e) => { + error!("Failed to parse commit: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new(); + let mut visited: HashSet> = HashSet::new(); + + collected_blocks.push((root_cid, commit_bytes.clone())); + visited.insert(root_cid.to_bytes()); + + let mst_root_cid = commit.data; + if !visited.contains(&mst_root_cid.to_bytes()) { + visited.insert(mst_root_cid.to_bytes()); + if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await { + collected_blocks.push((mst_root_cid, data)); + } + } + + let records = sqlx::query!("SELECT record_cid FROM records WHERE repo_id = $1", user_id) + .fetch_all(&state.db) + .await + .unwrap_or_default(); + + for record in records { + if let Ok(cid) = record.record_cid.parse::() { + if !visited.contains(&cid.to_bytes()) { + visited.insert(cid.to_bytes()); + if let Ok(Some(data)) = state.block_store.get(&cid).await { + collected_blocks.push((cid, data)); + } + } + } + } + + let mut buf = Vec::new(); + let header = encode_car_header(&root_cid); + if let Err(e) = ld_write(&mut buf, &header) { + error!("Failed to write CAR header: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + + for (cid, data) in &collected_blocks { + let mut block_data = Vec::new(); + block_data.extend_from_slice(&cid.to_bytes()); + block_data.extend_from_slice(data); + if let Err(e) = ld_write(&mut buf, &block_data) { + error!("Failed to write block: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + } + + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/vnd.ipld.car") + .body(Body::from(buf)) + .unwrap() +} + +#[derive(Deserialize)] +pub struct GetRecordParams { + pub did: String, + pub collection: String, + pub rkey: String, +} + +pub async fn get_record( + State(state): State, + Query(params): Query, +) -> Response { + let did = params.did.trim(); + let collection = params.collection.trim(); + let rkey = params.rkey.trim(); + + if did.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did is required"})), + ) + .into_response(); + } + + if collection.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "collection is required"})), + ) + .into_response(); + } + + if rkey.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "rkey is required"})), + ) + .into_response(); + } + + let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did) + .fetch_optional(&state.db) + .await; + + let user_id = match user_result { + Ok(Some(row)) => row.id, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in sync get_record: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let record_result = sqlx::query!( + "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3", + user_id, + collection, + rkey + ) + .fetch_optional(&state.db) + .await; + + let record_cid_str = match record_result { + Ok(Some(row)) => row.record_cid, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RecordNotFound", "message": "Record not found"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in sync get_record: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let record_cid = match record_cid_str.parse::() { + Ok(c) => c, + Err(e) => { + error!("Failed to parse record CID: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id) + .fetch_optional(&state.db) + .await; + + let repo_root_cid_str = match repo_result { + Ok(Some(row)) => row.repo_root_cid, + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in sync get_record: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let root_cid = match repo_root_cid_str.parse::() { + Ok(c) => c, + Err(e) => { + error!("Failed to parse root CID: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new(); + + let commit_bytes = match state.block_store.get(&root_cid).await { + Ok(Some(b)) => b, + Ok(None) => { + error!("Commit block not found: {}", root_cid); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + Err(e) => { + error!("Failed to load commit block: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + collected_blocks.push((root_cid, commit_bytes.clone())); + + let commit = match Commit::from_cbor(&commit_bytes) { + Ok(c) => c, + Err(e) => { + error!("Failed to parse commit: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let mst_root_cid = commit.data; + if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await { + collected_blocks.push((mst_root_cid, data)); + } + + if let Ok(Some(data)) = state.block_store.get(&record_cid).await { + collected_blocks.push((record_cid, data)); + } else { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RecordNotFound", "message": "Record block not found"})), + ) + .into_response(); + } + + let mut buf = Vec::new(); + let header = encode_car_header(&root_cid); + if let Err(e) = ld_write(&mut buf, &header) { + error!("Failed to write CAR header: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + + for (cid, data) in &collected_blocks { + let mut block_data = Vec::new(); + block_data.extend_from_slice(&cid.to_bytes()); + block_data.extend_from_slice(data); + if let Err(e) = ld_write(&mut buf, &block_data) { + error!("Failed to write block: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + } + + Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, "application/vnd.ipld.car") + .body(Body::from(buf)) + .unwrap() +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 0b935fb..912ad4a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -184,12 +184,16 @@ async fn spawn_app(database_url: String) -> String { .await .expect("Failed to run migrations"); - let state = AppState::new(pool).await; - let app = bspds::app(state); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); + unsafe { + std::env::set_var("PDS_HOSTNAME", addr.to_string()); + } + + let state = AppState::new(pool).await; + let app = bspds::app(state); + tokio::spawn(async move { axum::serve(listener, app).await.unwrap(); }); diff --git a/tests/identity.rs b/tests/identity.rs index afb2418..dfafa6a 100644 --- a/tests/identity.rs +++ b/tests/identity.rs @@ -106,7 +106,7 @@ async fn test_create_did_web_account_and_resolve() { let handle = format!("webuser_{}", uuid::Uuid::new_v4()); - let pds_endpoint = "https://localhost"; + let pds_endpoint = base_url().await.replace("http://", "https://"); let did_doc = json!({ "@context": ["https://www.w3.org/ns/did/v1"], @@ -211,10 +211,33 @@ async fn test_create_account_duplicate_handle() { #[tokio::test] async fn test_did_web_lifecycle() { let client = client(); + + let mock_server = MockServer::start().await; + let mock_uri = mock_server.uri(); + let mock_addr = mock_uri.trim_start_matches("http://"); + let handle = format!("lifecycle_{}", uuid::Uuid::new_v4()); - let did = format!("did:web:localhost:u:{}", handle); + let did = format!("did:web:{}:u:{}", mock_addr.replace(":", "%3A"), handle); let email = format!("{}@test.com", handle); + let pds_endpoint = base_url().await.replace("http://", "https://"); + + let did_doc = json!({ + "@context": ["https://www.w3.org/ns/did/v1"], + "id": did, + "service": [{ + "id": "#atproto_pds", + "type": "AtprotoPersonalDataServer", + "serviceEndpoint": pds_endpoint + }] + }); + + Mock::given(method("GET")) + .and(path(format!("/u/{}/did.json", handle))) + .respond_with(ResponseTemplate::new(200).set_body_json(did_doc)) + .mount(&mock_server) + .await; + let create_payload = json!({ "handle": handle, "email": email, diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index e5854e0..d8878ed 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -1,6 +1,7 @@ mod common; use common::*; +use base64::Engine; use chrono::Utc; use reqwest::{self, StatusCode, header}; use serde_json::{Value, json}; @@ -1842,3 +1843,572 @@ async fn test_account_deactivation_lifecycle() { let (new_post_uri, _) = create_post(&client, &did, &jwt, "Post after reactivation").await; assert!(!new_post_uri.is_empty(), "Should be able to post after reactivation"); } + +#[tokio::test] +async fn test_sync_record_lifecycle() { + let client = client(); + let (did, jwt) = setup_new_user("sync-record-lifecycle").await; + + let (post_uri, _post_cid) = + create_post(&client, &did, &jwt, "Post for sync record test").await; + let post_rkey = post_uri.split('/').last().unwrap(); + + let sync_record_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRecord", + base_url().await + )) + .query(&[ + ("did", did.as_str()), + ("collection", "app.bsky.feed.post"), + ("rkey", post_rkey), + ]) + .send() + .await + .expect("Failed to get sync record"); + + assert_eq!(sync_record_res.status(), StatusCode::OK); + assert_eq!( + sync_record_res + .headers() + .get("content-type") + .and_then(|h| h.to_str().ok()), + Some("application/vnd.ipld.car") + ); + let car_bytes = sync_record_res.bytes().await.unwrap(); + assert!(!car_bytes.is_empty(), "CAR data should not be empty"); + + let latest_before = client + .get(format!( + "{}/xrpc/com.atproto.sync.getLatestCommit", + base_url().await + )) + .query(&[("did", did.as_str())]) + .send() + .await + .expect("Failed to get latest commit"); + let latest_before_body: Value = latest_before.json().await.unwrap(); + let rev_before = latest_before_body["rev"].as_str().unwrap().to_string(); + + let (post2_uri, _) = create_post(&client, &did, &jwt, "Second post for sync test").await; + + let latest_after = client + .get(format!( + "{}/xrpc/com.atproto.sync.getLatestCommit", + base_url().await + )) + .query(&[("did", did.as_str())]) + .send() + .await + .expect("Failed to get latest commit after"); + let latest_after_body: Value = latest_after.json().await.unwrap(); + let rev_after = latest_after_body["rev"].as_str().unwrap().to_string(); + assert_ne!(rev_before, rev_after, "Revision should change after new record"); + + let delete_payload = json!({ + "repo": did, + "collection": "app.bsky.feed.post", + "rkey": post_rkey + }); + let delete_res = client + .post(format!( + "{}/xrpc/com.atproto.repo.deleteRecord", + base_url().await + )) + .bearer_auth(&jwt) + .json(&delete_payload) + .send() + .await + .expect("Failed to delete record"); + assert_eq!(delete_res.status(), StatusCode::OK); + + let sync_deleted_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRecord", + base_url().await + )) + .query(&[ + ("did", did.as_str()), + ("collection", "app.bsky.feed.post"), + ("rkey", post_rkey), + ]) + .send() + .await + .expect("Failed to check deleted record via sync"); + assert_eq!( + sync_deleted_res.status(), + StatusCode::NOT_FOUND, + "Deleted record should return 404 via sync.getRecord" + ); + + let post2_rkey = post2_uri.split('/').last().unwrap(); + let sync_post2_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRecord", + base_url().await + )) + .query(&[ + ("did", did.as_str()), + ("collection", "app.bsky.feed.post"), + ("rkey", post2_rkey), + ]) + .send() + .await + .expect("Failed to get second post via sync"); + assert_eq!( + sync_post2_res.status(), + StatusCode::OK, + "Second post should still be accessible" + ); +} + +#[tokio::test] +async fn test_sync_repo_export_lifecycle() { + let client = client(); + let (did, jwt) = setup_new_user("sync-repo-export").await; + + let profile_payload = json!({ + "repo": did, + "collection": "app.bsky.actor.profile", + "rkey": "self", + "record": { + "$type": "app.bsky.actor.profile", + "displayName": "Sync Export User" + } + }); + let profile_res = client + .post(format!( + "{}/xrpc/com.atproto.repo.putRecord", + base_url().await + )) + .bearer_auth(&jwt) + .json(&profile_payload) + .send() + .await + .expect("Failed to create profile"); + assert_eq!(profile_res.status(), StatusCode::OK); + + for i in 0..3 { + tokio::time::sleep(Duration::from_millis(50)).await; + create_post(&client, &did, &jwt, &format!("Export test post {}", i)).await; + } + + let blob_data = b"blob data for sync export test"; + let upload_res = client + .post(format!( + "{}/xrpc/com.atproto.repo.uploadBlob", + base_url().await + )) + .header(header::CONTENT_TYPE, "application/octet-stream") + .bearer_auth(&jwt) + .body(blob_data.to_vec()) + .send() + .await + .expect("Failed to upload blob"); + assert_eq!(upload_res.status(), StatusCode::OK); + let blob_body: Value = upload_res.json().await.unwrap(); + let blob_cid = blob_body["blob"]["ref"]["$link"].as_str().unwrap().to_string(); + + let repo_status_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRepoStatus", + base_url().await + )) + .query(&[("did", did.as_str())]) + .send() + .await + .expect("Failed to get repo status"); + assert_eq!(repo_status_res.status(), StatusCode::OK); + let status_body: Value = repo_status_res.json().await.unwrap(); + assert_eq!(status_body["did"], did); + assert_eq!(status_body["active"], true); + + let get_repo_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRepo", + base_url().await + )) + .query(&[("did", did.as_str())]) + .send() + .await + .expect("Failed to get full repo"); + assert_eq!(get_repo_res.status(), StatusCode::OK); + assert_eq!( + get_repo_res + .headers() + .get("content-type") + .and_then(|h| h.to_str().ok()), + Some("application/vnd.ipld.car") + ); + let repo_car = get_repo_res.bytes().await.unwrap(); + assert!(repo_car.len() > 100, "Repo CAR should have substantial data"); + + let list_blobs_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.listBlobs", + base_url().await + )) + .query(&[("did", did.as_str())]) + .send() + .await + .expect("Failed to list blobs"); + assert_eq!(list_blobs_res.status(), StatusCode::OK); + let blobs_body: Value = list_blobs_res.json().await.unwrap(); + let cids = blobs_body["cids"].as_array().unwrap(); + assert!(!cids.is_empty(), "Should have at least one blob"); + + let get_blob_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getBlob", + base_url().await + )) + .query(&[("did", did.as_str()), ("cid", &blob_cid)]) + .send() + .await + .expect("Failed to get blob"); + assert_eq!(get_blob_res.status(), StatusCode::OK); + let retrieved_blob = get_blob_res.bytes().await.unwrap(); + assert_eq!( + retrieved_blob.as_ref(), + blob_data, + "Retrieved blob should match uploaded data" + ); + + let latest_commit_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getLatestCommit", + base_url().await + )) + .query(&[("did", did.as_str())]) + .send() + .await + .expect("Failed to get latest commit"); + assert_eq!(latest_commit_res.status(), StatusCode::OK); + let commit_body: Value = latest_commit_res.json().await.unwrap(); + let root_cid = commit_body["cid"].as_str().unwrap(); + + let get_blocks_url = format!( + "{}/xrpc/com.atproto.sync.getBlocks?did={}&cids={}", + base_url().await, + did, + root_cid + ); + let get_blocks_res = client + .get(&get_blocks_url) + .send() + .await + .expect("Failed to get blocks"); + assert_eq!(get_blocks_res.status(), StatusCode::OK); + assert_eq!( + get_blocks_res + .headers() + .get("content-type") + .and_then(|h| h.to_str().ok()), + Some("application/vnd.ipld.car") + ); +} + +#[tokio::test] +async fn test_apply_writes_batch_lifecycle() { + let client = client(); + let (did, jwt) = setup_new_user("apply-writes-batch").await; + + let now = Utc::now().to_rfc3339(); + let writes_payload = json!({ + "repo": did, + "writes": [ + { + "$type": "com.atproto.repo.applyWrites#create", + "collection": "app.bsky.feed.post", + "rkey": "batch-post-1", + "value": { + "$type": "app.bsky.feed.post", + "text": "First batch post", + "createdAt": now + } + }, + { + "$type": "com.atproto.repo.applyWrites#create", + "collection": "app.bsky.feed.post", + "rkey": "batch-post-2", + "value": { + "$type": "app.bsky.feed.post", + "text": "Second batch post", + "createdAt": now + } + }, + { + "$type": "com.atproto.repo.applyWrites#create", + "collection": "app.bsky.actor.profile", + "rkey": "self", + "value": { + "$type": "app.bsky.actor.profile", + "displayName": "Batch User" + } + } + ] + }); + + let apply_res = client + .post(format!( + "{}/xrpc/com.atproto.repo.applyWrites", + base_url().await + )) + .bearer_auth(&jwt) + .json(&writes_payload) + .send() + .await + .expect("Failed to apply writes"); + + assert_eq!(apply_res.status(), StatusCode::OK); + + let get_post1 = client + .get(format!( + "{}/xrpc/com.atproto.repo.getRecord", + base_url().await + )) + .query(&[ + ("repo", did.as_str()), + ("collection", "app.bsky.feed.post"), + ("rkey", "batch-post-1"), + ]) + .send() + .await + .expect("Failed to get post 1"); + assert_eq!(get_post1.status(), StatusCode::OK); + let post1_body: Value = get_post1.json().await.unwrap(); + assert_eq!(post1_body["value"]["text"], "First batch post"); + + let get_post2 = client + .get(format!( + "{}/xrpc/com.atproto.repo.getRecord", + base_url().await + )) + .query(&[ + ("repo", did.as_str()), + ("collection", "app.bsky.feed.post"), + ("rkey", "batch-post-2"), + ]) + .send() + .await + .expect("Failed to get post 2"); + assert_eq!(get_post2.status(), StatusCode::OK); + + let get_profile = client + .get(format!( + "{}/xrpc/com.atproto.repo.getRecord", + base_url().await + )) + .query(&[ + ("repo", did.as_str()), + ("collection", "app.bsky.actor.profile"), + ("rkey", "self"), + ]) + .send() + .await + .expect("Failed to get profile"); + assert_eq!(get_profile.status(), StatusCode::OK); + let profile_body: Value = get_profile.json().await.unwrap(); + assert_eq!(profile_body["value"]["displayName"], "Batch User"); + + let update_writes = json!({ + "repo": did, + "writes": [ + { + "$type": "com.atproto.repo.applyWrites#update", + "collection": "app.bsky.actor.profile", + "rkey": "self", + "value": { + "$type": "app.bsky.actor.profile", + "displayName": "Updated Batch User" + } + }, + { + "$type": "com.atproto.repo.applyWrites#delete", + "collection": "app.bsky.feed.post", + "rkey": "batch-post-1" + } + ] + }); + + let update_res = client + .post(format!( + "{}/xrpc/com.atproto.repo.applyWrites", + base_url().await + )) + .bearer_auth(&jwt) + .json(&update_writes) + .send() + .await + .expect("Failed to apply update writes"); + assert_eq!(update_res.status(), StatusCode::OK); + + let get_updated_profile = client + .get(format!( + "{}/xrpc/com.atproto.repo.getRecord", + base_url().await + )) + .query(&[ + ("repo", did.as_str()), + ("collection", "app.bsky.actor.profile"), + ("rkey", "self"), + ]) + .send() + .await + .expect("Failed to get updated profile"); + let updated_profile: Value = get_updated_profile.json().await.unwrap(); + assert_eq!(updated_profile["value"]["displayName"], "Updated Batch User"); + + let get_deleted_post = client + .get(format!( + "{}/xrpc/com.atproto.repo.getRecord", + base_url().await + )) + .query(&[ + ("repo", did.as_str()), + ("collection", "app.bsky.feed.post"), + ("rkey", "batch-post-1"), + ]) + .send() + .await + .expect("Failed to check deleted post"); + assert_eq!( + get_deleted_post.status(), + StatusCode::NOT_FOUND, + "Batch-deleted post should be gone" + ); +} + +#[tokio::test] +async fn test_resolve_handle_lifecycle() { + let client = client(); + let ts = Utc::now().timestamp_millis(); + let handle = format!("resolve-test-{}.test", ts); + let email = format!("resolve-test-{}@test.com", ts); + + let create_res = client + .post(format!( + "{}/xrpc/com.atproto.server.createAccount", + base_url().await + )) + .json(&json!({ + "handle": handle, + "email": email, + "password": "resolve-test-pw" + })) + .send() + .await + .expect("Failed to create account"); + assert_eq!(create_res.status(), StatusCode::OK); + let account: Value = create_res.json().await.unwrap(); + let did = account["did"].as_str().unwrap(); + + let resolve_res = client + .get(format!( + "{}/xrpc/com.atproto.identity.resolveHandle", + base_url().await + )) + .query(&[("handle", handle.as_str())]) + .send() + .await + .expect("Failed to resolve handle"); + + assert_eq!(resolve_res.status(), StatusCode::OK); + let resolve_body: Value = resolve_res.json().await.unwrap(); + assert_eq!(resolve_body["did"], did); +} + +#[tokio::test] +async fn test_service_auth_lifecycle() { + let client = client(); + let (did, jwt) = setup_new_user("service-auth-test").await; + + let service_auth_res = client + .get(format!( + "{}/xrpc/com.atproto.server.getServiceAuth", + base_url().await + )) + .query(&[ + ("aud", "did:web:api.bsky.app"), + ("lxm", "com.atproto.repo.uploadBlob"), + ]) + .bearer_auth(&jwt) + .send() + .await + .expect("Failed to get service auth"); + + assert_eq!(service_auth_res.status(), StatusCode::OK); + let auth_body: Value = service_auth_res.json().await.unwrap(); + let service_token = auth_body["token"].as_str().expect("No token in response"); + + let parts: Vec<&str> = service_token.split('.').collect(); + assert_eq!(parts.len(), 3, "Service token should be a valid JWT"); + + let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(parts[1]) + .expect("Failed to decode JWT payload"); + let claims: Value = serde_json::from_slice(&payload_bytes).expect("Invalid JWT payload"); + + assert_eq!(claims["iss"], did); + assert_eq!(claims["aud"], "did:web:api.bsky.app"); + assert_eq!(claims["lxm"], "com.atproto.repo.uploadBlob"); +} + +#[tokio::test] +async fn test_moderation_report_lifecycle() { + let client = client(); + let (alice_did, alice_jwt) = setup_new_user("alice-report").await; + let (bob_did, bob_jwt) = setup_new_user("bob-report").await; + + let (post_uri, post_cid) = + create_post(&client, &bob_did, &bob_jwt, "This is a reportable post").await; + + let report_payload = json!({ + "reasonType": "com.atproto.moderation.defs#reasonSpam", + "reason": "This looks like spam to me", + "subject": { + "$type": "com.atproto.repo.strongRef", + "uri": post_uri, + "cid": post_cid + } + }); + + let report_res = client + .post(format!( + "{}/xrpc/com.atproto.moderation.createReport", + base_url().await + )) + .bearer_auth(&alice_jwt) + .json(&report_payload) + .send() + .await + .expect("Failed to create report"); + + assert_eq!(report_res.status(), StatusCode::OK); + let report_body: Value = report_res.json().await.unwrap(); + assert!(report_body["id"].is_number(), "Report should have an ID"); + assert_eq!(report_body["reasonType"], "com.atproto.moderation.defs#reasonSpam"); + assert_eq!(report_body["reportedBy"], alice_did); + + let account_report_payload = json!({ + "reasonType": "com.atproto.moderation.defs#reasonOther", + "reason": "Suspicious account activity", + "subject": { + "$type": "com.atproto.admin.defs#repoRef", + "did": bob_did + } + }); + + let account_report_res = client + .post(format!( + "{}/xrpc/com.atproto.moderation.createReport", + base_url().await + )) + .bearer_auth(&alice_jwt) + .json(&account_report_payload) + .send() + .await + .expect("Failed to create account report"); + + assert_eq!(account_report_res.status(), StatusCode::OK); +} diff --git a/tests/proxy.rs b/tests/proxy.rs index f90f687..6846440 100644 --- a/tests/proxy.rs +++ b/tests/proxy.rs @@ -98,3 +98,69 @@ async fn test_proxy_auth_signing() { assert_eq!(claims["aud"], upstream_url); assert_eq!(claims["lxm"], "com.example.signed"); } + +#[tokio::test] +async fn test_proxy_post_with_body() { + let app_url = common::base_url().await; + let (upstream_url, mut rx) = spawn_mock_upstream().await; + let client = Client::new(); + + let payload = serde_json::json!({ + "text": "Hello from proxy test", + "createdAt": "2024-01-01T00:00:00Z" + }); + + let res = client + .post(format!("{}/xrpc/com.example.postMethod", app_url)) + .header("atproto-proxy", &upstream_url) + .header("Authorization", "Bearer test-token") + .json(&payload) + .send() + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::OK); + + let (method, uri, auth) = rx.recv().await.expect("Upstream should receive request"); + assert_eq!(method, "POST"); + assert_eq!(uri, "/xrpc/com.example.postMethod"); + assert_eq!(auth, Some("Bearer test-token".to_string())); +} + +#[tokio::test] +async fn test_proxy_with_query_params() { + let app_url = common::base_url().await; + let (upstream_url, mut rx) = spawn_mock_upstream().await; + let client = Client::new(); + + let res = client + .get(format!( + "{}/xrpc/com.example.query?repo=did:plc:test&collection=app.bsky.feed.post&limit=50", + app_url + )) + .header("atproto-proxy", &upstream_url) + .header("Authorization", "Bearer test-token") + .send() + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::OK); + + let (method, uri, _auth) = rx.recv().await.expect("Upstream should receive request"); + assert_eq!(method, "GET"); + assert!( + uri.contains("repo=did") || uri.contains("repo=did%3Aplc%3Atest"), + "URI should contain repo param, got: {}", + uri + ); + assert!( + uri.contains("collection=app.bsky.feed.post") || uri.contains("collection=app.bsky"), + "URI should contain collection param, got: {}", + uri + ); + assert!( + uri.contains("limit=50"), + "URI should contain limit param, got: {}", + uri + ); +} diff --git a/tests/sync.rs b/tests/sync.rs index da2d975..ff9353f 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -3,6 +3,7 @@ use common::*; use reqwest::StatusCode; use reqwest::header; use serde_json::Value; +use chrono; #[tokio::test] async fn test_get_latest_commit_success() { @@ -352,3 +353,205 @@ async fn test_request_crawl() { assert_eq!(res.status(), StatusCode::OK); } + +#[tokio::test] +async fn test_get_repo_success() { + let client = client(); + let (access_jwt, did) = create_account_and_login(&client).await; + + let post_payload = serde_json::json!({ + "repo": did, + "collection": "app.bsky.feed.post", + "record": { + "$type": "app.bsky.feed.post", + "text": "Test post for getRepo", + "createdAt": chrono::Utc::now().to_rfc3339() + } + }); + let _ = client + .post(format!( + "{}/xrpc/com.atproto.repo.createRecord", + base_url().await + )) + .bearer_auth(&access_jwt) + .json(&post_payload) + .send() + .await + .expect("Failed to create record"); + + let params = [("did", did.as_str())]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRepo", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!( + res.headers() + .get("content-type") + .and_then(|h| h.to_str().ok()), + Some("application/vnd.ipld.car") + ); + let body = res.bytes().await.expect("Failed to get body"); + assert!(!body.is_empty()); +} + +#[tokio::test] +async fn test_get_repo_not_found() { + let client = client(); + let params = [("did", "did:plc:nonexistent12345")]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRepo", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::NOT_FOUND); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert_eq!(body["error"], "RepoNotFound"); +} + +#[tokio::test] +async fn test_get_record_sync_success() { + let client = client(); + let (access_jwt, did) = create_account_and_login(&client).await; + + let post_payload = serde_json::json!({ + "repo": did, + "collection": "app.bsky.feed.post", + "record": { + "$type": "app.bsky.feed.post", + "text": "Test post for sync getRecord", + "createdAt": chrono::Utc::now().to_rfc3339() + } + }); + let create_res = client + .post(format!( + "{}/xrpc/com.atproto.repo.createRecord", + base_url().await + )) + .bearer_auth(&access_jwt) + .json(&post_payload) + .send() + .await + .expect("Failed to create record"); + + let create_body: Value = create_res.json().await.expect("Invalid JSON"); + let uri = create_body["uri"].as_str().expect("No URI"); + let rkey = uri.split('/').last().expect("Invalid URI"); + + let params = [ + ("did", did.as_str()), + ("collection", "app.bsky.feed.post"), + ("rkey", rkey), + ]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRecord", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!( + res.headers() + .get("content-type") + .and_then(|h| h.to_str().ok()), + Some("application/vnd.ipld.car") + ); + let body = res.bytes().await.expect("Failed to get body"); + assert!(!body.is_empty()); +} + +#[tokio::test] +async fn test_get_record_sync_not_found() { + let client = client(); + let (_, did) = create_account_and_login(&client).await; + + let params = [ + ("did", did.as_str()), + ("collection", "app.bsky.feed.post"), + ("rkey", "nonexistent12345"), + ]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRecord", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::NOT_FOUND); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert_eq!(body["error"], "RecordNotFound"); +} + +#[tokio::test] +async fn test_get_blocks_success() { + let client = client(); + let (_, did) = create_account_and_login(&client).await; + + let params = [("did", did.as_str())]; + let latest_res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getLatestCommit", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to get latest commit"); + + let latest_body: Value = latest_res.json().await.expect("Invalid JSON"); + let root_cid = latest_body["cid"].as_str().expect("No CID"); + + let url = format!( + "{}/xrpc/com.atproto.sync.getBlocks?did={}&cids={}", + base_url().await, + did, + root_cid + ); + let res = client + .get(&url) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!( + res.headers() + .get("content-type") + .and_then(|h| h.to_str().ok()), + Some("application/vnd.ipld.car") + ); +} + +#[tokio::test] +async fn test_get_blocks_not_found() { + let client = client(); + let url = format!( + "{}/xrpc/com.atproto.sync.getBlocks?did=did:plc:nonexistent12345&cids=bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku", + base_url().await + ); + let res = client + .get(&url) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::NOT_FOUND); +}