More endpoints baybee

This commit is contained in:
lewis
2025-12-09 21:50:07 +02:00
parent 48477d8ea3
commit 80cee7e6f8
8 changed files with 1460 additions and 9 deletions

View File

@@ -68,10 +68,10 @@ Lewis' corrected big boy todofile
- [ ] Broadcast real-time commit events.
- [ ] Handle cursor replay (backfill).
- [ ] Bulk Export
- [ ] Implement `com.atproto.sync.getRepo` (Return full CAR file of repo).
- [ ] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs).
- [x] Implement `com.atproto.sync.getRepo` (Return full CAR file of repo).
- [x] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs).
- [x] Implement `com.atproto.sync.getLatestCommit`.
- [ ] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord).
- [x] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord).
- [x] Implement `com.atproto.sync.getRepoStatus`.
- [x] Implement `com.atproto.sync.listRepos`.
- [x] Implement `com.atproto.sync.notifyOfUpdate`.

View File

@@ -118,6 +118,18 @@ pub fn app(state: AppState) -> Router {
"/xrpc/com.atproto.sync.requestCrawl",
post(sync::request_crawl),
)
.route(
"/xrpc/com.atproto.sync.getBlocks",
get(sync::get_blocks),
)
.route(
"/xrpc/com.atproto.sync.getRepo",
get(sync::get_repo),
)
.route(
"/xrpc/com.atproto.sync.getRecord",
get(sync::get_record),
)
.route(
"/xrpc/com.atproto.moderation.createReport",
post(api::moderation::create_report),

View File

@@ -7,10 +7,45 @@ use axum::{
http::header,
response::{IntoResponse, Response},
};
use bytes::Bytes;
use cid::Cid;
use jacquard_repo::{commit::Commit, storage::BlockStore};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashSet;
use std::io::Write;
use tracing::{error, info};
fn write_varint<W: Write>(mut writer: W, mut value: u64) -> std::io::Result<()> {
loop {
let mut byte = (value & 0x7F) as u8;
value >>= 7;
if value != 0 {
byte |= 0x80;
}
writer.write_all(&[byte])?;
if value == 0 {
break;
}
}
Ok(())
}
fn ld_write<W: Write>(mut writer: W, data: &[u8]) -> std::io::Result<()> {
write_varint(&mut writer, data.len() as u64)?;
writer.write_all(data)?;
Ok(())
}
fn encode_car_header(root_cid: &Cid) -> Vec<u8> {
let header = serde_ipld_dagcbor::to_vec(&serde_json::json!({
"version": 1u64,
"roots": [root_cid.to_bytes()]
}))
.unwrap_or_default();
header
}
#[derive(Deserialize)]
pub struct GetLatestCommitParams {
pub did: String,
@@ -471,8 +506,546 @@ pub async fn request_crawl(
Json(input): Json<RequestCrawlInput>,
) -> Response {
info!("Received requestCrawl for hostname: {}", input.hostname);
// TODO: Queue job for crawling
info!("TODO: Queue job for requestCrawl (not implemented)");
(StatusCode::OK, Json(json!({}))).into_response()
}
#[derive(Deserialize)]
pub struct GetBlocksParams {
pub did: String,
pub cids: String,
}
pub async fn get_blocks(
State(state): State<AppState>,
Query(params): Query<GetBlocksParams>,
) -> Response {
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let cid_strings: Vec<&str> = params.cids.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect();
if cid_strings.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "cids is required"})),
)
.into_response();
}
let repo_result = sqlx::query!(
r#"
SELECT r.repo_root_cid
FROM repos r
JOIN users u ON r.user_id = u.id
WHERE u.did = $1
"#,
did
)
.fetch_optional(&state.db)
.await;
let repo_root_cid_str = match repo_result {
Ok(Some(row)) => row.repo_root_cid,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_blocks: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let root_cid = match repo_root_cid_str.parse::<Cid>() {
Ok(c) => c,
Err(e) => {
error!("Failed to parse root CID: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let mut requested_cids: Vec<Cid> = Vec::new();
for cid_str in &cid_strings {
match cid_str.parse::<Cid>() {
Ok(c) => requested_cids.push(c),
Err(e) => {
error!("Failed to parse CID '{}': {:?}", cid_str, e);
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", cid_str)})),
)
.into_response();
}
}
}
let mut buf = Vec::new();
let header = encode_car_header(&root_cid);
if let Err(e) = ld_write(&mut buf, &header) {
error!("Failed to write CAR header: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
for cid in &requested_cids {
let cid_bytes = cid.to_bytes();
let block_result = sqlx::query!(
"SELECT data FROM blocks WHERE cid = $1",
&cid_bytes
)
.fetch_optional(&state.db)
.await;
match block_result {
Ok(Some(row)) => {
let mut block_data = Vec::new();
block_data.extend_from_slice(&cid_bytes);
block_data.extend_from_slice(&row.data);
if let Err(e) = ld_write(&mut buf, &block_data) {
error!("Failed to write block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "BlockNotFound", "message": format!("Block not found: {}", cid)})),
)
.into_response();
}
Err(e) => {
error!("DB error fetching block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
}
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/vnd.ipld.car")
.body(Body::from(buf))
.unwrap()
}
#[derive(Deserialize)]
pub struct GetRepoParams {
pub did: String,
pub since: Option<String>,
}
pub async fn get_repo(
State(state): State<AppState>,
Query(params): Query<GetRepoParams>,
) -> Response {
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await;
let user_id = match user_result {
Ok(Some(row)) => row.id,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_repo: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
.fetch_optional(&state.db)
.await;
let repo_root_cid_str = match repo_result {
Ok(Some(row)) => row.repo_root_cid,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_repo: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let root_cid = match repo_root_cid_str.parse::<Cid>() {
Ok(c) => c,
Err(e) => {
error!("Failed to parse root CID: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let commit_bytes = match state.block_store.get(&root_cid).await {
Ok(Some(b)) => b,
Ok(None) => {
error!("Commit block not found: {}", root_cid);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
Err(e) => {
error!("Failed to load commit block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let commit = match Commit::from_cbor(&commit_bytes) {
Ok(c) => c,
Err(e) => {
error!("Failed to parse commit: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
let mut visited: HashSet<Vec<u8>> = HashSet::new();
collected_blocks.push((root_cid, commit_bytes.clone()));
visited.insert(root_cid.to_bytes());
let mst_root_cid = commit.data;
if !visited.contains(&mst_root_cid.to_bytes()) {
visited.insert(mst_root_cid.to_bytes());
if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
collected_blocks.push((mst_root_cid, data));
}
}
let records = sqlx::query!("SELECT record_cid FROM records WHERE repo_id = $1", user_id)
.fetch_all(&state.db)
.await
.unwrap_or_default();
for record in records {
if let Ok(cid) = record.record_cid.parse::<Cid>() {
if !visited.contains(&cid.to_bytes()) {
visited.insert(cid.to_bytes());
if let Ok(Some(data)) = state.block_store.get(&cid).await {
collected_blocks.push((cid, data));
}
}
}
}
let mut buf = Vec::new();
let header = encode_car_header(&root_cid);
if let Err(e) = ld_write(&mut buf, &header) {
error!("Failed to write CAR header: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
for (cid, data) in &collected_blocks {
let mut block_data = Vec::new();
block_data.extend_from_slice(&cid.to_bytes());
block_data.extend_from_slice(data);
if let Err(e) = ld_write(&mut buf, &block_data) {
error!("Failed to write block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/vnd.ipld.car")
.body(Body::from(buf))
.unwrap()
}
#[derive(Deserialize)]
pub struct GetRecordParams {
pub did: String,
pub collection: String,
pub rkey: String,
}
pub async fn get_record(
State(state): State<AppState>,
Query(params): Query<GetRecordParams>,
) -> Response {
let did = params.did.trim();
let collection = params.collection.trim();
let rkey = params.rkey.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
if collection.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "collection is required"})),
)
.into_response();
}
if rkey.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "rkey is required"})),
)
.into_response();
}
let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await;
let user_id = match user_result {
Ok(Some(row)) => row.id,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in sync get_record: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let record_result = sqlx::query!(
"SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
user_id,
collection,
rkey
)
.fetch_optional(&state.db)
.await;
let record_cid_str = match record_result {
Ok(Some(row)) => row.record_cid,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
)
.into_response();
}
Err(e) => {
error!("DB error in sync get_record: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let record_cid = match record_cid_str.parse::<Cid>() {
Ok(c) => c,
Err(e) => {
error!("Failed to parse record CID: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
.fetch_optional(&state.db)
.await;
let repo_root_cid_str = match repo_result {
Ok(Some(row)) => row.repo_root_cid,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
)
.into_response();
}
Err(e) => {
error!("DB error in sync get_record: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let root_cid = match repo_root_cid_str.parse::<Cid>() {
Ok(c) => c,
Err(e) => {
error!("Failed to parse root CID: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
let commit_bytes = match state.block_store.get(&root_cid).await {
Ok(Some(b)) => b,
Ok(None) => {
error!("Commit block not found: {}", root_cid);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
Err(e) => {
error!("Failed to load commit block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
collected_blocks.push((root_cid, commit_bytes.clone()));
let commit = match Commit::from_cbor(&commit_bytes) {
Ok(c) => c,
Err(e) => {
error!("Failed to parse commit: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let mst_root_cid = commit.data;
if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
collected_blocks.push((mst_root_cid, data));
}
if let Ok(Some(data)) = state.block_store.get(&record_cid).await {
collected_blocks.push((record_cid, data));
} else {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RecordNotFound", "message": "Record block not found"})),
)
.into_response();
}
let mut buf = Vec::new();
let header = encode_car_header(&root_cid);
if let Err(e) = ld_write(&mut buf, &header) {
error!("Failed to write CAR header: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
for (cid, data) in &collected_blocks {
let mut block_data = Vec::new();
block_data.extend_from_slice(&cid.to_bytes());
block_data.extend_from_slice(data);
if let Err(e) = ld_write(&mut buf, &block_data) {
error!("Failed to write block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/vnd.ipld.car")
.body(Body::from(buf))
.unwrap()
}

View File

@@ -184,12 +184,16 @@ async fn spawn_app(database_url: String) -> String {
.await
.expect("Failed to run migrations");
let state = AppState::new(pool).await;
let app = bspds::app(state);
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
unsafe {
std::env::set_var("PDS_HOSTNAME", addr.to_string());
}
let state = AppState::new(pool).await;
let app = bspds::app(state);
tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});

View File

@@ -106,7 +106,7 @@ async fn test_create_did_web_account_and_resolve() {
let handle = format!("webuser_{}", uuid::Uuid::new_v4());
let pds_endpoint = "https://localhost";
let pds_endpoint = base_url().await.replace("http://", "https://");
let did_doc = json!({
"@context": ["https://www.w3.org/ns/did/v1"],
@@ -211,10 +211,33 @@ async fn test_create_account_duplicate_handle() {
#[tokio::test]
async fn test_did_web_lifecycle() {
let client = client();
let mock_server = MockServer::start().await;
let mock_uri = mock_server.uri();
let mock_addr = mock_uri.trim_start_matches("http://");
let handle = format!("lifecycle_{}", uuid::Uuid::new_v4());
let did = format!("did:web:localhost:u:{}", handle);
let did = format!("did:web:{}:u:{}", mock_addr.replace(":", "%3A"), handle);
let email = format!("{}@test.com", handle);
let pds_endpoint = base_url().await.replace("http://", "https://");
let did_doc = json!({
"@context": ["https://www.w3.org/ns/did/v1"],
"id": did,
"service": [{
"id": "#atproto_pds",
"type": "AtprotoPersonalDataServer",
"serviceEndpoint": pds_endpoint
}]
});
Mock::given(method("GET"))
.and(path(format!("/u/{}/did.json", handle)))
.respond_with(ResponseTemplate::new(200).set_body_json(did_doc))
.mount(&mock_server)
.await;
let create_payload = json!({
"handle": handle,
"email": email,

View File

@@ -1,6 +1,7 @@
mod common;
use common::*;
use base64::Engine;
use chrono::Utc;
use reqwest::{self, StatusCode, header};
use serde_json::{Value, json};
@@ -1842,3 +1843,572 @@ async fn test_account_deactivation_lifecycle() {
let (new_post_uri, _) = create_post(&client, &did, &jwt, "Post after reactivation").await;
assert!(!new_post_uri.is_empty(), "Should be able to post after reactivation");
}
#[tokio::test]
async fn test_sync_record_lifecycle() {
let client = client();
let (did, jwt) = setup_new_user("sync-record-lifecycle").await;
let (post_uri, _post_cid) =
create_post(&client, &did, &jwt, "Post for sync record test").await;
let post_rkey = post_uri.split('/').last().unwrap();
let sync_record_res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRecord",
base_url().await
))
.query(&[
("did", did.as_str()),
("collection", "app.bsky.feed.post"),
("rkey", post_rkey),
])
.send()
.await
.expect("Failed to get sync record");
assert_eq!(sync_record_res.status(), StatusCode::OK);
assert_eq!(
sync_record_res
.headers()
.get("content-type")
.and_then(|h| h.to_str().ok()),
Some("application/vnd.ipld.car")
);
let car_bytes = sync_record_res.bytes().await.unwrap();
assert!(!car_bytes.is_empty(), "CAR data should not be empty");
let latest_before = client
.get(format!(
"{}/xrpc/com.atproto.sync.getLatestCommit",
base_url().await
))
.query(&[("did", did.as_str())])
.send()
.await
.expect("Failed to get latest commit");
let latest_before_body: Value = latest_before.json().await.unwrap();
let rev_before = latest_before_body["rev"].as_str().unwrap().to_string();
let (post2_uri, _) = create_post(&client, &did, &jwt, "Second post for sync test").await;
let latest_after = client
.get(format!(
"{}/xrpc/com.atproto.sync.getLatestCommit",
base_url().await
))
.query(&[("did", did.as_str())])
.send()
.await
.expect("Failed to get latest commit after");
let latest_after_body: Value = latest_after.json().await.unwrap();
let rev_after = latest_after_body["rev"].as_str().unwrap().to_string();
assert_ne!(rev_before, rev_after, "Revision should change after new record");
let delete_payload = json!({
"repo": did,
"collection": "app.bsky.feed.post",
"rkey": post_rkey
});
let delete_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.deleteRecord",
base_url().await
))
.bearer_auth(&jwt)
.json(&delete_payload)
.send()
.await
.expect("Failed to delete record");
assert_eq!(delete_res.status(), StatusCode::OK);
let sync_deleted_res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRecord",
base_url().await
))
.query(&[
("did", did.as_str()),
("collection", "app.bsky.feed.post"),
("rkey", post_rkey),
])
.send()
.await
.expect("Failed to check deleted record via sync");
assert_eq!(
sync_deleted_res.status(),
StatusCode::NOT_FOUND,
"Deleted record should return 404 via sync.getRecord"
);
let post2_rkey = post2_uri.split('/').last().unwrap();
let sync_post2_res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRecord",
base_url().await
))
.query(&[
("did", did.as_str()),
("collection", "app.bsky.feed.post"),
("rkey", post2_rkey),
])
.send()
.await
.expect("Failed to get second post via sync");
assert_eq!(
sync_post2_res.status(),
StatusCode::OK,
"Second post should still be accessible"
);
}
#[tokio::test]
async fn test_sync_repo_export_lifecycle() {
let client = client();
let (did, jwt) = setup_new_user("sync-repo-export").await;
let profile_payload = json!({
"repo": did,
"collection": "app.bsky.actor.profile",
"rkey": "self",
"record": {
"$type": "app.bsky.actor.profile",
"displayName": "Sync Export User"
}
});
let profile_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.putRecord",
base_url().await
))
.bearer_auth(&jwt)
.json(&profile_payload)
.send()
.await
.expect("Failed to create profile");
assert_eq!(profile_res.status(), StatusCode::OK);
for i in 0..3 {
tokio::time::sleep(Duration::from_millis(50)).await;
create_post(&client, &did, &jwt, &format!("Export test post {}", i)).await;
}
let blob_data = b"blob data for sync export test";
let upload_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.uploadBlob",
base_url().await
))
.header(header::CONTENT_TYPE, "application/octet-stream")
.bearer_auth(&jwt)
.body(blob_data.to_vec())
.send()
.await
.expect("Failed to upload blob");
assert_eq!(upload_res.status(), StatusCode::OK);
let blob_body: Value = upload_res.json().await.unwrap();
let blob_cid = blob_body["blob"]["ref"]["$link"].as_str().unwrap().to_string();
let repo_status_res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRepoStatus",
base_url().await
))
.query(&[("did", did.as_str())])
.send()
.await
.expect("Failed to get repo status");
assert_eq!(repo_status_res.status(), StatusCode::OK);
let status_body: Value = repo_status_res.json().await.unwrap();
assert_eq!(status_body["did"], did);
assert_eq!(status_body["active"], true);
let get_repo_res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRepo",
base_url().await
))
.query(&[("did", did.as_str())])
.send()
.await
.expect("Failed to get full repo");
assert_eq!(get_repo_res.status(), StatusCode::OK);
assert_eq!(
get_repo_res
.headers()
.get("content-type")
.and_then(|h| h.to_str().ok()),
Some("application/vnd.ipld.car")
);
let repo_car = get_repo_res.bytes().await.unwrap();
assert!(repo_car.len() > 100, "Repo CAR should have substantial data");
let list_blobs_res = client
.get(format!(
"{}/xrpc/com.atproto.sync.listBlobs",
base_url().await
))
.query(&[("did", did.as_str())])
.send()
.await
.expect("Failed to list blobs");
assert_eq!(list_blobs_res.status(), StatusCode::OK);
let blobs_body: Value = list_blobs_res.json().await.unwrap();
let cids = blobs_body["cids"].as_array().unwrap();
assert!(!cids.is_empty(), "Should have at least one blob");
let get_blob_res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getBlob",
base_url().await
))
.query(&[("did", did.as_str()), ("cid", &blob_cid)])
.send()
.await
.expect("Failed to get blob");
assert_eq!(get_blob_res.status(), StatusCode::OK);
let retrieved_blob = get_blob_res.bytes().await.unwrap();
assert_eq!(
retrieved_blob.as_ref(),
blob_data,
"Retrieved blob should match uploaded data"
);
let latest_commit_res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getLatestCommit",
base_url().await
))
.query(&[("did", did.as_str())])
.send()
.await
.expect("Failed to get latest commit");
assert_eq!(latest_commit_res.status(), StatusCode::OK);
let commit_body: Value = latest_commit_res.json().await.unwrap();
let root_cid = commit_body["cid"].as_str().unwrap();
let get_blocks_url = format!(
"{}/xrpc/com.atproto.sync.getBlocks?did={}&cids={}",
base_url().await,
did,
root_cid
);
let get_blocks_res = client
.get(&get_blocks_url)
.send()
.await
.expect("Failed to get blocks");
assert_eq!(get_blocks_res.status(), StatusCode::OK);
assert_eq!(
get_blocks_res
.headers()
.get("content-type")
.and_then(|h| h.to_str().ok()),
Some("application/vnd.ipld.car")
);
}
#[tokio::test]
async fn test_apply_writes_batch_lifecycle() {
let client = client();
let (did, jwt) = setup_new_user("apply-writes-batch").await;
let now = Utc::now().to_rfc3339();
let writes_payload = json!({
"repo": did,
"writes": [
{
"$type": "com.atproto.repo.applyWrites#create",
"collection": "app.bsky.feed.post",
"rkey": "batch-post-1",
"value": {
"$type": "app.bsky.feed.post",
"text": "First batch post",
"createdAt": now
}
},
{
"$type": "com.atproto.repo.applyWrites#create",
"collection": "app.bsky.feed.post",
"rkey": "batch-post-2",
"value": {
"$type": "app.bsky.feed.post",
"text": "Second batch post",
"createdAt": now
}
},
{
"$type": "com.atproto.repo.applyWrites#create",
"collection": "app.bsky.actor.profile",
"rkey": "self",
"value": {
"$type": "app.bsky.actor.profile",
"displayName": "Batch User"
}
}
]
});
let apply_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.applyWrites",
base_url().await
))
.bearer_auth(&jwt)
.json(&writes_payload)
.send()
.await
.expect("Failed to apply writes");
assert_eq!(apply_res.status(), StatusCode::OK);
let get_post1 = client
.get(format!(
"{}/xrpc/com.atproto.repo.getRecord",
base_url().await
))
.query(&[
("repo", did.as_str()),
("collection", "app.bsky.feed.post"),
("rkey", "batch-post-1"),
])
.send()
.await
.expect("Failed to get post 1");
assert_eq!(get_post1.status(), StatusCode::OK);
let post1_body: Value = get_post1.json().await.unwrap();
assert_eq!(post1_body["value"]["text"], "First batch post");
let get_post2 = client
.get(format!(
"{}/xrpc/com.atproto.repo.getRecord",
base_url().await
))
.query(&[
("repo", did.as_str()),
("collection", "app.bsky.feed.post"),
("rkey", "batch-post-2"),
])
.send()
.await
.expect("Failed to get post 2");
assert_eq!(get_post2.status(), StatusCode::OK);
let get_profile = client
.get(format!(
"{}/xrpc/com.atproto.repo.getRecord",
base_url().await
))
.query(&[
("repo", did.as_str()),
("collection", "app.bsky.actor.profile"),
("rkey", "self"),
])
.send()
.await
.expect("Failed to get profile");
assert_eq!(get_profile.status(), StatusCode::OK);
let profile_body: Value = get_profile.json().await.unwrap();
assert_eq!(profile_body["value"]["displayName"], "Batch User");
let update_writes = json!({
"repo": did,
"writes": [
{
"$type": "com.atproto.repo.applyWrites#update",
"collection": "app.bsky.actor.profile",
"rkey": "self",
"value": {
"$type": "app.bsky.actor.profile",
"displayName": "Updated Batch User"
}
},
{
"$type": "com.atproto.repo.applyWrites#delete",
"collection": "app.bsky.feed.post",
"rkey": "batch-post-1"
}
]
});
let update_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.applyWrites",
base_url().await
))
.bearer_auth(&jwt)
.json(&update_writes)
.send()
.await
.expect("Failed to apply update writes");
assert_eq!(update_res.status(), StatusCode::OK);
let get_updated_profile = client
.get(format!(
"{}/xrpc/com.atproto.repo.getRecord",
base_url().await
))
.query(&[
("repo", did.as_str()),
("collection", "app.bsky.actor.profile"),
("rkey", "self"),
])
.send()
.await
.expect("Failed to get updated profile");
let updated_profile: Value = get_updated_profile.json().await.unwrap();
assert_eq!(updated_profile["value"]["displayName"], "Updated Batch User");
let get_deleted_post = client
.get(format!(
"{}/xrpc/com.atproto.repo.getRecord",
base_url().await
))
.query(&[
("repo", did.as_str()),
("collection", "app.bsky.feed.post"),
("rkey", "batch-post-1"),
])
.send()
.await
.expect("Failed to check deleted post");
assert_eq!(
get_deleted_post.status(),
StatusCode::NOT_FOUND,
"Batch-deleted post should be gone"
);
}
#[tokio::test]
async fn test_resolve_handle_lifecycle() {
let client = client();
let ts = Utc::now().timestamp_millis();
let handle = format!("resolve-test-{}.test", ts);
let email = format!("resolve-test-{}@test.com", ts);
let create_res = client
.post(format!(
"{}/xrpc/com.atproto.server.createAccount",
base_url().await
))
.json(&json!({
"handle": handle,
"email": email,
"password": "resolve-test-pw"
}))
.send()
.await
.expect("Failed to create account");
assert_eq!(create_res.status(), StatusCode::OK);
let account: Value = create_res.json().await.unwrap();
let did = account["did"].as_str().unwrap();
let resolve_res = client
.get(format!(
"{}/xrpc/com.atproto.identity.resolveHandle",
base_url().await
))
.query(&[("handle", handle.as_str())])
.send()
.await
.expect("Failed to resolve handle");
assert_eq!(resolve_res.status(), StatusCode::OK);
let resolve_body: Value = resolve_res.json().await.unwrap();
assert_eq!(resolve_body["did"], did);
}
#[tokio::test]
async fn test_service_auth_lifecycle() {
let client = client();
let (did, jwt) = setup_new_user("service-auth-test").await;
let service_auth_res = client
.get(format!(
"{}/xrpc/com.atproto.server.getServiceAuth",
base_url().await
))
.query(&[
("aud", "did:web:api.bsky.app"),
("lxm", "com.atproto.repo.uploadBlob"),
])
.bearer_auth(&jwt)
.send()
.await
.expect("Failed to get service auth");
assert_eq!(service_auth_res.status(), StatusCode::OK);
let auth_body: Value = service_auth_res.json().await.unwrap();
let service_token = auth_body["token"].as_str().expect("No token in response");
let parts: Vec<&str> = service_token.split('.').collect();
assert_eq!(parts.len(), 3, "Service token should be a valid JWT");
let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(parts[1])
.expect("Failed to decode JWT payload");
let claims: Value = serde_json::from_slice(&payload_bytes).expect("Invalid JWT payload");
assert_eq!(claims["iss"], did);
assert_eq!(claims["aud"], "did:web:api.bsky.app");
assert_eq!(claims["lxm"], "com.atproto.repo.uploadBlob");
}
#[tokio::test]
async fn test_moderation_report_lifecycle() {
let client = client();
let (alice_did, alice_jwt) = setup_new_user("alice-report").await;
let (bob_did, bob_jwt) = setup_new_user("bob-report").await;
let (post_uri, post_cid) =
create_post(&client, &bob_did, &bob_jwt, "This is a reportable post").await;
let report_payload = json!({
"reasonType": "com.atproto.moderation.defs#reasonSpam",
"reason": "This looks like spam to me",
"subject": {
"$type": "com.atproto.repo.strongRef",
"uri": post_uri,
"cid": post_cid
}
});
let report_res = client
.post(format!(
"{}/xrpc/com.atproto.moderation.createReport",
base_url().await
))
.bearer_auth(&alice_jwt)
.json(&report_payload)
.send()
.await
.expect("Failed to create report");
assert_eq!(report_res.status(), StatusCode::OK);
let report_body: Value = report_res.json().await.unwrap();
assert!(report_body["id"].is_number(), "Report should have an ID");
assert_eq!(report_body["reasonType"], "com.atproto.moderation.defs#reasonSpam");
assert_eq!(report_body["reportedBy"], alice_did);
let account_report_payload = json!({
"reasonType": "com.atproto.moderation.defs#reasonOther",
"reason": "Suspicious account activity",
"subject": {
"$type": "com.atproto.admin.defs#repoRef",
"did": bob_did
}
});
let account_report_res = client
.post(format!(
"{}/xrpc/com.atproto.moderation.createReport",
base_url().await
))
.bearer_auth(&alice_jwt)
.json(&account_report_payload)
.send()
.await
.expect("Failed to create account report");
assert_eq!(account_report_res.status(), StatusCode::OK);
}

View File

@@ -98,3 +98,69 @@ async fn test_proxy_auth_signing() {
assert_eq!(claims["aud"], upstream_url);
assert_eq!(claims["lxm"], "com.example.signed");
}
#[tokio::test]
async fn test_proxy_post_with_body() {
let app_url = common::base_url().await;
let (upstream_url, mut rx) = spawn_mock_upstream().await;
let client = Client::new();
let payload = serde_json::json!({
"text": "Hello from proxy test",
"createdAt": "2024-01-01T00:00:00Z"
});
let res = client
.post(format!("{}/xrpc/com.example.postMethod", app_url))
.header("atproto-proxy", &upstream_url)
.header("Authorization", "Bearer test-token")
.json(&payload)
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let (method, uri, auth) = rx.recv().await.expect("Upstream should receive request");
assert_eq!(method, "POST");
assert_eq!(uri, "/xrpc/com.example.postMethod");
assert_eq!(auth, Some("Bearer test-token".to_string()));
}
#[tokio::test]
async fn test_proxy_with_query_params() {
let app_url = common::base_url().await;
let (upstream_url, mut rx) = spawn_mock_upstream().await;
let client = Client::new();
let res = client
.get(format!(
"{}/xrpc/com.example.query?repo=did:plc:test&collection=app.bsky.feed.post&limit=50",
app_url
))
.header("atproto-proxy", &upstream_url)
.header("Authorization", "Bearer test-token")
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
let (method, uri, _auth) = rx.recv().await.expect("Upstream should receive request");
assert_eq!(method, "GET");
assert!(
uri.contains("repo=did") || uri.contains("repo=did%3Aplc%3Atest"),
"URI should contain repo param, got: {}",
uri
);
assert!(
uri.contains("collection=app.bsky.feed.post") || uri.contains("collection=app.bsky"),
"URI should contain collection param, got: {}",
uri
);
assert!(
uri.contains("limit=50"),
"URI should contain limit param, got: {}",
uri
);
}

View File

@@ -3,6 +3,7 @@ use common::*;
use reqwest::StatusCode;
use reqwest::header;
use serde_json::Value;
use chrono;
#[tokio::test]
async fn test_get_latest_commit_success() {
@@ -352,3 +353,205 @@ async fn test_request_crawl() {
assert_eq!(res.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_get_repo_success() {
let client = client();
let (access_jwt, did) = create_account_and_login(&client).await;
let post_payload = serde_json::json!({
"repo": did,
"collection": "app.bsky.feed.post",
"record": {
"$type": "app.bsky.feed.post",
"text": "Test post for getRepo",
"createdAt": chrono::Utc::now().to_rfc3339()
}
});
let _ = client
.post(format!(
"{}/xrpc/com.atproto.repo.createRecord",
base_url().await
))
.bearer_auth(&access_jwt)
.json(&post_payload)
.send()
.await
.expect("Failed to create record");
let params = [("did", did.as_str())];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRepo",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()
.get("content-type")
.and_then(|h| h.to_str().ok()),
Some("application/vnd.ipld.car")
);
let body = res.bytes().await.expect("Failed to get body");
assert!(!body.is_empty());
}
#[tokio::test]
async fn test_get_repo_not_found() {
let client = client();
let params = [("did", "did:plc:nonexistent12345")];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRepo",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::NOT_FOUND);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert_eq!(body["error"], "RepoNotFound");
}
#[tokio::test]
async fn test_get_record_sync_success() {
let client = client();
let (access_jwt, did) = create_account_and_login(&client).await;
let post_payload = serde_json::json!({
"repo": did,
"collection": "app.bsky.feed.post",
"record": {
"$type": "app.bsky.feed.post",
"text": "Test post for sync getRecord",
"createdAt": chrono::Utc::now().to_rfc3339()
}
});
let create_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.createRecord",
base_url().await
))
.bearer_auth(&access_jwt)
.json(&post_payload)
.send()
.await
.expect("Failed to create record");
let create_body: Value = create_res.json().await.expect("Invalid JSON");
let uri = create_body["uri"].as_str().expect("No URI");
let rkey = uri.split('/').last().expect("Invalid URI");
let params = [
("did", did.as_str()),
("collection", "app.bsky.feed.post"),
("rkey", rkey),
];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRecord",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()
.get("content-type")
.and_then(|h| h.to_str().ok()),
Some("application/vnd.ipld.car")
);
let body = res.bytes().await.expect("Failed to get body");
assert!(!body.is_empty());
}
#[tokio::test]
async fn test_get_record_sync_not_found() {
let client = client();
let (_, did) = create_account_and_login(&client).await;
let params = [
("did", did.as_str()),
("collection", "app.bsky.feed.post"),
("rkey", "nonexistent12345"),
];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRecord",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::NOT_FOUND);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert_eq!(body["error"], "RecordNotFound");
}
#[tokio::test]
async fn test_get_blocks_success() {
let client = client();
let (_, did) = create_account_and_login(&client).await;
let params = [("did", did.as_str())];
let latest_res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getLatestCommit",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to get latest commit");
let latest_body: Value = latest_res.json().await.expect("Invalid JSON");
let root_cid = latest_body["cid"].as_str().expect("No CID");
let url = format!(
"{}/xrpc/com.atproto.sync.getBlocks?did={}&cids={}",
base_url().await,
did,
root_cid
);
let res = client
.get(&url)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()
.get("content-type")
.and_then(|h| h.to_str().ok()),
Some("application/vnd.ipld.car")
);
}
#[tokio::test]
async fn test_get_blocks_not_found() {
let client = client();
let url = format!(
"{}/xrpc/com.atproto.sync.getBlocks?did=did:plc:nonexistent12345&cids=bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku",
base_url().await
);
let res = client
.get(&url)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::NOT_FOUND);
}