Actor preferences

This commit is contained in:
lewis
2025-12-11 23:23:55 +02:00
parent 17a7f1dc2b
commit 2eb67eb688
25 changed files with 1136 additions and 90 deletions

View File

@@ -1,22 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT id, handle FROM users WHERE LOWER(email) = $1",
"query": "SELECT name, value_json FROM account_preferences WHERE user_id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
"name": "name",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "handle",
"type_info": "Text"
"name": "value_json",
"type_info": "Jsonb"
}
],
"parameters": {
"Left": [
"Text"
"Uuid"
]
},
"nullable": [
@@ -24,5 +24,5 @@
false
]
},
"hash": "40bd5b538224352dd8912e2c13a71b920ee3874f4acc12ebf4e6f62aae86c556"
"hash": "12351a50c151a1a4b0b74dcd2604427aed5e8e3ccc067f253c9e342a9b505941"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM account_preferences WHERE user_id = $1 AND (name = $2 OR name LIKE $3)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Text"
]
},
"nullable": []
},
"hash": "48b80b34ff2ad6e43ed7596d4c609e8b3ec4e546c711e57d47097f617471c60d"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Jsonb"
]
},
"nullable": []
},
"hash": "80d4e5415cd065ee137cdcdaa69a6d7329352ca48cfb9b216a9598e3f1a5dbeb"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.actor.profile' AND rkey = 'self'",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "record_cid",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false
]
},
"hash": "94e290ff1acc15ccb8fd57fce25c7a4eea1e45c7339145d5af2741cc04348c8f"
}

View File

@@ -0,0 +1,46 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT\n email,\n handle,\n preferred_notification_channel as \"channel: NotificationChannel\"\n FROM users\n WHERE id = $1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "email",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "handle",
"type_info": "Text"
},
{
"ordinal": 2,
"name": "channel: NotificationChannel",
"type_info": {
"Custom": {
"name": "notification_channel",
"kind": {
"Enum": [
"email",
"discord",
"telegram",
"signal"
]
}
}
}
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "bfb9ee0187a0062cb83c9295cf266f56fed0edd0f9f154c1786f2b0cdbe39508"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT id FROM users WHERE LOWER(email) = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "id",
"type_info": "Uuid"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
},
"hash": "d2a6047b9f8039025b19028b8db7935ea60bfff1698488cbaacc8785c85c94b4"
}

View File

@@ -46,6 +46,9 @@ tokio-tungstenite = { version = "0.28.0", features = ["native-tls"] }
urlencoding = "2.1"
uuid = { version = "1.19.0", features = ["v4", "fast-rng"] }
[features]
external-infra = []
[dev-dependencies]
ctor = "0.6.3"
iroh-car = "0.5.1"

61
TODO.md
View File

