Files were getting too large, logically split them

This commit is contained in:
lewis
2025-12-09 23:15:19 +02:00
parent 264e61eb14
commit 0c57965638
16 changed files with 3628 additions and 3494 deletions

564
src/api/admin/account.rs Normal file
View File

@@ -0,0 +1,564 @@
use crate::state::AppState;
use axum::{
Json,
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{error, warn};
#[derive(Deserialize)]
pub struct GetAccountInfoParams {
pub did: String,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AccountInfo {
pub did: String,
pub handle: String,
pub email: Option<String>,
pub indexed_at: String,
pub invite_note: Option<String>,
pub invites_disabled: bool,
pub email_confirmed_at: Option<String>,
pub deactivated_at: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct GetAccountInfosOutput {
pub infos: Vec<AccountInfo>,
}
pub async fn get_account_info(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(params): Query<GetAccountInfoParams>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let result = sqlx::query!(
r#"
SELECT did, handle, email, created_at
FROM users
WHERE did = $1
"#,
did
)
.fetch_optional(&state.db)
.await;
match result {
Ok(Some(row)) => {
(
StatusCode::OK,
Json(AccountInfo {
did: row.did,
handle: row.handle,
email: Some(row.email),
indexed_at: row.created_at.to_rfc3339(),
invite_note: None,
invites_disabled: false,
email_confirmed_at: None,
deactivated_at: None,
}),
)
.into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response(),
Err(e) => {
error!("DB error in get_account_info: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct GetAccountInfosParams {
pub dids: String,
}
pub async fn get_account_infos(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(params): Query<GetAccountInfosParams>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let dids: Vec<&str> = params.dids.split(',').map(|s| s.trim()).collect();
if dids.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "dids is required"})),
)
.into_response();
}
let mut infos = Vec::new();
for did in dids {
if did.is_empty() {
continue;
}
let result = sqlx::query!(
r#"
SELECT did, handle, email, created_at
FROM users
WHERE did = $1
"#,
did
)
.fetch_optional(&state.db)
.await;
if let Ok(Some(row)) = result {
infos.push(AccountInfo {
did: row.did,
handle: row.handle,
email: Some(row.email),
indexed_at: row.created_at.to_rfc3339(),
invite_note: None,
invites_disabled: false,
email_confirmed_at: None,
deactivated_at: None,
});
}
}
(StatusCode::OK, Json(GetAccountInfosOutput { infos })).into_response()
}
#[derive(Deserialize)]
pub struct DeleteAccountInput {
pub did: String,
}
pub async fn delete_account(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<DeleteAccountInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let did = input.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let user = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await;
let user_id = match user {
Ok(Some(row)) => row.id,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
Err(e) => {
error!("DB error in delete_account: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let _ = sqlx::query!("DELETE FROM sessions WHERE did = $1", did)
.execute(&state.db)
.await;
let _ = sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id)
.execute(&state.db)
.await;
let _ = sqlx::query!("DELETE FROM repos WHERE user_id = $1", user_id)
.execute(&state.db)
.await;
let _ = sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
.execute(&state.db)
.await;
let _ = sqlx::query!("DELETE FROM user_keys WHERE user_id = $1", user_id)
.execute(&state.db)
.await;
let result = sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
.execute(&state.db)
.await;
match result {
Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(),
Err(e) => {
error!("DB error deleting account: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct UpdateAccountEmailInput {
pub account: String,
pub email: String,
}
pub async fn update_account_email(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<UpdateAccountEmailInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let account = input.account.trim();
let email = input.email.trim();
if account.is_empty() || email.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "account and email are required"})),
)
.into_response();
}
let result = sqlx::query!("UPDATE users SET email = $1 WHERE did = $2", email, account)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error updating email: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct UpdateAccountHandleInput {
pub did: String,
pub handle: String,
}
pub async fn update_account_handle(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<UpdateAccountHandleInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let did = input.did.trim();
let handle = input.handle.trim();
if did.is_empty() || handle.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did and handle are required"})),
)
.into_response();
}
if !handle
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_')
{
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"})),
)
.into_response();
}
let existing = sqlx::query!("SELECT id FROM users WHERE handle = $1 AND did != $2", handle, did)
.fetch_optional(&state.db)
.await;
if let Ok(Some(_)) = existing {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "HandleTaken", "message": "Handle is already in use"})),
)
.into_response();
}
let result = sqlx::query!("UPDATE users SET handle = $1 WHERE did = $2", handle, did)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error updating handle: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct UpdateAccountPasswordInput {
pub did: String,
pub password: String,
}
pub async fn update_account_password(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<UpdateAccountPasswordInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let did = input.did.trim();
let password = input.password.trim();
if did.is_empty() || password.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did and password are required"})),
)
.into_response();
}
let password_hash = match bcrypt::hash(password, bcrypt::DEFAULT_COST) {
Ok(h) => h,
Err(e) => {
error!("Failed to hash password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let result = sqlx::query!("UPDATE users SET password_hash = $1 WHERE did = $2", password_hash, did)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error updating password: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SendEmailInput {
pub recipient_did: String,
pub sender_did: String,
pub content: String,
pub subject: Option<String>,
pub comment: Option<String>,
}
#[derive(Serialize)]
pub struct SendEmailOutput {
pub sent: bool,
}
pub async fn send_email(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<SendEmailInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let recipient_did = input.recipient_did.trim();
let content = input.content.trim();
if recipient_did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "recipientDid is required"})),
)
.into_response();
}
if content.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "content is required"})),
)
.into_response();
}
let user = sqlx::query!(
"SELECT id, email, handle FROM users WHERE did = $1",
recipient_did
)
.fetch_optional(&state.db)
.await;
let (user_id, email, handle) = match user {
Ok(Some(row)) => (row.id, row.email, row.handle),
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Recipient account not found"})),
)
.into_response();
}
Err(e) => {
error!("DB error in send_email: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
let subject = input
.subject
.clone()
.unwrap_or_else(|| format!("Message from {}", hostname));
let notification = crate::notifications::NewNotification::email(
user_id,
crate::notifications::NotificationType::AdminEmail,
email,
subject,
content.to_string(),
);
let result = crate::notifications::enqueue_notification(&state.db, notification).await;
match result {
Ok(_) => {
tracing::info!(
"Admin email queued for {} ({})",
handle,
recipient_did
);
(StatusCode::OK, Json(SendEmailOutput { sent: true })).into_response()
}
Err(e) => {
warn!("Failed to enqueue admin email: {:?}", e);
(StatusCode::OK, Json(SendEmailOutput { sent: false })).into_response()
}
}
}

323
src/api/admin/invite.rs Normal file
View File

@@ -0,0 +1,323 @@
use crate::state::AppState;
use axum::{
Json,
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::error;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DisableInviteCodesInput {
pub codes: Option<Vec<String>>,
pub accounts: Option<Vec<String>>,
}
pub async fn disable_invite_codes(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<DisableInviteCodesInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
if let Some(codes) = &input.codes {
for code in codes {
let _ = sqlx::query!("UPDATE invite_codes SET disabled = TRUE WHERE code = $1", code)
.execute(&state.db)
.await;
}
}
if let Some(accounts) = &input.accounts {
for account in accounts {
let user = sqlx::query!("SELECT id FROM users WHERE did = $1", account)
.fetch_optional(&state.db)
.await;
if let Ok(Some(user_row)) = user {
let _ = sqlx::query!(
"UPDATE invite_codes SET disabled = TRUE WHERE created_by_user = $1",
user_row.id
)
.execute(&state.db)
.await;
}
}
}
(StatusCode::OK, Json(json!({}))).into_response()
}
#[derive(Deserialize)]
pub struct GetInviteCodesParams {
pub sort: Option<String>,
pub limit: Option<i64>,
pub cursor: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct InviteCodeInfo {
pub code: String,
pub available: i32,
pub disabled: bool,
pub for_account: String,
pub created_by: String,
pub created_at: String,
pub uses: Vec<InviteCodeUseInfo>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct InviteCodeUseInfo {
pub used_by: String,
pub used_at: String,
}
#[derive(Serialize)]
pub struct GetInviteCodesOutput {
pub cursor: Option<String>,
pub codes: Vec<InviteCodeInfo>,
}
pub async fn get_invite_codes(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(params): Query<GetInviteCodesParams>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let limit = params.limit.unwrap_or(100).min(500);
let sort = params.sort.as_deref().unwrap_or("recent");
let order_clause = match sort {
"usage" => "available_uses DESC",
_ => "created_at DESC",
};
let codes_result = if let Some(cursor) = &params.cursor {
sqlx::query_as::<_, (String, i32, Option<bool>, uuid::Uuid, chrono::DateTime<chrono::Utc>)>(&format!(
r#"
SELECT ic.code, ic.available_uses, ic.disabled, ic.created_by_user, ic.created_at
FROM invite_codes ic
WHERE ic.created_at < (SELECT created_at FROM invite_codes WHERE code = $1)
ORDER BY {}
LIMIT $2
"#,
order_clause
))
.bind(cursor)
.bind(limit)
.fetch_all(&state.db)
.await
} else {
sqlx::query_as::<_, (String, i32, Option<bool>, uuid::Uuid, chrono::DateTime<chrono::Utc>)>(&format!(
r#"
SELECT ic.code, ic.available_uses, ic.disabled, ic.created_by_user, ic.created_at
FROM invite_codes ic
ORDER BY {}
LIMIT $1
"#,
order_clause
))
.bind(limit)
.fetch_all(&state.db)
.await
};
let codes_rows = match codes_result {
Ok(rows) => rows,
Err(e) => {
error!("DB error fetching invite codes: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let mut codes = Vec::new();
for (code, available_uses, disabled, created_by_user, created_at) in &codes_rows {
let creator_did = sqlx::query_scalar!("SELECT did FROM users WHERE id = $1", created_by_user)
.fetch_optional(&state.db)
.await
.ok()
.flatten()
.unwrap_or_else(|| "unknown".to_string());
let uses_result = sqlx::query!(
r#"
SELECT u.did, icu.used_at
FROM invite_code_uses icu
JOIN users u ON icu.used_by_user = u.id
WHERE icu.code = $1
ORDER BY icu.used_at DESC
"#,
code
)
.fetch_all(&state.db)
.await;
let uses = match uses_result {
Ok(use_rows) => use_rows
.iter()
.map(|u| InviteCodeUseInfo {
used_by: u.did.clone(),
used_at: u.used_at.to_rfc3339(),
})
.collect(),
Err(_) => Vec::new(),
};
codes.push(InviteCodeInfo {
code: code.clone(),
available: *available_uses,
disabled: disabled.unwrap_or(false),
for_account: creator_did.clone(),
created_by: creator_did,
created_at: created_at.to_rfc3339(),
uses,
});
}
let next_cursor = if codes_rows.len() == limit as usize {
codes_rows.last().map(|(code, _, _, _, _)| code.clone())
} else {
None
};
(
StatusCode::OK,
Json(GetInviteCodesOutput {
cursor: next_cursor,
codes,
}),
)
.into_response()
}
#[derive(Deserialize)]
pub struct DisableAccountInvitesInput {
pub account: String,
}
pub async fn disable_account_invites(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<DisableAccountInvitesInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let account = input.account.trim();
if account.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "account is required"})),
)
.into_response();
}
let result = sqlx::query!("UPDATE users SET invites_disabled = TRUE WHERE did = $1", account)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error disabling account invites: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct EnableAccountInvitesInput {
pub account: String,
}
pub async fn enable_account_invites(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<EnableAccountInvitesInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let account = input.account.trim();
if account.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "account is required"})),
)
.into_response();
}
let result = sqlx::query!("UPDATE users SET invites_disabled = FALSE WHERE did = $1", account)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AccountNotFound", "message": "Account not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error enabling account invites: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}

