Initial did:web and objsto impl

This commit is contained in:
Lewis
2025-12-06 12:55:28 +02:00
parent db272e9f4f
commit 9da8823d18
25 changed files with 2470 additions and 1365 deletions

354
src/api/identity.rs Normal file
View File

@@ -0,0 +1,354 @@
use axum::{
extract::{State, Path},
Json,
response::{IntoResponse, Response},
http::StatusCode,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::state::AppState;
use sqlx::Row;
use bcrypt::{hash, DEFAULT_COST};
use tracing::{info, error};
use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore};
use jacquard::types::{string::Tid, did::Did, integer::LimitedU32};
use std::sync::Arc;
use k256::SecretKey;
use rand::rngs::OsRng;
use base64::Engine;
#[derive(Deserialize)]
pub struct CreateAccountInput {
pub handle: String,
pub email: String,
pub password: String,
#[serde(rename = "inviteCode")]
pub invite_code: Option<String>,
pub did: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateAccountOutput {
pub access_jwt: String,
pub refresh_jwt: String,
pub handle: String,
pub did: String,
}
pub async fn create_account(
State(state): State<AppState>,
Json(input): Json<CreateAccountInput>,
) -> Response {
info!("create_account hit: {}", input.handle);
if input.handle.contains('!') || input.handle.contains('@') {
return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}))).into_response();
}
let did = if let Some(d) = &input.did {
if d.trim().is_empty() {
format!("did:plc:{}", uuid::Uuid::new_v4())
} else {
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
let _expected_prefix = format!("did:web:{}", hostname);
// TODO: should verify we are the authority for it if it matches our hostname.
// TODO: if it's an external did:web, we should technically verify ownership via ServiceAuth, but skipping for now.
d.clone()
}
} else {
format!("did:plc:{}", uuid::Uuid::new_v4())
};
let mut tx = match state.db.begin().await {
Ok(tx) => tx,
Err(e) => {
error!("Error starting transaction: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let exists_query = sqlx::query("SELECT 1 FROM users WHERE handle = $1")
.bind(&input.handle)
.fetch_optional(&mut *tx)
.await;
match exists_query {
Ok(Some(_)) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "HandleTaken", "message": "Handle already taken"}))).into_response(),
Err(e) => {
error!("Error checking handle: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
Ok(None) => {}
}
if let Some(code) = &input.invite_code {
let invite_query = sqlx::query("SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE")
.bind(code)
.fetch_optional(&mut *tx)
.await;
match invite_query {
Ok(Some(row)) => {
let uses: i32 = row.get("available_uses");
if uses <= 0 {
return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
}
let update_invite = sqlx::query("UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1")
.bind(code)
.execute(&mut *tx)
.await;
if let Err(e) = update_invite {
error!("Error updating invite code: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
},
Ok(None) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"}))).into_response(),
Err(e) => {
error!("Error checking invite code: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
}
}
let password_hash = match hash(&input.password, DEFAULT_COST) {
Ok(h) => h,
Err(e) => {
error!("Error hashing password: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let user_insert = sqlx::query("INSERT INTO users (handle, email, did, password_hash) VALUES ($1, $2, $3, $4) RETURNING id")
.bind(&input.handle)
.bind(&input.email)
.bind(&did)
.bind(&password_hash)
.fetch_one(&mut *tx)
.await;
let user_id: uuid::Uuid = match user_insert {
Ok(row) => row.get("id"),
Err(e) => {
error!("Error inserting user: {:?}", e);
// TODO: Check for unique constraint violation on email/did specifically
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let secret_key = SecretKey::random(&mut OsRng);
let secret_key_bytes = secret_key.to_bytes();
let key_insert = sqlx::query("INSERT INTO user_keys (user_id, key_bytes) VALUES ($1, $2)")
.bind(user_id)
.bind(&secret_key_bytes[..])
.execute(&mut *tx)
.await;
if let Err(e) = key_insert {
error!("Error inserting user key: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
let mst = Mst::new(Arc::new(state.block_store.clone()));
let mst_root = match mst.root().await {
Ok(c) => c,
Err(e) => {
error!("Error creating MST root: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let did_obj = match Did::new(&did) {
Ok(d) => d,
Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(),
};
let rev = Tid::now(LimitedU32::MIN);
let commit = Commit::new_unsigned(
did_obj,
mst_root,
rev,
None
);
let commit_bytes = match commit.to_cbor() {
Ok(b) => b,
Err(e) => {
error!("Error serializing genesis commit: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let commit_cid = match state.block_store.put(&commit_bytes).await {
Ok(c) => c,
Err(e) => {
error!("Error saving genesis commit: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let repo_insert = sqlx::query("INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)")
.bind(user_id)
.bind(commit_cid.to_string())
.execute(&mut *tx)
.await;
if let Err(e) = repo_insert {
error!("Error initializing repo: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
if let Some(code) = &input.invite_code {
let use_insert = sqlx::query("INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)")
.bind(code)
.bind(user_id)
.execute(&mut *tx)
.await;
if let Err(e) = use_insert {
error!("Error recording invite usage: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
}
let access_jwt = crate::auth::create_access_token(&did, &secret_key_bytes[..]).map_err(|e| {
error!("Error creating access token: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
});
let access_jwt = match access_jwt {
Ok(t) => t,
Err(r) => return r,
};
let refresh_jwt = crate::auth::create_refresh_token(&did, &secret_key_bytes[..]).map_err(|e| {
error!("Error creating refresh token: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
});
let refresh_jwt = match refresh_jwt {
Ok(t) => t,
Err(r) => return r,
};
let session_insert = sqlx::query("INSERT INTO sessions (access_jwt, refresh_jwt, did) VALUES ($1, $2, $3)")
.bind(&access_jwt)
.bind(&refresh_jwt)
.bind(&did)
.execute(&mut *tx)
.await;
if let Err(e) = session_insert {
error!("Error inserting session: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
if let Err(e) = tx.commit().await {
error!("Error committing transaction: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
(StatusCode::OK, Json(CreateAccountOutput {
access_jwt,
refresh_jwt,
handle: input.handle,
did,
})).into_response()
}
fn get_jwk(key_bytes: &[u8]) -> serde_json::Value {
use k256::elliptic_curve::sec1::ToEncodedPoint;
let secret_key = SecretKey::from_slice(key_bytes).expect("Invalid key length");
let public_key = secret_key.public_key();
let encoded = public_key.to_encoded_point(false);
let x = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(encoded.x().unwrap());
let y = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(encoded.y().unwrap());
json!({
"kty": "EC",
"crv": "secp256k1",
"x": x,
"y": y
})
}
pub async fn well_known_did(State(_state): State<AppState>) -> impl IntoResponse {
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
// Kinda for local dev, encode hostname if it contains port
let did = if hostname.contains(':') {
format!("did:web:{}", hostname.replace(':', "%3A"))
} else {
format!("did:web:{}", hostname)
};
Json(json!({
"@context": ["https://www.w3.org/ns/did/v1"],
"id": did,
"service": [{
"id": "#atproto_pds",
"type": "AtprotoPersonalDataServer",
"serviceEndpoint": format!("https://{}", hostname)
}]
}))
}
pub async fn user_did_doc(
State(state): State<AppState>,
Path(handle): Path<String>,
) -> Response {
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
let user = sqlx::query("SELECT id, did FROM users WHERE handle = $1")
.bind(&handle)
.fetch_optional(&state.db)
.await;
let (user_id, did) = match user {
Ok(Some(row)) => {
let id: uuid::Uuid = row.get("id");
let d: String = row.get("did");
(id, d)
},
Ok(None) => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound"}))).into_response(),
Err(e) => {
error!("DB Error: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
},
};
if !did.starts_with("did:web:") {
return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "User is not did:web"}))).into_response();
}
let key_row = sqlx::query("SELECT key_bytes FROM user_keys WHERE user_id = $1")
.bind(user_id)
.fetch_optional(&state.db)
.await;
let key_bytes: Vec<u8> = match key_row {
Ok(Some(row)) => row.get("key_bytes"),
_ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(),
};
let jwk = get_jwk(&key_bytes);
Json(json!({
"@context": ["https://www.w3.org/ns/did/v1", "https://w3id.org/security/suites/jws-2020/v1"],
"id": did,
"alsoKnownAs": [format!("at://{}", handle)],
"verificationMethod": [{
"id": format!("{}#atproto", did),
"type": "JsonWebKey2020",
"controller": did,
"publicKeyJwk": jwk
}],
"service": [{
"id": "#atproto_pds",
"type": "AtprotoPersonalDataServer",
"serviceEndpoint": format!("https://{}", hostname)
}]
})).into_response()
}

View File

@@ -1,3 +1,4 @@
pub mod server;
pub mod repo;
pub mod proxy;
pub mod identity;

View File

@@ -1,5 +1,5 @@
use axum::{
extract::State,
extract::{State, Query},
Json,
response::{IntoResponse, Response},
http::StatusCode,
@@ -15,6 +15,9 @@ use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore};
use jacquard::types::{string::{Nsid, Tid}, did::Did, integer::LimitedU32};
use tracing::error;
use std::sync::Arc;
use sha2::{Sha256, Digest};
use multihash::Multihash;
use axum::body::Bytes;
#[derive(Deserialize)]
#[allow(dead_code)]
@@ -219,3 +222,668 @@ pub async fn create_record(
};
(StatusCode::OK, Json(output)).into_response()
}
#[derive(Deserialize)]
#[allow(dead_code)]
pub struct PutRecordInput {
pub repo: String,
pub collection: String,
pub rkey: String,
pub validate: Option<bool>,
pub record: serde_json::Value,
#[serde(rename = "swapCommit")]
pub swap_commit: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PutRecordOutput {
pub uri: String,
pub cid: String,
}
pub async fn put_record(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<PutRecordInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response();
}
let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
let session = sqlx::query(
"SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
)
.bind(&token)
.fetch_optional(&state.db)
.await
.unwrap_or(None);
let (did, key_bytes) = match session {
Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")),
None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
}
if input.repo != did {
return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
}
let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
.bind(&did)
.fetch_optional(&state.db)
.await;
let user_id: uuid::Uuid = match user_query {
Ok(Some(row)) => row.get("id"),
_ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(),
};
let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
.bind(user_id)
.fetch_optional(&state.db)
.await;
let current_root_cid = match repo_root_query {
Ok(Some(row)) => {
let cid_str: String = row.get("repo_root_cid");
Cid::from_str(&cid_str).ok()
},
_ => None,
};
if current_root_cid.is_none() {
error!("Repo root not found for user {}", did);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response();
}
let current_root_cid = current_root_cid.unwrap();
let commit_bytes = match state.block_store.get(&current_root_cid).await {
Ok(Some(b)) => b,
Ok(None) => {
error!("Commit block not found: {}", current_root_cid);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response();
},
Err(e) => {
error!("Failed to load commit block: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to load commit block"}))).into_response();
}
};
let commit = match Commit::from_cbor(&commit_bytes) {
Ok(c) => c,
Err(e) => {
error!("Failed to parse commit: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to parse commit"}))).into_response();
}
};
let mst_root = commit.data;
let store = Arc::new(state.block_store.clone());
let mst = Mst::load(store.clone(), mst_root, None);
let collection_nsid = match input.collection.parse::<Nsid>() {
Ok(n) => n,
Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
};
let rkey = input.rkey.clone();
let mut record_bytes = Vec::new();
if let Err(e) = serde_ipld_dagcbor::to_writer(&mut record_bytes, &input.record) {
error!("Error serializing record: {:?}", e);
return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidRecord", "message": "Failed to serialize record"}))).into_response();
}
let record_cid = match state.block_store.put(&record_bytes).await {
Ok(c) => c,
Err(e) => {
error!("Failed to save record block: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save record block"}))).into_response();
}
};
let key = format!("{}/{}", collection_nsid, rkey);
if let Err(e) = mst.update(&key, record_cid).await {
error!("Failed to update MST: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to update MST: {:?}", e)}))).into_response();
}
let new_mst_root = match mst.root().await {
Ok(c) => c,
Err(e) => {
error!("Failed to get new MST root: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST root"}))).into_response();
}
};
let did_obj = match Did::new(&did) {
Ok(d) => d,
Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(),
};
let rev = Tid::now(LimitedU32::MIN);
let new_commit = Commit::new_unsigned(
did_obj,
new_mst_root,
rev,
Some(current_root_cid)
);
let new_commit_bytes = match new_commit.to_cbor() {
Ok(b) => b,
Err(e) => {
error!("Failed to serialize new commit: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to serialize new commit"}))).into_response();
}
};
let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
Ok(c) => c,
Err(e) => {
error!("Failed to save new commit: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save new commit"}))).into_response();
}
};
let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
.bind(new_root_cid.to_string())
.bind(user_id)
.execute(&state.db)
.await;
if let Err(e) = update_repo {
error!("Failed to update repo root in DB: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"}))).into_response();
}
let record_insert = sqlx::query(
"INSERT INTO records (repo_id, collection, rkey, record_cid) VALUES ($1, $2, $3, $4)
ON CONFLICT (repo_id, collection, rkey) DO UPDATE SET record_cid = $4, created_at = NOW()"
)
.bind(user_id)
.bind(&input.collection)
.bind(&rkey)
.bind(record_cid.to_string())
.execute(&state.db)
.await;
if let Err(e) = record_insert {
error!("Error inserting record index: {:?}", e);
}
let output = PutRecordOutput {
uri: format!("at://{}/{}/{}", input.repo, input.collection, rkey),
cid: record_cid.to_string(),
};
(StatusCode::OK, Json(output)).into_response()
}
#[derive(Deserialize)]
pub struct GetRecordInput {
pub repo: String,
pub collection: String,
pub rkey: String,
pub cid: Option<String>,
}
pub async fn get_record(
State(state): State<AppState>,
Query(input): Query<GetRecordInput>,
) -> Response {
let user_row = if input.repo.starts_with("did:") {
sqlx::query("SELECT id FROM users WHERE did = $1")
.bind(&input.repo)
.fetch_optional(&state.db)
.await
} else {
sqlx::query("SELECT id FROM users WHERE handle = $1")
.bind(&input.repo)
.fetch_optional(&state.db)
.await
};
let user_id: uuid::Uuid = match user_row {
Ok(Some(row)) => row.get("id"),
_ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Repo not found"}))).into_response(),
};
let record_row = sqlx::query("SELECT record_cid FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3")
.bind(user_id)
.bind(&input.collection)
.bind(&input.rkey)
.fetch_optional(&state.db)
.await;
let record_cid_str: String = match record_row {
Ok(Some(row)) => row.get("record_cid"),
_ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Record not found"}))).into_response(),
};
if let Some(expected_cid) = &input.cid {
if &record_cid_str != expected_cid {
return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Record CID mismatch"}))).into_response();
}
}
let cid = match Cid::from_str(&record_cid_str) {
Ok(c) => c,
Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid CID in DB"}))).into_response(),
};
let block = match state.block_store.get(&cid).await {
Ok(Some(b)) => b,
_ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Record block not found"}))).into_response(),
};
let value: serde_json::Value = match serde_ipld_dagcbor::from_slice(&block) {
Ok(v) => v,
Err(e) => {
error!("Failed to deserialize record: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
Json(json!({
"uri": format!("at://{}/{}/{}", input.repo, input.collection, input.rkey),
"cid": record_cid_str,
"value": value
})).into_response()
}
#[derive(Deserialize)]
pub struct DeleteRecordInput {
pub repo: String,
pub collection: String,
pub rkey: String,
#[serde(rename = "swapRecord")]
pub swap_record: Option<String>,
#[serde(rename = "swapCommit")]
pub swap_commit: Option<String>,
}
pub async fn delete_record(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<DeleteRecordInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response();
}
let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
let session = sqlx::query(
"SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
)
.bind(&token)
.fetch_optional(&state.db)
.await
.unwrap_or(None);
let (did, key_bytes) = match session {
Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")),
None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
}
if input.repo != did {
return (StatusCode::FORBIDDEN, Json(json!({"error": "InvalidRepo", "message": "Repo does not match authenticated user"}))).into_response();
}
let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
.bind(&did)
.fetch_optional(&state.db)
.await;
let user_id: uuid::Uuid = match user_query {
Ok(Some(row)) => row.get("id"),
_ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "User not found"}))).into_response(),
};
let repo_root_query = sqlx::query("SELECT repo_root_cid FROM repos WHERE user_id = $1")
.bind(user_id)
.fetch_optional(&state.db)
.await;
let current_root_cid = match repo_root_query {
Ok(Some(row)) => {
let cid_str: String = row.get("repo_root_cid");
Cid::from_str(&cid_str).ok()
},
_ => None,
};
if current_root_cid.is_none() {
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Repo root not found"}))).into_response();
}
let current_root_cid = current_root_cid.unwrap();
let commit_bytes = match state.block_store.get(&current_root_cid).await {
Ok(Some(b)) => b,
Ok(None) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Commit block not found"}))).into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to load commit block: {:?}", e)}))).into_response(),
};
let commit = match Commit::from_cbor(&commit_bytes) {
Ok(c) => c,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to parse commit: {:?}", e)}))).into_response(),
};
let mst_root = commit.data;
let store = Arc::new(state.block_store.clone());
let mst = Mst::load(store.clone(), mst_root, None);
let collection_nsid = match input.collection.parse::<Nsid>() {
Ok(n) => n,
Err(_) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidCollection"}))).into_response(),
};
let key = format!("{}/{}", collection_nsid, input.rkey);
// TODO: Check swapRecord if provided? Skipping for brevity/robustness
if let Err(e) = mst.delete(&key).await {
error!("Failed to delete from MST: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": format!("Failed to delete from MST: {:?}", e)}))).into_response();
}
let new_mst_root = match mst.root().await {
Ok(c) => c,
Err(_e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to get new MST root"}))).into_response(),
};
let did_obj = match Did::new(&did) {
Ok(d) => d,
Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(),
};
let rev = Tid::now(LimitedU32::MIN);
let new_commit = Commit::new_unsigned(
did_obj,
new_mst_root,
rev,
Some(current_root_cid)
);
let new_commit_bytes = match new_commit.to_cbor() {
Ok(b) => b,
Err(_e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to serialize new commit"}))).into_response(),
};
let new_root_cid = match state.block_store.put(&new_commit_bytes).await {
Ok(c) => c,
Err(_e) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to save new commit"}))).into_response(),
};
let update_repo = sqlx::query("UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2")
.bind(new_root_cid.to_string())
.bind(user_id)
.execute(&state.db)
.await;
if let Err(e) = update_repo {
error!("Failed to update repo root in DB: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to update repo root in DB"}))).into_response();
}
let record_delete = sqlx::query("DELETE FROM records WHERE repo_id = $1 AND collection = $2 AND rkey = $3")
.bind(user_id)
.bind(&input.collection)
.bind(&input.rkey)
.execute(&state.db)
.await;
if let Err(e) = record_delete {
error!("Error deleting record index: {:?}", e);
}
(StatusCode::OK, Json(json!({}))).into_response()
}
#[derive(Deserialize)]
pub struct ListRecordsInput {
pub repo: String,
pub collection: String,
pub limit: Option<i32>,
pub cursor: Option<String>,
#[serde(rename = "rkeyStart")]
pub rkey_start: Option<String>,
#[serde(rename = "rkeyEnd")]
pub rkey_end: Option<String>,
pub reverse: Option<bool>,
}
#[derive(Serialize)]
pub struct ListRecordsOutput {
pub cursor: Option<String>,
pub records: Vec<serde_json::Value>,
}
pub async fn list_records(
State(state): State<AppState>,
Query(input): Query<ListRecordsInput>,
) -> Response {
let user_row = if input.repo.starts_with("did:") {
sqlx::query("SELECT id FROM users WHERE did = $1")
.bind(&input.repo)
.fetch_optional(&state.db)
.await
} else {
sqlx::query("SELECT id FROM users WHERE handle = $1")
.bind(&input.repo)
.fetch_optional(&state.db)
.await
};
let user_id: uuid::Uuid = match user_row {
Ok(Some(row)) => row.get("id"),
_ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Repo not found"}))).into_response(),
};
let limit = input.limit.unwrap_or(50).clamp(1, 100);
let reverse = input.reverse.unwrap_or(false);
// Simplistic query construction - no sophisticated cursor handling or rkey ranges for now, just basic pagination
// TODO: Implement rkeyStart/End and correct cursor logic
let query_str = format!(
"SELECT rkey, record_cid FROM records WHERE repo_id = $1 AND collection = $2 {} ORDER BY rkey {} LIMIT {}",
if let Some(_c) = &input.cursor {
if reverse { "AND rkey < $3" } else { "AND rkey > $3" }
} else {
""
},
if reverse { "DESC" } else { "ASC" },
limit
);
let mut query = sqlx::query(&query_str)
.bind(user_id)
.bind(&input.collection);
if let Some(c) = &input.cursor {
query = query.bind(c);
}
let rows = match query.fetch_all(&state.db).await {
Ok(r) => r,
Err(e) => {
error!("Error listing records: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let mut records = Vec::new();
let mut last_rkey = None;
for row in rows {
let rkey: String = row.get("rkey");
let cid_str: String = row.get("record_cid");
last_rkey = Some(rkey.clone());
if let Ok(cid) = Cid::from_str(&cid_str) {
if let Ok(Some(block)) = state.block_store.get(&cid).await {
if let Ok(value) = serde_ipld_dagcbor::from_slice::<serde_json::Value>(&block) {
records.push(json!({
"uri": format!("at://{}/{}/{}", input.repo, input.collection, rkey),
"cid": cid_str,
"value": value
}));
}
}
}
}
Json(ListRecordsOutput {
cursor: last_rkey,
records,
}).into_response()
}
#[derive(Deserialize)]
pub struct DescribeRepoInput {
pub repo: String,
}
pub async fn describe_repo(
State(state): State<AppState>,
Query(input): Query<DescribeRepoInput>,
) -> Response {
let user_row = if input.repo.starts_with("did:") {
sqlx::query("SELECT id, handle, did FROM users WHERE did = $1")
.bind(&input.repo)
.fetch_optional(&state.db)
.await
} else {
sqlx::query("SELECT id, handle, did FROM users WHERE handle = $1")
.bind(&input.repo)
.fetch_optional(&state.db)
.await
};
let (user_id, handle, did) = match user_row {
Ok(Some(row)) => (row.get::<uuid::Uuid, _>("id"), row.get::<String, _>("handle"), row.get::<String, _>("did")),
_ => return (StatusCode::NOT_FOUND, Json(json!({"error": "NotFound", "message": "Repo not found"}))).into_response(),
};
let collections_query = sqlx::query("SELECT DISTINCT collection FROM records WHERE repo_id = $1")
.bind(user_id)
.fetch_all(&state.db)
.await;
let collections: Vec<String> = match collections_query {
Ok(rows) => rows.iter().map(|r| r.get("collection")).collect(),
Err(_) => Vec::new(),
};
let did_doc = json!({
"id": did,
"alsoKnownAs": [format!("at://{}", handle)]
});
Json(json!({
"handle": handle,
"did": did,
"didDoc": did_doc,
"collections": collections,
"handleIsCorrect": true
})).into_response()
}
pub async fn upload_blob(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
body: Bytes,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationRequired"}))).into_response();
}
let token = auth_header.unwrap().to_str().unwrap_or("").replace("Bearer ", "");
let session = sqlx::query(
"SELECT s.did, k.key_bytes FROM sessions s JOIN users u ON s.did = u.did JOIN user_keys k ON u.id = k.user_id WHERE s.access_jwt = $1"
)
.bind(&token)
.fetch_optional(&state.db)
.await
.unwrap_or(None);
let (did, key_bytes) = match session {
Some(row) => (row.get::<String, _>("did"), row.get::<Vec<u8>, _>("key_bytes")),
None => return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed"}))).into_response(),
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"}))).into_response();
}
let mime_type = headers.get("content-type")
.and_then(|h| h.to_str().ok())
.unwrap_or("application/octet-stream")
.to_string();
let size = body.len() as i64;
let data = body.to_vec();
let mut hasher = Sha256::new();
hasher.update(&data);
let hash = hasher.finalize();
let multihash = Multihash::wrap(0x12, &hash).unwrap();
let cid = Cid::new_v1(0x55, multihash);
let cid_str = cid.to_string();
let storage_key = format!("blobs/{}", cid_str);
if let Err(e) = state.blob_store.put(&storage_key, &data).await {
error!("Failed to upload blob to storage: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Failed to store blob"}))).into_response();
}
let user_query = sqlx::query("SELECT id FROM users WHERE did = $1")
.bind(&did)
.fetch_optional(&state.db)
.await;
let user_id: uuid::Uuid = match user_query {
Ok(Some(row)) => row.get("id"),
_ => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response(),
};
let insert = sqlx::query(
"INSERT INTO blobs (cid, mime_type, size_bytes, created_by_user, storage_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (cid) DO NOTHING"
)
.bind(&cid_str)
.bind(&mime_type)
.bind(size)
.bind(user_id)
.bind(&storage_key)
.execute(&state.db)
.await;
if let Err(e) = insert {
error!("Failed to insert blob record: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
Json(json!({
"blob": {
"ref": {
"$link": cid_str
},
"mimeType": mime_type,
"size": size
}
})).into_response()
}

View File

@@ -8,13 +8,8 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::state::AppState;
use sqlx::Row;
use bcrypt::{hash, verify, DEFAULT_COST};
use bcrypt::verify;
use tracing::{info, error, warn};
use jacquard_repo::{mst::Mst, commit::Commit, storage::BlockStore};
use jacquard::types::{string::Tid, did::Did, integer::LimitedU32};
use std::sync::Arc;
use k256::SecretKey;
use rand::rngs::OsRng;
pub async fn describe_server() -> impl IntoResponse {
let domains_str = std::env::var("AVAILABLE_USER_DOMAINS").unwrap_or_else(|_| "example.com".to_string());
@@ -35,233 +30,6 @@ pub async fn health(State(state): State<AppState>) -> impl IntoResponse {
}
}
#[derive(Deserialize)]
pub struct CreateAccountInput {
pub handle: String,
pub email: String,
pub password: String,
#[serde(rename = "inviteCode")]
pub invite_code: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateAccountOutput {
pub access_jwt: String,
pub refresh_jwt: String,
pub handle: String,
pub did: String,
}
pub async fn create_account(
State(state): State<AppState>,
Json(input): Json<CreateAccountInput>,
) -> Response {
info!("create_account hit: {}", input.handle);
if input.handle.contains('!') || input.handle.contains('@') {
return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidHandle", "message": "Handle contains invalid characters"}))).into_response();
}
let mut tx = match state.db.begin().await {
Ok(tx) => tx,
Err(e) => {
error!("Error starting transaction: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let exists_query = sqlx::query("SELECT 1 FROM users WHERE handle = $1")
.bind(&input.handle)
.fetch_optional(&mut *tx)
.await;
match exists_query {
Ok(Some(_)) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "HandleTaken", "message": "Handle already taken"}))).into_response(),
Err(e) => {
error!("Error checking handle: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
Ok(None) => {}
}
if let Some(code) = &input.invite_code {
let invite_query = sqlx::query("SELECT available_uses FROM invite_codes WHERE code = $1 FOR UPDATE")
.bind(code)
.fetch_optional(&mut *tx)
.await;
match invite_query {
Ok(Some(row)) => {
let uses: i32 = row.get("available_uses");
if uses <= 0 {
return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code exhausted"}))).into_response();
}
let update_invite = sqlx::query("UPDATE invite_codes SET available_uses = available_uses - 1 WHERE code = $1")
.bind(code)
.execute(&mut *tx)
.await;
if let Err(e) = update_invite {
error!("Error updating invite code: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
},
Ok(None) => return (StatusCode::BAD_REQUEST, Json(json!({"error": "InvalidInviteCode", "message": "Invite code not found"}))).into_response(),
Err(e) => {
error!("Error checking invite code: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
}
}
let did = format!("did:plc:{}", uuid::Uuid::new_v4());
let password_hash = match hash(&input.password, DEFAULT_COST) {
Ok(h) => h,
Err(e) => {
error!("Error hashing password: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let user_insert = sqlx::query("INSERT INTO users (handle, email, did, password_hash) VALUES ($1, $2, $3, $4) RETURNING id")
.bind(&input.handle)
.bind(&input.email)
.bind(&did)
.bind(&password_hash)
.fetch_one(&mut *tx)
.await;
let user_id: uuid::Uuid = match user_insert {
Ok(row) => row.get("id"),
Err(e) => {
error!("Error inserting user: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let secret_key = SecretKey::random(&mut OsRng);
let secret_key_bytes = secret_key.to_bytes();
let key_insert = sqlx::query("INSERT INTO user_keys (user_id, key_bytes) VALUES ($1, $2)")
.bind(user_id)
.bind(&secret_key_bytes[..])
.execute(&mut *tx)
.await;
if let Err(e) = key_insert {
error!("Error inserting user key: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
let store = Arc::new(state.block_store.clone());
let mst = Mst::new(store.clone());
let mst_root = match mst.root().await {
Ok(c) => c,
Err(e) => {
error!("Error creating MST root: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let did_obj = match Did::new(&did) {
Ok(d) => d,
Err(_) => return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError", "message": "Invalid DID"}))).into_response(),
};
let rev = Tid::now(LimitedU32::MIN);
let commit = Commit::new_unsigned(
did_obj,
mst_root,
rev,
None
);
let commit_bytes = match commit.to_cbor() {
Ok(b) => b,
Err(e) => {
error!("Error serializing genesis commit: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let commit_cid = match state.block_store.put(&commit_bytes).await {
Ok(c) => c,
Err(e) => {
error!("Error saving genesis commit: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
};
let repo_insert = sqlx::query("INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)")
.bind(user_id)
.bind(commit_cid.to_string())
.execute(&mut *tx)
.await;
if let Err(e) = repo_insert {
error!("Error initializing repo: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
if let Some(code) = &input.invite_code {
let use_insert = sqlx::query("INSERT INTO invite_code_uses (code, used_by_user) VALUES ($1, $2)")
.bind(code)
.bind(user_id)
.execute(&mut *tx)
.await;
if let Err(e) = use_insert {
error!("Error recording invite usage: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
}
let access_jwt = crate::auth::create_access_token(&did, &secret_key_bytes[..]).map_err(|e| {
error!("Error creating access token: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
});
let access_jwt = match access_jwt {
Ok(t) => t,
Err(r) => return r,
};
let refresh_jwt = crate::auth::create_refresh_token(&did, &secret_key_bytes[..]).map_err(|e| {
error!("Error creating refresh token: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response()
});
let refresh_jwt = match refresh_jwt {
Ok(t) => t,
Err(r) => return r,
};
let session_insert = sqlx::query("INSERT INTO sessions (access_jwt, refresh_jwt, did) VALUES ($1, $2, $3)")
.bind(&access_jwt)
.bind(&refresh_jwt)
.bind(&did)
.execute(&mut *tx)
.await;
if let Err(e) = session_insert {
error!("Error inserting session: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
if let Err(e) = tx.commit().await {
error!("Error committing transaction: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"}))).into_response();
}
(StatusCode::OK, Json(CreateAccountOutput {
access_jwt,
refresh_jwt,
handle: input.handle,
did,
})).into_response()
}
#[derive(Deserialize)]
pub struct CreateSessionInput {
pub identifier: String,
@@ -515,7 +283,7 @@ pub async fn refresh_session(
}
},
Ok(None) => {
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid refresh token"}))).into_response();
return (StatusCode::UNAUTHORIZED, Json(json!({"error": "AuthenticationFailed", "message": "Invalid refresh token"}))).into_response();
},
Err(e) => {
error!("Database error fetching session: {:?}", e);
@@ -523,3 +291,4 @@ pub async fn refresh_session(
}
}
}