Account lifecycle conf. vs ref

This commit is contained in:
lewis
2025-12-29 21:51:09 +02:00
parent c58255daa1
commit c302d2aea1
39 changed files with 1338 additions and 144 deletions

View File

@@ -100,11 +100,25 @@ AWS_SECRET_ACCESS_KEY=minioadmin
# Comma-separated list of available user domains
# AVAILABLE_USER_DOMAINS=example.com
# =============================================================================
# Server Metadata (returned by describeServer)
# =============================================================================
# Privacy policy URL (optional)
# PRIVACY_POLICY_URL=https://example.com/privacy
# Terms of service URL (optional)
# TERMS_OF_SERVICE_URL=https://example.com/terms
# Contact email address (optional)
# CONTACT_EMAIL=admin@example.com
# =============================================================================
# Rate Limiting
# =============================================================================
# Disable all rate limiting (testing only, NEVER in production)
# DISABLE_RATE_LIMITING=1
# =============================================================================
# Account Deletion
# =============================================================================
# How often to check for scheduled account deletions (default: 3600 = 1 hour)
# SCHEDULED_DELETE_CHECK_INTERVAL_SECS=3600
# =============================================================================
# Miscellaneous
# =============================================================================
# Allow HTTP for proxy requests (development only)

View File

@@ -1,15 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text"
]
},
"nullable": []
},
"hash": "14a68a119586aa980fb7b64646c1373eecd788e508246b5ad84e31b1adbdd2c1"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"ByteaArray"
]
},
"nullable": []
},
"hash": "244b55cedfe51f834337141d3bb00e48a1c9277be3e6f0e7e6231a0f3e53a7a4"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"Uuid"
]
},
"nullable": []
},
"hash": "3567e730c1fe4dee7753a53b71c2c586335c795003ce6090fb5af2b107208305"
}

View File

@@ -0,0 +1,26 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT u.id as user_id, r.repo_root_cid\n FROM users u\n JOIN repos r ON r.user_id = u.id\n WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "user_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "repo_root_cid",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false
]
},
"hash": "3f13f59e14ca24d4523be38a0b95d32a4a970f61c84f0539f4c4ee484afdce7d"
}

View File

@@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "repo_root_cid",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "repo_rev",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
true
]
},
"hash": "49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO repo_seq (did, event_type, active, status)\n VALUES ($1, 'account', false, 'deleted')\n RETURNING seq\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "seq",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
},
"hash": "51da09ecbd806c8ee59acfbe333a3eace1c428f5bb5130dff0cccf14e4bdb4c1"
}

View File

@@ -0,0 +1,28 @@
{
"db_name": "PostgreSQL",
"query": "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "repo_root_cid",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "repo_rev",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false,
true
]
},
"hash": "6b3704b48a690ea278019a70a977737de7f6dc39c3f2509b55bb6c4580e3d2ee"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "count",
"type_info": "Int8"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
null
]
},
"hash": "908e74d3c4c6e429133adb7074dcfe52980f0e02f2908b17cdd00fc679e6da36"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"Text",
"Text"
]
},
"nullable": []
},
"hash": "94683841b256b65ed2ac4806206faf7edc34b5952143334b8fc834350894478f"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"ByteaArray"
]
},
"nullable": []
},
"hash": "978ec276ffa89b539b5365e8106f0f78b7dd5d3d50162deb535c583796afe192"
}

View File

@@ -0,0 +1,26 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT did, handle\n FROM users\n WHERE delete_after IS NOT NULL\n AND delete_after < NOW()\n AND deactivated_at IS NOT NULL\n LIMIT 100\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "did",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "handle",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false
]
},
"hash": "9c1d6f38011f8070e058ef4c9100ebe833c85fe4aa1b77af1ce67dd8fcda507a"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Uuid",
"ByteaArray"
]
},
"nullable": []
},
"hash": "9f461c44be23d43feb8491422dd5008e3a32ba603f09fcdbbc29bf23cb870444"
}

View File

@@ -0,0 +1,16 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Text",
"Uuid"
]
},
"nullable": []
},
"hash": "a9e604216b880a8e1be9b4cec84880febb5185f7b7babb616f9c0f1f7016f59e"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Int8"
]
},
"nullable": []
},
"hash": "b6d6548acb89d6384cd226f6ed0d66de27fde3af24b4a7a3fce7e098812e38a5"
}

View File

@@ -0,0 +1,15 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Timestamptz"
]
},
"nullable": []
},
"hash": "b7432d134013ff1f64389dda715ae0c23e0095f42585e2ecb962422d9a45ef17"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2",
"query": "UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
"describe": {
"columns": [],
"parameters": {
@@ -11,5 +11,5 @@
},
"nullable": []
},
"hash": "2588479ef83ed45a5d0dee599636f195ca38c5df164e225dcb1b829b497c8f14"
"hash": "b8de174efc5f897e688bc1fb5c49a10530815dd4737e4c4b821f5b26756b63ba"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)\n VALUES ($1, 'commit', $2, $2, $3, $4, $5)\n RETURNING seq\n ",
"query": "\n INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)\n VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)\n RETURNING seq\n ",
"describe": {
"columns": [
{
@@ -11,16 +11,18 @@
],
"parameters": {
"Left": [
"Text",
"Text",
"Text",
"Jsonb",
"TextArray",
"TextArray"
"TextArray",
"Text"
]
},
"nullable": [
false
]
},
"hash": "53b0ea60a759f8bb37d01461fd0769dcc683e796287e41d5180340296286fcbe"
"hash": "c9067e3e62c22fe92a135fa0c6c2b06cad977bf73bf3bb0fd3fc88938d875637"
}

View File