File diff suppressed because it is too large Load Diff

356
src/api/admin/status.rs Normal file
View File

@@ -0,0 +1,356 @@
use crate::state::AppState;
use axum::{
Json,
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::error;
#[derive(Deserialize)]
pub struct GetSubjectStatusParams {
pub did: Option<String>,
pub uri: Option<String>,
pub blob: Option<String>,
}
#[derive(Serialize)]
pub struct SubjectStatus {
pub subject: serde_json::Value,
pub takedown: Option<StatusAttr>,
pub deactivated: Option<StatusAttr>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StatusAttr {
pub applied: bool,
pub r#ref: Option<String>,
}
pub async fn get_subject_status(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(params): Query<GetSubjectStatusParams>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
if params.did.is_none() && params.uri.is_none() && params.blob.is_none() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "Must provide did, uri, or blob"})),
)
.into_response();
}
if let Some(did) = &params.did {
let user = sqlx::query!(
"SELECT did, deactivated_at, takedown_ref FROM users WHERE did = $1",
did
)
.fetch_optional(&state.db)
.await;
match user {
Ok(Some(row)) => {
let deactivated = row.deactivated_at.map(|_| StatusAttr {
applied: true,
r#ref: None,
});
let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
applied: true,
r#ref: Some(r.clone()),
});
return (
StatusCode::OK,
Json(SubjectStatus {
subject: json!({
"$type": "com.atproto.admin.defs#repoRef",
"did": row.did
}),
takedown,
deactivated,
}),
)
.into_response();
}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_subject_status: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
}
if let Some(uri) = &params.uri {
let record = sqlx::query!(
"SELECT r.id, r.takedown_ref FROM records r WHERE r.record_cid = $1",
uri
)
.fetch_optional(&state.db)
.await;
match record {
Ok(Some(row)) => {
let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
applied: true,
r#ref: Some(r.clone()),
});
return (
StatusCode::OK,
Json(SubjectStatus {
subject: json!({
"$type": "com.atproto.repo.strongRef",
"uri": uri,
"cid": uri
}),
takedown,
deactivated: None,
}),
)
.into_response();
}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_subject_status: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
}
if let Some(blob_cid) = &params.blob {
let blob = sqlx::query!("SELECT cid, takedown_ref FROM blobs WHERE cid = $1", blob_cid)
.fetch_optional(&state.db)
.await;
match blob {
Ok(Some(row)) => {
let takedown = row.takedown_ref.as_ref().map(|r| StatusAttr {
applied: true,
r#ref: Some(r.clone()),
});
return (
StatusCode::OK,
Json(SubjectStatus {
subject: json!({
"$type": "com.atproto.admin.defs#repoBlobRef",
"did": "",
"cid": row.cid
}),
takedown,
deactivated: None,
}),
)
.into_response();
}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "SubjectNotFound", "message": "Subject not found"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_subject_status: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
}
(
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
)
.into_response()
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateSubjectStatusInput {
pub subject: serde_json::Value,
pub takedown: Option<StatusAttrInput>,
pub deactivated: Option<StatusAttrInput>,
}
#[derive(Deserialize)]
pub struct StatusAttrInput {
pub apply: bool,
pub r#ref: Option<String>,
}
pub async fn update_subject_status(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<UpdateSubjectStatusInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let subject_type = input.subject.get("$type").and_then(|t| t.as_str());
match subject_type {
Some("com.atproto.admin.defs#repoRef") => {
let did = input.subject.get("did").and_then(|d| d.as_str());
if let Some(did) = did {
if let Some(takedown) = &input.takedown {
let takedown_ref = if takedown.apply {
takedown.r#ref.clone()
} else {
None
};
let _ = sqlx::query!(
"UPDATE users SET takedown_ref = $1 WHERE did = $2",
takedown_ref,
did
)
.execute(&state.db)
.await;
}
if let Some(deactivated) = &input.deactivated {
if deactivated.apply {
let _ = sqlx::query!(
"UPDATE users SET deactivated_at = NOW() WHERE did = $1",
did
)
.execute(&state.db)
.await;
} else {
let _ = sqlx::query!(
"UPDATE users SET deactivated_at = NULL WHERE did = $1",
did
)
.execute(&state.db)
.await;
}
}
return (
StatusCode::OK,
Json(json!({
"subject": input.subject,
"takedown": input.takedown.as_ref().map(|t| json!({
"applied": t.apply,
"ref": t.r#ref
})),
"deactivated": input.deactivated.as_ref().map(|d| json!({
"applied": d.apply
}))
})),
)
.into_response();
}
}
Some("com.atproto.repo.strongRef") => {
let uri = input.subject.get("uri").and_then(|u| u.as_str());
if let Some(uri) = uri {
if let Some(takedown) = &input.takedown {
let takedown_ref = if takedown.apply {
takedown.r#ref.clone()
} else {
None
};
let _ = sqlx::query!(
"UPDATE records SET takedown_ref = $1 WHERE record_cid = $2",
takedown_ref,
uri
)
.execute(&state.db)
.await;
}
return (
StatusCode::OK,
Json(json!({
"subject": input.subject,
"takedown": input.takedown.as_ref().map(|t| json!({
"applied": t.apply,
"ref": t.r#ref
}))
})),
)
.into_response();
}
}
Some("com.atproto.admin.defs#repoBlobRef") => {
let cid = input.subject.get("cid").and_then(|c| c.as_str());
if let Some(cid) = cid {
if let Some(takedown) = &input.takedown {
let takedown_ref = if takedown.apply {
takedown.r#ref.clone()
} else {
None
};
let _ = sqlx::query!(
"UPDATE blobs SET takedown_ref = $1 WHERE cid = $2",
takedown_ref,
cid
)
.execute(&state.db)
.await;
}
return (
StatusCode::OK,
Json(json!({
"subject": input.subject,
"takedown": input.takedown.as_ref().map(|t| json!({
"applied": t.apply,
"ref": t.r#ref
}))
})),
)
.into_response();
}
}
_ => {}
}
(
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "Invalid subject type"})),
)
.into_response()
}

