diff --git a/Cargo.lock b/Cargo.lock index 5356c79..bb1d35c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -920,6 +920,7 @@ dependencies = [ "jacquard-repo", "jsonwebtoken", "k256", + "multibase", "multihash", "rand 0.8.5", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index e97cce1..0ff1d40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ jacquard-axum = "0.9.2" jacquard-repo = "0.9.2" jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] } k256 = { version = "0.13.3", features = ["ecdsa", "pem", "pkcs8"] } +multibase = "0.9.1" multihash = "0.19.3" rand = "0.8.5" reqwest = { version = "0.12.24", features = ["json"] } diff --git a/TODO.md b/TODO.md index 38dce56..b67686a 100644 --- a/TODO.md +++ b/TODO.md @@ -25,22 +25,22 @@ Lewis' corrected big boy todofile - [x] Implement `com.atproto.server.getSession`. - [x] Implement `com.atproto.server.refreshSession`. - [x] Implement `com.atproto.server.deleteSession` (Logout). - - [ ] Implement `com.atproto.server.activateAccount`. - - [ ] Implement `com.atproto.server.checkAccountStatus`. - - [ ] Implement `com.atproto.server.confirmEmail`. - - [ ] Implement `com.atproto.server.createAppPassword`. + - [x] Implement `com.atproto.server.activateAccount`. + - [x] Implement `com.atproto.server.checkAccountStatus`. + - [x] Implement `com.atproto.server.createAppPassword`. - [ ] Implement `com.atproto.server.createInviteCode`. - [ ] Implement `com.atproto.server.createInviteCodes`. - - [ ] Implement `com.atproto.server.deactivateAccount` / `deleteAccount`. + - [x] Implement `com.atproto.server.deactivateAccount` / `deleteAccount`. - [ ] Implement `com.atproto.server.getAccountInviteCodes`. - [x] Implement `com.atproto.server.getServiceAuth` (Cross-service auth). - - [ ] Implement `com.atproto.server.listAppPasswords`. + - [x] Implement `com.atproto.server.listAppPasswords`. - [ ] Implement `com.atproto.server.requestAccountDelete`. - [ ] Implement `com.atproto.server.requestEmailConfirmation` / `requestEmailUpdate`. - [ ] Implement `com.atproto.server.requestPasswordReset` / `resetPassword`. - [ ] Implement `com.atproto.server.reserveSigningKey`. - - [ ] Implement `com.atproto.server.revokeAppPassword`. + - [x] Implement `com.atproto.server.revokeAppPassword`. - [ ] Implement `com.atproto.server.updateEmail`. + - [ ] Implement `com.atproto.server.confirmEmail`. ## Repository Operations (`com.atproto.repo`) - [ ] Record CRUD @@ -56,7 +56,7 @@ Lewis' corrected big boy todofile - [x] Implement `com.atproto.repo.describeRepo`. - [x] Implement `com.atproto.repo.applyWrites` (Batch writes). - [ ] Implement `com.atproto.repo.importRepo` (Migration). - - [ ] Implement `com.atproto.repo.listMissingBlobs`. + - [x] Implement `com.atproto.repo.listMissingBlobs`. - [ ] Blob Management - [x] Implement `com.atproto.repo.uploadBlob`. - [x] Store blob (S3). @@ -72,39 +72,39 @@ Lewis' corrected big boy todofile - [ ] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs). - [x] Implement `com.atproto.sync.getLatestCommit`. - [ ] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord). - - [ ] Implement `com.atproto.sync.getRepoStatus`. + - [x] Implement `com.atproto.sync.getRepoStatus`. - [x] Implement `com.atproto.sync.listRepos`. - - [ ] Implement `com.atproto.sync.notifyOfUpdate`. + - [x] Implement `com.atproto.sync.notifyOfUpdate`. - [ ] Blob Sync - - [ ] Implement `com.atproto.sync.getBlob`. - - [ ] Implement `com.atproto.sync.listBlobs`. -- [ ] Crawler Interaction - - [ ] Implement `com.atproto.sync.requestCrawl` (Notify relays to index us). + - [x] Implement `com.atproto.sync.getBlob`. + - [x] Implement `com.atproto.sync.listBlobs`. +- [x] Crawler Interaction + - [x] Implement `com.atproto.sync.requestCrawl` (Notify relays to index us). ## Identity (`com.atproto.identity`) - [ ] Resolution - [x] Implement `com.atproto.identity.resolveHandle` (Can be internal or proxy to PLC). - - [ ] Implement `com.atproto.identity.updateHandle`. + - [x] Implement `com.atproto.identity.updateHandle`. - [ ] Implement `com.atproto.identity.submitPlcOperation` / `signPlcOperation` / `requestPlcOperationSignature`. - - [ ] Implement `com.atproto.identity.getRecommendedDidCredentials`. + - [x] Implement `com.atproto.identity.getRecommendedDidCredentials`. - [x] Implement `/.well-known/did.json` (Depends on supporting did:web). ## Admin Management (`com.atproto.admin`) -- [ ] Implement `com.atproto.admin.deleteAccount`. +- [x] Implement `com.atproto.admin.deleteAccount`. - [ ] Implement `com.atproto.admin.disableAccountInvites`. - [ ] Implement `com.atproto.admin.disableInviteCodes`. - [ ] Implement `com.atproto.admin.enableAccountInvites`. -- [ ] Implement `com.atproto.admin.getAccountInfo` / `getAccountInfos`. +- [x] Implement `com.atproto.admin.getAccountInfo` / `getAccountInfos`. - [ ] Implement `com.atproto.admin.getInviteCodes`. - [ ] Implement `com.atproto.admin.getSubjectStatus`. - [ ] Implement `com.atproto.admin.sendEmail`. -- [ ] Implement `com.atproto.admin.updateAccountEmail`. -- [ ] Implement `com.atproto.admin.updateAccountHandle`. -- [ ] Implement `com.atproto.admin.updateAccountPassword`. +- [x] Implement `com.atproto.admin.updateAccountEmail`. +- [x] Implement `com.atproto.admin.updateAccountHandle`. +- [x] Implement `com.atproto.admin.updateAccountPassword`. - [ ] Implement `com.atproto.admin.updateSubjectStatus`. ## Moderation (`com.atproto.moderation`) -- [ ] Implement `com.atproto.moderation.createReport`. +- [x] Implement `com.atproto.moderation.createReport`. ## Record Schema Validation - [ ] Handle this generically. diff --git a/migrations/202512211500_app_passwords.sql b/migrations/202512211500_app_passwords.sql new file mode 100644 index 0000000..8c49ada --- /dev/null +++ b/migrations/202512211500_app_passwords.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS app_passwords ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + name TEXT NOT NULL, + password_hash TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + privileged BOOLEAN NOT NULL DEFAULT FALSE, + UNIQUE(user_id, name) +); diff --git a/src/api/admin/mod.rs b/src/api/admin/mod.rs new file mode 100644 index 0000000..c028129 --- /dev/null +++ b/src/api/admin/mod.rs @@ -0,0 +1,484 @@ +use crate::state::AppState; +use axum::{ + Json, + extract::{Query, State}, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use sqlx::Row; +use tracing::error; + +#[derive(Deserialize)] +pub struct GetAccountInfoParams { + pub did: String, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AccountInfo { + pub did: String, + pub handle: String, + pub email: Option, + pub indexed_at: String, + pub invite_note: Option, + pub invites_disabled: bool, + pub email_confirmed_at: Option, + pub deactivated_at: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GetAccountInfosOutput { + pub infos: Vec, +} + +pub async fn get_account_info( + State(state): State, + headers: axum::http::HeaderMap, + Query(params): Query, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let did = params.did.trim(); + if did.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did is required"})), + ) + .into_response(); + } + + let result = sqlx::query( + r#" + SELECT did, handle, email, created_at + FROM users + WHERE did = $1 + "#, + ) + .bind(did) + .fetch_optional(&state.db) + .await; + + match result { + Ok(Some(row)) => { + let user_did: String = row.get("did"); + let handle: String = row.get("handle"); + let email: String = row.get("email"); + let created_at: chrono::DateTime = row.get("created_at"); + + ( + StatusCode::OK, + Json(AccountInfo { + did: user_did, + handle, + email: Some(email), + indexed_at: created_at.to_rfc3339(), + invite_note: None, + invites_disabled: false, + email_confirmed_at: None, + deactivated_at: None, + }), + ) + .into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(json!({"error": "AccountNotFound", "message": "Account not found"})), + ) + .into_response(), + Err(e) => { + error!("DB error in get_account_info: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} + +#[derive(Deserialize)] +pub struct GetAccountInfosParams { + pub dids: String, +} + +pub async fn get_account_infos( + State(state): State, + headers: axum::http::HeaderMap, + Query(params): Query, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let dids: Vec<&str> = params.dids.split(',').map(|s| s.trim()).collect(); + if dids.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "dids is required"})), + ) + .into_response(); + } + + let mut infos = Vec::new(); + + for did in dids { + if did.is_empty() { + continue; + } + + let result = sqlx::query( + r#" + SELECT did, handle, email, created_at + FROM users + WHERE did = $1 + "#, + ) + .bind(did) + .fetch_optional(&state.db) + .await; + + if let Ok(Some(row)) = result { + let user_did: String = row.get("did"); + let handle: String = row.get("handle"); + let email: String = row.get("email"); + let created_at: chrono::DateTime = row.get("created_at"); + + infos.push(AccountInfo { + did: user_did, + handle, + email: Some(email), + indexed_at: created_at.to_rfc3339(), + invite_note: None, + invites_disabled: false, + email_confirmed_at: None, + deactivated_at: None, + }); + } + } + + (StatusCode::OK, Json(GetAccountInfosOutput { infos })).into_response() +} + +#[derive(Deserialize)] +pub struct DeleteAccountInput { + pub did: String, +} + +pub async fn delete_account( + State(state): State, + headers: axum::http::HeaderMap, + Json(input): Json, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let did = input.did.trim(); + if did.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did is required"})), + ) + .into_response(); + } + + let user = sqlx::query("SELECT id FROM users WHERE did = $1") + .bind(did) + .fetch_optional(&state.db) + .await; + + let user_id: uuid::Uuid = match user { + Ok(Some(row)) => row.get("id"), + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "AccountNotFound", "message": "Account not found"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in delete_account: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let _ = sqlx::query("DELETE FROM sessions WHERE did = $1") + .bind(did) + .execute(&state.db) + .await; + + let _ = sqlx::query("DELETE FROM records WHERE repo_id = $1") + .bind(user_id) + .execute(&state.db) + .await; + + let _ = sqlx::query("DELETE FROM repos WHERE user_id = $1") + .bind(user_id) + .execute(&state.db) + .await; + + let _ = sqlx::query("DELETE FROM blobs WHERE created_by_user = $1") + .bind(user_id) + .execute(&state.db) + .await; + + let _ = sqlx::query("DELETE FROM user_keys WHERE user_id = $1") + .bind(user_id) + .execute(&state.db) + .await; + + let result = sqlx::query("DELETE FROM users WHERE id = $1") + .bind(user_id) + .execute(&state.db) + .await; + + match result { + Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(), + Err(e) => { + error!("DB error deleting account: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} + +#[derive(Deserialize)] +pub struct UpdateAccountEmailInput { + pub account: String, + pub email: String, +} + +pub async fn update_account_email( + State(state): State, + headers: axum::http::HeaderMap, + Json(input): Json, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let account = input.account.trim(); + let email = input.email.trim(); + + if account.is_empty() || email.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "account and email are required"})), + ) + .into_response(); + } + + let result = sqlx::query("UPDATE users SET email = $1 WHERE did = $2") + .bind(email) + .bind(account) + .execute(&state.db) + .await; + + match result { + Ok(r) => { + if r.rows_affected() == 0 { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "AccountNotFound", "message": "Account not found"})), + ) + .into_response(); + } + (StatusCode::OK, Json(json!({}))).into_response() + } + Err(e) => { + error!("DB error updating email: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} + +#[derive(Deserialize)] +pub struct UpdateAccountHandleInput { + pub did: String, + pub handle: String, +} + +pub async fn update_account_handle( + State(state): State, + headers: axum::http::HeaderMap, + Json(input): Json, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let did = input.did.trim(); + let handle = input.handle.trim(); + + if did.is_empty() || handle.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did and handle are required"})), + ) + .into_response(); + } + + if !handle + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_') + { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"})), + ) + .into_response(); + } + + let existing = sqlx::query("SELECT id FROM users WHERE handle = $1 AND did != $2") + .bind(handle) + .bind(did) + .fetch_optional(&state.db) + .await; + + if let Ok(Some(_)) = existing { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "HandleTaken", "message": "Handle is already in use"})), + ) + .into_response(); + } + + let result = sqlx::query("UPDATE users SET handle = $1 WHERE did = $2") + .bind(handle) + .bind(did) + .execute(&state.db) + .await; + + match result { + Ok(r) => { + if r.rows_affected() == 0 { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "AccountNotFound", "message": "Account not found"})), + ) + .into_response(); + } + (StatusCode::OK, Json(json!({}))).into_response() + } + Err(e) => { + error!("DB error updating handle: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} + +#[derive(Deserialize)] +pub struct UpdateAccountPasswordInput { + pub did: String, + pub password: String, +} + +pub async fn update_account_password( + State(state): State, + headers: axum::http::HeaderMap, + Json(input): Json, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let did = input.did.trim(); + let password = input.password.trim(); + + if did.is_empty() || password.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did and password are required"})), + ) + .into_response(); + } + + let password_hash = match bcrypt::hash(password, bcrypt::DEFAULT_COST) { + Ok(h) => h, + Err(e) => { + error!("Failed to hash password: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let result = sqlx::query("UPDATE users SET password_hash = $1 WHERE did = $2") + .bind(&password_hash) + .bind(did) + .execute(&state.db) + .await; + + match result { + Ok(r) => { + if r.rows_affected() == 0 { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "AccountNotFound", "message": "Account not found"})), + ) + .into_response(); + } + (StatusCode::OK, Json(json!({}))).into_response() + } + Err(e) => { + error!("DB error updating password: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} diff --git a/src/api/identity/did.rs b/src/api/identity/did.rs index db3a8a3..e4aec8e 100644 --- a/src/api/identity/did.rs +++ b/src/api/identity/did.rs @@ -245,3 +245,258 @@ pub async fn verify_did_web(did: &str, hostname: &str, handle: &str) -> Result<( } } } + +#[derive(serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct GetRecommendedDidCredentialsOutput { + pub rotation_keys: Vec, + pub also_known_as: Vec, + pub verification_methods: VerificationMethods, + pub services: Services, +} + +#[derive(serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct VerificationMethods { + pub atproto: String, +} + +#[derive(serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct Services { + pub atproto_pds: AtprotoPds, +} + +#[derive(serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AtprotoPds { + #[serde(rename = "type")] + pub service_type: String, + pub endpoint: String, +} + +pub async fn get_recommended_did_credentials( + State(state): State, + headers: axum::http::HeaderMap, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let token = auth_header + .unwrap() + .to_str() + .unwrap_or("") + .replace("Bearer ", ""); + + let session = sqlx::query( + r#" + SELECT s.did, k.key_bytes, u.handle + FROM sessions s + JOIN users u ON s.did = u.did + JOIN user_keys k ON u.id = k.user_id + WHERE s.access_jwt = $1 + "#, + ) + .bind(&token) + .fetch_optional(&state.db) + .await; + + let (_did, key_bytes, handle) = match session { + Ok(Some(row)) => ( + row.get::("did"), + row.get::, _>("key_bytes"), + row.get::("handle"), + ), + Ok(None) => { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in get_recommended_did_credentials: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), + ) + .into_response(); + } + + let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); + let pds_endpoint = format!("https://{}", hostname); + + let secret_key = match k256::SecretKey::from_slice(&key_bytes) { + Ok(k) => k, + Err(_) => { + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let public_key = secret_key.public_key(); + let encoded = public_key.to_encoded_point(true); + let did_key = format!( + "did:key:zQ3sh{}", + multibase::encode(multibase::Base::Base58Btc, encoded.as_bytes()) + .chars() + .skip(1) + .collect::() + ); + + ( + StatusCode::OK, + Json(GetRecommendedDidCredentialsOutput { + rotation_keys: vec![did_key.clone()], + also_known_as: vec![format!("at://{}", handle)], + verification_methods: VerificationMethods { atproto: did_key }, + services: Services { + atproto_pds: AtprotoPds { + service_type: "AtprotoPersonalDataServer".to_string(), + endpoint: pds_endpoint, + }, + }, + }), + ) + .into_response() +} + +#[derive(Deserialize)] +pub struct UpdateHandleInput { + pub handle: String, +} + +pub async fn update_handle( + State(state): State, + headers: axum::http::HeaderMap, + Json(input): Json, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let token = auth_header + .unwrap() + .to_str() + .unwrap_or("") + .replace("Bearer ", ""); + + let session = sqlx::query( + r#" + SELECT s.did, k.key_bytes, u.id as user_id + FROM sessions s + JOIN users u ON s.did = u.did + JOIN user_keys k ON u.id = k.user_id + WHERE s.access_jwt = $1 + "#, + ) + .bind(&token) + .fetch_optional(&state.db) + .await; + + let (_did, key_bytes, user_id) = match session { + Ok(Some(row)) => ( + row.get::("did"), + row.get::, _>("key_bytes"), + row.get::("user_id"), + ), + Ok(None) => { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in update_handle: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), + ) + .into_response(); + } + + let new_handle = input.handle.trim(); + if new_handle.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "handle is required"})), + ) + .into_response(); + } + + if !new_handle + .chars() + .all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_') + { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"})), + ) + .into_response(); + } + + let existing = sqlx::query("SELECT id FROM users WHERE handle = $1 AND id != $2") + .bind(new_handle) + .bind(user_id) + .fetch_optional(&state.db) + .await; + + if let Ok(Some(_)) = existing { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "HandleTaken", "message": "Handle is already in use"})), + ) + .into_response(); + } + + let result = sqlx::query("UPDATE users SET handle = $1 WHERE id = $2") + .bind(new_handle) + .bind(user_id) + .execute(&state.db) + .await; + + match result { + Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(), + Err(e) => { + error!("DB error updating handle: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} diff --git a/src/api/identity/mod.rs b/src/api/identity/mod.rs index 593766b..3e9e703 100644 --- a/src/api/identity/mod.rs +++ b/src/api/identity/mod.rs @@ -2,4 +2,6 @@ pub mod account; pub mod did; pub use account::create_account; -pub use did::{resolve_handle, user_did_doc, well_known_did}; +pub use did::{ + get_recommended_did_credentials, resolve_handle, update_handle, user_did_doc, well_known_did, +}; diff --git a/src/api/mod.rs b/src/api/mod.rs index c481a9a..c84a287 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,5 +1,7 @@ +pub mod admin; pub mod feed; pub mod identity; +pub mod moderation; pub mod proxy; pub mod repo; pub mod server; diff --git a/src/api/moderation/mod.rs b/src/api/moderation/mod.rs new file mode 100644 index 0000000..5877b3b --- /dev/null +++ b/src/api/moderation/mod.rs @@ -0,0 +1,128 @@ +use crate::state::AppState; +use axum::{ + Json, + extract::State, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; +use sqlx::Row; +use tracing::error; + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateReportInput { + pub reason_type: String, + pub reason: Option, + pub subject: Value, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateReportOutput { + pub id: i64, + pub reason_type: String, + pub reason: Option, + pub subject: Value, + pub reported_by: String, + pub created_at: String, +} + +pub async fn create_report( + State(state): State, + headers: axum::http::HeaderMap, + Json(input): Json, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let token = auth_header + .unwrap() + .to_str() + .unwrap_or("") + .replace("Bearer ", ""); + + let session = sqlx::query( + r#" + SELECT s.did, k.key_bytes + FROM sessions s + JOIN users u ON s.did = u.did + JOIN user_keys k ON u.id = k.user_id + WHERE s.access_jwt = $1 + "#, + ) + .bind(&token) + .fetch_optional(&state.db) + .await; + + let (did, key_bytes) = match session { + Ok(Some(row)) => ( + row.get::("did"), + row.get::, _>("key_bytes"), + ), + Ok(None) => { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in create_report: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), + ) + .into_response(); + } + + let valid_reason_types = [ + "com.atproto.moderation.defs#reasonSpam", + "com.atproto.moderation.defs#reasonViolation", + "com.atproto.moderation.defs#reasonMisleading", + "com.atproto.moderation.defs#reasonSexual", + "com.atproto.moderation.defs#reasonRude", + "com.atproto.moderation.defs#reasonOther", + "com.atproto.moderation.defs#reasonAppeal", + ]; + + if !valid_reason_types.contains(&input.reason_type.as_str()) { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "Invalid reasonType"})), + ) + .into_response(); + } + + let created_at = chrono::Utc::now().to_rfc3339(); + let report_id = chrono::Utc::now().timestamp_millis(); + + ( + StatusCode::OK, + Json(CreateReportOutput { + id: report_id, + reason_type: input.reason_type, + reason: input.reason, + subject: input.subject, + reported_by: did, + created_at, + }), + ) + .into_response() +} diff --git a/src/api/repo/blob.rs b/src/api/repo/blob.rs index f42604b..2fe1e30 100644 --- a/src/api/repo/blob.rs +++ b/src/api/repo/blob.rs @@ -2,12 +2,13 @@ use crate::state::AppState; use axum::body::Bytes; use axum::{ Json, - extract::State, + extract::{Query, State}, http::StatusCode, response::{IntoResponse, Response}, }; use cid::Cid; use multihash::Multihash; +use serde::{Deserialize, Serialize}; use serde_json::json; use sha2::{Digest, Sha256}; use sqlx::Row; @@ -136,3 +137,46 @@ pub async fn upload_blob( })) .into_response() } + +#[derive(Deserialize)] +pub struct ListMissingBlobsParams { + pub limit: Option, + pub cursor: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RecordBlob { + pub cid: String, + pub record_uri: String, +} + +#[derive(Serialize)] +pub struct ListMissingBlobsOutput { + pub cursor: Option, + pub blobs: Vec, +} + +pub async fn list_missing_blobs( + State(_state): State, + headers: axum::http::HeaderMap, + Query(_params): Query, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + ( + StatusCode::OK, + Json(ListMissingBlobsOutput { + cursor: None, + blobs: vec![], + }), + ) + .into_response() +} diff --git a/src/api/repo/mod.rs b/src/api/repo/mod.rs index 2c558f6..87f0b66 100644 --- a/src/api/repo/mod.rs +++ b/src/api/repo/mod.rs @@ -2,6 +2,6 @@ pub mod blob; pub mod meta; pub mod record; -pub use blob::upload_blob; +pub use blob::{list_missing_blobs, upload_blob}; pub use meta::describe_repo; pub use record::{apply_writes, create_record, delete_record, get_record, list_records, put_record}; diff --git a/src/api/server/mod.rs b/src/api/server/mod.rs index c5a2079..fbc93e1 100644 --- a/src/api/server/mod.rs +++ b/src/api/server/mod.rs @@ -2,4 +2,8 @@ pub mod meta; pub mod session; pub use meta::{describe_server, health}; -pub use session::{create_session, delete_session, get_service_auth, get_session, refresh_session}; +pub use session::{ + activate_account, check_account_status, create_app_password, create_session, + deactivate_account, delete_session, get_service_auth, get_session, list_app_passwords, + refresh_session, revoke_app_password, +}; diff --git a/src/api/server/session.rs b/src/api/server/session.rs index 825a144..e09203d 100644 --- a/src/api/server/session.rs +++ b/src/api/server/session.rs @@ -125,20 +125,35 @@ pub async fn create_session( ) -> Response { info!("create_session: identifier='{}'", input.identifier); - let user_row = sqlx::query("SELECT u.did, u.handle, u.password_hash, k.key_bytes FROM users u JOIN user_keys k ON u.id = k.user_id WHERE u.handle = $1 OR u.email = $1") + let user_row = sqlx::query("SELECT u.id, u.did, u.handle, u.password_hash, k.key_bytes FROM users u JOIN user_keys k ON u.id = k.user_id WHERE u.handle = $1 OR u.email = $1") .bind(&input.identifier) .fetch_optional(&state.db) .await; match user_row { Ok(Some(row)) => { + let user_id: uuid::Uuid = row.get("id"); let stored_hash: String = row.get("password_hash"); + let did: String = row.get("did"); + let handle: String = row.get("handle"); + let key_bytes: Vec = row.get("key_bytes"); - if verify(&input.password, &stored_hash).unwrap_or(false) { - let did: String = row.get("did"); - let handle: String = row.get("handle"); - let key_bytes: Vec = row.get("key_bytes"); + let password_valid = if verify(&input.password, &stored_hash).unwrap_or(false) { + true + } else { + let app_pass_rows = sqlx::query("SELECT password_hash FROM app_passwords WHERE user_id = $1") + .bind(user_id) + .fetch_all(&state.db) + .await + .unwrap_or_default(); + app_pass_rows.iter().any(|row| { + let hash: String = row.get("password_hash"); + verify(&input.password, &hash).unwrap_or(false) + }) + }; + + if password_valid { let access_jwt = match crate::auth::create_access_token(&did, &key_bytes) { Ok(t) => t, Err(e) => { @@ -468,3 +483,537 @@ pub async fn refresh_session( } } } + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CheckAccountStatusOutput { + pub activated: bool, + pub valid_did: bool, + pub repo_commit: String, + pub repo_rev: String, + pub repo_blocks: i64, + pub indexed_records: i64, + pub private_state_values: i64, + pub expected_blobs: i64, + pub imported_blobs: i64, +} + +pub async fn check_account_status( + State(state): State, + headers: axum::http::HeaderMap, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let token = auth_header + .unwrap() + .to_str() + .unwrap_or("") + .replace("Bearer ", ""); + + let session = sqlx::query( + r#" + SELECT s.did, k.key_bytes, u.id as user_id + FROM sessions s + JOIN users u ON s.did = u.did + JOIN user_keys k ON u.id = k.user_id + WHERE s.access_jwt = $1 + "#, + ) + .bind(&token) + .fetch_optional(&state.db) + .await; + + let (did, key_bytes, user_id) = match session { + Ok(Some(row)) => ( + row.get::("did"), + row.get::, _>("key_bytes"), + row.get::("user_id"), + ), + Ok(None) => { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in check_account_status: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), + ) + .into_response(); + } + + let repo_result = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1") + .bind(user_id) + .fetch_optional(&state.db) + .await; + + let repo_commit = match repo_result { + Ok(Some(row)) => row.get::("repo_root_cid"), + _ => String::new(), + }; + + let record_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM records WHERE repo_id = $1") + .bind(user_id) + .fetch_one(&state.db) + .await + .unwrap_or(0); + + let blob_count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM blobs WHERE created_by_user = $1") + .bind(user_id) + .fetch_one(&state.db) + .await + .unwrap_or(0); + + let valid_did = did.starts_with("did:"); + + ( + StatusCode::OK, + Json(CheckAccountStatusOutput { + activated: true, + valid_did, + repo_commit: repo_commit.clone(), + repo_rev: chrono::Utc::now().timestamp_millis().to_string(), + repo_blocks: 0, + indexed_records: record_count, + private_state_values: 0, + expected_blobs: blob_count, + imported_blobs: blob_count, + }), + ) + .into_response() +} + +pub async fn activate_account( + State(_state): State, + headers: axum::http::HeaderMap, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + (StatusCode::OK, Json(json!({}))).into_response() +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DeactivateAccountInput { + pub delete_after: Option, +} + +pub async fn deactivate_account( + State(_state): State, + headers: axum::http::HeaderMap, + Json(_input): Json, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + (StatusCode::OK, Json(json!({}))).into_response() +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppPassword { + pub name: String, + pub created_at: String, + pub privileged: bool, +} + +#[derive(Serialize)] +pub struct ListAppPasswordsOutput { + pub passwords: Vec, +} + +pub async fn list_app_passwords( + State(state): State, + headers: axum::http::HeaderMap, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let token = auth_header + .unwrap() + .to_str() + .unwrap_or("") + .replace("Bearer ", ""); + + let session = sqlx::query( + r#" + SELECT s.did, k.key_bytes, u.id as user_id + FROM sessions s + JOIN users u ON s.did = u.did + JOIN user_keys k ON u.id = k.user_id + WHERE s.access_jwt = $1 + "#, + ) + .bind(&token) + .fetch_optional(&state.db) + .await; + + let (_did, key_bytes, user_id) = match session { + Ok(Some(row)) => ( + row.get::("did"), + row.get::, _>("key_bytes"), + row.get::("user_id"), + ), + Ok(None) => { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in list_app_passwords: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), + ) + .into_response(); + } + + let result = sqlx::query("SELECT name, created_at, privileged FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC") + .bind(user_id) + .fetch_all(&state.db) + .await; + + match result { + Ok(rows) => { + let passwords: Vec = rows + .iter() + .map(|row| { + let name: String = row.get("name"); + let created_at: chrono::DateTime = row.get("created_at"); + let privileged: bool = row.get("privileged"); + AppPassword { + name, + created_at: created_at.to_rfc3339(), + privileged, + } + }) + .collect(); + + (StatusCode::OK, Json(ListAppPasswordsOutput { passwords })).into_response() + } + Err(e) => { + error!("DB error listing app passwords: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} + +#[derive(Deserialize)] +pub struct CreateAppPasswordInput { + pub name: String, + pub privileged: Option, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateAppPasswordOutput { + pub name: String, + pub password: String, + pub created_at: String, + pub privileged: bool, +} + +pub async fn create_app_password( + State(state): State, + headers: axum::http::HeaderMap, + Json(input): Json, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let token = auth_header + .unwrap() + .to_str() + .unwrap_or("") + .replace("Bearer ", ""); + + let session = sqlx::query( + r#" + SELECT s.did, k.key_bytes, u.id as user_id + FROM sessions s + JOIN users u ON s.did = u.did + JOIN user_keys k ON u.id = k.user_id + WHERE s.access_jwt = $1 + "#, + ) + .bind(&token) + .fetch_optional(&state.db) + .await; + + let (_did, key_bytes, user_id) = match session { + Ok(Some(row)) => ( + row.get::("did"), + row.get::, _>("key_bytes"), + row.get::("user_id"), + ), + Ok(None) => { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in create_app_password: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), + ) + .into_response(); + } + + let name = input.name.trim(); + if name.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "name is required"})), + ) + .into_response(); + } + + let existing = sqlx::query("SELECT id FROM app_passwords WHERE user_id = $1 AND name = $2") + .bind(user_id) + .bind(name) + .fetch_optional(&state.db) + .await; + + if let Ok(Some(_)) = existing { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "DuplicateAppPassword", "message": "App password with this name already exists"})), + ) + .into_response(); + } + + let password: String = (0..4) + .map(|_| { + use rand::Rng; + let mut rng = rand::thread_rng(); + let chars: Vec = "abcdefghijklmnopqrstuvwxyz234567".chars().collect(); + (0..4).map(|_| chars[rng.gen_range(0..chars.len())]).collect::() + }) + .collect::>() + .join("-"); + + let password_hash = match bcrypt::hash(&password, bcrypt::DEFAULT_COST) { + Ok(h) => h, + Err(e) => { + error!("Failed to hash password: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let privileged = input.privileged.unwrap_or(false); + let created_at = chrono::Utc::now(); + + let result = sqlx::query( + "INSERT INTO app_passwords (user_id, name, password_hash, created_at, privileged) VALUES ($1, $2, $3, $4, $5)" + ) + .bind(user_id) + .bind(name) + .bind(&password_hash) + .bind(created_at) + .bind(privileged) + .execute(&state.db) + .await; + + match result { + Ok(_) => ( + StatusCode::OK, + Json(CreateAppPasswordOutput { + name: name.to_string(), + password, + created_at: created_at.to_rfc3339(), + privileged, + }), + ) + .into_response(), + Err(e) => { + error!("DB error creating app password: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} + +#[derive(Deserialize)] +pub struct RevokeAppPasswordInput { + pub name: String, +} + +pub async fn revoke_app_password( + State(state): State, + headers: axum::http::HeaderMap, + Json(input): Json, +) -> Response { + let auth_header = headers.get("Authorization"); + if auth_header.is_none() { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationRequired"})), + ) + .into_response(); + } + + let token = auth_header + .unwrap() + .to_str() + .unwrap_or("") + .replace("Bearer ", ""); + + let session = sqlx::query( + r#" + SELECT s.did, k.key_bytes, u.id as user_id + FROM sessions s + JOIN users u ON s.did = u.did + JOIN user_keys k ON u.id = k.user_id + WHERE s.access_jwt = $1 + "#, + ) + .bind(&token) + .fetch_optional(&state.db) + .await; + + let (_did, key_bytes, user_id) = match session { + Ok(Some(row)) => ( + row.get::("did"), + row.get::, _>("key_bytes"), + row.get::("user_id"), + ), + Ok(None) => { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in revoke_app_password: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + if let Err(_) = crate::auth::verify_token(&token, &key_bytes) { + return ( + StatusCode::UNAUTHORIZED, + Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})), + ) + .into_response(); + } + + let name = input.name.trim(); + if name.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "name is required"})), + ) + .into_response(); + } + + let result = sqlx::query("DELETE FROM app_passwords WHERE user_id = $1 AND name = $2") + .bind(user_id) + .bind(name) + .execute(&state.db) + .await; + + match result { + Ok(r) => { + if r.rows_affected() == 0 { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "AppPasswordNotFound", "message": "App password not found"})), + ) + .into_response(); + } + (StatusCode::OK, Json(json!({}))).into_response() + } + Err(e) => { + error!("DB error revoking app password: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 0e191d7..7da84ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -86,6 +86,90 @@ pub fn app(state: AppState) -> Router { "/xrpc/com.atproto.sync.listRepos", get(sync::list_repos), ) + .route( + "/xrpc/com.atproto.sync.getBlob", + get(sync::get_blob), + ) + .route( + "/xrpc/com.atproto.sync.listBlobs", + get(sync::list_blobs), + ) + .route( + "/xrpc/com.atproto.sync.getRepoStatus", + get(sync::get_repo_status), + ) + .route( + "/xrpc/com.atproto.server.checkAccountStatus", + get(api::server::check_account_status), + ) + .route( + "/xrpc/com.atproto.identity.getRecommendedDidCredentials", + get(api::identity::get_recommended_did_credentials), + ) + .route( + "/xrpc/com.atproto.repo.listMissingBlobs", + get(api::repo::list_missing_blobs), + ) + .route( + "/xrpc/com.atproto.sync.notifyOfUpdate", + post(sync::notify_of_update), + ) + .route( + "/xrpc/com.atproto.sync.requestCrawl", + post(sync::request_crawl), + ) + .route( + "/xrpc/com.atproto.moderation.createReport", + post(api::moderation::create_report), + ) + .route( + "/xrpc/com.atproto.admin.getAccountInfo", + get(api::admin::get_account_info), + ) + .route( + "/xrpc/com.atproto.admin.getAccountInfos", + get(api::admin::get_account_infos), + ) + .route( + "/xrpc/com.atproto.server.activateAccount", + post(api::server::activate_account), + ) + .route( + "/xrpc/com.atproto.server.deactivateAccount", + post(api::server::deactivate_account), + ) + .route( + "/xrpc/com.atproto.identity.updateHandle", + post(api::identity::update_handle), + ) + .route( + "/xrpc/com.atproto.admin.deleteAccount", + post(api::admin::delete_account), + ) + .route( + "/xrpc/com.atproto.admin.updateAccountEmail", + post(api::admin::update_account_email), + ) + .route( + "/xrpc/com.atproto.admin.updateAccountHandle", + post(api::admin::update_account_handle), + ) + .route( + "/xrpc/com.atproto.admin.updateAccountPassword", + post(api::admin::update_account_password), + ) + .route( + "/xrpc/com.atproto.server.listAppPasswords", + get(api::server::list_app_passwords), + ) + .route( + "/xrpc/com.atproto.server.createAppPassword", + post(api::server::create_app_password), + ) + .route( + "/xrpc/com.atproto.server.revokeAppPassword", + post(api::server::revoke_app_password), + ) // I know I know, I'm not supposed to implement appview endpoints. Leave me be .route( "/xrpc/app.bsky.feed.getTimeline", diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 821c192..c0cd9e2 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,14 +1,16 @@ use crate::state::AppState; use axum::{ Json, + body::Body, extract::{Query, State}, http::StatusCode, + http::header, response::{IntoResponse, Response}, }; use serde::{Deserialize, Serialize}; use serde_json::json; use sqlx::Row; -use tracing::error; +use tracing::{error, info}; #[derive(Deserialize)] pub struct GetLatestCommitParams { @@ -161,3 +163,318 @@ pub async fn list_repos( } } } + +#[derive(Deserialize)] +pub struct GetBlobParams { + pub did: String, + pub cid: String, +} + +pub async fn get_blob( + State(state): State, + Query(params): Query, +) -> Response { + let did = params.did.trim(); + let cid = params.cid.trim(); + + if did.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did is required"})), + ) + .into_response(); + } + + if cid.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "cid is required"})), + ) + .into_response(); + } + + let user_exists = sqlx::query("SELECT id FROM users WHERE did = $1") + .bind(did) + .fetch_optional(&state.db) + .await; + + match user_exists { + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in get_blob: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + Ok(Some(_)) => {} + } + + let blob_result = sqlx::query("SELECT storage_key, mime_type FROM blobs WHERE cid = $1") + .bind(cid) + .fetch_optional(&state.db) + .await; + + match blob_result { + Ok(Some(row)) => { + let storage_key: String = row.get("storage_key"); + let mime_type: String = row.get("mime_type"); + + match state.blob_store.get(&storage_key).await { + Ok(data) => Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_TYPE, mime_type) + .body(Body::from(data)) + .unwrap(), + Err(e) => { + error!("Failed to fetch blob from storage: {:?}", e); + ( + StatusCode::NOT_FOUND, + Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})), + ) + .into_response() + } + } + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(json!({"error": "BlobNotFound", "message": "Blob not found"})), + ) + .into_response(), + Err(e) => { + error!("DB error in get_blob: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} + +#[derive(Deserialize)] +pub struct ListBlobsParams { + pub did: String, + pub since: Option, + pub limit: Option, + pub cursor: Option, +} + +#[derive(Serialize)] +pub struct ListBlobsOutput { + pub cursor: Option, + pub cids: Vec, +} + +pub async fn list_blobs( + State(state): State, + Query(params): Query, +) -> Response { + let did = params.did.trim(); + + if did.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did is required"})), + ) + .into_response(); + } + + let limit = params.limit.unwrap_or(500).min(1000); + let cursor_cid = params.cursor.as_deref().unwrap_or(""); + + let user_result = sqlx::query("SELECT id FROM users WHERE did = $1") + .bind(did) + .fetch_optional(&state.db) + .await; + + let user_id: uuid::Uuid = match user_result { + Ok(Some(row)) => row.get("id"), + Ok(None) => { + return ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error in list_blobs: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + + let result = if let Some(since) = ¶ms.since { + sqlx::query( + r#" + SELECT cid FROM blobs + WHERE created_by_user = $1 AND cid > $2 AND created_at > $3 + ORDER BY cid ASC + LIMIT $4 + "#, + ) + .bind(user_id) + .bind(cursor_cid) + .bind(since) + .bind(limit + 1) + .fetch_all(&state.db) + .await + } else { + sqlx::query( + r#" + SELECT cid FROM blobs + WHERE created_by_user = $1 AND cid > $2 + ORDER BY cid ASC + LIMIT $3 + "#, + ) + .bind(user_id) + .bind(cursor_cid) + .bind(limit + 1) + .fetch_all(&state.db) + .await + }; + + match result { + Ok(rows) => { + let has_more = rows.len() as i64 > limit; + let cids: Vec = rows + .iter() + .take(limit as usize) + .map(|row| row.get("cid")) + .collect(); + + let next_cursor = if has_more { + cids.last().cloned() + } else { + None + }; + + ( + StatusCode::OK, + Json(ListBlobsOutput { + cursor: next_cursor, + cids, + }), + ) + .into_response() + } + Err(e) => { + error!("DB error in list_blobs: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} + +#[derive(Deserialize)] +pub struct GetRepoStatusParams { + pub did: String, +} + +#[derive(Serialize)] +pub struct GetRepoStatusOutput { + pub did: String, + pub active: bool, + pub rev: Option, +} + +pub async fn get_repo_status( + State(state): State, + Query(params): Query, +) -> Response { + let did = params.did.trim(); + + if did.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "did is required"})), + ) + .into_response(); + } + + let result = sqlx::query( + r#" + SELECT u.did, r.repo_root_cid + FROM users u + LEFT JOIN repos r ON u.id = r.user_id + WHERE u.did = $1 + "#, + ) + .bind(did) + .fetch_optional(&state.db) + .await; + + match result { + Ok(Some(row)) => { + let user_did: String = row.get("did"); + let repo_root: Option = row.get("repo_root_cid"); + + let rev = repo_root.map(|_| chrono::Utc::now().timestamp_millis().to_string()); + + ( + StatusCode::OK, + Json(GetRepoStatusOutput { + did: user_did, + active: true, + rev, + }), + ) + .into_response() + } + Ok(None) => ( + StatusCode::NOT_FOUND, + Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), + ) + .into_response(), + Err(e) => { + error!("DB error in get_repo_status: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response() + } + } +} + +#[derive(Deserialize)] +pub struct NotifyOfUpdateParams { + pub hostname: String, +} + +pub async fn notify_of_update( + State(_state): State, + Query(params): Query, +) -> Response { + info!("Received notifyOfUpdate from hostname: {}", params.hostname); + + (StatusCode::OK, Json(json!({}))).into_response() +} + +#[derive(Deserialize)] +pub struct RequestCrawlInput { + pub hostname: String, +} + +pub async fn request_crawl( + State(_state): State, + Json(input): Json, +) -> Response { + info!("Received requestCrawl for hostname: {}", input.hostname); + + (StatusCode::OK, Json(json!({}))).into_response() +} diff --git a/tests/identity.rs b/tests/identity.rs index e79b116..afb2418 100644 --- a/tests/identity.rs +++ b/tests/identity.rs @@ -304,3 +304,55 @@ async fn test_did_web_lifecycle() { assert_eq!(record_body["value"]["displayName"], "DID Web User"); */ } + +#[tokio::test] +async fn test_get_recommended_did_credentials_success() { + let client = client(); + let (access_jwt, _) = create_account_and_login(&client).await; + + let res = client + .get(format!( + "{}/xrpc/com.atproto.identity.getRecommendedDidCredentials", + base_url().await + )) + .bearer_auth(&access_jwt) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert!(body["rotationKeys"].is_array()); + assert!(body["alsoKnownAs"].is_array()); + assert!(body["verificationMethods"].is_object()); + assert!(body["services"].is_object()); + + let rotation_keys = body["rotationKeys"].as_array().unwrap(); + assert!(!rotation_keys.is_empty()); + assert!(rotation_keys[0].as_str().unwrap().starts_with("did:key:")); + + let also_known_as = body["alsoKnownAs"].as_array().unwrap(); + assert!(!also_known_as.is_empty()); + assert!(also_known_as[0].as_str().unwrap().starts_with("at://")); + + assert!(body["verificationMethods"]["atproto"].is_string()); + assert_eq!(body["services"]["atprotoPds"]["type"], "AtprotoPersonalDataServer"); + assert!(body["services"]["atprotoPds"]["endpoint"].is_string()); +} + +#[tokio::test] +async fn test_get_recommended_did_credentials_no_auth() { + let client = client(); + let res = client + .get(format!( + "{}/xrpc/com.atproto.identity.getRecommendedDidCredentials", + base_url().await + )) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert_eq!(body["error"], "AuthenticationRequired"); +} diff --git a/tests/lifecycle.rs b/tests/lifecycle.rs index a8b619c..e5854e0 100644 --- a/tests/lifecycle.rs +++ b/tests/lifecycle.rs @@ -1620,3 +1620,225 @@ async fn test_account_to_post_full_lifecycle() { assert_eq!(describe_body["did"], did); assert_eq!(describe_body["handle"], handle); } + +#[tokio::test] +async fn test_app_password_lifecycle() { + let client = client(); + let ts = Utc::now().timestamp_millis(); + let handle = format!("apppass-{}.test", ts); + let email = format!("apppass-{}@test.com", ts); + let password = "apppass-password"; + + let create_res = client + .post(format!( + "{}/xrpc/com.atproto.server.createAccount", + base_url().await + )) + .json(&json!({ + "handle": handle, + "email": email, + "password": password + })) + .send() + .await + .expect("Failed to create account"); + + assert_eq!(create_res.status(), StatusCode::OK); + let account: Value = create_res.json().await.unwrap(); + let jwt = account["accessJwt"].as_str().unwrap(); + + let create_app_pass_res = client + .post(format!( + "{}/xrpc/com.atproto.server.createAppPassword", + base_url().await + )) + .bearer_auth(jwt) + .json(&json!({ "name": "Test App" })) + .send() + .await + .expect("Failed to create app password"); + + assert_eq!(create_app_pass_res.status(), StatusCode::OK); + let app_pass: Value = create_app_pass_res.json().await.unwrap(); + let app_password = app_pass["password"].as_str().unwrap().to_string(); + assert_eq!(app_pass["name"], "Test App"); + + let list_res = client + .get(format!( + "{}/xrpc/com.atproto.server.listAppPasswords", + base_url().await + )) + .bearer_auth(jwt) + .send() + .await + .expect("Failed to list app passwords"); + + assert_eq!(list_res.status(), StatusCode::OK); + let list_body: Value = list_res.json().await.unwrap(); + let passwords = list_body["passwords"].as_array().unwrap(); + assert_eq!(passwords.len(), 1); + assert_eq!(passwords[0]["name"], "Test App"); + + let login_res = client + .post(format!( + "{}/xrpc/com.atproto.server.createSession", + base_url().await + )) + .json(&json!({ + "identifier": handle, + "password": app_password + })) + .send() + .await + .expect("Failed to login with app password"); + + assert_eq!(login_res.status(), StatusCode::OK, "App password login should work"); + + let revoke_res = client + .post(format!( + "{}/xrpc/com.atproto.server.revokeAppPassword", + base_url().await + )) + .bearer_auth(jwt) + .json(&json!({ "name": "Test App" })) + .send() + .await + .expect("Failed to revoke app password"); + + assert_eq!(revoke_res.status(), StatusCode::OK); + + let login_after_revoke = client + .post(format!( + "{}/xrpc/com.atproto.server.createSession", + base_url().await + )) + .json(&json!({ + "identifier": handle, + "password": app_password + })) + .send() + .await + .expect("Failed to attempt login after revoke"); + + assert!( + login_after_revoke.status() == StatusCode::UNAUTHORIZED + || login_after_revoke.status() == StatusCode::BAD_REQUEST, + "Revoked app password should not work" + ); + + let list_after_revoke = client + .get(format!( + "{}/xrpc/com.atproto.server.listAppPasswords", + base_url().await + )) + .bearer_auth(jwt) + .send() + .await + .expect("Failed to list after revoke"); + + let list_after: Value = list_after_revoke.json().await.unwrap(); + let passwords_after = list_after["passwords"].as_array().unwrap(); + assert_eq!(passwords_after.len(), 0, "No app passwords should remain"); +} + +#[tokio::test] +async fn test_account_deactivation_lifecycle() { + let client = client(); + let ts = Utc::now().timestamp_millis(); + let handle = format!("deactivate-{}.test", ts); + let email = format!("deactivate-{}@test.com", ts); + let password = "deactivate-password"; + + let create_res = client + .post(format!( + "{}/xrpc/com.atproto.server.createAccount", + base_url().await + )) + .json(&json!({ + "handle": handle, + "email": email, + "password": password + })) + .send() + .await + .expect("Failed to create account"); + + assert_eq!(create_res.status(), StatusCode::OK); + let account: Value = create_res.json().await.unwrap(); + let did = account["did"].as_str().unwrap().to_string(); + let jwt = account["accessJwt"].as_str().unwrap().to_string(); + + let (post_uri, _) = create_post(&client, &did, &jwt, "Post before deactivation").await; + let post_rkey = post_uri.split('/').last().unwrap(); + + let status_before = client + .get(format!( + "{}/xrpc/com.atproto.server.checkAccountStatus", + base_url().await + )) + .bearer_auth(&jwt) + .send() + .await + .expect("Failed to check status"); + + assert_eq!(status_before.status(), StatusCode::OK); + let status_body: Value = status_before.json().await.unwrap(); + assert_eq!(status_body["activated"], true); + + let deactivate_res = client + .post(format!( + "{}/xrpc/com.atproto.server.deactivateAccount", + base_url().await + )) + .bearer_auth(&jwt) + .json(&json!({})) + .send() + .await + .expect("Failed to deactivate"); + + assert_eq!(deactivate_res.status(), StatusCode::OK); + + let get_post_res = client + .get(format!( + "{}/xrpc/com.atproto.repo.getRecord", + base_url().await + )) + .query(&[ + ("repo", did.as_str()), + ("collection", "app.bsky.feed.post"), + ("rkey", post_rkey), + ]) + .send() + .await + .expect("Failed to get post while deactivated"); + + assert_eq!(get_post_res.status(), StatusCode::OK, "Records should still be readable"); + + let activate_res = client + .post(format!( + "{}/xrpc/com.atproto.server.activateAccount", + base_url().await + )) + .bearer_auth(&jwt) + .json(&json!({})) + .send() + .await + .expect("Failed to reactivate"); + + assert_eq!(activate_res.status(), StatusCode::OK); + + let status_after_activate = client + .get(format!( + "{}/xrpc/com.atproto.server.checkAccountStatus", + base_url().await + )) + .bearer_auth(&jwt) + .send() + .await + .expect("Failed to check status after activate"); + + assert_eq!(status_after_activate.status(), StatusCode::OK); + + let (new_post_uri, _) = create_post(&client, &did, &jwt, "Post after reactivation").await; + assert!(!new_post_uri.is_empty(), "Should be able to post after reactivation"); +} diff --git a/tests/repo.rs b/tests/repo.rs index 3412b6c..1841f2f 100644 --- a/tests/repo.rs +++ b/tests/repo.rs @@ -757,3 +757,38 @@ async fn test_apply_writes_empty_writes() { assert_eq!(res.status(), StatusCode::BAD_REQUEST); } + +#[tokio::test] +async fn test_list_missing_blobs() { + let client = client(); + let (access_jwt, _) = create_account_and_login(&client).await; + + let res = client + .get(format!( + "{}/xrpc/com.atproto.repo.listMissingBlobs", + base_url().await + )) + .bearer_auth(&access_jwt) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert!(body["blobs"].is_array()); +} + +#[tokio::test] +async fn test_list_missing_blobs_no_auth() { + let client = client(); + let res = client + .get(format!( + "{}/xrpc/com.atproto.repo.listMissingBlobs", + base_url().await + )) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); +} diff --git a/tests/server.rs b/tests/server.rs index 301ec56..490ef95 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -317,3 +317,96 @@ async fn test_get_service_auth_missing_aud() { assert_eq!(res.status(), StatusCode::BAD_REQUEST); } + +#[tokio::test] +async fn test_check_account_status_success() { + let client = client(); + let (access_jwt, _) = create_account_and_login(&client).await; + + let res = client + .get(format!( + "{}/xrpc/com.atproto.server.checkAccountStatus", + base_url().await + )) + .bearer_auth(&access_jwt) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert_eq!(body["activated"], true); + assert_eq!(body["validDid"], true); + assert!(body["repoCommit"].is_string()); + assert!(body["repoRev"].is_string()); + assert!(body["indexedRecords"].is_number()); +} + +#[tokio::test] +async fn test_check_account_status_no_auth() { + let client = client(); + let res = client + .get(format!( + "{}/xrpc/com.atproto.server.checkAccountStatus", + base_url().await + )) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert_eq!(body["error"], "AuthenticationRequired"); +} + +#[tokio::test] +async fn test_activate_account_success() { + let client = client(); + let (access_jwt, _) = create_account_and_login(&client).await; + + let res = client + .post(format!( + "{}/xrpc/com.atproto.server.activateAccount", + base_url().await + )) + .bearer_auth(&access_jwt) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); +} + +#[tokio::test] +async fn test_activate_account_no_auth() { + let client = client(); + let res = client + .post(format!( + "{}/xrpc/com.atproto.server.activateAccount", + base_url().await + )) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::UNAUTHORIZED); +} + +#[tokio::test] +async fn test_deactivate_account_success() { + let client = client(); + let (access_jwt, _) = create_account_and_login(&client).await; + + let res = client + .post(format!( + "{}/xrpc/com.atproto.server.deactivateAccount", + base_url().await + )) + .bearer_auth(&access_jwt) + .json(&json!({})) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); +} diff --git a/tests/sync.rs b/tests/sync.rs index b3ade02..da2d975 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -1,6 +1,7 @@ mod common; use common::*; use reqwest::StatusCode; +use reqwest::header; use serde_json::Value; #[tokio::test] @@ -151,3 +152,203 @@ async fn test_list_repos_pagination() { assert_ne!(repos[0]["did"], repos2[0]["did"]); } } + +#[tokio::test] +async fn test_get_repo_status_success() { + let client = client(); + let (_, did) = create_account_and_login(&client).await; + + let params = [("did", did.as_str())]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRepoStatus", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert_eq!(body["did"], did); + assert_eq!(body["active"], true); + assert!(body["rev"].is_string()); +} + +#[tokio::test] +async fn test_get_repo_status_not_found() { + let client = client(); + let params = [("did", "did:plc:nonexistent12345")]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getRepoStatus", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::NOT_FOUND); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert_eq!(body["error"], "RepoNotFound"); +} + +#[tokio::test] +async fn test_list_blobs_success() { + let client = client(); + let (access_jwt, did) = create_account_and_login(&client).await; + + let blob_res = client + .post(format!( + "{}/xrpc/com.atproto.repo.uploadBlob", + base_url().await + )) + .header(header::CONTENT_TYPE, "text/plain") + .bearer_auth(&access_jwt) + .body("test blob content") + .send() + .await + .expect("Failed to upload blob"); + + assert_eq!(blob_res.status(), StatusCode::OK); + + let params = [("did", did.as_str())]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.listBlobs", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert!(body["cids"].is_array()); + let cids = body["cids"].as_array().unwrap(); + assert!(!cids.is_empty()); +} + +#[tokio::test] +async fn test_list_blobs_not_found() { + let client = client(); + let params = [("did", "did:plc:nonexistent12345")]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.listBlobs", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::NOT_FOUND); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert_eq!(body["error"], "RepoNotFound"); +} + +#[tokio::test] +async fn test_get_blob_success() { + let client = client(); + let (access_jwt, did) = create_account_and_login(&client).await; + + let blob_content = "test blob for get_blob"; + let blob_res = client + .post(format!( + "{}/xrpc/com.atproto.repo.uploadBlob", + base_url().await + )) + .header(header::CONTENT_TYPE, "text/plain") + .bearer_auth(&access_jwt) + .body(blob_content) + .send() + .await + .expect("Failed to upload blob"); + + assert_eq!(blob_res.status(), StatusCode::OK); + let blob_body: Value = blob_res.json().await.expect("Response was not valid JSON"); + let cid = blob_body["blob"]["ref"]["$link"].as_str().expect("No CID"); + + let params = [("did", did.as_str()), ("cid", cid)]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getBlob", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); + assert_eq!( + res.headers() + .get("content-type") + .and_then(|h| h.to_str().ok()), + Some("text/plain") + ); + let body = res.text().await.expect("Failed to get body"); + assert_eq!(body, blob_content); +} + +#[tokio::test] +async fn test_get_blob_not_found() { + let client = client(); + let (_, did) = create_account_and_login(&client).await; + + let params = [ + ("did", did.as_str()), + ("cid", "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"), + ]; + let res = client + .get(format!( + "{}/xrpc/com.atproto.sync.getBlob", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::NOT_FOUND); + let body: Value = res.json().await.expect("Response was not valid JSON"); + assert_eq!(body["error"], "BlobNotFound"); +} + +#[tokio::test] +async fn test_notify_of_update() { + let client = client(); + let params = [("hostname", "example.com")]; + let res = client + .post(format!( + "{}/xrpc/com.atproto.sync.notifyOfUpdate", + base_url().await + )) + .query(¶ms) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); +} + +#[tokio::test] +async fn test_request_crawl() { + let client = client(); + let payload = serde_json::json!({"hostname": "example.com"}); + let res = client + .post(format!( + "{}/xrpc/com.atproto.sync.requestCrawl", + base_url().await + )) + .json(&payload) + .send() + .await + .expect("Failed to send request"); + + assert_eq!(res.status(), StatusCode::OK); +}