@@ -1,15 +0,0 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Text",
"Uuid"
]
},
"nullable": []
},
"hash": "f1e88d447915b116f887c378253388654a783bddb111b1f9aa04507f176980d3"
}

View File

@@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "SELECT storage_key as \"storage_key!\" FROM blobs WHERE created_by_user = $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "storage_key!",
"type_info": "Text"
}
],
"parameters": {
"Left": [
"Uuid"
]
},
"nullable": [
false
]
},
"hash": "f59010ecdd7f782489e0e03288a06dacd72b33d04c1e2b98475018ad25485852"
}

View File

@@ -0,0 +1,26 @@
{
"db_name": "PostgreSQL",
"query": "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "user_id",
"type_info": "Uuid"
},
{
"ordinal": 1,
"name": "repo_root_cid",
"type_info": "Text"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false
]
},
"hash": "f90c58a4e9dc9c28a682405fb7d5421853c6ef710bee0170430416485f41a0c3"
}

View File

@@ -0,0 +1 @@
ALTER TABLE users ADD COLUMN IF NOT EXISTS delete_after TIMESTAMPTZ;

View File

@@ -0,0 +1,7 @@
CREATE TABLE IF NOT EXISTS user_blocks (
user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
block_cid BYTEA NOT NULL,
PRIMARY KEY (user_id, block_cid)
);
CREATE INDEX IF NOT EXISTS idx_user_blocks_user_id ON user_blocks(user_id);

View File

