More email-related endpoints

This commit is contained in:
Lewis
2025-12-09 22:52:22 +02:00
parent 39096a217e
commit 264e61eb14
20 changed files with 820 additions and 160 deletions

View File

@@ -0,0 +1,52 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT s.did, k.key_bytes, u.id as user_id, u.email_confirmation_code, u.email_confirmation_code_expires_at, u.email_pending_verification\n FROM sessions s\n JOIN users u ON s.did = u.did\n JOIN user_keys k ON u.id = k.user_id\n WHERE s.access_jwt = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "did",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "key_bytes",
"type_info": "Bytea"
},
{
"ordinal": 2,
"name": "user_id",
"type_info": "Uuid"
},
{
"ordinal": 3,
"name": "email_confirmation_code",
"type_info": "Text"
},
{
"ordinal": 4,
"name": "email_confirmation_code_expires_at",
"type_info": "Timestamptz"
},
{
"ordinal": 5,
"name": "email_pending_verification",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false,
true,
true,
true
]
},
"hash": "3377750b73c3831cbd6c96b971ea8b6d4da38f1bc740afce3136d86c27b8ce8d"
}

View File

@@ -0,0 +1,17 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE users SET email_pending_verification = $1, email_confirmation_code = $2, email_confirmation_code_expires_at = $3 WHERE id = $4",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"Timestamptz",
"Uuid"
]
},
"nullable": []
},
"hash": "4b9243c9ef4bf260d179a778536e815c8d563017ecda7dc530aeeebd5362d190"
}

View File

@@ -0,0 +1,40 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT s.did, k.key_bytes, u.id as user_id, u.handle\n FROM sessions s\n JOIN users u ON s.did = u.did\n JOIN user_keys k ON u.id = k.user_id\n WHERE s.access_jwt = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "did",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "key_bytes",
"type_info": "Bytea"
},
{
"ordinal": 2,
"name": "user_id",
"type_info": "Uuid"
},
{
"ordinal": 3,
"name": "handle",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false,
false
]
},
"hash": "6a233f0ca94195935bf32ee749c8429c2292bb3907f129e06aff033a31681175"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE users SET email = $1, email_pending_verification = NULL, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE id = $2",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Uuid"
]
},
"nullable": []
},
"hash": "76a239da5103f43b16a768d6970cc7e04d9d27c88cc54072818033a03bf53057"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT 1 as one FROM users WHERE LOWER(email) = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "one",
"type_info": "Int4"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
null
]
},
"hash": "8c9c899187a8b19747b1c25dbac1501de14985beafcfed5f0a23549e18da2c19"
}

View File

@@ -35,12 +35,12 @@ Lewis' corrected big boy todofile
- [x] Implement `com.atproto.server.getServiceAuth` (Cross-service auth).
- [x] Implement `com.atproto.server.listAppPasswords`.
- [x] Implement `com.atproto.server.requestAccountDelete`.
- [ ] Implement `com.atproto.server.requestEmailConfirmation` / `requestEmailUpdate`.
- [x] Implement `com.atproto.server.requestEmailConfirmation` / `requestEmailUpdate`.
- [x] Implement `com.atproto.server.requestPasswordReset` / `resetPassword`.
- [ ] Implement `com.atproto.server.reserveSigningKey`.
- [x] Implement `com.atproto.server.revokeAppPassword`.
- [ ] Implement `com.atproto.server.updateEmail`.
- [ ] Implement `com.atproto.server.confirmEmail`.
- [x] Implement `com.atproto.server.confirmEmail`.
## Repository Operations (`com.atproto.repo`)
- [ ] Record CRUD

View File