@@ -162,10 +162,10 @@ I've tried to ensure that this codebase is not vulnerable to the following:
These endpoints need to be implemented at the PDS level (not just proxied to appview).
### Actor (`app.bsky.actor`)
- [ ] Implement `app.bsky.actor.getPreferences` (user preferences storage).
- [ ] Implement `app.bsky.actor.putPreferences` (update user preferences).
- [ ] Implement `app.bsky.actor.getProfile` (PDS-level with proxy fallback).
- [ ] Implement `app.bsky.actor.getProfiles` (PDS-level with proxy fallback).
- [x] Implement `app.bsky.actor.getPreferences` (user preferences storage).
- [x] Implement `app.bsky.actor.putPreferences` (update user preferences).
- [x] Implement `app.bsky.actor.getProfile` (PDS-level with proxy fallback).
- [x] Implement `app.bsky.actor.getProfiles` (PDS-level with proxy fallback).
### Feed (`app.bsky.feed`)
These are implemented at PDS level to enable local-first reads:
@@ -190,9 +190,9 @@ These are implemented at PDS level to enable local-first reads:
## Preference Storage
User preferences (for app.bsky.actor.getPreferences/putPreferences):
- [ ] Create preferences table for storing user app preferences.
- [ ] Implement `app.bsky.actor.getPreferences` handler (read from postgres, proxy fallback).
- [ ] Implement `app.bsky.actor.putPreferences` handler (write to postgres).
- [x] Create preferences table for storing user app preferences.
- [x] Implement `app.bsky.actor.getPreferences` handler (read from postgres, proxy fallback).
- [x] Implement `app.bsky.actor.putPreferences` handler (write to postgres).
## Infrastructure & Core Components
- [x] Sequencer (Event Log)
@@ -221,6 +221,7 @@ User preferences (for app.bsky.actor.getPreferences/putPreferences):
- [ ] Telegram bot sender
- [ ] Signal bot sender
- [x] Helper functions for common notification types (welcome, password reset, email verification, etc.)
- [x] Respect user's `preferred_notification_channel` setting for non-email-specific notifications
- [ ] Image Processing
- [ ] Implement image resize/formatting pipeline (for blob uploads).
- [x] IPLD & MST
@@ -230,3 +231,49 @@ User preferences (for app.bsky.actor.getPreferences/putPreferences):
- [ ] DID PLC Operations (Sign rotation keys).
- [ ] Fix any remaining TODOs in the code, everywhere, full stop.
## Web Management UI
A single-page web app for account management. The frontend (JS framework) calls existing ATProto XRPC endpoints - no server-side rendering or bespoke HTML form handlers.
### Architecture
- [ ] Static SPA served from PDS (or separate static host)
- [ ] Frontend authenticates via OAuth 2.1 flow (same as any ATProto client)
- [ ] All operations use standard XRPC endpoints (existing + new PDS-specific ones below)
- [ ] No server-side sessions or CSRF - pure API client
### PDS-Specific XRPC Endpoints (new)
Absolutely subject to change, "bspds" isn't even the real name of this pds thus far :D
Anyway... endpoints for PDS settings not covered by standard ATProto:
- [ ] `com.bspds.account.getNotificationPrefs` - get preferred channel, verified channels
- [ ] `com.bspds.account.updateNotificationPrefs` - set preferred channel
- [ ] `com.bspds.account.getNotificationHistory` - list past notifications
- [ ] `com.bspds.account.verifyChannel` - initiate verification for Discord/Telegram/Signal
- [ ] `com.bspds.account.confirmChannelVerification` - confirm with code
- [ ] `com.bspds.admin.getServerStats` - user count, storage usage, etc.
### Frontend Views
Uses existing ATProto endpoints where possible:
**User Dashboard**
- [ ] Account overview (uses `com.atproto.server.getSession`, `com.atproto.admin.getAccountInfo`)
- [ ] Active sessions view (needs new endpoint or extend existing)
- [ ] App passwords (uses `com.atproto.server.listAppPasswords`, `createAppPassword`, `revokeAppPassword`)
- [ ] Invite codes (uses `com.atproto.server.getAccountInviteCodes`, `createInviteCode`)
**Notification Preferences**
- [ ] Channel selector (uses `com.bspds.account.*` endpoints above)
- [ ] Verification flows for Discord/Telegram/Signal
- [ ] Notification history view
**Account Settings**
- [ ] Email change (uses `com.atproto.server.requestEmailUpdate`, `updateEmail`)
- [ ] Password change (uses `com.atproto.server.requestPasswordReset`, `resetPassword`)
- [ ] Handle change (uses `com.atproto.identity.updateHandle`)
- [ ] Account deletion (uses `com.atproto.server.requestAccountDelete`, `deleteAccount`)
- [ ] Data export (uses `com.atproto.sync.getRepo`)
**Admin Dashboard** (privileged users only)
- [ ] User list (uses `com.atproto.admin.getAccountInfos` with pagination)
- [ ] User detail/actions (uses `com.atproto.admin.*` endpoints)
- [ ] Invite management (uses `com.atproto.admin.getInviteCodes`, `disableInviteCodes`)
- [ ] Server stats (uses `com.bspds.admin.getServerStats`)

View File

@@ -0,0 +1,12 @@
CREATE TABLE IF NOT EXISTS account_preferences (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
name TEXT NOT NULL,
value_json JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(user_id, name)
);
CREATE INDEX IF NOT EXISTS idx_account_preferences_user_id ON account_preferences(user_id);
CREATE INDEX IF NOT EXISTS idx_account_preferences_name ON account_preferences(name);

5
src/api/actor/mod.rs Normal file
View File

@@ -0,0 +1,5 @@
mod preferences;
mod profile;
pub use preferences::{get_preferences, put_preferences};
pub use profile::{get_profile, get_profiles};

View File