View File

@@ -0,0 +1,393 @@
use crate::state::AppState;
use axum::{
Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use chrono::{Duration, Utc};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::{error, info, warn};
use uuid::Uuid;
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CheckAccountStatusOutput {
pub activated: bool,
pub valid_did: bool,
pub repo_commit: String,
pub repo_rev: String,
pub repo_blocks: i64,
pub indexed_records: i64,
pub private_state_values: i64,
pub expected_blobs: i64,
pub imported_blobs: i64,
}
pub async fn check_account_status(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes, u.id as user_id
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (did, key_bytes, user_id) = match session {
Ok(Some(row)) => (row.did, row.key_bytes, row.user_id),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in check_account_status: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let user_status = sqlx::query!("SELECT deactivated_at FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await;
let deactivated_at = match user_status {
Ok(Some(row)) => row.deactivated_at,
_ => None,
};
let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
.fetch_optional(&state.db)
.await;
let repo_commit = match repo_result {
Ok(Some(row)) => row.repo_root_cid,
_ => String::new(),
};
let record_count: i64 = sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
.fetch_one(&state.db)
.await
.unwrap_or(Some(0))
.unwrap_or(0);
let blob_count: i64 =
sqlx::query_scalar!("SELECT COUNT(*) FROM blobs WHERE created_by_user = $1", user_id)
.fetch_one(&state.db)
.await
.unwrap_or(Some(0))
.unwrap_or(0);
let valid_did = did.starts_with("did:");
(
StatusCode::OK,
Json(CheckAccountStatusOutput {
activated: deactivated_at.is_none(),
valid_did,
repo_commit: repo_commit.clone(),
repo_rev: chrono::Utc::now().timestamp_millis().to_string(),
repo_blocks: 0,
indexed_records: record_count,
private_state_values: 0,
expected_blobs: blob_count,
imported_blobs: blob_count,
}),
)
.into_response()
}
pub async fn activate_account(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (did, key_bytes) = match session {
Ok(Some(row)) => (row.did, row.key_bytes),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in activate_account: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let result = sqlx::query!("UPDATE users SET deactivated_at = NULL WHERE did = $1", did)
.execute(&state.db)
.await;
match result {
Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(),
Err(e) => {
error!("DB error activating account: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DeactivateAccountInput {
pub delete_after: Option<String>,
}
pub async fn deactivate_account(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(_input): Json<DeactivateAccountInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (did, key_bytes) = match session {
Ok(Some(row)) => (row.did, row.key_bytes),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in deactivate_account: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let result = sqlx::query!("UPDATE users SET deactivated_at = NOW() WHERE did = $1", did)
.execute(&state.db)
.await;
match result {
Ok(_) => (StatusCode::OK, Json(json!({}))).into_response(),
Err(e) => {
error!("DB error deactivating account: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
pub async fn request_account_delete(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, u.id as user_id, u.email, u.handle, k.key_bytes
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (did, user_id, email, handle, key_bytes) = match session {
Ok(Some(row)) => (row.did, row.user_id, row.email, row.handle, row.key_bytes),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in request_account_delete: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let confirmation_token = Uuid::new_v4().to_string();
let expires_at = Utc::now() + Duration::minutes(15);
let insert = sqlx::query!(
"INSERT INTO account_deletion_requests (token, did, expires_at) VALUES ($1, $2, $3)",
confirmation_token,
did,
expires_at
)
.execute(&state.db)
.await;
if let Err(e) = insert {
error!("DB error creating deletion token: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
if let Err(e) = crate::notifications::enqueue_account_deletion(
&state.db,
user_id,
&email,
&handle,
&confirmation_token,
&hostname,
)
.await
{
warn!("Failed to enqueue account deletion notification: {:?}", e);
}
info!("Account deletion requested for user {}", did);
(StatusCode::OK, Json(json!({}))).into_response()
}

View File

@@ -0,0 +1,366 @@
use crate::state::AppState;
use axum::{
Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::error;
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppPassword {
pub name: String,
pub created_at: String,
pub privileged: bool,
}
#[derive(Serialize)]
pub struct ListAppPasswordsOutput {
pub passwords: Vec<AppPassword>,
}
pub async fn list_app_passwords(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes, u.id as user_id
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id) = match session {
Ok(Some(row)) => (row.did, row.key_bytes, row.user_id),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in list_app_passwords: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let result = sqlx::query!("SELECT name, created_at, privileged FROM app_passwords WHERE user_id = $1 ORDER BY created_at DESC", user_id)
.fetch_all(&state.db)
.await;
match result {
Ok(rows) => {
let passwords: Vec<AppPassword> = rows
.iter()
.map(|row| {
AppPassword {
name: row.name.clone(),
created_at: row.created_at.to_rfc3339(),
privileged: row.privileged,
}
})
.collect();
(StatusCode::OK, Json(ListAppPasswordsOutput { passwords })).into_response()
}
Err(e) => {
error!("DB error listing app passwords: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct CreateAppPasswordInput {
pub name: String,
pub privileged: Option<bool>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateAppPasswordOutput {
pub name: String,
pub password: String,
pub created_at: String,
pub privileged: bool,
}
pub async fn create_app_password(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<CreateAppPasswordInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes, u.id as user_id
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id) = match session {
Ok(Some(row)) => (row.did, row.key_bytes, row.user_id),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in create_app_password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let name = input.name.trim();
if name.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "name is required"})),
)
.into_response();
}
let existing = sqlx::query!("SELECT id FROM app_passwords WHERE user_id = $1 AND name = $2", user_id, name)
.fetch_optional(&state.db)
.await;
if let Ok(Some(_)) = existing {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "DuplicateAppPassword", "message": "App password with this name already exists"})),
)
.into_response();
}
let password: String = (0..4)
.map(|_| {
use rand::Rng;
let mut rng = rand::thread_rng();
let chars: Vec<char> = "abcdefghijklmnopqrstuvwxyz234567".chars().collect();
(0..4).map(|_| chars[rng.gen_range(0..chars.len())]).collect::<String>()
})
.collect::<Vec<String>>()
.join("-");
let password_hash = match bcrypt::hash(&password, bcrypt::DEFAULT_COST) {
Ok(h) => h,
Err(e) => {
error!("Failed to hash password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let privileged = input.privileged.unwrap_or(false);
let created_at = chrono::Utc::now();
let result = sqlx::query!(
"INSERT INTO app_passwords (user_id, name, password_hash, created_at, privileged) VALUES ($1, $2, $3, $4, $5)",
user_id,
name,
password_hash,
created_at,
privileged
)
.execute(&state.db)
.await;
match result {
Ok(_) => (
StatusCode::OK,
Json(CreateAppPasswordOutput {
name: name.to_string(),
password,
created_at: created_at.to_rfc3339(),
privileged,
}),
)
.into_response(),
Err(e) => {
error!("DB error creating app password: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct RevokeAppPasswordInput {
pub name: String,
}
pub async fn revoke_app_password(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<RevokeAppPasswordInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes, u.id as user_id
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id) = match session {
Ok(Some(row)) => (row.did, row.key_bytes, row.user_id),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in revoke_app_password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let name = input.name.trim();
if name.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "name is required"})),
)
.into_response();
}
let result = sqlx::query!("DELETE FROM app_passwords WHERE user_id = $1 AND name = $2", user_id, name)
.execute(&state.db)
.await;
match result {
Ok(r) => {
if r.rows_affected() == 0 {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "AppPasswordNotFound", "message": "App password not found"})),
)
.into_response();
}
(StatusCode::OK, Json(json!({}))).into_response()
}
Err(e) => {
error!("DB error revoking app password: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}

288
src/api/server/email.rs Normal file
View File

@@ -0,0 +1,288 @@
use crate::state::AppState;
use axum::{
Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use chrono::{Duration, Utc};
use rand::Rng;
use serde::Deserialize;
use serde_json::json;
use tracing::{error, info, warn};
fn generate_confirmation_code() -> String {
let mut rng = rand::thread_rng();
let chars: Vec<char> = "abcdefghijklmnopqrstuvwxyz234567".chars().collect();
let part1: String = (0..5).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
let part2: String = (0..5).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
format!("{}-{}", part1, part2)
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RequestEmailUpdateInput {
pub email: String,
}
pub async fn request_email_update(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<RequestEmailUpdateInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes, u.id as user_id, u.handle
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id, handle) = match session {
Ok(Some(row)) => (row.did, row.key_bytes, row.user_id, row.handle),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in request_email_update: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let email = input.email.trim().to_lowercase();
if email.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "email is required"})),
)
.into_response();
}
let exists = sqlx::query!("SELECT 1 as one FROM users WHERE LOWER(email) = $1", email)
.fetch_optional(&state.db)
.await;
if let Ok(Some(_)) = exists {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "EmailTaken", "message": "Email already taken"})),
)
.into_response();
}
let code = generate_confirmation_code();
let expires_at = Utc::now() + Duration::minutes(10);
let update = sqlx::query!(
"UPDATE users SET email_pending_verification = $1, email_confirmation_code = $2, email_confirmation_code_expires_at = $3 WHERE id = $4",
email,
code,
expires_at,
user_id
)
.execute(&state.db)
.await;
if let Err(e) = update {
error!("DB error setting email update code: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
if let Err(e) = crate::notifications::enqueue_email_update(
&state.db,
user_id,
&email,
&handle,
&code,
&hostname,
)
.await
{
warn!("Failed to enqueue email update notification: {:?}", e);
}
info!("Email update requested for user {}", user_id);
(StatusCode::OK, Json(json!({ "tokenRequired": true }))).into_response()
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConfirmEmailInput {
pub email: String,
pub token: String,
}
pub async fn confirm_email(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<ConfirmEmailInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes, u.id as user_id, u.email_confirmation_code, u.email_confirmation_code_expires_at, u.email_pending_verification
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id, stored_code, expires_at, email_pending_verification) = match session {
Ok(Some(row)) => (
row.did,
row.key_bytes,
row.user_id,
row.email_confirmation_code,
row.email_confirmation_code_expires_at,
row.email_pending_verification,
),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in confirm_email: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let email = input.email.trim().to_lowercase();
let confirmation_code = input.token.trim();
if email_pending_verification.is_none() || stored_code.is_none() || expires_at.is_none() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "No pending email update found"})),
)
.into_response();
}
let email_pending_verification = email_pending_verification.unwrap();
if email_pending_verification != email {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "Email does not match pending update"})),
)
.into_response();
}
if stored_code.unwrap() != confirmation_code {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidToken", "message": "Invalid token"})),
)
.into_response();
}
if Utc::now() > expires_at.unwrap() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
)
.into_response();
}
let update = sqlx::query!(
"UPDATE users SET email = $1, email_pending_verification = NULL, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE id = $2",
email_pending_verification,
user_id
)
.execute(&state.db)
.await;
if let Err(e) = update {
error!("DB error finalizing email update: {:?}", e);
if e.as_database_error().map(|db_err| db_err.is_unique_violation()).unwrap_or(false) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "EmailTaken", "message": "Email already taken"})),
)
.into_response();
}
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
info!("Email updated for user {}", user_id);
(StatusCode::OK, Json(json!({}))).into_response()
}