@@ -0,0 +1,157 @@
CREATE TYPE notification_channel AS ENUM ('email', 'discord', 'telegram', 'signal');
CREATE TYPE notification_status AS ENUM ('pending', 'processing', 'sent', 'failed');
CREATE TYPE notification_type AS ENUM (
'welcome',
'email_verification',
'password_reset',
'email_update',
'account_deletion',
'admin_email'
);
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
handle TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
did TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- status & moderation
deactivated_at TIMESTAMPTZ,
invites_disabled BOOLEAN DEFAULT FALSE,
takedown_ref TEXT,
-- notifs
preferred_notification_channel notification_channel NOT NULL DEFAULT 'email',
-- auth & verification
password_reset_code TEXT,
password_reset_code_expires_at TIMESTAMPTZ,
email_pending_verification TEXT,
email_confirmation_code TEXT,
email_confirmation_code_expires_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS idx_users_password_reset_code ON users(password_reset_code) WHERE password_reset_code IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_users_email_confirmation_code ON users(email_confirmation_code) WHERE email_confirmation_code IS NOT NULL;
CREATE TABLE IF NOT EXISTS invite_codes (
code TEXT PRIMARY KEY,
available_uses INT NOT NULL DEFAULT 1,
created_by_user UUID NOT NULL REFERENCES users(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
disabled BOOLEAN DEFAULT FALSE
);
CREATE TABLE IF NOT EXISTS invite_code_uses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code TEXT NOT NULL REFERENCES invite_codes(code),
used_by_user UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
used_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(code, used_by_user)
);
-- TODO: encrypt at rest!
CREATE TABLE IF NOT EXISTS user_keys (
user_id UUID PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
key_bytes BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS repos (
user_id UUID PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
repo_root_cid TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- content addressable storage
CREATE TABLE IF NOT EXISTS blocks (
cid BYTEA PRIMARY KEY,
data BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- denormalized index for fast queries
CREATE TABLE IF NOT EXISTS records (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
repo_id UUID NOT NULL REFERENCES repos(user_id) ON DELETE CASCADE,
collection TEXT NOT NULL,
rkey TEXT NOT NULL,
record_cid TEXT NOT NULL,
takedown_ref TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(repo_id, collection, rkey)
);
CREATE TABLE IF NOT EXISTS blobs (
cid TEXT PRIMARY KEY,
mime_type TEXT NOT NULL,
size_bytes BIGINT NOT NULL,
created_by_user UUID NOT NULL REFERENCES users(id),
storage_key TEXT NOT NULL,
takedown_ref TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS sessions (
access_jwt TEXT PRIMARY KEY,
refresh_jwt TEXT NOT NULL UNIQUE,
did TEXT NOT NULL REFERENCES users(did) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS app_passwords (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
name TEXT NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
privileged BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE(user_id, name)
);
-- naughty list
CREATE TABLE reports (
id BIGINT PRIMARY KEY,
reason_type TEXT NOT NULL,
reason TEXT,
subject_json JSONB NOT NULL,
reported_by_did TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS account_deletion_requests (
token TEXT PRIMARY KEY,
did TEXT NOT NULL REFERENCES users(did) ON DELETE CASCADE,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS notification_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
channel notification_channel NOT NULL DEFAULT 'email',
notification_type notification_type NOT NULL,
status notification_status NOT NULL DEFAULT 'pending',
recipient TEXT NOT NULL,
subject TEXT,
body TEXT NOT NULL,
metadata JSONB,
attempts INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 3,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
scheduled_for TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
CREATE INDEX idx_notification_queue_status_scheduled
ON notification_queue(status, scheduled_for)
WHERE status = 'pending';
CREATE INDEX idx_notification_queue_user_id ON notification_queue(user_id);

View File

@@ -1,80 +0,0 @@
-- A very basic schema to get started.
-- TODO: PRODUCTIONIZE BABY
CREATE TABLE IF NOT EXISTS users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
handle TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
did TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS invite_codes (
code TEXT PRIMARY KEY,
available_uses INT NOT NULL DEFAULT 1,
created_by_user UUID NOT NULL REFERENCES users(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS invite_code_uses (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
code TEXT NOT NULL REFERENCES invite_codes(code),
used_by_user UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
used_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(code, used_by_user)
);
-- OIII THIS TABLE CONTAINS PLAINTEXT PRIVATE KEYS, TODO: encrypt at rest!
CREATE TABLE IF NOT EXISTS user_keys (
user_id UUID PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
-- Storing as raw bytes
-- secp256k1 is 32 bytes
key_bytes BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS repos (
user_id UUID PRIMARY KEY REFERENCES users(id) ON DELETE CASCADE,
repo_root_cid TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS blocks (
cid BYTEA PRIMARY KEY,
data BYTEA NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- A denormalized table to quickly query for records
-- TODO: Do I actually need this?
CREATE TABLE IF NOT EXISTS records (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
repo_id UUID NOT NULL REFERENCES repos(user_id) ON DELETE CASCADE,
collection TEXT NOT NULL,
rkey TEXT NOT NULL,
record_cid TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(repo_id, collection, rkey)
);
CREATE TABLE IF NOT EXISTS blobs (
cid TEXT PRIMARY KEY,
mime_type TEXT NOT NULL,
size_bytes BIGINT NOT NULL,
created_by_user UUID NOT NULL REFERENCES users(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- The key/path in the S3 bucket
storage_key TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sessions (
access_jwt TEXT PRIMARY KEY,
refresh_jwt TEXT NOT NULL UNIQUE,
did TEXT NOT NULL REFERENCES users(did) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

View File

@@ -1,9 +0,0 @@
CREATE TABLE IF NOT EXISTS app_passwords (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
name TEXT NOT NULL,
password_hash TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
privileged BOOLEAN NOT NULL DEFAULT FALSE,
UNIQUE(user_id, name)
);

View File

@@ -1,11 +0,0 @@
ALTER TABLE users ADD COLUMN deactivated_at TIMESTAMPTZ;
-- * reports u *
CREATE TABLE reports (
id BIGINT PRIMARY KEY,
reason_type TEXT NOT NULL,
reason TEXT,
subject_json JSONB NOT NULL,
reported_by_did TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

View File

@@ -1,3 +0,0 @@
ALTER TABLE invite_codes ADD COLUMN disabled BOOLEAN DEFAULT FALSE;
ALTER TABLE users ADD COLUMN invites_disabled BOOLEAN DEFAULT FALSE;

View File

@@ -1,5 +0,0 @@
ALTER TABLE users ADD COLUMN takedown_ref TEXT;
ALTER TABLE records ADD COLUMN takedown_ref TEXT;
ALTER TABLE blobs ADD COLUMN takedown_ref TEXT;

View File

@@ -1,6 +0,0 @@
CREATE TABLE IF NOT EXISTS account_deletion_requests (
token TEXT PRIMARY KEY,
did TEXT NOT NULL REFERENCES users(did) ON DELETE CASCADE,
expires_at TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

View File

@@ -1,36 +0,0 @@
CREATE TYPE notification_channel AS ENUM ('email', 'discord', 'telegram', 'signal');
CREATE TYPE notification_status AS ENUM ('pending', 'processing', 'sent', 'failed');
CREATE TYPE notification_type AS ENUM (
'welcome',
'email_verification',
'password_reset',
'email_update',
'account_deletion'
);
CREATE TABLE IF NOT EXISTS notification_queue (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
channel notification_channel NOT NULL DEFAULT 'email',
notification_type notification_type NOT NULL,
status notification_status NOT NULL DEFAULT 'pending',
recipient TEXT NOT NULL,
subject TEXT,
body TEXT NOT NULL,
metadata JSONB,
attempts INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 3,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
scheduled_for TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
CREATE INDEX idx_notification_queue_status_scheduled
ON notification_queue(status, scheduled_for)
WHERE status = 'pending';
CREATE INDEX idx_notification_queue_user_id ON notification_queue(user_id);
ALTER TABLE users ADD COLUMN IF NOT EXISTS preferred_notification_channel notification_channel NOT NULL DEFAULT 'email';

View File

@@ -1,4 +0,0 @@
ALTER TABLE users ADD COLUMN IF NOT EXISTS password_reset_code TEXT;
ALTER TABLE users ADD COLUMN IF NOT EXISTS password_reset_code_expires_at TIMESTAMPTZ;
CREATE INDEX IF NOT EXISTS idx_users_password_reset_code ON users(password_reset_code) WHERE password_reset_code IS NOT NULL;

View File

@@ -1 +0,0 @@
ALTER TYPE notification_type ADD VALUE IF NOT EXISTS 'admin_email';

View File

@@ -5,8 +5,8 @@ pub mod session;
pub use invite::{create_invite_code, create_invite_codes, get_account_invite_codes};
pub use meta::{describe_server, health};
pub use session::{
activate_account, check_account_status, create_app_password, create_session,
activate_account, check_account_status, confirm_email, create_app_password, create_session,
deactivate_account, delete_session, get_service_auth, get_session, list_app_passwords,
refresh_session, request_account_delete, request_password_reset, reset_password,
revoke_app_password,
refresh_session, request_account_delete, request_email_update, request_password_reset,
reset_password, revoke_app_password,
};

View File

@@ -1419,3 +1419,271 @@ pub async fn reset_password(
(StatusCode::OK, Json(json!({}))).into_response()
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RequestEmailUpdateInput {
pub email: String,
}
pub async fn request_email_update(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<RequestEmailUpdateInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes, u.id as user_id, u.handle
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id, handle) = match session {
Ok(Some(row)) => (row.did, row.key_bytes, row.user_id, row.handle),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in request_email_update: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let email = input.email.trim().to_lowercase();
if email.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "email is required"})),
)
.into_response();
}
let exists = sqlx::query!("SELECT 1 as one FROM users WHERE LOWER(email) = $1", email)
.fetch_optional(&state.db)
.await;
if let Ok(Some(_)) = exists {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "EmailTaken", "message": "Email already taken"})),
)
.into_response();
}
let code = generate_reset_code();
let expires_at = Utc::now() + Duration::minutes(10);
let update = sqlx::query!(
"UPDATE users SET email_pending_verification = $1, email_confirmation_code = $2, email_confirmation_code_expires_at = $3 WHERE id = $4",
email,
code,
expires_at,
user_id
)
.execute(&state.db)
.await;
if let Err(e) = update {
error!("DB error setting email update code: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
if let Err(e) = crate::notifications::enqueue_email_update(
&state.db,
user_id,
&email,
&handle,
&code,
&hostname,
)
.await
{
warn!("Failed to enqueue email update notification: {:?}", e);
}
info!("Email update requested for user {}", user_id);
(StatusCode::OK, Json(json!({ "tokenRequired": true }))).into_response()
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConfirmEmailInput {
pub email: String,
pub token: String,
}
pub async fn confirm_email(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<ConfirmEmailInput>,
) -> Response {
let auth_header = headers.get("Authorization");
if auth_header.is_none() {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
let token = auth_header
.unwrap()
.to_str()
.unwrap_or("")
.replace("Bearer ", "");
let session = sqlx::query!(
r#"
SELECT s.did, k.key_bytes, u.id as user_id, u.email_confirmation_code, u.email_confirmation_code_expires_at, u.email_pending_verification
FROM sessions s
JOIN users u ON s.did = u.did
JOIN user_keys k ON u.id = k.user_id
WHERE s.access_jwt = $1
"#,
token
)
.fetch_optional(&state.db)
.await;
let (_did, key_bytes, user_id, stored_code, expires_at, email_pending_verification) = match session {
Ok(Some(row)) => (
row.did,
row.key_bytes,
row.user_id,
row.email_confirmation_code,
row.email_confirmation_code_expires_at,
row.email_pending_verification,
),
Ok(None) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
Err(e) => {
error!("DB error in confirm_email: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(_) = crate::auth::verify_token(&token, &key_bytes) {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed", "message": "Invalid token signature"})),
)
.into_response();
}
let email = input.email.trim().to_lowercase();
let confirmation_code = input.token.trim();
if email_pending_verification.is_none() || stored_code.is_none() || expires_at.is_none() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "No pending email update found"})),
)
.into_response();
}
let email_pending_verification = email_pending_verification.unwrap();
if email_pending_verification != email {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "Email does not match pending update"})),
)
.into_response();
}
if stored_code.unwrap() != confirmation_code {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidToken", "message": "Invalid token"})),
)
.into_response();
}
if Utc::now() > expires_at.unwrap() {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "ExpiredToken", "message": "Token has expired"})),
)
.into_response();
}
let update = sqlx::query!(
"UPDATE users SET email = $1, email_pending_verification = NULL, email_confirmation_code = NULL, email_confirmation_code_expires_at = NULL WHERE id = $2",
email_pending_verification,
user_id
)
.execute(&state.db)
.await;
if let Err(e) = update {
error!("DB error finalizing email update: {:?}", e);
if e.as_database_error().map(|db_err| db_err.is_unique_violation()).unwrap_or(false) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "EmailTaken", "message": "Email already taken"})),
)
.into_response();
}
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
info!("Email updated for user {}", user_id);
(StatusCode::OK, Json(json!({}))).into_response()
}