@@ -0,0 +1,233 @@
use crate::state::AppState;
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
const APP_BSKY_NAMESPACE: &str = "app.bsky";
#[derive(Serialize)]
pub struct GetPreferencesOutput {
pub preferences: Vec<Value>,
}
pub async fn get_preferences(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
) -> Response {
let token = match crate::auth::extract_bearer_token_from_header(
headers.get("Authorization").and_then(|h| h.to_str().ok()),
) {
Some(t) => t,
None => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
};
let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
Ok(user) => user,
Err(_) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
};
let user_id: uuid::Uuid =
match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
.fetch_optional(&state.db)
.await
{
Ok(Some(id)) => id,
_ => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError", "message": "User not found"})),
)
.into_response();
}
};
let prefs_result = sqlx::query!(
"SELECT name, value_json FROM account_preferences WHERE user_id = $1",
user_id
)
.fetch_all(&state.db)
.await;
let prefs = match prefs_result {
Ok(rows) => rows,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError", "message": "Failed to fetch preferences"})),
)
.into_response();
}
};
let preferences: Vec<Value> = prefs
.into_iter()
.filter(|row| {
row.name == APP_BSKY_NAMESPACE || row.name.starts_with(&format!("{}.", APP_BSKY_NAMESPACE))
})
.filter_map(|row| {
if row.name == "app.bsky.actor.defs#declaredAgePref" {
return None;
}
serde_json::from_value(row.value_json).ok()
})
.collect();
(StatusCode::OK, Json(GetPreferencesOutput { preferences })).into_response()
}
#[derive(Deserialize)]
pub struct PutPreferencesInput {
pub preferences: Vec<Value>,
}
pub async fn put_preferences(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(input): Json<PutPreferencesInput>,
) -> Response {
let token = match crate::auth::extract_bearer_token_from_header(
headers.get("Authorization").and_then(|h| h.to_str().ok()),
) {
Some(t) => t,
None => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationRequired"})),
)
.into_response();
}
};
let auth_user = match crate::auth::validate_bearer_token(&state.db, &token).await {
Ok(user) => user,
Err(_) => {
return (
StatusCode::UNAUTHORIZED,
Json(json!({"error": "AuthenticationFailed"})),
)
.into_response();
}
};
let user_id: uuid::Uuid =
match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", auth_user.did)
.fetch_optional(&state.db)
.await
{
Ok(Some(id)) => id,
_ => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError", "message": "User not found"})),
)
.into_response();
}
};
for pref in &input.preferences {
let pref_type = match pref.get("$type").and_then(|t| t.as_str()) {
Some(t) => t,
None => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "Preference missing $type field"})),
)
.into_response();
}
};
if !pref_type.starts_with(APP_BSKY_NAMESPACE) {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": format!("Invalid preference namespace: {}", pref_type)})),
)
.into_response();
}
if pref_type == "app.bsky.actor.defs#declaredAgePref" {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "declaredAgePref is read-only"})),
)
.into_response();
}
}
let mut tx = match state.db.begin().await {
Ok(tx) => tx,
Err(_) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError", "message": "Failed to start transaction"})),
)
.into_response();
}
};
let delete_result = sqlx::query!(
"DELETE FROM account_preferences WHERE user_id = $1 AND (name = $2 OR name LIKE $3)",
user_id,
APP_BSKY_NAMESPACE,
format!("{}.%", APP_BSKY_NAMESPACE)
)
.execute(&mut *tx)
.await;
if delete_result.is_err() {
let _ = tx.rollback().await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError", "message": "Failed to clear preferences"})),
)
.into_response();
}
for pref in input.preferences {
let pref_type = pref.get("$type").and_then(|t| t.as_str()).unwrap();
let insert_result = sqlx::query!(
"INSERT INTO account_preferences (user_id, name, value_json) VALUES ($1, $2, $3)",
user_id,
pref_type,
pref
)
.execute(&mut *tx)
.await;
if insert_result.is_err() {
let _ = tx.rollback().await;
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError", "message": "Failed to save preference"})),
)
.into_response();
}
}
if let Err(_) = tx.commit().await {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError", "message": "Failed to commit transaction"})),
)
.into_response();
}
StatusCode::OK.into_response()
}

206
src/api/actor/profile.rs Normal file
View File