@@ -886,10 +886,12 @@ pub async fn create_delegated_account(
}
};
let commit_cid_str = commit_cid.to_string();
let rev_str = rev.as_ref().to_string();
if let Err(e) = sqlx::query!(
"INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
"INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
user_id,
commit_cid_str
commit_cid_str,
rev_str
)
.execute(&mut *tx)
.await
@@ -901,6 +903,26 @@ pub async fn create_delegated_account(
)
.into_response();
}
let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
if let Err(e) = sqlx::query!(
r#"
INSERT INTO user_blocks (user_id, block_cid)
SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
ON CONFLICT (user_id, block_cid) DO NOTHING
"#,
user_id,
&genesis_block_cids
)
.execute(&mut *tx)
.await
{
error!("Error inserting user_blocks: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
if let Some(ref code) = input.invite_code {
let _ = sqlx::query!(

View File

@@ -57,9 +57,9 @@ pub struct CreateAccountOutput {
pub handle: String,
pub did: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub access_jwt: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub refresh_jwt: Option<String>,
pub did_doc: Option<serde_json::Value>,
pub access_jwt: String,
pub refresh_jwt: String,
pub verification_required: bool,
pub verification_channel: String,
}
@@ -624,9 +624,10 @@ pub async fn create_account(
StatusCode::OK,
Json(CreateAccountOutput {
handle: handle.clone(),
did,
access_jwt: Some(access_meta.token),
refresh_jwt: Some(refresh_meta.token),
did: did.clone(),
did_doc: state.did_resolver.resolve_did_document(&did).await,
access_jwt: access_meta.token,
refresh_jwt: refresh_meta.token,
verification_required: false,
verification_channel: "email".to_string(),
}),
@@ -912,10 +913,12 @@ pub async fn create_account(
}
};
let commit_cid_str = commit_cid.to_string();
let rev_str = rev.as_ref().to_string();
let repo_insert = sqlx::query!(
"INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
"INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
user_id,
commit_cid_str
commit_cid_str,
rev_str
)
.execute(&mut *tx)
.await;
@@ -927,6 +930,26 @@ pub async fn create_account(
)
.into_response();
}
let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
if let Err(e) = sqlx::query!(
r#"
INSERT INTO user_blocks (user_id, block_cid)
SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
ON CONFLICT (user_id, block_cid) DO NOTHING
"#,
user_id,
&genesis_block_cids
)
.execute(&mut *tx)
.await
{
error!("Error inserting user_blocks: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
if let Some(code) = &input.invite_code
&& !code.trim().is_empty()
{
@@ -965,6 +988,21 @@ pub async fn create_account(
{
warn!("Failed to sequence account event for {}: {}", did, e);
}
if let Err(e) =
crate::api::repo::record::sequence_empty_commit_event(&state, &did).await
{
warn!("Failed to sequence commit event for {}: {}", did, e);
}
if let Err(e) = crate::api::repo::record::sequence_sync_event(
&state,
&did,
&commit_cid_str,
Some(rev.as_ref()),
)
.await
{
warn!("Failed to sequence sync event for {}: {}", did, e);
}
let profile_record = json!({
"$type": "app.bsky.actor.profile",
"displayName": input.handle
@@ -1023,71 +1061,50 @@ pub async fn create_account(
}
}
let (access_jwt, refresh_jwt) = if is_migration {
info!(
"[MIGRATION] createAccount: Creating session tokens for migration did={}",
did
);
let access_meta = match crate::auth::create_access_token_with_metadata(
&did,
&secret_key_bytes,
) {
Ok(m) => m,
Err(e) => {
error!(
"[MIGRATION] createAccount: Error creating access token for migration: {:?}",
e
);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
let refresh_meta = match crate::auth::create_refresh_token_with_metadata(
&did,
&secret_key_bytes,
) {
Ok(m) => m,
Err(e) => {
error!(
"[MIGRATION] createAccount: Error creating refresh token for migration: {:?}",
e
);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(e) = sqlx::query!(
"INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
did,
access_meta.jti,
refresh_meta.jti,
access_meta.expires_at,
refresh_meta.expires_at
)
.execute(&state.db)
.await
{
error!("[MIGRATION] createAccount: Error creating session for migration: {:?}", e);
let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes)
{
Ok(m) => m,
Err(e) => {
error!("createAccount: Error creating access token: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
info!(
"[MIGRATION] createAccount: Session created successfully for did={}",
did
);
(Some(access_meta.token), Some(refresh_meta.token))
} else {
(None, None)
};
let refresh_meta =
match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) {
Ok(m) => m,
Err(e) => {
error!("createAccount: Error creating refresh token: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
};
if let Err(e) = sqlx::query!(
"INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)",
did,
access_meta.jti,
refresh_meta.jti,
access_meta.expires_at,
refresh_meta.expires_at
)
.execute(&state.db)
.await
{
error!("createAccount: Error creating session: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
let did_doc = state.did_resolver.resolve_did_document(&did).await;
if is_migration {
info!(
@@ -1101,11 +1118,13 @@ pub async fn create_account(
Json(CreateAccountOutput {
handle: handle.clone(),
did,
access_jwt,
refresh_jwt,
did_doc,
access_jwt: access_meta.token,
refresh_jwt: refresh_meta.token,
verification_required: !is_migration,
verification_channel: verification_channel.to_string(),
}),
)
.into_response()
}

View File

@@ -315,7 +315,7 @@ pub async fn import_repo(
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_MAX_BLOCKS);
match apply_import(&state.db, user_id, root, blocks, max_blocks).await {
match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await {
Ok(import_result) => {
info!(
"Successfully imported {} records for user {}",
@@ -405,8 +405,9 @@ pub async fn import_repo(
};
let new_root_str = new_root_cid.to_string();
if let Err(e) = sqlx::query!(
"UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2",
"UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3",
new_root_str,
&new_rev_str,
user_id
)
.execute(&state.db)
@@ -419,6 +420,27 @@ pub async fn import_repo(
)
.into_response();
}
let mut all_block_cids: Vec<Vec<u8>> = blocks.keys().map(|c| c.to_bytes()).collect();
all_block_cids.push(new_root_cid.to_bytes());
if let Err(e) = sqlx::query!(
r#"
INSERT INTO user_blocks (user_id, block_cid)
SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
ON CONFLICT (user_id, block_cid) DO NOTHING
"#,
user_id,
&all_block_cids
)
.execute(&state.db)
.await
{
error!("Failed to insert user_blocks: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
info!(
"Created new commit for imported repo: cid={}, rev={}",
new_root_str, new_rev_str

View File

@@ -173,13 +173,34 @@ pub async fn commit_and_log(
.flatten()
.unwrap_or(false);
sqlx::query!(
"UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2",
"UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3",
new_root_cid.to_string(),
&rev_str,
user_id
)
.execute(&mut *tx)
.await
.map_err(|e| format!("DB Error (repos): {}", e))?;
let mut all_block_cids: Vec<Vec<u8>> = blocks_cids
.iter()
.filter_map(|s| Cid::from_str(s).ok())
.map(|c| c.to_bytes())
.collect();
all_block_cids.push(new_root_cid.to_bytes());
if !all_block_cids.is_empty() {
sqlx::query!(
r#"
INSERT INTO user_blocks (user_id, block_cid)
SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
ON CONFLICT (user_id, block_cid) DO NOTHING
"#,
user_id,
&all_block_cids
)
.execute(&mut *tx)
.await
.map_err(|e| format!("DB Error (user_blocks): {}", e))?;
}
let mut upsert_collections: Vec<String> = Vec::new();
let mut upsert_rkeys: Vec<String> = Vec::new();
let mut upsert_cids: Vec<String> = Vec::new();
@@ -492,8 +513,8 @@ pub async fn sequence_sync_event(
}
pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<i64, String> {
let repo_root = sqlx::query_scalar!(
"SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
let repo_info = sqlx::query!(
"SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
did
)
.fetch_optional(&state.db)
@@ -503,17 +524,20 @@ pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result<
let ops = serde_json::json!([]);
let blobs: Vec<String> = vec![];
let blocks_cids: Vec<String> = vec![];
let prev_cid: Option<&str> = None;
let seq_row = sqlx::query!(
r#"
INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)
VALUES ($1, 'commit', $2, $2, $3, $4, $5)
INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)
VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)
RETURNING seq
"#,
did,
repo_root,
repo_info.repo_root_cid,
prev_cid,
ops,
&blobs,
&blocks_cids
&blocks_cids,
repo_info.repo_rev
)
.fetch_one(&state.db)
.await

View File

@@ -83,14 +83,38 @@ pub async fn check_account_status(
_ => None,
};
let repo_result = sqlx::query!(
"SELECT repo_root_cid FROM repos WHERE user_id = $1",
"SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1",
user_id
)
.fetch_optional(&state.db)
.await;
let repo_commit = match repo_result {
Ok(Some(row)) => row.repo_root_cid,
_ => String::new(),
let (repo_commit, repo_rev_from_db) = match repo_result {
Ok(Some(row)) => (row.repo_root_cid, row.repo_rev),
_ => (String::new(), None),
};
let block_count: i64 =
sqlx::query_scalar!("SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", user_id)
.fetch_one(&state.db)
.await
.unwrap_or(Some(0))
.unwrap_or(0);
let repo_rev = if let Some(rev) = repo_rev_from_db {
rev
} else if !repo_commit.is_empty() {
if let Ok(cid) = Cid::from_str(&repo_commit) {
if let Ok(Some(block)) = state.block_store.get(&cid).await {
Commit::from_cbor(&block)
.ok()
.map(|c| c.rev().to_string())
.unwrap_or_default()
} else {
String::new()
}
} else {
String::new()
}
} else {
String::new()
};
let record_count: i64 =
sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id)
@@ -106,15 +130,15 @@ pub async fn check_account_status(
.await
.unwrap_or(Some(0))
.unwrap_or(0);
let valid_did = did.starts_with("did:");
let valid_did = is_valid_did_for_service(&state.db, &did).await;
(
StatusCode::OK,
Json(CheckAccountStatusOutput {
activated: deactivated_at.is_none(),
valid_did,
repo_commit: repo_commit.clone(),
repo_rev: chrono::Utc::now().timestamp_millis().to_string(),
repo_blocks: 0,
repo_rev,
repo_blocks: block_count as i64,
indexed_records: record_count,
private_state_values: 0,
expected_blobs: blob_count,
@@ -124,9 +148,16 @@ pub async fn check_account_status(
.into_response()
}
async fn is_valid_did_for_service(db: &sqlx::PgPool, did: &str) -> bool {
assert_valid_did_document_for_service(db, did, false)
.await
.is_ok()
}
async fn assert_valid_did_document_for_service(
db: &sqlx::PgPool,
did: &str,
with_retry: bool,
) -> Result<(), (StatusCode, Json<serde_json::Value>)> {
let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string());
let expected_endpoint = format!("https://{}", hostname);
@@ -134,9 +165,10 @@ async fn assert_valid_did_document_for_service(
if did.starts_with("did:plc:") {
let plc_client = PlcClient::new(None);
let max_attempts = if with_retry { 5 } else { 1 };
let mut last_error = None;
let mut doc_data = None;
for attempt in 0..5 {
for attempt in 0..max_attempts {
if attempt > 0 {
let delay_ms = 500 * (1 << (attempt - 1));
info!(
@@ -196,6 +228,28 @@ async fn assert_valid_did_document_for_service(
}
};
let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok();
if let Some(ref expected_rotation_key) = server_rotation_key {
let rotation_keys = doc_data
.get("rotationKeys")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|k| k.as_str())
.collect::<Vec<_>>()
})
.unwrap_or_default();
if !rotation_keys.contains(&expected_rotation_key.as_str()) {
return Err((
StatusCode::BAD_REQUEST,
Json(json!({
"error": "InvalidRequest",
"message": "Server rotation key not included in PLC DID data"
})),
));
}
}
let doc_signing_key = doc_data
.get("verificationMethods")
.and_then(|v| v.get("atproto"))
@@ -378,7 +432,7 @@ pub async fn activate_account(
did
);
let did_validation_start = std::time::Instant::now();
if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did).await {
if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did, true).await {
info!(
"[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})",
did,
@@ -511,7 +565,7 @@ pub struct DeactivateAccountInput {
pub async fn deactivate_account(
State(state): State<AppState>,
headers: axum::http::HeaderMap,
Json(_input): Json<DeactivateAccountInput>,
Json(input): Json<DeactivateAccountInput>,
) -> Response {
let extracted = match crate::auth::extract_auth_token_from_header(
headers.get("Authorization").and_then(|h| h.to_str().ok()),
@@ -548,6 +602,12 @@ pub async fn deactivate_account(
return e;
}
let delete_after: Option<chrono::DateTime<chrono::Utc>> = input
.delete_after
.as_ref()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc));
let did = auth_user.did;
let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did)
.fetch_optional(&state.db)
@@ -555,8 +615,9 @@ pub async fn deactivate_account(
.ok()
.flatten();
let result = sqlx::query!(
"UPDATE users SET deactivated_at = NOW() WHERE did = $1",
did
"UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1",
did,
delete_after
)
.execute(&state.db)
.await;
@@ -693,6 +754,14 @@ pub async fn delete_account(
)
.into_response();
}
const OLD_PASSWORD_MAX_LENGTH: usize = 512;
if password.len() > OLD_PASSWORD_MAX_LENGTH {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})),
)
.into_response();
}
if token.is_empty() {
return (
StatusCode::BAD_REQUEST,
@@ -842,18 +911,35 @@ pub async fn delete_account(
)
.into_response();
}
if let Err(e) = crate::api::repo::record::sequence_account_event(
let account_seq = crate::api::repo::record::sequence_account_event(
&state,
did,
false,
Some("deleted"),
)
.await
{
warn!(
"Failed to sequence account deletion event for {}: {}",
did, e
);
.await;
match account_seq {
Ok(seq) => {
if let Err(e) = sqlx::query!(
"DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
did,
seq
)
.execute(&state.db)
.await
{
warn!(
"Failed to cleanup sequences for deleted account {}: {}",
did, e
);
}
}
Err(e) => {
warn!(
"Failed to sequence account deletion event for {}: {}",
did, e
);
}
}
let _ = state.cache.delete(&format!("handle:{}", handle)).await;
info!("Account {} deleted successfully", did);

View File

@@ -32,10 +32,20 @@ pub async fn describe_server() -> impl IntoResponse {
let invite_code_required = std::env::var("INVITE_CODE_REQUIRED")
.map(|v| v == "true" || v == "1")
.unwrap_or(false);
let privacy_policy = std::env::var("PRIVACY_POLICY_URL").ok();
let terms_of_service = std::env::var("TERMS_OF_SERVICE_URL").ok();
let contact_email = std::env::var("CONTACT_EMAIL").ok();
Json(json!({
"availableUserDomains": domains,
"inviteCodeRequired": invite_code_required,
"did": format!("did:web:{}", pds_hostname),
"links": {
"privacyPolicy": privacy_policy,
"termsOfService": terms_of_service
},
"contact": {
"email": contact_email
},
"version": env!("CARGO_PKG_VERSION"),
"availableCommsChannels": get_available_comms_channels()
}))

View File

@@ -612,10 +612,12 @@ pub async fn create_passkey_account(
}
};
let commit_cid_str = commit_cid.to_string();
let rev_str = rev.as_ref().to_string();
if let Err(e) = sqlx::query!(
"INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)",
"INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)",
user_id,
commit_cid_str
commit_cid_str,
rev_str
)
.execute(&mut *tx)
.await
@@ -627,6 +629,26 @@ pub async fn create_passkey_account(
)
.into_response();
}
let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()];
if let Err(e) = sqlx::query!(
r#"
INSERT INTO user_blocks (user_id, block_cid)
SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
ON CONFLICT (user_id, block_cid) DO NOTHING
"#,
user_id,
&genesis_block_cids
)
.execute(&mut *tx)
.await
{
error!("Error inserting user_blocks: {:?}", e);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": "InternalError"})),
)
.into_response();
}
if let Some(ref code) = input.invite_code {
let _ = sqlx::query!(

View File

@@ -15,6 +15,7 @@ pub mod oauth;
pub mod plc;
pub mod rate_limit;
pub mod repo;
pub mod scheduled;
pub mod state;
pub mod storage;
pub mod sync;

View File

@@ -5,6 +5,7 @@ use tokio::sync::watch;
use tracing::{error, info, warn};
use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender};
use tranquil_pds::crawlers::{Crawlers, start_crawlers_service};
use tranquil_pds::scheduled::{backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks};
use tranquil_pds::state::AppState;
#[tokio::main]
@@ -28,6 +29,13 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let backfill_db = state.db.clone();
let backfill_block_store = state.block_store.clone();
tokio::spawn(async move {
backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await;
backfill_user_blocks(&backfill_db, backfill_block_store).await;
});
let mut comms_service = CommsService::new(state.db.clone());
if let Some(email_sender) = EmailSender::from_env() {
@@ -63,13 +71,19 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
Some(tokio::spawn(start_crawlers_service(
crawlers,
firehose_rx,
shutdown_rx,
shutdown_rx.clone(),
)))
} else {
warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)");
None
};
let scheduled_handle = tokio::spawn(start_scheduled_tasks(
state.db.clone(),
state.blob_store.clone(),
shutdown_rx,
));
let app = tranquil_pds::app(state);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
info!("listening on {}", addr);
@@ -88,6 +102,8 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
handle.await.ok();
}
scheduled_handle.await.ok();
if let Err(e) = server_result {
return Err(format!("Server error: {}", e).into());
}

368
src/scheduled.rs Normal file
View File

@@ -0,0 +1,368 @@
use cid::Cid;
use jacquard_repo::commit::Commit;
use jacquard_repo::storage::BlockStore;
use ipld_core::ipld::Ipld;
use sqlx::PgPool;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time::interval;
use tracing::{debug, error, info, warn};
use crate::repo::PostgresBlockStore;
use crate::storage::BlobStorage;
pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) {
let repos_missing_rev = match sqlx::query!(
"SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL"
)
.fetch_all(db)
.await
{
Ok(rows) => rows,
Err(e) => {
error!("Failed to query repos for backfill: {}", e);
return;
}
};
if repos_missing_rev.is_empty() {
debug!("No repos need repo_rev backfill");
return;
}
info!(
count = repos_missing_rev.len(),
"Backfilling repo_rev for existing repos"
);
let mut success = 0;
let mut failed = 0;
for repo in repos_missing_rev {
let cid = match Cid::from_str(&repo.repo_root_cid) {
Ok(c) => c,
Err(_) => {
failed += 1;
continue;
}
};
let block = match block_store.get(&cid).await {
Ok(Some(b)) => b,
_ => {
failed += 1;
continue;
}
};
let commit = match Commit::from_cbor(&block) {
Ok(c) => c,
Err(_) => {
failed += 1;
continue;
}
};
let rev = commit.rev().to_string();
if let Err(e) = sqlx::query!(
"UPDATE repos SET repo_rev = $1 WHERE user_id = $2",
rev,
repo.user_id
)
.execute(db)
.await
{
warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev");
failed += 1;
} else {
success += 1;
}
}
info!(success, failed, "Completed repo_rev backfill");
}
pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) {
let users_without_blocks = match sqlx::query!(
r#"
SELECT u.id as user_id, r.repo_root_cid
FROM users u
JOIN repos r ON r.user_id = u.id
WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)
"#
)
.fetch_all(db)
.await
{
Ok(rows) => rows,
Err(e) => {
error!("Failed to query users for user_blocks backfill: {}", e);
return;
}
};
if users_without_blocks.is_empty() {
debug!("No users need user_blocks backfill");
return;
}
info!(
count = users_without_blocks.len(),
"Backfilling user_blocks for existing repos"
);
let mut success = 0;
let mut failed = 0;
for user in users_without_blocks {
let root_cid = match Cid::from_str(&user.repo_root_cid) {
Ok(c) => c,
Err(_) => {
failed += 1;
continue;
}
};
let mut block_cids: Vec<Vec<u8>> = Vec::new();
let mut to_visit = vec![root_cid];
let mut visited = std::collections::HashSet::new();
while let Some(cid) = to_visit.pop() {
if visited.contains(&cid) {
continue;
}
visited.insert(cid);
block_cids.push(cid.to_bytes());
let block = match block_store.get(&cid).await {
Ok(Some(b)) => b,
_ => continue,
};
if let Ok(commit) = Commit::from_cbor(&block) {
to_visit.push(commit.data);
if let Some(prev) = commit.prev {
to_visit.push(prev);
}
} else if let Ok(ipld) = serde_ipld_dagcbor::from_slice::<Ipld>(&block) {
if let Ipld::Map(ref obj) = ipld {
if let Some(Ipld::Link(left_cid)) = obj.get("l") {
to_visit.push(*left_cid);
}
if let Some(Ipld::List(entries)) = obj.get("e") {
for entry in entries {
if let Ipld::Map(entry_obj) = entry {
if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") {
to_visit.push(*tree_cid);
}
if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") {
to_visit.push(*val_cid);
}
}
}
}
}
}
}
if block_cids.is_empty() {
failed += 1;
continue;
}
if let Err(e) = sqlx::query!(
r#"
INSERT INTO user_blocks (user_id, block_cid)
SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)
ON CONFLICT (user_id, block_cid) DO NOTHING
"#,
user.user_id,
&block_cids
)
.execute(db)
.await
{
warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks");
failed += 1;
} else {
info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks");
success += 1;
}
}
info!(success, failed, "Completed user_blocks backfill");
}
pub async fn start_scheduled_tasks(
db: PgPool,
blob_store: Arc<dyn BlobStorage>,
mut shutdown_rx: watch::Receiver<bool>,
) {
let check_interval = Duration::from_secs(
std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3600),
);
info!(
check_interval_secs = check_interval.as_secs(),
"Starting scheduled tasks service"
);
let mut ticker = interval(check_interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = shutdown_rx.changed() => {
if *shutdown_rx.borrow() {
info!("Scheduled tasks service shutting down");
break;
}
}
_ = ticker.tick() => {
if let Err(e) = process_scheduled_deletions(&db, &blob_store).await {
error!("Error processing scheduled deletions: {}", e);
}
}
}
}
}
async fn process_scheduled_deletions(
db: &PgPool,
blob_store: &Arc<dyn BlobStorage>,
) -> Result<(), String> {
let accounts_to_delete = sqlx::query!(
r#"
SELECT did, handle
FROM users
WHERE delete_after IS NOT NULL
AND delete_after < NOW()
AND deactivated_at IS NOT NULL
LIMIT 100
"#
)
.fetch_all(db)
.await
.map_err(|e| format!("DB error fetching accounts to delete: {}", e))?;
if accounts_to_delete.is_empty() {
debug!("No accounts scheduled for deletion");
return Ok(());
}
info!(
count = accounts_to_delete.len(),
"Processing scheduled account deletions"
);
for account in accounts_to_delete {
if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await {
warn!(
did = %account.did,
handle = %account.handle,
error = %e,
"Failed to delete scheduled account"
);
} else {
info!(
did = %account.did,
handle = %account.handle,
"Successfully deleted scheduled account"
);
}
}
Ok(())
}
async fn delete_account_data(
db: &PgPool,
blob_store: &Arc<dyn BlobStorage>,
did: &str,
_handle: &str,
) -> Result<(), String> {
let user_id: uuid::Uuid = sqlx::query_scalar!(
"SELECT id FROM users WHERE did = $1",
did
)
.fetch_one(db)
.await
.map_err(|e| format!("DB error fetching user: {}", e))?;
let blob_storage_keys: Vec<String> = sqlx::query_scalar!(
r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#,
user_id
)
.fetch_all(db)
.await
.map_err(|e| format!("DB error fetching blob keys: {}", e))?;
for storage_key in &blob_storage_keys {
if let Err(e) = blob_store.delete(storage_key).await {
warn!(
storage_key = %storage_key,
error = %e,
"Failed to delete blob from storage (continuing anyway)"
);
}
}
let mut tx = db
.begin()
.await
.map_err(|e| format!("Failed to begin transaction: {}", e))?;
sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id)
.execute(&mut *tx)
.await
.map_err(|e| format!("Failed to delete blobs: {}", e))?;
sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
.execute(&mut *tx)
.await
.map_err(|e| format!("Failed to delete user: {}", e))?;
let account_seq = sqlx::query_scalar!(
r#"
INSERT INTO repo_seq (did, event_type, active, status)
VALUES ($1, 'account', false, 'deleted')
RETURNING seq
"#,
did
)
.fetch_one(&mut *tx)
.await
.map_err(|e| format!("Failed to sequence account deletion: {}", e))?;
sqlx::query!(
"DELETE FROM repo_seq WHERE did = $1 AND seq != $2",
did,
account_seq
)
.execute(&mut *tx)
.await
.map_err(|e| format!("Failed to cleanup sequences: {}", e))?;
tx.commit()
.await
.map_err(|e| format!("Failed to commit transaction: {}", e))?;
sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq))
.execute(db)
.await
.map_err(|e| format!("Failed to notify: {}", e))?;
info!(
did = %did,
blob_count = blob_storage_keys.len(),
"Deleted account data including blobs from storage"
);
Ok(())
}

View File

@@ -101,6 +101,7 @@ pub struct CommitFrameBuilder {
pub ops_json: serde_json::Value,
pub blobs: Vec<String>,
pub time: chrono::DateTime<chrono::Utc>,
pub rev: Option<String>,
}
impl CommitFrameBuilder {
@@ -122,7 +123,8 @@ impl CommitFrameBuilder {
.iter()
.filter_map(|s| Cid::from_str(s).ok())
.collect();
let rev = placeholder_rev();
let rev = self.rev.unwrap_or_else(placeholder_rev);
let since = self.prev_cid_str.as_ref().map(|_| rev.clone());
Ok(CommitFrame {
seq: self.seq,
rebase: false,
@@ -130,7 +132,7 @@ impl CommitFrameBuilder {
repo: self.did,
commit: commit_cid,
rev,
since: self.prev_cid_str.as_ref().map(|_| placeholder_rev()),
since,
blocks: Vec::new(),
ops,
blobs,
@@ -161,6 +163,7 @@ impl TryFrom<SequencedEvent> for CommitFrame {
ops_json: event.ops.unwrap_or_default(),
blobs: event.blobs.unwrap_or_default(),
time: event.created_at,
rev: event.rev,
};
builder.build()
}

279
tests/account_lifecycle.rs Normal file
View File

@@ -0,0 +1,279 @@
mod common;
mod helpers;
use common::*;
use reqwest::StatusCode;
use serde_json::{Value, json};
#[tokio::test]
async fn test_check_account_status_returns_correct_block_count() {
let client = client();
let base = base_url().await;
let (access_jwt, did) = create_account_and_login(&client).await;
let status1 = client
.get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base))
.bearer_auth(&access_jwt)
.send()
.await
.unwrap();
assert_eq!(status1.status(), StatusCode::OK);
let body1: Value = status1.json().await.unwrap();
let initial_blocks = body1["repoBlocks"].as_i64().unwrap();
assert!(initial_blocks >= 2, "New account should have at least 2 blocks (commit + empty MST)");
let create_res = client
.post(format!("{}/xrpc/com.atproto.repo.createRecord", base))
.bearer_auth(&access_jwt)
.json(&json!({
"repo": did,
"collection": "app.bsky.feed.post",
"record": {
"$type": "app.bsky.feed.post",
"text": "Test post for block counting",
"createdAt": chrono::Utc::now().to_rfc3339()
}
}))
.send()
.await
.unwrap();
assert_eq!(create_res.status(), StatusCode::OK);
let create_body: Value = create_res.json().await.unwrap();
let rkey = create_body["uri"].as_str().unwrap().split('/').last().unwrap().to_string();
let status2 = client
.get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base))
.bearer_auth(&access_jwt)
.send()
.await
.unwrap();
let body2: Value = status2.json().await.unwrap();
let after_create_blocks = body2["repoBlocks"].as_i64().unwrap();
assert!(after_create_blocks > initial_blocks, "Block count should increase after creating a record");
let delete_res = client
.post(format!("{}/xrpc/com.atproto.repo.deleteRecord", base))
.bearer_auth(&access_jwt)
.json(&json!({
"repo": did,
"collection": "app.bsky.feed.post",
"rkey": rkey
}))
.send()
.await
.unwrap();
assert_eq!(delete_res.status(), StatusCode::OK);
let status3 = client
.get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base))
.bearer_auth(&access_jwt)
.send()
.await
.unwrap();
let body3: Value = status3.json().await.unwrap();
let after_delete_blocks = body3["repoBlocks"].as_i64().unwrap();
assert!(
after_delete_blocks >= after_create_blocks,
"Block count should not decrease after deleting a record (was {}, now {})",
after_create_blocks,
after_delete_blocks
);
}
#[tokio::test]
async fn test_check_account_status_returns_valid_repo_rev() {
let client = client();
let base = base_url().await;
let (access_jwt, _) = create_account_and_login(&client).await;
let status = client
.get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base))
.bearer_auth(&access_jwt)
.send()
.await
.unwrap();
assert_eq!(status.status(), StatusCode::OK);
let body: Value = status.json().await.unwrap();
let repo_rev = body["repoRev"].as_str().unwrap();
assert!(!repo_rev.is_empty(), "repoRev should not be empty");
assert!(repo_rev.chars().all(|c| c.is_alphanumeric()), "repoRev should be alphanumeric TID");
}
#[tokio::test]
async fn test_check_account_status_valid_did_is_true_for_active_account() {
let client = client();
let base = base_url().await;
let (access_jwt, _) = create_account_and_login(&client).await;
let status = client
.get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base))
.bearer_auth(&access_jwt)
.send()
.await
.unwrap();
assert_eq!(status.status(), StatusCode::OK);
let body: Value = status.json().await.unwrap();
assert_eq!(body["validDid"], true, "validDid should be true for active account with correct DID document");
assert_eq!(body["activated"], true, "activated should be true for active account");
}
#[tokio::test]
async fn test_deactivate_account_with_delete_after() {
let client = client();
let base = base_url().await;
let (access_jwt, _) = create_account_and_login(&client).await;
let future_time = chrono::Utc::now() + chrono::Duration::hours(24);
let delete_after = future_time.to_rfc3339();
let deactivate = client
.post(format!("{}/xrpc/com.atproto.server.deactivateAccount", base))
.bearer_auth(&access_jwt)
.json(&json!({
"deleteAfter": delete_after
}))
.send()
.await
.unwrap();
assert_eq!(deactivate.status(), StatusCode::OK);
let status = client
.get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base))
.bearer_auth(&access_jwt)
.send()
.await
.unwrap();
assert_eq!(status.status(), StatusCode::OK);
let body: Value = status.json().await.unwrap();
assert_eq!(body["activated"], false, "Account should be deactivated");
}
#[tokio::test]
async fn test_create_account_returns_did_doc() {
let client = client();
let base = base_url().await;
let handle = format!("diddoctest-{}", uuid::Uuid::new_v4());
let payload = json!({
"handle": handle,
"email": format!("{}@example.com", handle),
"password": "Testpass123!"
});
let create_res = client
.post(format!("{}/xrpc/com.atproto.server.createAccount", base))
.json(&payload)
.send()
.await
.unwrap();
assert_eq!(create_res.status(), StatusCode::OK);
let body: Value = create_res.json().await.unwrap();
assert!(body["accessJwt"].is_string(), "accessJwt should always be returned");
assert!(body["refreshJwt"].is_string(), "refreshJwt should always be returned");
assert!(body["did"].is_string(), "did should be returned");
if body["didDoc"].is_object() {
let did_doc = &body["didDoc"];
assert!(did_doc["id"].is_string(), "didDoc should have id field");
}
}
#[tokio::test]
async fn test_create_account_always_returns_tokens() {
let client = client();
let base = base_url().await;
let handle = format!("tokentest-{}", uuid::Uuid::new_v4());
let payload = json!({
"handle": handle,
"email": format!("{}@example.com", handle),
"password": "Testpass123!"
});
let create_res = client
.post(format!("{}/xrpc/com.atproto.server.createAccount", base))
.json(&payload)
.send()
.await
.unwrap();
assert_eq!(create_res.status(), StatusCode::OK);
let body: Value = create_res.json().await.unwrap();
let access_jwt = body["accessJwt"].as_str().expect("accessJwt should be present");
let refresh_jwt = body["refreshJwt"].as_str().expect("refreshJwt should be present");
assert!(!access_jwt.is_empty(), "accessJwt should not be empty");
assert!(!refresh_jwt.is_empty(), "refreshJwt should not be empty");
let parts: Vec<&str> = access_jwt.split('.').collect();
assert_eq!(parts.len(), 3, "accessJwt should be a valid JWT with 3 parts");
}
#[tokio::test]
async fn test_describe_server_has_links_and_contact() {
let client = client();
let base = base_url().await;
let describe = client
.get(format!("{}/xrpc/com.atproto.server.describeServer", base))
.send()
.await
.unwrap();
assert_eq!(describe.status(), StatusCode::OK);
let body: Value = describe.json().await.unwrap();
assert!(body.get("links").is_some(), "describeServer should include links object");
assert!(body.get("contact").is_some(), "describeServer should include contact object");
let links = &body["links"];
assert!(links.get("privacyPolicy").is_some() || links["privacyPolicy"].is_null(),
"links should have privacyPolicy field (can be null)");
assert!(links.get("termsOfService").is_some() || links["termsOfService"].is_null(),
"links should have termsOfService field (can be null)");
let contact = &body["contact"];
assert!(contact.get("email").is_some() || contact["email"].is_null(),
"contact should have email field (can be null)");
}
#[tokio::test]
async fn test_delete_account_password_max_length() {
let client = client();
let base = base_url().await;
let handle = format!("pwdlentest-{}", uuid::Uuid::new_v4());
let payload = json!({
"handle": handle,
"email": format!("{}@example.com", handle),
"password": "Testpass123!"
});
let create_res = client
.post(format!("{}/xrpc/com.atproto.server.createAccount", base))
.json(&payload)
.send()
.await
.unwrap();
assert_eq!(create_res.status(), StatusCode::OK);
let body: Value = create_res.json().await.unwrap();
let did = body["did"].as_str().unwrap();
let too_long_password = "a".repeat(600);
let delete_res = client
.post(format!("{}/xrpc/com.atproto.server.deleteAccount", base))
.json(&json!({
"did": did,
"password": too_long_password,
"token": "fake-token"
}))
.send()
.await
.unwrap();
assert_eq!(delete_res.status(), StatusCode::BAD_REQUEST);
let error_body: Value = delete_res.json().await.unwrap();
assert!(error_body["message"].as_str().unwrap().contains("password length")
|| error_body["error"].as_str().unwrap() == "InvalidRequest");
}

View File

@@ -466,8 +466,11 @@ async fn create_account_and_login_internal(client: &Client, make_admin: bool) ->
.await
.expect("Failed to mark user as admin");
}
let verification_required = body["verificationRequired"].as_bool().unwrap_or(true);
if let Some(access_jwt) = body["accessJwt"].as_str() {
return (access_jwt.to_string(), did);
if !verification_required {
return (access_jwt.to_string(), did);
}
}
let body_text: String = sqlx::query_scalar!(
"SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1",

View File

@@ -8,6 +8,7 @@ use serde_json::Value;
async fn test_list_blobs_success() {
let client = client();
let (access_jwt, did) = create_account_and_login(&client).await;
let unique_content = format!("test blob content {}", uuid::Uuid::new_v4());
let blob_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.uploadBlob",
@@ -15,7 +16,7 @@ async fn test_list_blobs_success() {
))
.header(header::CONTENT_TYPE, "text/plain")
.bearer_auth(&access_jwt)
.body("test blob content")
.body(unique_content)
.send()
.await
.expect("Failed to upload blob");

View File

@@ -162,7 +162,7 @@ async fn test_list_repos_shows_status_field() {
let res = client
.get(format!(
"{}/xrpc/com.atproto.sync.listRepos",
"{}/xrpc/com.atproto.sync.listRepos?limit=1000",
base_url().await
))
.send()

View File

@@ -552,7 +552,8 @@ async fn test_sync_repo_export_lifecycle() {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
create_post(&client, &did, &jwt, &format!("Export test post {}", i)).await;
}
let blob_data = b"blob data for sync export test";
let blob_data = format!("blob data for sync export test {}", uuid::Uuid::new_v4());
let blob_bytes = blob_data.as_bytes().to_vec();
let upload_res = client
.post(format!(
"{}/xrpc/com.atproto.repo.uploadBlob",
@@ -560,7 +561,7 @@ async fn test_sync_repo_export_lifecycle() {
))
.header(header::CONTENT_TYPE, "application/octet-stream")
.bearer_auth(&jwt)
.body(blob_data.to_vec())
.body(blob_bytes.clone())
.send()
.await
.expect("Failed to upload blob");
@@ -631,7 +632,7 @@ async fn test_sync_repo_export_lifecycle() {
let retrieved_blob = get_blob_res.bytes().await.unwrap();
assert_eq!(
retrieved_blob.as_ref(),
blob_data,
blob_bytes.as_slice(),
"Retrieved blob should match uploaded data"
);
let latest_commit_res = client