View File

@@ -163,6 +163,14 @@ pub fn app(state: AppState) -> Router {
"/xrpc/com.atproto.server.resetPassword",
post(api::server::reset_password),
)
.route(
"/xrpc/com.atproto.server.requestEmailUpdate",
post(api::server::request_email_update),
)
.route(
"/xrpc/com.atproto.server.confirmEmail",
post(api::server::confirm_email),
)
.route(
"/xrpc/com.atproto.identity.updateHandle",
post(api::identity::update_handle),

236
tests/email_update.rs Normal file
View File

@@ -0,0 +1,236 @@
mod common;
use reqwest::StatusCode;
use serde_json::{json, Value};
use sqlx::PgPool;
async fn get_pool() -> PgPool {
let conn_str = common::get_db_connection_string().await;
sqlx::postgres::PgPoolOptions::new()
.max_connections(5)
.connect(&conn_str)
.await
.expect("Failed to connect to test database")
}
#[tokio::test]
async fn test_email_update_flow_success() {
let client = common::client();
let base_url = common::base_url().await;
let pool = get_pool().await;
let handle = format!("emailup_{}", uuid::Uuid::new_v4());
let email = format!("{}@example.com", handle);
let payload = json!({
"handle": handle,
"email": email,
"password": "password"
});
let res = client
.post(format!("{}/xrpc/com.atproto.server.createAccount", base_url))
.json(&payload)
.send()
.await
.expect("Failed to create account");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Invalid JSON");
let access_jwt = body["accessJwt"].as_str().expect("No accessJwt");
let new_email = format!("new_{}@example.com", handle);
let res = client
.post(format!("{}/xrpc/com.atproto.server.requestEmailUpdate", base_url))
.bearer_auth(access_jwt)
.json(&json!({"email": new_email}))
.send()
.await
.expect("Failed to request email update");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Invalid JSON");
assert_eq!(body["tokenRequired"], true);
let user = sqlx::query!(
"SELECT email_pending_verification, email_confirmation_code, email FROM users WHERE handle = $1",
handle
)
.fetch_one(&pool)
.await
.expect("User not found");
assert_eq!(user.email_pending_verification.as_deref(), Some(new_email.as_str()));
assert!(user.email_confirmation_code.is_some());
let code = user.email_confirmation_code.unwrap();
let res = client
.post(format!("{}/xrpc/com.atproto.server.confirmEmail", base_url))
.bearer_auth(access_jwt)
.json(&json!({
"email": new_email,
"token": code
}))
.send()
.await
.expect("Failed to confirm email");
assert_eq!(res.status(), StatusCode::OK);
let user = sqlx::query!(
"SELECT email, email_pending_verification, email_confirmation_code FROM users WHERE handle = $1",
handle
)
.fetch_one(&pool)
.await
.expect("User not found");
assert_eq!(user.email, new_email);
assert!(user.email_pending_verification.is_none());
assert!(user.email_confirmation_code.is_none());
}
#[tokio::test]
async fn test_request_email_update_taken_email() {
let client = common::client();
let base_url = common::base_url().await;
let handle1 = format!("emailup_taken1_{}", uuid::Uuid::new_v4());
let email1 = format!("{}@example.com", handle1);
let res = client
.post(format!("{}/xrpc/com.atproto.server.createAccount", base_url))
.json(&json!({
"handle": handle1,
"email": email1,
"password": "password"
}))
.send()
.await
.expect("Failed to create account 1");
assert_eq!(res.status(), StatusCode::OK);
let handle2 = format!("emailup_taken2_{}", uuid::Uuid::new_v4());
let email2 = format!("{}@example.com", handle2);
let res = client
.post(format!("{}/xrpc/com.atproto.server.createAccount", base_url))
.json(&json!({
"handle": handle2,
"email": email2,
"password": "password"
}))
.send()
.await
.expect("Failed to create account 2");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Invalid JSON");
let access_jwt2 = body["accessJwt"].as_str().expect("No accessJwt");
let res = client
.post(format!("{}/xrpc/com.atproto.server.requestEmailUpdate", base_url))
.bearer_auth(access_jwt2)
.json(&json!({"email": email1}))
.send()
.await
.expect("Failed to request email update");
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
let body: Value = res.json().await.expect("Invalid JSON");
assert_eq!(body["error"], "EmailTaken");
}
#[tokio::test]
async fn test_confirm_email_invalid_token() {
let client = common::client();
let base_url = common::base_url().await;
let handle = format!("emailup_inv_{}", uuid::Uuid::new_v4());
let email = format!("{}@example.com", handle);
let res = client
.post(format!("{}/xrpc/com.atproto.server.createAccount", base_url))
.json(&json!({
"handle": handle,
"email": email,
"password": "password"
}))
.send()
.await
.expect("Failed to create account");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Invalid JSON");
let access_jwt = body["accessJwt"].as_str().expect("No accessJwt");
let new_email = format!("new_{}@example.com", handle);
let res = client
.post(format!("{}/xrpc/com.atproto.server.requestEmailUpdate", base_url))
.bearer_auth(access_jwt)
.json(&json!({"email": new_email}))
.send()
.await
.expect("Failed to request email update");
assert_eq!(res.status(), StatusCode::OK);
let res = client
.post(format!("{}/xrpc/com.atproto.server.confirmEmail", base_url))
.bearer_auth(access_jwt)
.json(&json!({
"email": new_email,
"token": "wrong-token"
}))
.send()
.await
.expect("Failed to confirm email");
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
let body: Value = res.json().await.expect("Invalid JSON");
assert_eq!(body["error"], "InvalidToken");
}
#[tokio::test]
async fn test_confirm_email_wrong_email() {
let client = common::client();
let base_url = common::base_url().await;
let pool = get_pool().await;
let handle = format!("emailup_wrong_{}", uuid::Uuid::new_v4());
let email = format!("{}@example.com", handle);
let res = client
.post(format!("{}/xrpc/com.atproto.server.createAccount", base_url))
.json(&json!({
"handle": handle,
"email": email,
"password": "password"
}))
.send()
.await
.expect("Failed to create account");
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await.expect("Invalid JSON");
let access_jwt = body["accessJwt"].as_str().expect("No accessJwt");
let new_email = format!("new_{}@example.com", handle);
let res = client
.post(format!("{}/xrpc/com.atproto.server.requestEmailUpdate", base_url))
.bearer_auth(access_jwt)
.json(&json!({"email": new_email}))
.send()
.await
.expect("Failed to request email update");
assert_eq!(res.status(), StatusCode::OK);
let user = sqlx::query!("SELECT email_confirmation_code FROM users WHERE handle = $1", handle)
.fetch_one(&pool)
.await
.expect("User not found");
let code = user.email_confirmation_code.unwrap();
let res = client
.post(format!("{}/xrpc/com.atproto.server.confirmEmail", base_url))
.bearer_auth(access_jwt)
.json(&json!({
"email": "another_random@example.com",
"token": code
}))
.send()
.await
.expect("Failed to confirm email");
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
let body: Value = res.json().await.expect("Invalid JSON");
assert_eq!(body["message"], "Email does not match pending update");
}