@@ -0,0 +1,206 @@
use crate::state::AppState;
use axum::{
extract::{Query, State},
http::StatusCode,
response::{IntoResponse, Response},
Json,
};
use jacquard_repo::storage::BlockStore;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use tracing::{error, info};
#[derive(Deserialize)]
pub struct GetProfileParams {
pub actor: String,
}
#[derive(Deserialize)]
pub struct GetProfilesParams {
pub actors: String,
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct ProfileViewDetailed {
pub did: String,
pub handle: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub display_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub avatar: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub banner: Option<String>,
#[serde(flatten)]
pub extra: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize)]
pub struct GetProfilesOutput {
pub profiles: Vec<ProfileViewDetailed>,
}
async fn get_local_profile_record(state: &AppState, did: &str) -> Option<Value> {
let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await
.ok()??;
let record_row = sqlx::query!(
"SELECT record_cid FROM records WHERE repo_id = $1 AND collection = 'app.bsky.actor.profile' AND rkey = 'self'",
user_id
)
.fetch_optional(&state.db)
.await
.ok()??;
let cid: cid::Cid = record_row.record_cid.parse().ok()?;
let block_bytes = state.block_store.get(&cid).await.ok()??;
serde_ipld_dagcbor::from_slice(&block_bytes).ok()
}
fn munge_profile_with_local(profile: &mut ProfileViewDetailed, local_record: &Value) {
if let Some(display_name) = local_record.get("displayName").and_then(|v| v.as_str()) {
profile.display_name = Some(display_name.to_string());
}
if let Some(description) = local_record.get("description").and_then(|v| v.as_str()) {
profile.description = Some(description.to_string());
}
}
async fn proxy_to_appview(
method: &str,
params: &HashMap<String, String>,
auth_header: Option<&str>,
) -> Result<(StatusCode, Value), Response> {
let appview_url = match std::env::var("APPVIEW_URL") {
Ok(url) => url,
Err(_) => {
return Err(
(StatusCode::BAD_GATEWAY, Json(json!({"error": "UpstreamError", "message": "No upstream AppView configured"}))).into_response()
);
}
};
let target_url = format!("{}/xrpc/{}", appview_url, method);
info!("Proxying GET request to {}", target_url);
let client = Client::new();
let mut request_builder = client.get(&target_url).query(params);
if let Some(auth) = auth_header {
request_builder = request_builder.header("Authorization", auth);
}
match request_builder.send().await {
Ok(resp) => {
let status = StatusCode::from_u16(resp.status().as_u16()).unwrap_or(StatusCode::BAD_GATEWAY);
match resp.json::<Value>().await {
Ok(body) => Ok((status, body)),
Err(e) => {
error!("Error parsing proxy response: {:?}", e);
Err((StatusCode::BAD_GATEWAY, Json(json!({"error": "UpstreamError"}))).into_response())
}
}
}
Err(e) => {
error!("Error sending proxy request: {:?}", e);
if e.is_timeout() {
Err((StatusCode::GATEWAY_TIMEOUT, Json(json!({"error": "UpstreamTimeout"}))).into_response())
} else {
Err((StatusCode::BAD_GATEWAY, Json(json!({"error": "UpstreamError"}))).into_response())
}
}
}
}
pub async fn get_profile(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(params): Query<GetProfileParams>,
) -> Response {
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
let auth_did = auth_header.and_then(|h| {
let token = crate::auth::extract_bearer_token_from_header(Some(h))?;
crate::auth::get_did_from_token(&token).ok()
});
let mut query_params = HashMap::new();
query_params.insert("actor".to_string(), params.actor.clone());
let (status, body) = match proxy_to_appview("app.bsky.actor.getProfile", &query_params, auth_header).await {
Ok(r) => r,
Err(e) => return e,
};
if !status.is_success() {
return (status, Json(body)).into_response();
}
let mut profile: ProfileViewDetailed = match serde_json::from_value(body) {
Ok(p) => p,
Err(_) => {
return (StatusCode::BAD_GATEWAY, Json(json!({"error": "UpstreamError", "message": "Invalid profile response"}))).into_response();
}
};
if let Some(ref did) = auth_did {
if profile.did == *did {
if let Some(local_record) = get_local_profile_record(&state, did).await {
munge_profile_with_local(&mut profile, &local_record);
}
}
}
(StatusCode::OK, Json(profile)).into_response()
}
pub async fn get_profiles(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Query(params): Query<GetProfilesParams>,
) -> Response {
let auth_header = headers.get("Authorization").and_then(|h| h.to_str().ok());
let auth_did = auth_header.and_then(|h| {
let token = crate::auth::extract_bearer_token_from_header(Some(h))?;
crate::auth::get_did_from_token(&token).ok()
});
let mut query_params = HashMap::new();
query_params.insert("actors".to_string(), params.actors.clone());
let (status, body) = match proxy_to_appview("app.bsky.actor.getProfiles", &query_params, auth_header).await {
Ok(r) => r,
Err(e) => return e,
};
if !status.is_success() {
return (status, Json(body)).into_response();
}
let mut output: GetProfilesOutput = match serde_json::from_value(body) {
Ok(p) => p,
Err(_) => {
return (StatusCode::BAD_GATEWAY, Json(json!({"error": "UpstreamError", "message": "Invalid profiles response"}))).into_response();
}
};
if let Some(ref did) = auth_did {
for profile in &mut output.profiles {
if profile.did == *did {
if let Some(local_record) = get_local_profile_record(&state, did).await {
munge_profile_with_local(profile, &local_record);
}
break;
}
}
}
(StatusCode::OK, Json(output)).into_response()
}

