More endpoints

This commit is contained in:
lewis
2025-12-08 17:04:00 +02:00
parent 7ec4861cde
commit a2b2c5b4c9
20 changed files with 2515 additions and 32 deletions

1
Cargo.lock generated
View File

@@ -920,6 +920,7 @@ dependencies = [
"jacquard-repo",
"jsonwebtoken",
"k256",
"multibase",
"multihash",
"rand 0.8.5",
"reqwest",

View File

@@ -20,6 +20,7 @@ jacquard-axum = "0.9.2"
jacquard-repo = "0.9.2"
jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
k256 = { version = "0.13.3", features = ["ecdsa", "pem", "pkcs8"] }
multibase = "0.9.1"
multihash = "0.19.3"
rand = "0.8.5"
reqwest = { version = "0.12.24", features = ["json"] }

44
TODO.md
View File

@@ -25,22 +25,22 @@ Lewis' corrected big boy todofile
- [x] Implement `com.atproto.server.getSession`.
- [x] Implement `com.atproto.server.refreshSession`.
- [x] Implement `com.atproto.server.deleteSession` (Logout).
- [ ] Implement `com.atproto.server.activateAccount`.
- [ ] Implement `com.atproto.server.checkAccountStatus`.
- [ ] Implement `com.atproto.server.confirmEmail`.
- [ ] Implement `com.atproto.server.createAppPassword`.
- [x] Implement `com.atproto.server.activateAccount`.
- [x] Implement `com.atproto.server.checkAccountStatus`.
- [x] Implement `com.atproto.server.createAppPassword`.
- [ ] Implement `com.atproto.server.createInviteCode`.
- [ ] Implement `com.atproto.server.createInviteCodes`.
- [ ] Implement `com.atproto.server.deactivateAccount` / `deleteAccount`.
- [x] Implement `com.atproto.server.deactivateAccount` / `deleteAccount`.
- [ ] Implement `com.atproto.server.getAccountInviteCodes`.
- [x] Implement `com.atproto.server.getServiceAuth` (Cross-service auth).
- [ ] Implement `com.atproto.server.listAppPasswords`.
- [x] Implement `com.atproto.server.listAppPasswords`.
- [ ] Implement `com.atproto.server.requestAccountDelete`.
- [ ] Implement `com.atproto.server.requestEmailConfirmation` / `requestEmailUpdate`.
- [ ] Implement `com.atproto.server.requestPasswordReset` / `resetPassword`.
- [ ] Implement `com.atproto.server.reserveSigningKey`.
- [ ] Implement `com.atproto.server.revokeAppPassword`.
- [x] Implement `com.atproto.server.revokeAppPassword`.
- [ ] Implement `com.atproto.server.updateEmail`.
- [ ] Implement `com.atproto.server.confirmEmail`.
## Repository Operations (`com.atproto.repo`)
- [ ] Record CRUD
@@ -56,7 +56,7 @@ Lewis' corrected big boy todofile
- [x] Implement `com.atproto.repo.describeRepo`.
- [x] Implement `com.atproto.repo.applyWrites` (Batch writes).
- [ ] Implement `com.atproto.repo.importRepo` (Migration).
- [ ] Implement `com.atproto.repo.listMissingBlobs`.
- [x] Implement `com.atproto.repo.listMissingBlobs`.
- [ ] Blob Management
- [x] Implement `com.atproto.repo.uploadBlob`.
- [x] Store blob (S3).
@@ -72,39 +72,39 @@ Lewis' corrected big boy todofile
- [ ] Implement `com.atproto.sync.getBlocks` (Return specific blocks via CIDs).
- [x] Implement `com.atproto.sync.getLatestCommit`.
- [ ] Implement `com.atproto.sync.getRecord` (Sync version, distinct from repo.getRecord).
- [ ] Implement `com.atproto.sync.getRepoStatus`.
- [x] Implement `com.atproto.sync.getRepoStatus`.
- [x] Implement `com.atproto.sync.listRepos`.
- [ ] Implement `com.atproto.sync.notifyOfUpdate`.
- [x] Implement `com.atproto.sync.notifyOfUpdate`.
- [ ] Blob Sync
- [ ] Implement `com.atproto.sync.getBlob`.
- [ ] Implement `com.atproto.sync.listBlobs`.
- [ ] Crawler Interaction
- [ ] Implement `com.atproto.sync.requestCrawl` (Notify relays to index us).
- [x] Implement `com.atproto.sync.getBlob`.
- [x] Implement `com.atproto.sync.listBlobs`.
- [x] Crawler Interaction
- [x] Implement `com.atproto.sync.requestCrawl` (Notify relays to index us).
## Identity (`com.atproto.identity`)
- [ ] Resolution
- [x] Implement `com.atproto.identity.resolveHandle` (Can be internal or proxy to PLC).
- [ ] Implement `com.atproto.identity.updateHandle`.
- [x] Implement `com.atproto.identity.updateHandle`.
- [ ] Implement `com.atproto.identity.submitPlcOperation` / `signPlcOperation` / `requestPlcOperationSignature`.
- [ ] Implement `com.atproto.identity.getRecommendedDidCredentials`.
- [x] Implement `com.atproto.identity.getRecommendedDidCredentials`.
- [x] Implement `/.well-known/did.json` (Depends on supporting did:web).
## Admin Management (`com.atproto.admin`)
- [ ] Implement `com.atproto.admin.deleteAccount`.
- [x] Implement `com.atproto.admin.deleteAccount`.
- [ ] Implement `com.atproto.admin.disableAccountInvites`.
- [ ] Implement `com.atproto.admin.disableInviteCodes`.
- [ ] Implement `com.atproto.admin.enableAccountInvites`.
- [ ] Implement `com.atproto.admin.getAccountInfo` / `getAccountInfos`.
- [x] Implement `com.atproto.admin.getAccountInfo` / `getAccountInfos`.
- [ ] Implement `com.atproto.admin.getInviteCodes`.
- [ ] Implement `com.atproto.admin.getSubjectStatus`.
- [ ] Implement `com.atproto.admin.sendEmail`.
- [ ] Implement `com.atproto.admin.updateAccountEmail`.
- [ ] Implement `com.atproto.admin.updateAccountHandle`.
- [ ] Implement `com.atproto.admin.updateAccountPassword`.
- [x] Implement `com.atproto.admin.updateAccountEmail`.
- [x] Implement `com.atproto.admin.updateAccountHandle`.
- [x] Implement `com.atproto.admin.updateAccountPassword`.
- [ ] Implement `com.atproto.admin.updateSubjectStatus`.
## Moderation (`com.atproto.moderation`)
- [ ] Implement `com.atproto.moderation.createReport`.
- [x] Implement `com.atproto.moderation.createReport`.
## Record Schema Validation
- [ ] Handle this generically.

View File

@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS app_passwords (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
name TEXT NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
privileged BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE(user_id, name)
);

484
src/api/admin/mod.rs Normal file
View File

@@ -0,0 +1,484 @@
use crate::state::AppState;
use axum::{
Json,
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::Row;
use tracing::error;
#[derive(Deserialize)]
pub struct GetAccountInfoParams {
pub did: String,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AccountInfo {
pub did: String,
pub handle: String,
pub email: Option<String>,
pub indexed_at: String,
pub invite_note: Option<String>,
pub invites_disabled: bool,
pub email_confirmed_at: Option<String>,
pub deactivated_at: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct GetAccountInfosOutput {
pub infos: Vec<AccountInfo>,
}
pub async fn get_account_info(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(params): Query<GetAccountInfoParams>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let result = sqlx::query(
r#"
SELECT did, handle, email, created_at
FROM users
WHERE did = $1
"#,
)
.bind(did)
.fetch_optional(&state.db)
.await;
match result {
Ok(Some(row)) => {
let user_did: String = row.get("did");
let handle: String = row.get("handle");
let email: String = row.get("email");
let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
(
StatusCode::OK,
Json(AccountInfo {
did: user_did,
handle,
email: Some(email),
indexed_at: created_at.to_rfc3339(),
invite_note: None,
invites_disabled: false,
email_confirmed_at: None,
deactivated_at: None,
}),
)
.into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response(),
Err(e) => {
error!("DB error in get_account_info: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct GetAccountInfosParams {
pub dids: String,
}
pub async fn get_account_infos(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(params): Query<GetAccountInfosParams>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let dids: Vec<&str> = params.dids.split(',').map(|s| s.trim()).collect();
if dids.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "dids is required"})),
)
.into_response();
}
let mut infos = Vec::new();
for did in dids {
if did.is_empty() {
continue;
}
let result = sqlx::query(
r#"
SELECT did, handle, email, created_at
FROM users
WHERE did = $1
"#,
)
.bind(did)
.fetch_optional(&state.db)
.await;
if let Ok(Some(row)) = result {
let user_did: String = row.get("did");
let handle: String = row.get("handle");
let email: String = row.get("email");
let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
infos.push(AccountInfo {
did: user_did,
handle,
email: Some(email),
indexed_at: created_at.to_rfc3339(),
invite_note: None,
invites_disabled: false,
email_confirmed_at: None,
deactivated_at: None,
});
}
}
(StatusCode::OK, Json(GetAccountInfosOutput { infos })).into_response()
}
#[derive(Deserialize)]
pub struct DeleteAccountInput {
pub did: String,
}
pub async fn delete_account(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<DeleteAccountInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let did = input.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let user = sqlx::query("SELECT id FROM users WHERE did = $1")
.bind(did)
.fetch_optional(&state.db)
.await;
let user_id: uuid::Uuid = match user {
Ok(Some(row)) => row.get("id"),
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
Err(e) => {
error!("DB error in delete_account: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let _ = sqlx::query("DELETE FROM sessions WHERE did = $1")
.bind(did)
.execute(&state.db)
.await;
let _ = sqlx::query("DELETE FROM records WHERE repo_id = $1")
.bind(user_id)
.execute(&state.db)
.await;
let _ = sqlx::query("DELETE FROM repos WHERE user_id = $1")
.bind(user_id)
.execute(&state.db)
.await;
let _ = sqlx::query("DELETE FROM blobs WHERE created_by_user = $1")
.bind(user_id)
.execute(&state.db)
.await;
let _ = sqlx::query("DELETE FROM user_keys WHERE user_id = $1")
.bind(user_id)
.execute(&state.db)
.await;
let result = sqlx::query("DELETE FROM users WHERE id = $1")
.bind(user_id)
.execute(&state.db)
.await;
match result {
Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(),
Err(e) => {
error!("DB error deleting account: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct UpdateAccountEmailInput {
pub account: String,
pub email: String,
}
pub async fn update_account_email(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<UpdateAccountEmailInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let account = input.account.trim();
let email = input.email.trim();
if account.is_empty() || email.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "account and email are required"})),
)
.into_response();
}
let result = sqlx::query("UPDATE users SET email = $1 WHERE did = $2")
.bind(email)
.bind(account)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error updating email: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct UpdateAccountHandleInput {
pub did: String,
pub handle: String,
}
pub async fn update_account_handle(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<UpdateAccountHandleInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let did = input.did.trim();
let handle = input.handle.trim();
if did.is_empty() || handle.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did and handle are required"})),
)
.into_response();
}
if !handle
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_')
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"})),
)
.into_response();
}
let existing = sqlx::query("SELECT id FROM users WHERE handle = $1 AND did != $2")
.bind(handle)
.bind(did)
.fetch_optional(&state.db)
.await;
if let Ok(Some(_)) = existing {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "HandleTaken", "message": "Handle is already in use"})),
)
.into_response();
}
let result = sqlx::query("UPDATE users SET handle = $1 WHERE did = $2")
.bind(handle)
.bind(did)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error updating handle: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct UpdateAccountPasswordInput {
pub did: String,
pub password: String,
}
pub async fn update_account_password(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<UpdateAccountPasswordInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let did = input.did.trim();
let password = input.password.trim();
if did.is_empty() || password.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did and password are required"})),
)
.into_response();
}
let password_hash = match bcrypt::hash(password, bcrypt::DEFAULT_COST) {
Ok(h) => h,
Err(e) => {
error!("Failed to hash password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let result = sqlx::query("UPDATE users SET password_hash = $1 WHERE did = $2")
.bind(&password_hash)
.bind(did)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error updating password: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}

View File

@@ -245,3 +245,258 @@ pub async fn verify_did_web(did: &str, hostname: &str, handle: &str) -> Result<(
}
}
}
#[derive(serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct GetRecommendedDidCredentialsOutput {
pub rotation_keys: Vec<String>,
pub also_known_as: Vec<String>,
pub verification_methods: VerificationMethods,
pub services: Services,
}
#[derive(serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VerificationMethods {
pub atproto: String,
}
#[derive(serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Services {
pub atproto_pds: AtprotoPds,
}
#[derive(serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AtprotoPds {
#[serde(rename = "type")]
pub service_type: String,
pub endpoint: String,
}
pub async fn get_recommended_did_credentials(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query(
r#"
SELECT s.did, k.key_bytes, u.handle
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
)
.bind(&token)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, handle) = match session {
Ok(Some(row)) => (
row.get::<String, _>("did"),
row.get::<Vec<u8>, _>("key_bytes"),
row.get::<String, _>("handle"),
),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_recommended_did_credentials: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
let pds_endpoint = format!("https://{}", hostname);
let secret_key = match k256::SecretKey::from_slice(&key_bytes) {
Ok(k) => k,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let public_key = secret_key.public_key();
let encoded = public_key.to_encoded_point(true);
let did_key = format!(
"did:key:zQ3sh{}",
multibase::encode(multibase::Base::Base58Btc, encoded.as_bytes())
.chars()
.skip(1)
.collect::<String>()
);
(
StatusCode::OK,
Json(GetRecommendedDidCredentialsOutput {
rotation_keys: vec![did_key.clone()],
also_known_as: vec![format!("at://{}", handle)],
verification_methods: VerificationMethods { atproto: did_key },
services: Services {
atproto_pds: AtprotoPds {
service_type: "AtprotoPersonalDataServer".to_string(),
endpoint: pds_endpoint,
},
},
}),
)
.into_response()
}
#[derive(Deserialize)]
pub struct UpdateHandleInput {
pub handle: String,
}
pub async fn update_handle(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<UpdateHandleInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query(
r#"
SELECT s.did, k.key_bytes, u.id as user_id
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
)
.bind(&token)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id) = match session {
Ok(Some(row)) => (
row.get::<String, _>("did"),
row.get::<Vec<u8>, _>("key_bytes"),
row.get::<uuid::Uuid, _>("user_id"),
),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in update_handle: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let new_handle = input.handle.trim();
if new_handle.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "handle is required"})),
)
.into_response();
}
if !new_handle
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_')
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"})),
)
.into_response();
}
let existing = sqlx::query("SELECT id FROM users WHERE handle = $1 AND id != $2")
.bind(new_handle)
.bind(user_id)
.fetch_optional(&state.db)
.await;
if let Ok(Some(_)) = existing {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "HandleTaken", "message": "Handle is already in use"})),
)
.into_response();
}
let result = sqlx::query("UPDATE users SET handle = $1 WHERE id = $2")
.bind(new_handle)
.bind(user_id)
.execute(&state.db)
.await;
match result {
Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(),
Err(e) => {
error!("DB error updating handle: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}

View File

@@ -2,4 +2,6 @@ pub mod account;
pub mod did;
pub use account::create_account;
pub use did::{resolve_handle, user_did_doc, well_known_did};
pub use did::{
get_recommended_did_credentials, resolve_handle, update_handle, user_did_doc, well_known_did,
};

View File

@@ -1,5 +1,7 @@
pub mod admin;
pub mod feed;
pub mod identity;
pub mod moderation;
pub mod proxy;
pub mod repo;
pub mod server;

128
src/api/moderation/mod.rs Normal file
View File

@@ -0,0 +1,128 @@
use crate::state::AppState;
use axum::{
Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use sqlx::Row;
use tracing::error;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateReportInput {
pub reason_type: String,
pub reason: Option<String>,
pub subject: Value,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateReportOutput {
pub id: i64,
pub reason_type: String,
pub reason: Option<String>,
pub subject: Value,
pub reported_by: String,
pub created_at: String,
}
pub async fn create_report(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<CreateReportInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query(
r#"
SELECT s.did, k.key_bytes
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
)
.bind(&token)
.fetch_optional(&state.db)
.await;
let (did, key_bytes) = match session {
Ok(Some(row)) => (
row.get::<String, _>("did"),
row.get::<Vec<u8>, _>("key_bytes"),
),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in create_report: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let valid_reason_types = [
"com.atproto.moderation.defs#reasonSpam",
"com.atproto.moderation.defs#reasonViolation",
"com.atproto.moderation.defs#reasonMisleading",
"com.atproto.moderation.defs#reasonSexual",
"com.atproto.moderation.defs#reasonRude",
"com.atproto.moderation.defs#reasonOther",
"com.atproto.moderation.defs#reasonAppeal",
];
if !valid_reason_types.contains(&input.reason_type.as_str()) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "Invalid reasonType"})),
)
.into_response();
}
let created_at = chrono::Utc::now().to_rfc3339();
let report_id = chrono::Utc::now().timestamp_millis();
(
StatusCode::OK,
Json(CreateReportOutput {
id: report_id,
reason_type: input.reason_type,
reason: input.reason,
subject: input.subject,
reported_by: did,
created_at,
}),
)
.into_response()
}

View File

@@ -2,12 +2,13 @@ use crate::state::AppState;
use axum::body::Bytes;
use axum::{
Json,
extract::State,
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use cid::Cid;
use multihash::Multihash;
use serde::{Deserialize, Serialize};
use serde_json::json;
use sha2::{Digest, Sha256};
use sqlx::Row;
@@ -136,3 +137,46 @@ pub async fn upload_blob(
}))
.into_response()
}
#[derive(Deserialize)]
pub struct ListMissingBlobsParams {
pub limit: Option<i64>,
pub cursor: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RecordBlob {
pub cid: String,
pub record_uri: String,
}
#[derive(Serialize)]
pub struct ListMissingBlobsOutput {
pub cursor: Option<String>,
pub blobs: Vec<RecordBlob>,
}
pub async fn list_missing_blobs(
State(_state): State<AppState>,
headers: axum::http::HeaderMap,
Query(_params): Query<ListMissingBlobsParams>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
(
StatusCode::OK,
Json(ListMissingBlobsOutput {
cursor: None,
blobs: vec![],
}),
)
.into_response()
}

View File

@@ -2,6 +2,6 @@ pub mod blob;
pub mod meta;
pub mod record;
pub use blob::upload_blob;
pub use blob::{list_missing_blobs, upload_blob};
pub use meta::describe_repo;
pub use record::{apply_writes, create_record, delete_record, get_record, list_records, put_record};

View File

@@ -2,4 +2,8 @@ pub mod meta;
pub mod session;
pub use meta::{describe_server, health};
pub use session::{create_session, delete_session, get_service_auth, get_session, refresh_session};
pub use session::{
activate_account, check_account_status, create_app_password, create_session,
deactivate_account, delete_session, get_service_auth, get_session, list_app_passwords,
refresh_session, revoke_app_password,
};

View File

@@ -125,20 +125,35 @@ pub async fn create_session(
) -> Response {
info!("create_session: identifier='{}'", input.identifier);
let user_row = sqlx::query("SELECT u.did, u.handle, u.password_hash, k.key_bytes FROM users u JOIN user_keys k ON u.id = k.user_id WHERE u.handle = $1 OR u.email = $1")
let user_row = sqlx::query("SELECT u.id, u.did, u.handle, u.password_hash, k.key_bytes FROM users u JOIN user_keys k ON u.id = k.user_id WHERE u.handle = $1 OR u.email = $1")
.bind(&input.identifier)
.fetch_optional(&state.db)
.await;
match user_row {
Ok(Some(row)) => {
let user_id: uuid::Uuid = row.get("id");
let stored_hash: String = row.get("password_hash");
let did: String = row.get("did");
let handle: String = row.get("handle");
let key_bytes: Vec<u8> = row.get("key_bytes");
if verify(&input.password, &stored_hash).unwrap_or(false) {
let did: String = row.get("did");
let handle: String = row.get("handle");
let key_bytes: Vec<u8> = row.get("key_bytes");
let password_valid = if verify(&input.password, &stored_hash).unwrap_or(false) {
true
} else {
let app_pass_rows = sqlx::query("SELECT password_hash FROM app_passwords WHERE user_id = $1")
.bind(user_id)
.fetch_all(&state.db)
.await
.unwrap_or_default();
app_pass_rows.iter().any(|row| {
let hash: String = row.get("password_hash");
verify(&input.password, &hash).unwrap_or(false)
})
};
if password_valid {
let access_jwt = match crate::auth::create_access_token(&did, &key_bytes) {
Ok(t) => t,
Err(e) => {
@@ -468,3 +483,537 @@ pub async fn refresh_session(
}
}
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CheckAccountStatusOutput {
pub activated: bool,
pub valid_did: bool,
pub repo_commit: String,
pub repo_rev: String,
pub repo_blocks: i64,
pub indexed_records: i64,
pub private_state_values: i64,
pub expected_blobs: i64,
pub imported_blobs: i64,
}
pub async fn check_account_status(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query(
r#"
SELECT s.did, k.key_bytes, u.id as user_id
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
)
.bind(&token)
.fetch_optional(&state.db)
.await;
let (did, key_bytes, user_id) = match session {
Ok(Some(row)) => (
row.get::<String, _>("did"),
row.get::<Vec<u8>, _>("key_bytes"),
row.get::<uuid::Uuid, _>("user_id"),
),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in check_account_status: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let repo_result = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
.bind(user_id)
.fetch_optional(&state.db)
.await;
let repo_commit = match repo_result {
Ok(Some(row)) => row.get::<String, _>("repo_root_cid"),
_ => String::new(),
};
let record_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM records WHERE repo_id = $1")
.bind(user_id)
.fetch_one(&state.db)
.await
.unwrap_or(0);
let blob_count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM blobs WHERE created_by_user = $1")
.bind(user_id)
.fetch_one(&state.db)
.await
.unwrap_or(0);
let valid_did = did.starts_with("did:");
(
StatusCode::OK,
Json(CheckAccountStatusOutput {
activated: true,
valid_did,
repo_commit: repo_commit.clone(),
repo_rev: chrono::Utc::now().timestamp_millis().to_string(),
repo_blocks: 0,
indexed_records: record_count,
private_state_values: 0,
expected_blobs: blob_count,
imported_blobs: blob_count,
}),
)
.into_response()
}
pub async fn activate_account(
State(_state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DeactivateAccountInput {
pub delete_after: Option<String>,
}
pub async fn deactivate_account(
State(_state): State<AppState>,
headers: axum::http::HeaderMap,
Json(_input): Json<DeactivateAccountInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppPassword {
pub name: String,
pub created_at: String,
pub privileged: bool,
}
#[derive(Serialize)]
pub struct ListAppPasswordsOutput {
pub passwords: Vec<AppPassword>,
}
pub async fn list_app_passwords(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query(
r#"
SELECT s.did, k.key_bytes, u.id as user_id
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
)
.bind(&token)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id) = match session {
Ok(Some(row)) => (
row.get::<String, _>("did"),
row.get::<Vec<u8>, _>("key_bytes"),
row.get::<uuid::Uuid, _>("user_id"),
),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in list_app_passwords: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let result = sqlx::query("SELECT name, created_at, privileged FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC")
.bind(user_id)
.fetch_all(&state.db)
.await;
match result {
Ok(rows) => {
let passwords: Vec<AppPassword> = rows
.iter()
.map(|row| {
let name: String = row.get("name");
let created_at: chrono::DateTime<chrono::Utc> = row.get("created_at");
let privileged: bool = row.get("privileged");
AppPassword {
name,
created_at: created_at.to_rfc3339(),
privileged,
}
})
.collect();
(StatusCode::OK, Json(ListAppPasswordsOutput { passwords })).into_response()
}
Err(e) => {
error!("DB error listing app passwords: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct CreateAppPasswordInput {
pub name: String,
pub privileged: Option<bool>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateAppPasswordOutput {
pub name: String,
pub password: String,
pub created_at: String,
pub privileged: bool,
}
pub async fn create_app_password(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<CreateAppPasswordInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query(
r#"
SELECT s.did, k.key_bytes, u.id as user_id
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
)
.bind(&token)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id) = match session {
Ok(Some(row)) => (
row.get::<String, _>("did"),
row.get::<Vec<u8>, _>("key_bytes"),
row.get::<uuid::Uuid, _>("user_id"),
),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in create_app_password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let name = input.name.trim();
if name.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "name is required"})),
)
.into_response();
}
let existing = sqlx::query("SELECT id FROM app_passwords WHERE user_id = $1 AND name = $2")
.bind(user_id)
.bind(name)
.fetch_optional(&state.db)
.await;
if let Ok(Some(_)) = existing {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "DuplicateAppPassword", "message": "App password with this name already exists"})),
)
.into_response();
}
let password: String = (0..4)
.map(|_| {
use rand::Rng;
let mut rng = rand::thread_rng();
let chars: Vec<char> = "abcdefghijklmnopqrstuvwxyz234567".chars().collect();
(0..4).map(|_| chars[rng.gen_range(0..chars.len())]).collect::<String>()
})
.collect::<Vec<String>>()
.join("-");
let password_hash = match bcrypt::hash(&password, bcrypt::DEFAULT_COST) {
Ok(h) => h,
Err(e) => {
error!("Failed to hash password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let privileged = input.privileged.unwrap_or(false);
let created_at = chrono::Utc::now();
let result = sqlx::query(
"INSERT INTO app_passwords (user_id, name, password_hash, created_at, privileged) VALUES ($1, $2, $3, $4, $5)"
)
.bind(user_id)
.bind(name)
.bind(&password_hash)
.bind(created_at)
.bind(privileged)
.execute(&state.db)
.await;
match result {
Ok(_) => (
StatusCode::OK,
Json(CreateAppPasswordOutput {
name: name.to_string(),
password,
created_at: created_at.to_rfc3339(),
privileged,
}),
)
.into_response(),
Err(e) => {
error!("DB error creating app password: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct RevokeAppPasswordInput {
pub name: String,
}
pub async fn revoke_app_password(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<RevokeAppPasswordInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query(
r#"
SELECT s.did, k.key_bytes, u.id as user_id
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
)
.bind(&token)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id) = match session {
Ok(Some(row)) => (
row.get::<String, _>("did"),
row.get::<Vec<u8>, _>("key_bytes"),
row.get::<uuid::Uuid, _>("user_id"),
),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in revoke_app_password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let name = input.name.trim();
if name.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "name is required"})),
)
.into_response();
}
let result = sqlx::query("DELETE FROM app_passwords WHERE user_id = $1 AND name = $2")
.bind(user_id)
.bind(name)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AppPasswordNotFound", "message": "App password not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error revoking app password: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}

View File

@@ -86,6 +86,90 @@ pub fn app(state: AppState) -> Router {
"/xrpc/com.atproto.sync.listRepos",
get(sync::list_repos),
)
.route(
"/xrpc/com.atproto.sync.getBlob",
get(sync::get_blob),
)
.route(
"/xrpc/com.atproto.sync.listBlobs",
get(sync::list_blobs),
)
.route(
"/xrpc/com.atproto.sync.getRepoStatus",
get(sync::get_repo_status),
)
.route(
"/xrpc/com.atproto.server.checkAccountStatus",
get(api::server::check_account_status),
)
.route(
"/xrpc/com.atproto.identity.getRecommendedDidCredentials",
get(api::identity::get_recommended_did_credentials),
)
.route(
"/xrpc/com.atproto.repo.listMissingBlobs",
get(api::repo::list_missing_blobs),
)
.route(
"/xrpc/com.atproto.sync.notifyOfUpdate",
post(sync::notify_of_update),
)
.route(
"/xrpc/com.atproto.sync.requestCrawl",
post(sync::request_crawl),
)
.route(
"/xrpc/com.atproto.moderation.createReport",
post(api::moderation::create_report),
)
.route(
"/xrpc/com.atproto.admin.getAccountInfo",
get(api::admin::get_account_info),
)
.route(
"/xrpc/com.atproto.admin.getAccountInfos",
get(api::admin::get_account_infos),
)
.route(
"/xrpc/com.atproto.server.activateAccount",
post(api::server::activate_account),
)
.route(
"/xrpc/com.atproto.server.deactivateAccount",
post(api::server::deactivate_account),
)
.route(
"/xrpc/com.atproto.identity.updateHandle",
post(api::identity::update_handle),
)
.route(
"/xrpc/com.atproto.admin.deleteAccount",
post(api::admin::delete_account),
)
.route(
"/xrpc/com.atproto.admin.updateAccountEmail",
post(api::admin::update_account_email),
)
.route(
"/xrpc/com.atproto.admin.updateAccountHandle",
post(api::admin::update_account_handle),
)
.route(
"/xrpc/com.atproto.admin.updateAccountPassword",
post(api::admin::update_account_password),
)
.route(
"/xrpc/com.atproto.server.listAppPasswords",
get(api::server::list_app_passwords),
)
.route(
"/xrpc/com.atproto.server.createAppPassword",
post(api::server::create_app_password),
)
.route(
"/xrpc/com.atproto.server.revokeAppPassword",
post(api::server::revoke_app_password),
)
// I know I know, I'm not supposed to implement appview endpoints. Leave me be
.route(
"/xrpc/app.bsky.feed.getTimeline",

View File

@@ -1,14 +1,16 @@
use crate::state::AppState;
use axum::{
Json,
body::Body,
extract::{Query, State},
http::StatusCode,
http::header,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::Row;
use tracing::error;
use tracing::{error, info};
#[derive(Deserialize)]
pub struct GetLatestCommitParams {
@@ -161,3 +163,318 @@ pub async fn list_repos(
}
}
}
#[derive(Deserialize)]
pub struct GetBlobParams {
pub did: String,
pub cid: String,
}
pub async fn get_blob(
State(state): State<AppState>,
Query(params): Query<GetBlobParams>,
) -> Response {
let did = params.did.trim();
let cid = params.cid.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
if cid.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "cid is required"})),
)
.into_response();
}
let user_exists = sqlx::query("SELECT id FROM users WHERE did = $1")
.bind(did)
.fetch_optional(&state.db)
.await;
match user_exists {
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_blob: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
Ok(Some(_)) => {}
}
let blob_result = sqlx::query("SELECT storage_key, mime_type FROM blobs WHERE cid = $1")
.bind(cid)
.fetch_optional(&state.db)
.await;
match blob_result {
Ok(Some(row)) => {
let storage_key: String = row.get("storage_key");
let mime_type: String = row.get("mime_type");
match state.blob_store.get(&storage_key).await {
Ok(data) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, mime_type)
.body(Body::from(data))
.unwrap(),
Err(e) => {
error!("Failed to fetch blob from storage: {:?}", e);
(
StatusCode::NOT_FOUND,
Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})),
)
.into_response()
}
}
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({"error": "BlobNotFound", "message": "Blob not found"})),
)
.into_response(),
Err(e) => {
error!("DB error in get_blob: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct ListBlobsParams {
pub did: String,
pub since: Option<String>,
pub limit: Option<i64>,
pub cursor: Option<String>,
}
#[derive(Serialize)]
pub struct ListBlobsOutput {
pub cursor: Option<String>,
pub cids: Vec<String>,
}
pub async fn list_blobs(
State(state): State<AppState>,
Query(params): Query<ListBlobsParams>,
) -> Response {
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let limit = params.limit.unwrap_or(500).min(1000);
let cursor_cid = params.cursor.as_deref().unwrap_or("");
let user_result = sqlx::query("SELECT id FROM users WHERE did = $1")
.bind(did)
.fetch_optional(&state.db)
.await;
let user_id: uuid::Uuid = match user_result {
Ok(Some(row)) => row.get("id"),
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in list_blobs: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let result = if let Some(since) = &params.since {
sqlx::query(
r#"
SELECT cid FROM blobs
WHERE created_by_user = $1 AND cid > $2 AND created_at > $3
ORDER BY cid ASC
LIMIT $4
"#,
)
.bind(user_id)
.bind(cursor_cid)
.bind(since)
.bind(limit + 1)
.fetch_all(&state.db)
.await
} else {
sqlx::query(
r#"
SELECT cid FROM blobs
WHERE created_by_user = $1 AND cid > $2
ORDER BY cid ASC
LIMIT $3
"#,
)
.bind(user_id)
.bind(cursor_cid)
.bind(limit + 1)
.fetch_all(&state.db)
.await
};
match result {
Ok(rows) => {
let has_more = rows.len() as i64 > limit;
let cids: Vec<String> = rows
.iter()
.take(limit as usize)
.map(|row| row.get("cid"))
.collect();
let next_cursor = if has_more {
cids.last().cloned()
} else {
None
};
(
StatusCode::OK,
Json(ListBlobsOutput {
cursor: next_cursor,
cids,
}),
)
.into_response()
}
Err(e) => {
error!("DB error in list_blobs: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct GetRepoStatusParams {
pub did: String,
}
#[derive(Serialize)]
pub struct GetRepoStatusOutput {
pub did: String,
pub active: bool,
pub rev: Option<String>,
}
pub async fn get_repo_status(
State(state): State<AppState>,
Query(params): Query<GetRepoStatusParams>,
) -> Response {
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let result = sqlx::query(
r#"
SELECT u.did, r.repo_root_cid
FROM users u
LEFT JOIN repos r ON u.id = r.user_id
WHERE u.did = $1
"#,
)
.bind(did)
.fetch_optional(&state.db)
.await;
match result {
Ok(Some(row)) => {
let user_did: String = row.get("did");
let repo_root: Option<String> = row.get("repo_root_cid");
let rev = repo_root.map(|_| chrono::Utc::now().timestamp_millis().to_string());
(
StatusCode::OK,
Json(GetRepoStatusOutput {
did: user_did,
active: true,
rev,
}),
)
.into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response(),
Err(e) => {
error!("DB error in get_repo_status: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct NotifyOfUpdateParams {
pub hostname: String,
}
pub async fn notify_of_update(
State(_state): State<AppState>,
Query(params): Query<NotifyOfUpdateParams>,
) -> Response {
info!("Received notifyOfUpdate from hostname: {}", params.hostname);
(StatusCode::OK, Json(json!({}))).into_response()
}
#[derive(Deserialize)]
pub struct RequestCrawlInput {
pub hostname: String,
}
pub async fn request_crawl(
State(_state): State<AppState>,
Json(input): Json<RequestCrawlInput>,
) -> Response {
info!("Received requestCrawl for hostname: {}", input.hostname);
(StatusCode::OK, Json(json!({}))).into_response()
}

View File

@@ -304,3 +304,55 @@ async fn test_did_web_lifecycle() {
assert_eq!(record_body["value"]["displayName"], "DID Web User");
*/
}
#[tokio::test]
async fn test_get_recommended_did_credentials_success() {
let client = client();
let (access_jwt, _) = create_account_and_login(&client).await;
let res = client
.get(format!(
"{}/xrpc/com.atproto.identity.getRecommendedDidCredentials",
base_url().await
))
.bearer_auth(&access_jwt)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert!(body["rotationKeys"].is_array());
assert!(body["alsoKnownAs"].is_array());
assert!(body["verificationMethods"].is_object());
assert!(body["services"].is_object());
let rotation_keys = body["rotationKeys"].as_array().unwrap();
assert!(!rotation_keys.is_empty());
assert!(rotation_keys[0].as_str().unwrap().starts_with("did:key:"));
let also_known_as = body["alsoKnownAs"].as_array().unwrap();
assert!(!also_known_as.is_empty());
assert!(also_known_as[0].as_str().unwrap().starts_with("at://"));
assert!(body["verificationMethods"]["atproto"].is_string());
assert_eq!(body["services"]["atprotoPds"]["type"], "AtprotoPersonalDataServer");
assert!(body["services"]["atprotoPds"]["endpoint"].is_string());
}
#[tokio::test]
async fn test_get_recommended_did_credentials_no_auth() {
let client = client();
let res = client
.get(format!(
"{}/xrpc/com.atproto.identity.getRecommendedDidCredentials",
base_url().await
))
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert_eq!(body["error"], "AuthenticationRequired");
}

View File

@@ -1620,3 +1620,225 @@ async fn test_account_to_post_full_lifecycle() {
assert_eq!(describe_body["did"], did);
assert_eq!(describe_body["handle"], handle);
}
#[tokio::test]
async fn test_app_password_lifecycle() {
let client = client();
let ts = Utc::now().timestamp_millis();
let handle = format!("apppass-{}.test", ts);
let email = format!("apppass-{}@test.com", ts);
let password = "apppass-password";
let create_res = client
.post(format!(
"{}/xrpc/com.atproto.server.createAccount",
base_url().await
))
.json(&json!({
"handle": handle,
"email": email,
"password": password
}))
.send()
.await
.expect("Failed to create account");
assert_eq!(create_res.status(), StatusCode::OK);
let account: Value = create_res.json().await.unwrap();
let jwt = account["accessJwt"].as_str().unwrap();
let create_app_pass_res = client
.post(format!(
"{}/xrpc/com.atproto.server.createAppPassword",
base_url().await
))
.bearer_auth(jwt)
.json(&json!({ "name": "Test App" }))
.send()
.await
.expect("Failed to create app password");
assert_eq!(create_app_pass_res.status(), StatusCode::OK);
let app_pass: Value = create_app_pass_res.json().await.unwrap();
let app_password = app_pass["password"].as_str().unwrap().to_string();
assert_eq!(app_pass["name"], "Test App");
let list_res = client
.get(format!(
"{}/xrpc/com.atproto.server.listAppPasswords",
base_url().await
))
.bearer_auth(jwt)
.send()
.await
.expect("Failed to list app passwords");
assert_eq!(list_res.status(), StatusCode::OK);
let list_body: Value = list_res.json().await.unwrap();
let passwords = list_body["passwords"].as_array().unwrap();
assert_eq!(passwords.len(), 1);
assert_eq!(passwords[0]["name"], "Test App");
let login_res = client
.post(format!(
"{}/xrpc/com.atproto.server.createSession",
base_url().await
))
.json(&json!({
"identifier": handle,
"password": app_password
}))
.send()
.await
.expect("Failed to login with app password");
assert_eq!(login_res.status(), StatusCode::OK, "App password login should work");
let revoke_res = client
.post(format!(
"{}/xrpc/com.atproto.server.revokeAppPassword",
base_url().await
))
.bearer_auth(jwt)
.json(&json!({ "name": "Test App" }))
.send()
.await
.expect("Failed to revoke app password");
assert_eq!(revoke_res.status(), StatusCode::OK);
let login_after_revoke = client
.post(format!(
"{}/xrpc/com.atproto.server.createSession",
base_url().await
))
.json(&json!({
"identifier": handle,
"password": app_password
}))
.send()
.await
.expect("Failed to attempt login after revoke");
assert!(
login_after_revoke.status() == StatusCode::UNAUTHORIZED
|| login_after_revoke.status() == StatusCode::BAD_REQUEST,
"Revoked app password should not work"
);
let list_after_revoke = client
.get(format!(
"{}/xrpc/com.atproto.server.listAppPasswords",
base_url().await
))
.bearer_auth(jwt)
.send()
.await
.expect("Failed to list after revoke");
let list_after: Value = list_after_revoke.json().await.unwrap();
let passwords_after = list_after["passwords"].as_array().unwrap();
assert_eq!(passwords_after.len(), 0, "No app passwords should remain");
}
#[tokio::test]
async fn test_account_deactivation_lifecycle() {
let client = client();
let ts = Utc::now().timestamp_millis();
let handle = format!("deactivate-{}.test", ts);
let email = format!("deactivate-{}@test.com", ts);
let password = "deactivate-password";
let create_res = client
.post(format!(
"{}/xrpc/com.atproto.server.createAccount",
base_url().await
))
.json(&json!({
"handle": handle,
"email": email,
"password": password
}))
.send()
.await
.expect("Failed to create account");
assert_eq!(create_res.status(), StatusCode::OK);
let account: Value = create_res.json().await.unwrap();
let did = account["did"].as_str().unwrap().to_string();
let jwt = account["accessJwt"].as_str().unwrap().to_string();
let (post_uri, _) = create_post(&client, &did, &jwt, "Post before deactivation").await;
let post_rkey = post_uri.split('/').last().unwrap();
let status_before = client
.get(format!(
"{}/xrpc/com.atproto.server.checkAccountStatus",
base_url().await
))
.bearer_auth(&jwt)
.send()
.await
.expect("Failed to check status");
assert_eq!(status_before.status(), StatusCode::OK);
let status_body: Value = status_before.json().await.unwrap();
assert_eq!(status_body["activated"], true);
let deactivate_res = client
.post(format!(
"{}/xrpc/com.atproto.server.deactivateAccount",
base_url().await
))
.bearer_auth(&jwt)
.json(&json!({}))
.send()
.await
.expect("Failed to deactivate");
assert_eq!(deactivate_res.status(), StatusCode::OK);
let get_post_res = client
.get(format!(
"{}/xrpc/com.atproto.repo.getRecord",
base_url().await
))
.query(&[
("repo", did.as_str()),
("collection", "app.bsky.feed.post"),
("rkey", post_rkey),
])
.send()
.await
.expect("Failed to get post while deactivated");
assert_eq!(get_post_res.status(), StatusCode::OK, "Records should still be readable");
let activate_res = client
.post(format!(
"{}/xrpc/com.atproto.server.activateAccount",
base_url().await
))
.bearer_auth(&jwt)
.json(&json!({}))
.send()
.await
.expect("Failed to reactivate");
assert_eq!(activate_res.status(), StatusCode::OK);
let status_after_activate = client
.get(format!(
"{}/xrpc/com.atproto.server.checkAccountStatus",
base_url().await
))
.bearer_auth(&jwt)
.send()
.await
.expect("Failed to check status after activate");
assert_eq!(status_after_activate.status(), StatusCode::OK);
let (new_post_uri, _) = create_post(&client, &did, &jwt, "Post after reactivation").await;
assert!(!new_post_uri.is_empty(), "Should be able to post after reactivation");
}

View File

@@ -757,3 +757,38 @@ async fn test_apply_writes_empty_writes() {
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_list_missing_blobs() {
let client = client();
let (access_jwt, _) = create_account_and_login(&client).await;
let res = client
.get(format!(
"{}/xrpc/com.atproto.repo.listMissingBlobs",
base_url().await
))
.bearer_auth(&access_jwt)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert!(body["blobs"].is_array());
}
#[tokio::test]
async fn test_list_missing_blobs_no_auth() {
let client = client();
let res = client
.get(format!(
"{}/xrpc/com.atproto.repo.listMissingBlobs",
base_url().await
))
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
}

View File

@@ -317,3 +317,96 @@ async fn test_get_service_auth_missing_aud() {
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn test_check_account_status_success() {
let client = client();
let (access_jwt, _) = create_account_and_login(&client).await;
let res = client
.get(format!(
"{}/xrpc/com.atproto.server.checkAccountStatus",
base_url().await
))
.bearer_auth(&access_jwt)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert_eq!(body["activated"], true);
assert_eq!(body["validDid"], true);
assert!(body["repoCommit"].is_string());
assert!(body["repoRev"].is_string());
assert!(body["indexedRecords"].is_number());
}
#[tokio::test]
async fn test_check_account_status_no_auth() {
let client = client();
let res = client
.get(format!(
"{}/xrpc/com.atproto.server.checkAccountStatus",
base_url().await
))
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert_eq!(body["error"], "AuthenticationRequired");
}
#[tokio::test]
async fn test_activate_account_success() {
let client = client();
let (access_jwt, _) = create_account_and_login(&client).await;
let res = client
.post(format!(
"{}/xrpc/com.atproto.server.activateAccount",
base_url().await
))
.bearer_auth(&access_jwt)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_activate_account_no_auth() {
let client = client();
let res = client
.post(format!(
"{}/xrpc/com.atproto.server.activateAccount",
base_url().await
))
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn test_deactivate_account_success() {
let client = client();
let (access_jwt, _) = create_account_and_login(&client).await;
let res = client
.post(format!(
"{}/xrpc/com.atproto.server.deactivateAccount",
base_url().await
))
.bearer_auth(&access_jwt)
.json(&json!({}))
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
}

View File

@@ -1,6 +1,7 @@
mod common;
use common::*;
use reqwest::StatusCode;
use reqwest::header;
use serde_json::Value;
#[tokio::test]
@@ -151,3 +152,203 @@ async fn test_list_repos_pagination() {
assert_ne!(repos[0]["did"], repos2[0]["did"]);
}
}
#[tokio::test]
async fn test_get_repo_status_success() {
let client = client();
let (_, did) = create_account_and_login(&client).await;
let params = [("did", did.as_str())];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRepoStatus",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert_eq!(body["did"], did);
assert_eq!(body["active"], true);
assert!(body["rev"].is_string());
}
#[tokio::test]
async fn test_get_repo_status_not_found() {
let client = client();
let params = [("did", "did:plc:nonexistent12345")];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getRepoStatus",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::NOT_FOUND);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert_eq!(body["error"], "RepoNotFound");
}
#[tokio::test]
async fn test_list_blobs_success() {
let client = client();
let (access_jwt, did) = create_account_and_login(&client).await;
let blob_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.uploadBlob",
base_url().await
))
.header(header::CONTENT_TYPE, "text/plain")
.bearer_auth(&access_jwt)
.body("test blob content")
.send()
.await
.expect("Failed to upload blob");
assert_eq!(blob_res.status(), StatusCode::OK);
let params = [("did", did.as_str())];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.listBlobs",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert!(body["cids"].is_array());
let cids = body["cids"].as_array().unwrap();
assert!(!cids.is_empty());
}
#[tokio::test]
async fn test_list_blobs_not_found() {
let client = client();
let params = [("did", "did:plc:nonexistent12345")];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.listBlobs",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::NOT_FOUND);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert_eq!(body["error"], "RepoNotFound");
}
#[tokio::test]
async fn test_get_blob_success() {
let client = client();
let (access_jwt, did) = create_account_and_login(&client).await;
let blob_content = "test blob for get_blob";
let blob_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.uploadBlob",
base_url().await
))
.header(header::CONTENT_TYPE, "text/plain")
.bearer_auth(&access_jwt)
.body(blob_content)
.send()
.await
.expect("Failed to upload blob");
assert_eq!(blob_res.status(), StatusCode::OK);
let blob_body: Value = blob_res.json().await.expect("Response was not valid JSON");
let cid = blob_body["blob"]["ref"]["$link"].as_str().expect("No CID");
let params = [("did", did.as_str()), ("cid", cid)];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getBlob",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(
res.headers()
.get("content-type")
.and_then(|h| h.to_str().ok()),
Some("text/plain")
);
let body = res.text().await.expect("Failed to get body");
assert_eq!(body, blob_content);
}
#[tokio::test]
async fn test_get_blob_not_found() {
let client = client();
let (_, did) = create_account_and_login(&client).await;
let params = [
("did", did.as_str()),
("cid", "bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku"),
];
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.getBlob",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::NOT_FOUND);
let body: Value = res.json().await.expect("Response was not valid JSON");
assert_eq!(body["error"], "BlobNotFound");
}
#[tokio::test]
async fn test_notify_of_update() {
let client = client();
let params = [("hostname", "example.com")];
let res = client
.post(format!(
"{}/xrpc/com.atproto.sync.notifyOfUpdate",
base_url().await
))
.query(&params)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
}
#[tokio::test]
async fn test_request_crawl() {
let client = client();
let payload = serde_json::json!({"hostname": "example.com"});
let res = client
.post(format!(
"{}/xrpc/com.atproto.sync.requestCrawl",
base_url().await
))
.json(&payload)
.send()
.await
.expect("Failed to send request");
assert_eq!(res.status(), StatusCode::OK);
}