View File

@@ -1,12 +1,19 @@
pub mod account_status;
pub mod app_password;
pub mod email;
pub mod invite;
pub mod meta;
pub mod password;
pub mod session;
pub use account_status::{
activate_account, check_account_status, deactivate_account, request_account_delete,
};
pub use app_password::{create_app_password, list_app_passwords, revoke_app_password};
pub use email::{confirm_email, request_email_update};
pub use invite::{create_invite_code, create_invite_codes, get_account_invite_codes};
pub use meta::{describe_server, health};
pub use password::{request_password_reset, reset_password};
pub use session::{
activate_account, check_account_status, confirm_email, create_app_password, create_session,
deactivate_account, delete_session, get_service_auth, get_session, list_app_passwords,
refresh_session, request_account_delete, request_email_update, request_password_reset,
reset_password, revoke_app_password,
create_session, delete_session, get_service_auth, get_session, refresh_session,
};

221
src/api/server/password.rs Normal file
View File

@@ -0,0 +1,221 @@
use crate::state::AppState;
use axum::{
Json,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
};
use bcrypt::{hash, DEFAULT_COST};
use chrono::{Duration, Utc};
use rand::Rng;
use serde::Deserialize;
use serde_json::json;
use tracing::{error, info, warn};
fn generate_reset_code() -> String {
let mut rng = rand::thread_rng();
let chars: Vec<char> = "abcdefghijklmnopqrstuvwxyz234567".chars().collect();
let part1: String = (0..5).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
let part2: String = (0..5).map(|_| chars[rng.gen_range(0..chars.len())]).collect();
format!("{}-{}", part1, part2)
}
#[derive(Deserialize)]
pub struct RequestPasswordResetInput {
pub email: String,
}
pub async fn request_password_reset(
State(state): State<AppState>,
Json(input): Json<RequestPasswordResetInput>,
) -> Response {
let email = input.email.trim().to_lowercase();
if email.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "email is required"})),
)
.into_response();
}
let user = sqlx::query!(
"SELECT id, handle FROM users WHERE LOWER(email) = $1",
email
)
.fetch_optional(&state.db)
.await;
let (user_id, handle) = match user {
Ok(Some(row)) => (row.id, row.handle),
Ok(None) => {
info!("Password reset requested for unknown email: {}", email);
return (StatusCode::OK, Json(json!({}))).into_response();
}
Err(e) => {
error!("DB error in request_password_reset: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let code = generate_reset_code();
let expires_at = Utc::now() + Duration::minutes(10);
let update = sqlx::query!(
"UPDATE users SET password_reset_code = $1, password_reset_code_expires_at = $2 WHERE id = $3",
code,
expires_at,
user_id
)
.execute(&state.db)
.await;
if let Err(e) = update {
error!("DB error setting reset code: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
if let Err(e) = crate::notifications::enqueue_password_reset(
&state.db,
user_id,
&email,
&handle,
&code,
&hostname,
)
.await
{
warn!("Failed to enqueue password reset notification: {:?}", e);
}
info!("Password reset requested for user {}", user_id);
(StatusCode::OK, Json(json!({}))).into_response()
}
#[derive(Deserialize)]
pub struct ResetPasswordInput {
pub token: String,
pub password: String,
}
pub async fn reset_password(
State(state): State<AppState>,
Json(input): Json<ResetPasswordInput>,
) -> Response {
let token = input.token.trim();
let password = &input.password;
if token.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidToken", "message": "token is required"})),
)
.into_response();
}
if password.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "password is required"})),
)
.into_response();
}
let user = sqlx::query!(
"SELECT id, password_reset_code, password_reset_code_expires_at FROM users WHERE password_reset_code = $1",
token
)
.fetch_optional(&state.db)
.await;
let (user_id, expires_at) = match user {
Ok(Some(row)) => {
let expires = row.password_reset_code_expires_at;
(row.id, expires)
}
Ok(None) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
)
.into_response();
}
Err(e) => {
error!("DB error in reset_password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Some(exp) = expires_at {
if Utc::now() > exp {
let _ = sqlx::query!(
"UPDATE users SET password_reset_code = NULL, password_reset_code_expires_at = NULL WHERE id = $1",
user_id
)
.execute(&state.db)
.await;
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
)
.into_response();
}
} else {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidToken", "message": "Invalid or expired token"})),
)
.into_response();
}
let password_hash = match hash(password, DEFAULT_COST) {
Ok(h) => h,
Err(e) => {
error!("Failed to hash password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let update = sqlx::query!(
"UPDATE users SET password_hash = $1, password_reset_code = NULL, password_reset_code_expires_at = NULL WHERE id = $2",
password_hash,
user_id
)
.execute(&state.db)
.await;
if let Err(e) = update {
error!("DB error updating password: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
let _ = sqlx::query!("DELETE FROM sessions WHERE did = (SELECT did FROM users WHERE id = $1)", user_id)
.execute(&state.db)
.await;
info!("Password reset completed for user {}", user_id);
(StatusCode::OK, Json(json!({}))).into_response()
}

File diff suppressed because it is too large Load Diff

229
src/sync/blob.rs Normal file
View File

@@ -0,0 +1,229 @@
use crate::state::AppState;
use axum::{
Json,
body::Body,
extract::{Query, State},
http::StatusCode,
http::header,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::error;
#[derive(Deserialize)]
pub struct GetBlobParams {
pub did: String,
pub cid: String,
}
pub async fn get_blob(
State(state): State<AppState>,
Query(params): Query<GetBlobParams>,
) -> Response {
let did = params.did.trim();
let cid = params.cid.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
if cid.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "cid is required"})),
)
.into_response();
}
let user_exists = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await;
match user_exists {
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_blob: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
Ok(Some(_)) => {}
}
let blob_result = sqlx::query!("SELECT storage_key, mime_type FROM blobs WHERE cid = $1", cid)
.fetch_optional(&state.db)
.await;
match blob_result {
Ok(Some(row)) => {
let storage_key = &row.storage_key;
let mime_type = &row.mime_type;
match state.blob_store.get(&storage_key).await {
Ok(data) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, mime_type)
.body(Body::from(data))
.unwrap(),
Err(e) => {
error!("Failed to fetch blob from storage: {:?}", e);
(
StatusCode::NOT_FOUND,
Json(json!({"error": "BlobNotFound", "message": "Blob not found in storage"})),
)
.into_response()
}
}
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({"error": "BlobNotFound", "message": "Blob not found"})),
)
.into_response(),
Err(e) => {
error!("DB error in get_blob: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct ListBlobsParams {
pub did: String,
pub since: Option<String>,
pub limit: Option<i64>,
pub cursor: Option<String>,
}
#[derive(Serialize)]
pub struct ListBlobsOutput {
pub cursor: Option<String>,
pub cids: Vec<String>,
}
pub async fn list_blobs(
State(state): State<AppState>,
Query(params): Query<ListBlobsParams>,
) -> Response {
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let limit = params.limit.unwrap_or(500).min(1000);
let cursor_cid = params.cursor.as_deref().unwrap_or("");
let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await;
let user_id = match user_result {
Ok(Some(row)) => row.id,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in list_blobs: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let cids_result: Result<Vec<String>, sqlx::Error> = if let Some(since) = &params.since {
let since_time = chrono::DateTime::parse_from_rfc3339(since)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
sqlx::query!(
r#"
SELECT cid FROM blobs
WHERE created_by_user = $1 AND cid > $2 AND created_at > $3
ORDER BY cid ASC
LIMIT $4
"#,
user_id,
cursor_cid,
since_time,
limit + 1
)
.fetch_all(&state.db)
.await
.map(|rows| rows.into_iter().map(|r| r.cid).collect())
} else {
sqlx::query!(
r#"
SELECT cid FROM blobs
WHERE created_by_user = $1 AND cid > $2
ORDER BY cid ASC
LIMIT $3
"#,
user_id,
cursor_cid,
limit + 1
)
.fetch_all(&state.db)
.await
.map(|rows| rows.into_iter().map(|r| r.cid).collect())
};
match cids_result {
Ok(cids) => {
let has_more = cids.len() as i64 > limit;
let cids: Vec<String> = cids
.into_iter()
.take(limit as usize)
.collect();
let next_cursor = if has_more {
cids.last().cloned()
} else {
None
};
(
StatusCode::OK,
Json(ListBlobsOutput {
cursor: next_cursor,
cids,
}),
)
.into_response()
}
Err(e) => {
error!("DB error in list_blobs: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}

32
src/sync/car.rs Normal file
View File

@@ -0,0 +1,32 @@
use cid::Cid;
use std::io::Write;
pub fn write_varint<W: Write>(mut writer: W, mut value: u64) -> std::io::Result<()> {
loop {
let mut byte = (value & 0x7F) as u8;
value >>= 7;
if value != 0 {
byte |= 0x80;
}
writer.write_all(&[byte])?;
if value == 0 {
break;
}
}
Ok(())
}
pub fn ld_write<W: Write>(mut writer: W, data: &[u8]) -> std::io::Result<()> {
write_varint(&mut writer, data.len() as u64)?;
writer.write_all(data)?;
Ok(())
}
pub fn encode_car_header(root_cid: &Cid) -> Vec<u8> {
let header = serde_ipld_dagcbor::to_vec(&serde_json::json!({
"version": 1u64,
"roots": [root_cid.to_bytes()]
}))
.unwrap_or_default();
header
}

227
src/sync/commit.rs Normal file
View File

@@ -0,0 +1,227 @@
use crate::state::AppState;
use axum::{
Json,
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::error;
#[derive(Deserialize)]
pub struct GetLatestCommitParams {
pub did: String,
}
#[derive(Serialize)]
pub struct GetLatestCommitOutput {
pub cid: String,
pub rev: String,
}
pub async fn get_latest_commit(
State(state): State<AppState>,
Query(params): Query<GetLatestCommitParams>,
) -> Response {
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let result = sqlx::query!(
r#"
SELECT r.repo_root_cid
FROM repos r
JOIN users u ON r.user_id = u.id
WHERE u.did = $1
"#,
did
)
.fetch_optional(&state.db)
.await;
match result {
Ok(Some(row)) => {
(
StatusCode::OK,
Json(GetLatestCommitOutput {
cid: row.repo_root_cid,
rev: chrono::Utc::now().timestamp_millis().to_string(),
}),
)
.into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response(),
Err(e) => {
error!("DB error in get_latest_commit: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct ListReposParams {
pub limit: Option<i64>,
pub cursor: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RepoInfo {
pub did: String,
pub head: String,
pub rev: String,
pub active: bool,
}
#[derive(Serialize)]
pub struct ListReposOutput {
pub cursor: Option<String>,
pub repos: Vec<RepoInfo>,
}
pub async fn list_repos(
State(state): State<AppState>,
Query(params): Query<ListReposParams>,
) -> Response {
let limit = params.limit.unwrap_or(50).min(1000);
let cursor_did = params.cursor.as_deref().unwrap_or("");
let result = sqlx::query!(
r#"
SELECT u.did, r.repo_root_cid
FROM repos r
JOIN users u ON r.user_id = u.id
WHERE u.did > $1
ORDER BY u.did ASC
LIMIT $2
"#,
cursor_did,
limit + 1
)
.fetch_all(&state.db)
.await;
match result {
Ok(rows) => {
let has_more = rows.len() as i64 > limit;
let repos: Vec<RepoInfo> = rows
.iter()
.take(limit as usize)
.map(|row| {
RepoInfo {
did: row.did.clone(),
head: row.repo_root_cid.clone(),
rev: chrono::Utc::now().timestamp_millis().to_string(),
active: true,
}
})
.collect();
let next_cursor = if has_more {
repos.last().map(|r| r.did.clone())
} else {
None
};
(
StatusCode::OK,
Json(ListReposOutput {
cursor: next_cursor,
repos,
}),
)
.into_response()
}
Err(e) => {
error!("DB error in list_repos: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}
#[derive(Deserialize)]
pub struct GetRepoStatusParams {
pub did: String,
}
#[derive(Serialize)]
pub struct GetRepoStatusOutput {
pub did: String,
pub active: bool,
pub rev: Option<String>,
}
pub async fn get_repo_status(
State(state): State<AppState>,
Query(params): Query<GetRepoStatusParams>,
) -> Response {
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let result = sqlx::query!(
r#"
SELECT u.did, r.repo_root_cid
FROM users u
LEFT JOIN repos r ON u.id = r.user_id
WHERE u.did = $1
"#,
did
)
.fetch_optional(&state.db)
.await;
match result {
Ok(Some(row)) => {
let rev = Some(chrono::Utc::now().timestamp_millis().to_string());
(
StatusCode::OK,
Json(GetRepoStatusOutput {
did: row.did,
active: true,
rev,
}),
)
.into_response()
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response(),
Err(e) => {
error!("DB error in get_repo_status: {:?}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response()
}
}
}

40
src/sync/crawl.rs Normal file
View File

@@ -0,0 +1,40 @@
use crate::state::AppState;
use axum::{
Json,
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
};
use serde::Deserialize;
use serde_json::json;
use tracing::info;
#[derive(Deserialize)]
pub struct NotifyOfUpdateParams {
pub hostname: String,
}
pub async fn notify_of_update(
State(_state): State<AppState>,
Query(params): Query<NotifyOfUpdateParams>,
) -> Response {
info!("Received notifyOfUpdate from hostname: {}", params.hostname);
info!("TODO: Queue job for notifyOfUpdate (not implemented)");
(StatusCode::OK, Json(json!({}))).into_response()
}
#[derive(Deserialize)]
pub struct RequestCrawlInput {
pub hostname: String,
}
pub async fn request_crawl(
State(_state): State<AppState>,
Json(input): Json<RequestCrawlInput>,
) -> Response {
info!("Received requestCrawl for hostname: {}", input.hostname);
info!("TODO: Queue job for requestCrawl (not implemented)");
(StatusCode::OK, Json(json!({}))).into_response()
}

File diff suppressed because it is too large Load Diff

556
src/sync/repo.rs Normal file
View File

@@ -0,0 +1,556 @@
use crate::state::AppState;
use crate::sync::car::{encode_car_header, ld_write};
use axum::{
Json,
body::Body,
extract::{Query, State},
http::StatusCode,
http::header,
response::{IntoResponse, Response},
};
use bytes::Bytes;
use cid::Cid;
use jacquard_repo::{commit::Commit, storage::BlockStore};
use serde::Deserialize;
use serde_json::json;
use std::collections::HashSet;
use tracing::error;
#[derive(Deserialize)]
pub struct GetBlocksParams {
pub did: String,
pub cids: String,
}
pub async fn get_blocks(
State(state): State<AppState>,
Query(params): Query<GetBlocksParams>,
) -> Response {
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let cid_strings: Vec<&str> = params.cids.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()).collect();
if cid_strings.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "cids is required"})),
)
.into_response();
}
let repo_result = sqlx::query!(
r#"
SELECT r.repo_root_cid
FROM repos r
JOIN users u ON r.user_id = u.id
WHERE u.did = $1
"#,
did
)
.fetch_optional(&state.db)
.await;
let repo_root_cid_str = match repo_result {
Ok(Some(row)) => row.repo_root_cid,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_blocks: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let root_cid = match repo_root_cid_str.parse::<Cid>() {
Ok(c) => c,
Err(e) => {
error!("Failed to parse root CID: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let mut requested_cids: Vec<Cid> = Vec::new();
for cid_str in &cid_strings {
match cid_str.parse::<Cid>() {
Ok(c) => requested_cids.push(c),
Err(e) => {
error!("Failed to parse CID '{}': {:?}", cid_str, e);
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": format!("Invalid CID: {}", cid_str)})),
)
.into_response();
}
}
}
let mut buf = Vec::new();
let car_header = encode_car_header(&root_cid);
if let Err(e) = ld_write(&mut buf, &car_header) {
error!("Failed to write CAR header: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
for cid in &requested_cids {
let cid_bytes = cid.to_bytes();
let block_result = sqlx::query!(
"SELECT data FROM blocks WHERE cid = $1",
&cid_bytes
)
.fetch_optional(&state.db)
.await;
match block_result {
Ok(Some(row)) => {
let mut block_data = Vec::new();
block_data.extend_from_slice(&cid_bytes);
block_data.extend_from_slice(&row.data);
if let Err(e) = ld_write(&mut buf, &block_data) {
error!("Failed to write block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "BlockNotFound", "message": format!("Block not found: {}", cid)})),
)
.into_response();
}
Err(e) => {
error!("DB error fetching block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
}
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/vnd.ipld.car")
.body(Body::from(buf))
.unwrap()
}
#[derive(Deserialize)]
pub struct GetRepoParams {
pub did: String,
pub since: Option<String>,
}
pub async fn get_repo(
State(state): State<AppState>,
Query(params): Query<GetRepoParams>,
) -> Response {
let did = params.did.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await;
let user_id = match user_result {
Ok(Some(row)) => row.id,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_repo: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
.fetch_optional(&state.db)
.await;
let repo_root_cid_str = match repo_result {
Ok(Some(row)) => row.repo_root_cid,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
)
.into_response();
}
Err(e) => {
error!("DB error in get_repo: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let root_cid = match repo_root_cid_str.parse::<Cid>() {
Ok(c) => c,
Err(e) => {
error!("Failed to parse root CID: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let commit_bytes = match state.block_store.get(&root_cid).await {
Ok(Some(b)) => b,
Ok(None) => {
error!("Commit block not found: {}", root_cid);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
Err(e) => {
error!("Failed to load commit block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let commit = match Commit::from_cbor(&commit_bytes) {
Ok(c) => c,
Err(e) => {
error!("Failed to parse commit: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
let mut visited: HashSet<Vec<u8>> = HashSet::new();
collected_blocks.push((root_cid, commit_bytes.clone()));
visited.insert(root_cid.to_bytes());
let mst_root_cid = commit.data;
if !visited.contains(&mst_root_cid.to_bytes()) {
visited.insert(mst_root_cid.to_bytes());
if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
collected_blocks.push((mst_root_cid, data));
}
}
let records = sqlx::query!("SELECT record_cid FROM records WHERE repo_id = $1", user_id)
.fetch_all(&state.db)
.await
.unwrap_or_default();
for record in records {
if let Ok(cid) = record.record_cid.parse::<Cid>() {
if !visited.contains(&cid.to_bytes()) {
visited.insert(cid.to_bytes());
if let Ok(Some(data)) = state.block_store.get(&cid).await {
collected_blocks.push((cid, data));
}
}
}
}
let mut buf = Vec::new();
let car_header = encode_car_header(&root_cid);
if let Err(e) = ld_write(&mut buf, &car_header) {
error!("Failed to write CAR header: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
for (cid, data) in &collected_blocks {
let mut block_data = Vec::new();
block_data.extend_from_slice(&cid.to_bytes());
block_data.extend_from_slice(data);
if let Err(e) = ld_write(&mut buf, &block_data) {
error!("Failed to write block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/vnd.ipld.car")
.body(Body::from(buf))
.unwrap()
}
#[derive(Deserialize)]
pub struct GetRecordParams {
pub did: String,
pub collection: String,
pub rkey: String,
}
pub async fn get_record(
State(state): State<AppState>,
Query(params): Query<GetRecordParams>,
) -> Response {
let did = params.did.trim();
let collection = params.collection.trim();
let rkey = params.rkey.trim();
if did.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "did is required"})),
)
.into_response();
}
if collection.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "collection is required"})),
)
.into_response();
}
if rkey.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "rkey is required"})),
)
.into_response();
}
let user_result = sqlx::query!("SELECT id FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await;
let user_id = match user_result {
Ok(Some(row)) => row.id,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
)
.into_response();
}
Err(e) => {
error!("DB error in sync get_record: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let record_result = sqlx::query!(
"SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3",
user_id,
collection,
rkey
)
.fetch_optional(&state.db)
.await;
let record_cid_str = match record_result {
Ok(Some(row)) => row.record_cid,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RecordNotFound", "message": "Record not found"})),
)
.into_response();
}
Err(e) => {
error!("DB error in sync get_record: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let record_cid = match record_cid_str.parse::<Cid>() {
Ok(c) => c,
Err(e) => {
error!("Failed to parse record CID: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let repo_result = sqlx::query!("SELECT repo_root_cid FROM repos WHERE user_id = $1", user_id)
.fetch_optional(&state.db)
.await;
let repo_root_cid_str = match repo_result {
Ok(Some(row)) => row.repo_root_cid,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RepoNotFound", "message": "Repository not initialized"})),
)
.into_response();
}
Err(e) => {
error!("DB error in sync get_record: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let root_cid = match repo_root_cid_str.parse::<Cid>() {
Ok(c) => c,
Err(e) => {
error!("Failed to parse root CID: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let mut collected_blocks: Vec<(Cid, Bytes)> = Vec::new();
let commit_bytes = match state.block_store.get(&root_cid).await {
Ok(Some(b)) => b,
Ok(None) => {
error!("Commit block not found: {}", root_cid);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
Err(e) => {
error!("Failed to load commit block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
collected_blocks.push((root_cid, commit_bytes.clone()));
let commit = match Commit::from_cbor(&commit_bytes) {
Ok(c) => c,
Err(e) => {
error!("Failed to parse commit: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let mst_root_cid = commit.data;
if let Ok(Some(data)) = state.block_store.get(&mst_root_cid).await {
collected_blocks.push((mst_root_cid, data));
}
if let Ok(Some(data)) = state.block_store.get(&record_cid).await {
collected_blocks.push((record_cid, data));
} else {
return (
StatusCode::NOT_FOUND,
Json(json!({"error": "RecordNotFound", "message": "Record block not found"})),
)
.into_response();
}
let mut buf = Vec::new();
let car_header = encode_car_header(&root_cid);
if let Err(e) = ld_write(&mut buf, &car_header) {
error!("Failed to write CAR header: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
for (cid, data) in &collected_blocks {
let mut block_data = Vec::new();
block_data.extend_from_slice(&cid.to_bytes());
block_data.extend_from_slice(data);
if let Err(e) = ld_write(&mut buf, &block_data) {
error!("Failed to write block: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
}
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "application/vnd.ipld.car")
.body(Body::from(buf))
.unwrap()
}