View File

@@ -415,16 +415,8 @@ pub async fn create_account(
}
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
if let Err(e) = crate::notifications::enqueue_welcome_email(
&state.db,
user_id,
&input.email,
&input.handle,
&hostname,
)
.await
{
warn!("Failed to enqueue welcome email: {:?}", e);
if let Err(e) = crate::notifications::enqueue_welcome(&state.db, user_id, &hostname).await {
warn!("Failed to enqueue welcome notification: {:?}", e);
}
(

View File

@@ -1,3 +1,4 @@
pub mod actor;
pub mod admin;
pub mod feed;
pub mod identity;

View File

@@ -247,11 +247,11 @@ pub async fn request_account_delete(
}
};
let user = match sqlx::query!("SELECT id, email, handle FROM users WHERE did = $1", did)
let user_id = match sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
.await
{
Ok(Some(row)) => row,
Ok(Some(id)) => id,
_ => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
@@ -260,9 +260,6 @@ pub async fn request_account_delete(
.into_response();
}
};
let user_id = user.id;
let email = user.email;
let handle = user.handle;
let confirmation_token = Uuid::new_v4().to_string();
let expires_at = Utc::now() + Duration::minutes(15);
@@ -286,15 +283,8 @@ pub async fn request_account_delete(
}
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
if let Err(e) = crate::notifications::enqueue_account_deletion(
&state.db,
user_id,
&email,
&handle,
&confirmation_token,
&hostname,
)
.await
if let Err(e) =
crate::notifications::enqueue_account_deletion(&state.db, user_id, &confirmation_token, &hostname).await
{
warn!("Failed to enqueue account deletion notification: {:?}", e);
}

View File

@@ -38,15 +38,12 @@ pub async fn request_password_reset(
.into_response();
}
let user = sqlx::query!(
"SELECT id, handle FROM users WHERE LOWER(email) = $1",
email
)
.fetch_optional(&state.db)
.await;
let user = sqlx::query!("SELECT id FROM users WHERE LOWER(email) = $1", email)
.fetch_optional(&state.db)
.await;
let (user_id, handle) = match user {
Ok(Some(row)) => (row.id, row.handle),
let user_id = match user {
Ok(Some(row)) => row.id,
Ok(None) => {
info!("Password reset requested for unknown email: {}", email);
return (StatusCode::OK, Json(json!({}))).into_response();
@@ -83,15 +80,8 @@ pub async fn request_password_reset(
}
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
if let Err(e) = crate::notifications::enqueue_password_reset(
&state.db,
user_id,
&email,
&handle,
&code,
&hostname,
)
.await
if let Err(e) =
crate::notifications::enqueue_password_reset(&state.db, user_id, &code, &hostname).await
{
warn!("Failed to enqueue password reset notification: {:?}", e);
}

View File

@@ -1,3 +1,4 @@
#[allow(deprecated)]
use aes_gcm::{
Aes256Gcm, KeyInit, Nonce,
aead::Aead,
@@ -127,6 +128,7 @@ impl AuthConfig {
let mut nonce_bytes = [0u8; 12];
rand::thread_rng().fill_bytes(&mut nonce_bytes);
#[allow(deprecated)]
let nonce = Nonce::from_slice(&nonce_bytes);
let ciphertext = cipher
@@ -148,6 +150,7 @@ impl AuthConfig {
let cipher = Aes256Gcm::new_from_slice(&self.key_encryption_key)
.map_err(|e| format!("Failed to create cipher: {}", e))?;
#[allow(deprecated)]
let nonce = Nonce::from_slice(&encrypted[..12]);
let ciphertext = &encrypted[12..];

View File

@@ -262,6 +262,22 @@ pub fn app(state: AppState) -> Router {
"/xrpc/com.atproto.admin.sendEmail",
post(api::admin::send_email),
)
.route(
"/xrpc/app.bsky.actor.getPreferences",
get(api::actor::get_preferences),
)
.route(
"/xrpc/app.bsky.actor.putPreferences",
post(api::actor::put_preferences),
)
.route(
"/xrpc/app.bsky.actor.getProfile",
get(api::actor::get_profile),
)
.route(
"/xrpc/app.bsky.actor.getProfiles",
get(api::actor::get_profiles),
)
// I know I know, I'm not supposed to implement appview endpoints. Leave me be
.route(
"/xrpc/app.bsky.feed.getTimeline",

View File

@@ -5,7 +5,7 @@ mod types;
pub use sender::{EmailSender, NotificationSender};
pub use service::{
enqueue_account_deletion, enqueue_email_update, enqueue_email_verification,
enqueue_notification, enqueue_password_reset, enqueue_welcome_email, NotificationService,
enqueue_notification, enqueue_password_reset, enqueue_welcome, NotificationService,
};
pub use types::{
NewNotification, NotificationChannel, NotificationStatus, NotificationType, QueuedNotification,

View File

@@ -254,25 +254,57 @@ pub async fn enqueue_notification(db: &PgPool, notification: NewNotification) ->
.await
}
pub async fn enqueue_welcome_email(
pub struct UserNotificationPrefs {
pub channel: NotificationChannel,
pub email: String,
pub handle: String,
}
pub async fn get_user_notification_prefs(
db: &PgPool,
user_id: Uuid,
) -> Result<UserNotificationPrefs, sqlx::Error> {
let row = sqlx::query!(
r#"
SELECT
email,
handle,
preferred_notification_channel as "channel: NotificationChannel"
FROM users
WHERE id = $1
"#,
user_id
)
.fetch_one(db)
.await?;
Ok(UserNotificationPrefs {
channel: row.channel,
email: row.email,
handle: row.handle,
})
}
pub async fn enqueue_welcome(
db: &PgPool,
user_id: Uuid,
email: &str,
handle: &str,
hostname: &str,
) -> Result<Uuid, sqlx::Error> {
let prefs = get_user_notification_prefs(db, user_id).await?;
let body = format!(
"Welcome to {}!\n\nYour handle is: @{}\n\nThank you for joining us.",
hostname, handle
hostname, prefs.handle
);
enqueue_notification(
db,
NewNotification::email(
NewNotification::new(
user_id,
prefs.channel,
super::types::NotificationType::Welcome,
email.to_string(),
format!("Welcome to {}", hostname),
prefs.email.clone(),
Some(format!("Welcome to {}", hostname)),
body,
),
)
@@ -308,23 +340,24 @@ pub async fn enqueue_email_verification(
pub async fn enqueue_password_reset(
db: &PgPool,
user_id: Uuid,
email: &str,
handle: &str,
code: &str,
hostname: &str,
) -> Result<Uuid, sqlx::Error> {
let prefs = get_user_notification_prefs(db, user_id).await?;
let body = format!(
"Hello @{},\n\nYour password reset code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this email.",
handle, code
"Hello @{},\n\nYour password reset code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please ignore this message.",
prefs.handle, code
);
enqueue_notification(
db,
NewNotification::email(
NewNotification::new(
user_id,
prefs.channel,
super::types::NotificationType::PasswordReset,
email.to_string(),
format!("Password Reset - {}", hostname),
prefs.email.clone(),
Some(format!("Password Reset - {}", hostname)),
body,
),
)
@@ -360,23 +393,24 @@ pub async fn enqueue_email_update(
pub async fn enqueue_account_deletion(
db: &PgPool,
user_id: Uuid,
email: &str,
handle: &str,
code: &str,
hostname: &str,
) -> Result<Uuid, sqlx::Error> {
let prefs = get_user_notification_prefs(db, user_id).await?;
let body = format!(
"Hello @{},\n\nYour account deletion confirmation code is: {}\n\nThis code will expire in 10 minutes.\n\nIf you did not request this, please secure your account immediately.",
handle, code
prefs.handle, code
);
enqueue_notification(
db,
NewNotification::email(
NewNotification::new(
user_id,
prefs.channel,
super::types::NotificationType::AccountDeletion,
email.to_string(),
format!("Account Deletion Request - {}", hostname),
prefs.email.clone(),
Some(format!("Account Deletion Request - {}", hostname)),
body,
),
)

View File

@@ -63,6 +63,25 @@ pub struct NewNotification {
}
impl NewNotification {
pub fn new(
user_id: Uuid,
channel: NotificationChannel,
notification_type: NotificationType,
recipient: String,
subject: Option<String>,
body: String,
) -> Self {
Self {
user_id,
channel,
notification_type,
recipient,
subject,
body,
metadata: None,
}
}
pub fn email(
user_id: Uuid,
notification_type: NotificationType,
@@ -70,14 +89,13 @@ impl NewNotification {
subject: String,
body: String,
) -> Self {
Self {
Self::new(
user_id,
channel: NotificationChannel::Email,
NotificationChannel::Email,
notification_type,
recipient,
subject: Some(subject),
Some(subject),
body,
metadata: None,
}
)
}
}

375
tests/actor.rs Normal file
View File

@@ -0,0 +1,375 @@
mod common;
use common::{base_url, client, create_account_and_login};
use serde_json::{json, Value};
#[tokio::test]
async fn test_get_preferences_empty() {
let client = client();
let base = base_url().await;
let (token, _did) = create_account_and_login(&client).await;
let resp = client
.get(format!("{}/xrpc/app.bsky.actor.getPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Value = resp.json().await.unwrap();
assert!(body.get("preferences").is_some());
assert!(body["preferences"].as_array().unwrap().is_empty());
}
#[tokio::test]
async fn test_get_preferences_no_auth() {
let client = client();
let base = base_url().await;
let resp = client
.get(format!("{}/xrpc/app.bsky.actor.getPreferences", base))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 401);
}
#[tokio::test]
async fn test_put_preferences_success() {
let client = client();
let base = base_url().await;
let (token, _did) = create_account_and_login(&client).await;
let prefs = json!({
"preferences": [
{
"$type": "app.bsky.actor.defs#adultContentPref",
"enabled": true
},
{
"$type": "app.bsky.actor.defs#contentLabelPref",
"label": "nsfw",
"visibility": "warn"
}
]
});
let resp = client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.json(&prefs)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let resp = client
.get(format!("{}/xrpc/app.bsky.actor.getPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Value = resp.json().await.unwrap();
let prefs_arr = body["preferences"].as_array().unwrap();
assert_eq!(prefs_arr.len(), 2);
let adult_pref = prefs_arr.iter().find(|p| {
p.get("$type").and_then(|t| t.as_str()) == Some("app.bsky.actor.defs#adultContentPref")
});
assert!(adult_pref.is_some());
assert_eq!(adult_pref.unwrap()["enabled"], true);
}
#[tokio::test]
async fn test_put_preferences_no_auth() {
let client = client();
let base = base_url().await;
let prefs = json!({
"preferences": []
});
let resp = client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.json(&prefs)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 401);
}
#[tokio::test]
async fn test_put_preferences_missing_type() {
let client = client();
let base = base_url().await;
let (token, _did) = create_account_and_login(&client).await;
let prefs = json!({
"preferences": [
{
"enabled": true
}
]
});
let resp = client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.json(&prefs)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 400);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"], "InvalidRequest");
}
#[tokio::test]
async fn test_put_preferences_invalid_namespace() {
let client = client();
let base = base_url().await;
let (token, _did) = create_account_and_login(&client).await;
let prefs = json!({
"preferences": [
{
"$type": "com.example.somePref",
"value": "test"
}
]
});
let resp = client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.json(&prefs)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 400);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"], "InvalidRequest");
}
#[tokio::test]
async fn test_put_preferences_read_only_rejected() {
let client = client();
let base = base_url().await;
let (token, _did) = create_account_and_login(&client).await;
let prefs = json!({
"preferences": [
{
"$type": "app.bsky.actor.defs#declaredAgePref",
"isOverAge18": true
}
]
});
let resp = client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.json(&prefs)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 400);
let body: Value = resp.json().await.unwrap();
assert_eq!(body["error"], "InvalidRequest");
}
#[tokio::test]
async fn test_put_preferences_replaces_all() {
let client = client();
let base = base_url().await;
let (token, _did) = create_account_and_login(&client).await;
let prefs1 = json!({
"preferences": [
{
"$type": "app.bsky.actor.defs#adultContentPref",
"enabled": true
},
{
"$type": "app.bsky.actor.defs#contentLabelPref",
"label": "nsfw",
"visibility": "warn"
}
]
});
client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.json(&prefs1)
.send()
.await
.unwrap();
let prefs2 = json!({
"preferences": [
{
"$type": "app.bsky.actor.defs#threadViewPref",
"sort": "newest"
}
]
});
client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.json(&prefs2)
.send()
.await
.unwrap();
let resp = client
.get(format!("{}/xrpc/app.bsky.actor.getPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Value = resp.json().await.unwrap();
let prefs_arr = body["preferences"].as_array().unwrap();
assert_eq!(prefs_arr.len(), 1);
assert_eq!(prefs_arr[0]["$type"], "app.bsky.actor.defs#threadViewPref");
}
#[tokio::test]
async fn test_put_preferences_saved_feeds() {
let client = client();
let base = base_url().await;
let (token, _did) = create_account_and_login(&client).await;
let prefs = json!({
"preferences": [
{
"$type": "app.bsky.actor.defs#savedFeedsPrefV2",
"items": [
{
"type": "feed",
"value": "at://did:plc:example/app.bsky.feed.generator/my-feed",
"pinned": true
}
]
}
]
});
let resp = client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.json(&prefs)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let resp = client
.get(format!("{}/xrpc/app.bsky.actor.getPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Value = resp.json().await.unwrap();
let prefs_arr = body["preferences"].as_array().unwrap();
assert_eq!(prefs_arr.len(), 1);
let saved_feeds = &prefs_arr[0];
assert_eq!(saved_feeds["$type"], "app.bsky.actor.defs#savedFeedsPrefV2");
assert!(saved_feeds["items"].as_array().unwrap().len() == 1);
}
#[tokio::test]
async fn test_put_preferences_muted_words() {
let client = client();
let base = base_url().await;
let (token, _did) = create_account_and_login(&client).await;
let prefs = json!({
"preferences": [
{
"$type": "app.bsky.actor.defs#mutedWordsPref",
"items": [
{
"value": "spoiler",
"targets": ["content", "tag"],
"actorTarget": "all"
}
]
}
]
});
let resp = client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.json(&prefs)
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let resp = client
.get(format!("{}/xrpc/app.bsky.actor.getPreferences", base))
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.unwrap();
let body: Value = resp.json().await.unwrap();
let prefs_arr = body["preferences"].as_array().unwrap();
assert_eq!(prefs_arr[0]["$type"], "app.bsky.actor.defs#mutedWordsPref");
}
#[tokio::test]
async fn test_preferences_isolation_between_users() {
let client = client();
let base = base_url().await;
let (token1, _did1) = create_account_and_login(&client).await;
let (token2, _did2) = create_account_and_login(&client).await;
let prefs1 = json!({
"preferences": [
{
"$type": "app.bsky.actor.defs#adultContentPref",
"enabled": true
}
]
});
client
.post(format!("{}/xrpc/app.bsky.actor.putPreferences", base))
.header("Authorization", format!("Bearer {}", token1))
.json(&prefs1)
.send()
.await
.unwrap();
let resp = client
.get(format!("{}/xrpc/app.bsky.actor.getPreferences", base))
.header("Authorization", format!("Bearer {}", token2))
.send()
.await
.unwrap();
assert_eq!(resp.status(), 200);
let body: Value = resp.json().await.unwrap();
assert!(body["preferences"].as_array().unwrap().is_empty());
}

View File

@@ -1,7 +1,7 @@
mod common;
use bspds::notifications::{
enqueue_notification, enqueue_welcome_email, NewNotification, NotificationChannel,
enqueue_notification, enqueue_welcome, NewNotification, NotificationChannel,
NotificationStatus, NotificationType,
};
use sqlx::PgPool;
@@ -64,19 +64,19 @@ async fn test_enqueue_notification() {
}
#[tokio::test]
async fn test_enqueue_welcome_email() {
async fn test_enqueue_welcome() {
let pool = get_pool().await;
let (_, did) = common::create_account_and_login(&common::client()).await;
let user_id: uuid::Uuid = sqlx::query_scalar!("SELECT id FROM users WHERE did = $1", did)
let user_row = sqlx::query!("SELECT id, email, handle FROM users WHERE did = $1", did)
.fetch_one(&pool)
.await
.expect("User not found");
let notification_id = enqueue_welcome_email(&pool, user_id, "user@example.com", "testhandle", "example.com")
let notification_id = enqueue_welcome(&pool, user_row.id, "example.com")
.await
.expect("Failed to enqueue welcome email");
.expect("Failed to enqueue welcome notification");
let row = sqlx::query!(
r#"
@@ -92,9 +92,9 @@ async fn test_enqueue_welcome_email() {
.await
.expect("Notification not found");
assert_eq!(row.recipient, "user@example.com");
assert_eq!(row.recipient, user_row.email);
assert_eq!(row.subject.as_deref(), Some("Welcome to example.com"));
assert!(row.body.contains("@testhandle"));
assert!(row.body.contains(&format!("@{}", user_row.handle)));
assert_eq!(row.notification_type, NotificationType::Welcome);
}

View File

@@ -1428,7 +1428,7 @@ async fn test_state_with_special_chars() {
let mock_client = setup_mock_client_metadata(redirect_uri).await;
let client_id = mock_client.uri();
let (code_verifier, code_challenge) = generate_pkce();
let (_code_verifier, code_challenge) = generate_pkce();
let special_state = "state=with&special=chars&plus+more";
let par_body: Value = http_client

View File

@@ -11,7 +11,6 @@ fn create_dpop_proof(
iat_offset_secs: i64,
) -> String {
use p256::ecdsa::{SigningKey, Signature, signature::Signer};
use p256::elliptic_curve::sec1::ToEncodedPoint;
let signing_key = SigningKey::random(&mut rand::thread_rng());
let verifying_key = signing_key.verifying_key();