diff --git a/.sqlx/query-7c914c71e0340325e99495a1867fea9c814b056bd978c67a0eab55ed61278197.json b/.sqlx/query-7c914c71e0340325e99495a1867fea9c814b056bd978c67a0eab55ed61278197.json new file mode 100644 index 0000000..20d935a --- /dev/null +++ b/.sqlx/query-7c914c71e0340325e99495a1867fea9c814b056bd978c67a0eab55ed61278197.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO record_blobs (repo_id, record_uri, blob_cid)\n SELECT $1, record_uri, blob_cid\n FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid)\n ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "TextArray", + "TextArray" + ] + }, + "nullable": [] + }, + "hash": "7c914c71e0340325e99495a1867fea9c814b056bd978c67a0eab55ed61278197" +} diff --git a/.sqlx/query-a97815493ba7b9b20f6759e2e96a9000473ec5e85d865325500d2e193d5dcf8c.json b/.sqlx/query-a97815493ba7b9b20f6759e2e96a9000473ec5e85d865325500d2e193d5dcf8c.json new file mode 100644 index 0000000..ee8bf45 --- /dev/null +++ b/.sqlx/query-a97815493ba7b9b20f6759e2e96a9000473ec5e85d865325500d2e193d5dcf8c.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes)\n VALUES ($1, $2, $3, $4, $5, $6)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Text", + "Text", + "Int4", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "a97815493ba7b9b20f6759e2e96a9000473ec5e85d865325500d2e193d5dcf8c" +} diff --git a/crates/tranquil-pds/src/api/actor/preferences.rs b/crates/tranquil-pds/src/api/actor/preferences.rs index 3737706..44d9d17 100644 --- a/crates/tranquil-pds/src/api/actor/preferences.rs +++ b/crates/tranquil-pds/src/api/actor/preferences.rs @@ -127,35 +127,67 @@ pub async fn put_preferences( )) .into_response(); } - let mut forbidden_prefs: Vec = Vec::new(); - for pref in &input.preferences { - let pref_str = serde_json::to_string(pref).unwrap_or_default(); - if pref_str.len() > MAX_PREFERENCE_SIZE { - return ApiError::InvalidRequest(format!( - "Preference too large: {} bytes exceeds limit of {}", - pref_str.len(), - MAX_PREFERENCE_SIZE - )) - .into_response(); - } - let pref_type = match pref.get("$type").and_then(|t| t.as_str()) { - Some(t) => t, - None => { - return ApiError::InvalidRequest("Preference is missing a $type".into()) - .into_response(); + enum PrefValidation { + Ok(Option), + TooLarge(usize), + MissingType, + WrongNamespace, + } + + let validation_results: Vec = input + .preferences + .iter() + .map(|pref| { + let pref_str = serde_json::to_string(pref).unwrap_or_default(); + if pref_str.len() > MAX_PREFERENCE_SIZE { + return PrefValidation::TooLarge(pref_str.len()); } - }; - if !pref_type.starts_with(APP_BSKY_NAMESPACE) { - return ApiError::InvalidRequest(format!( + let pref_type = match pref.get("$type").and_then(|t| t.as_str()) { + Some(t) => t, + None => return PrefValidation::MissingType, + }; + if !pref_type.starts_with(APP_BSKY_NAMESPACE) { + return PrefValidation::WrongNamespace; + } + if pref_type == PERSONAL_DETAILS_PREF && !has_full_access { + PrefValidation::Ok(Some(pref_type.to_string())) + } else { + PrefValidation::Ok(None) + } + }) + .collect(); + + if let Some(err) = validation_results.iter().find_map(|v| match v { + PrefValidation::TooLarge(size) => Some( + ApiError::InvalidRequest(format!( + "Preference too large: {} bytes exceeds limit of {}", + size, MAX_PREFERENCE_SIZE + )) + .into_response(), + ), + PrefValidation::MissingType => Some( + ApiError::InvalidRequest("Preference is missing a $type".into()).into_response(), + ), + PrefValidation::WrongNamespace => Some( + ApiError::InvalidRequest(format!( "Some preferences are not in the {} namespace", APP_BSKY_NAMESPACE )) - .into_response(); - } - if pref_type == PERSONAL_DETAILS_PREF && !has_full_access { - forbidden_prefs.push(pref_type.to_string()); - } + .into_response(), + ), + PrefValidation::Ok(_) => None, + }) { + return err; } + + let forbidden_prefs: Vec = validation_results + .into_iter() + .filter_map(|v| match v { + PrefValidation::Ok(Some(s)) => Some(s), + _ => None, + }) + .collect(); + if !forbidden_prefs.is_empty() { return ApiError::InvalidRequest(format!( "Do not have authorization to set preferences: {}", diff --git a/crates/tranquil-pds/src/api/repo/record/batch.rs b/crates/tranquil-pds/src/api/repo/record/batch.rs index 05960ec..95a62a8 100644 --- a/crates/tranquil-pds/src/api/repo/record/batch.rs +++ b/crates/tranquil-pds/src/api/repo/record/batch.rs @@ -343,35 +343,33 @@ pub async fn apply_writes( }) .collect(); - for collection in create_collections { - if let Err(e) = crate::auth::scope_check::check_repo_scope( - is_oauth, - scope.as_deref(), - crate::oauth::RepoAction::Create, - collection, - ) { - return e; - } - } - for collection in update_collections { - if let Err(e) = crate::auth::scope_check::check_repo_scope( - is_oauth, - scope.as_deref(), - crate::oauth::RepoAction::Update, - collection, - ) { - return e; - } - } - for collection in delete_collections { - if let Err(e) = crate::auth::scope_check::check_repo_scope( - is_oauth, - scope.as_deref(), - crate::oauth::RepoAction::Delete, - collection, - ) { - return e; - } + let scope_checks = create_collections + .iter() + .map(|c| (crate::oauth::RepoAction::Create, c)) + .chain( + update_collections + .iter() + .map(|c| (crate::oauth::RepoAction::Update, c)), + ) + .chain( + delete_collections + .iter() + .map(|c| (crate::oauth::RepoAction::Delete, c)), + ); + + if let Some(err) = scope_checks + .filter_map(|(action, collection)| { + crate::auth::scope_check::check_repo_scope( + is_oauth, + scope.as_deref(), + action, + collection, + ) + .err() + }) + .next() + { + return err; } } @@ -439,22 +437,29 @@ pub async fn apply_writes( return ApiError::InternalError(Some("Failed to persist MST".into())).into_response(); } }; - let mut new_mst_blocks = std::collections::BTreeMap::new(); - let mut old_mst_blocks = std::collections::BTreeMap::new(); - for key in &modified_keys { - if mst.blocks_for_path(key, &mut new_mst_blocks).await.is_err() { - return ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) + let (new_mst_blocks, old_mst_blocks) = { + let mut new_blocks = std::collections::BTreeMap::new(); + let mut old_blocks = std::collections::BTreeMap::new(); + for key in &modified_keys { + if mst.blocks_for_path(key, &mut new_blocks).await.is_err() { + return ApiError::InternalError(Some( + "Failed to get new MST blocks for path".into(), + )) .into_response(); - } - if original_mst - .blocks_for_path(key, &mut old_mst_blocks) - .await - .is_err() - { - return ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) + } + if original_mst + .blocks_for_path(key, &mut old_blocks) + .await + .is_err() + { + return ApiError::InternalError(Some( + "Failed to get old MST blocks for path".into(), + )) .into_response(); + } } - } + (new_blocks, old_blocks) + }; let mut relevant_blocks = new_mst_blocks.clone(); relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); let written_cids: Vec = tracking_store diff --git a/crates/tranquil-pds/src/api/repo/record/utils.rs b/crates/tranquil-pds/src/api/repo/record/utils.rs index 7b2f927..68f83e5 100644 --- a/crates/tranquil-pds/src/api/repo/record/utils.rs +++ b/crates/tranquil-pds/src/api/repo/record/utils.rs @@ -25,14 +25,12 @@ fn extract_blob_cids_recursive(value: &Value, blobs: &mut Vec) { { blobs.push(link.to_string()); } - for v in map.values() { - extract_blob_cids_recursive(v, blobs); - } + map.values() + .for_each(|v| extract_blob_cids_recursive(v, blobs)); } Value::Array(arr) => { - for v in arr { - extract_blob_cids_recursive(v, blobs); - } + arr.iter() + .for_each(|v| extract_blob_cids_recursive(v, blobs)); } _ => {} } diff --git a/crates/tranquil-pds/src/appview/mod.rs b/crates/tranquil-pds/src/appview/mod.rs index 918c9c5..750d8b2 100644 --- a/crates/tranquil-pds/src/appview/mod.rs +++ b/crates/tranquil-pds/src/appview/mod.rs @@ -231,25 +231,26 @@ impl DidResolver { } fn extract_service_endpoint(&self, doc: &DidDocument) -> Option { - for service in &doc.service { - if service.service_type == "AtprotoAppView" - || service.id.contains("atproto_appview") - || service.id.ends_with("#bsky_appview") - { - return Some(ResolvedService { - url: service.service_endpoint.clone(), - did: doc.id.clone(), - }); - } + if let Some(service) = doc.service.iter().find(|s| { + s.service_type == "AtprotoAppView" + || s.id.contains("atproto_appview") + || s.id.ends_with("#bsky_appview") + }) { + return Some(ResolvedService { + url: service.service_endpoint.clone(), + did: doc.id.clone(), + }); } - for service in &doc.service { - if service.service_type.contains("AppView") || service.id.contains("appview") { - return Some(ResolvedService { - url: service.service_endpoint.clone(), - did: doc.id.clone(), - }); - } + if let Some(service) = doc + .service + .iter() + .find(|s| s.service_type.contains("AppView") || s.id.contains("appview")) + { + return Some(ResolvedService { + url: service.service_endpoint.clone(), + did: doc.id.clone(), + }); } if let Some(service) = doc.service.first() diff --git a/crates/tranquil-pds/src/circuit_breaker.rs b/crates/tranquil-pds/src/circuit_breaker.rs index bece00c..55aafd9 100644 --- a/crates/tranquil-pds/src/circuit_breaker.rs +++ b/crates/tranquil-pds/src/circuit_breaker.rs @@ -265,9 +265,7 @@ mod tests { async fn test_circuit_breaker_half_open_closes_after_successes() { let cb = CircuitBreaker::new("test", 3, 2, 0); - for _ in 0..3 { - cb.record_failure().await; - } + futures::future::join_all((0..3).map(|_| cb.record_failure())).await; assert_eq!(cb.state().await, CircuitState::Open); tokio::time::sleep(Duration::from_millis(100)).await; @@ -285,9 +283,7 @@ mod tests { async fn test_circuit_breaker_half_open_reopens_on_failure() { let cb = CircuitBreaker::new("test", 3, 2, 0); - for _ in 0..3 { - cb.record_failure().await; - } + futures::future::join_all((0..3).map(|_| cb.record_failure())).await; tokio::time::sleep(Duration::from_millis(100)).await; cb.can_execute().await; diff --git a/crates/tranquil-pds/src/comms/service.rs b/crates/tranquil-pds/src/comms/service.rs index 57bf4d5..2513868 100644 --- a/crates/tranquil-pds/src/comms/service.rs +++ b/crates/tranquil-pds/src/comms/service.rs @@ -115,9 +115,7 @@ impl CommsService { return Ok(()); } debug!(count = items.len(), "Processing comms batch"); - for item in items { - self.process_item(item).await; - } + futures::future::join_all(items.into_iter().map(|item| self.process_item(item))).await; Ok(()) } diff --git a/crates/tranquil-pds/src/crawlers.rs b/crates/tranquil-pds/src/crawlers.rs index 3616e8a..499835f 100644 --- a/crates/tranquil-pds/src/crawlers.rs +++ b/crates/tranquil-pds/src/crawlers.rs @@ -91,7 +91,7 @@ impl Crawlers { self.mark_notified(); let circuit_breaker = self.circuit_breaker.clone(); - for crawler_url in &self.crawler_urls { + self.crawler_urls.iter().for_each(|crawler_url| { let url = format!( "{}/xrpc/com.atproto.sync.requestCrawl", crawler_url.trim_end_matches('/') @@ -136,7 +136,7 @@ impl Crawlers { } } }); - } + }); } } diff --git a/crates/tranquil-pds/src/delegation/scopes.rs b/crates/tranquil-pds/src/delegation/scopes.rs index 4064f97..c7905a8 100644 --- a/crates/tranquil-pds/src/delegation/scopes.rs +++ b/crates/tranquil-pds/src/delegation/scopes.rs @@ -57,18 +57,16 @@ pub fn intersect_scopes(requested: &str, granted: &str) -> String { return granted_set.into_iter().collect::>().join(" "); } - let mut result: Vec<&str> = Vec::new(); - - for requested_scope in &requested_set { - if granted_set.contains(requested_scope) { - result.push(requested_scope); - continue; - } - - if let Some(match_result) = find_matching_scope(requested_scope, &granted_set) { - result.push(match_result); - } - } + let mut result: Vec<&str> = requested_set + .iter() + .filter_map(|requested_scope| { + if granted_set.contains(requested_scope) { + Some(*requested_scope) + } else { + find_matching_scope(requested_scope, &granted_set) + } + }) + .collect(); result.sort(); result.join(" ") @@ -118,19 +116,20 @@ pub fn validate_delegation_scopes(scopes: &str) -> Result<(), String> { return Ok(()); } - for scope in scopes.split_whitespace() { - let (base, _) = split_scope(scope); - - if !is_valid_scope_prefix(base) { - return Err(format!("Invalid scope: {}", scope)); - } - } - - Ok(()) + scopes + .split_whitespace() + .try_for_each(|scope| { + let (base, _) = split_scope(scope); + if is_valid_scope_prefix(base) { + Ok(()) + } else { + Err(format!("Invalid scope: {}", scope)) + } + }) } fn is_valid_scope_prefix(base: &str) -> bool { - let valid_prefixes = [ + const VALID_PREFIXES: [&str; 7] = [ "atproto", "repo:", "blob:", @@ -140,13 +139,9 @@ fn is_valid_scope_prefix(base: &str) -> bool { "transition:", ]; - for prefix in valid_prefixes { - if base == prefix.trim_end_matches(':') || base.starts_with(prefix) { - return true; - } - } - - false + VALID_PREFIXES + .iter() + .any(|prefix| base == prefix.trim_end_matches(':') || base.starts_with(prefix)) } #[cfg(test)] diff --git a/crates/tranquil-pds/src/handle/mod.rs b/crates/tranquil-pds/src/handle/mod.rs index 4e0bd53..0ab1e96 100644 --- a/crates/tranquil-pds/src/handle/mod.rs +++ b/crates/tranquil-pds/src/handle/mod.rs @@ -25,18 +25,17 @@ pub async fn resolve_handle_dns(handle: &str) -> Result Result { @@ -95,15 +94,9 @@ pub fn is_service_domain_handle(handle: &str, hostname: &str) -> bool { let service_domains: Vec = std::env::var("PDS_SERVICE_HANDLE_DOMAINS") .map(|s| s.split(',').map(|d| d.trim().to_string()).collect()) .unwrap_or_else(|_| vec![hostname.to_string()]); - for domain in service_domains { - if handle.ends_with(&format!(".{}", domain)) { - return true; - } - if handle == domain { - return true; - } - } - false + service_domains + .iter() + .any(|domain| handle.ends_with(&format!(".{}", domain)) || handle == domain) } #[cfg(test)] diff --git a/crates/tranquil-pds/src/handle/reserved.rs b/crates/tranquil-pds/src/handle/reserved.rs index 74f5676..2497213 100644 --- a/crates/tranquil-pds/src/handle/reserved.rs +++ b/crates/tranquil-pds/src/handle/reserved.rs @@ -1029,19 +1029,12 @@ const FAMOUS_ACCOUNTS: &[&str] = &[ ]; pub static RESERVED_SUBDOMAINS: LazyLock> = LazyLock::new(|| { - let mut set = HashSet::with_capacity( - ATP_SPECIFIC.len() + COMMONLY_RESERVED.len() + FAMOUS_ACCOUNTS.len(), - ); - for s in ATP_SPECIFIC { - set.insert(*s); - } - for s in COMMONLY_RESERVED { - set.insert(*s); - } - for s in FAMOUS_ACCOUNTS { - set.insert(*s); - } - set + ATP_SPECIFIC + .iter() + .chain(COMMONLY_RESERVED.iter()) + .chain(FAMOUS_ACCOUNTS.iter()) + .copied() + .collect() }); pub fn is_reserved_subdomain(subdomain: &str) -> bool { diff --git a/crates/tranquil-pds/src/oauth/endpoints/authorize.rs b/crates/tranquil-pds/src/oauth/endpoints/authorize.rs index e1858b7..394449d 100644 --- a/crates/tranquil-pds/src/oauth/endpoints/authorize.rs +++ b/crates/tranquil-pds/src/oauth/endpoints/authorize.rs @@ -1352,41 +1352,43 @@ pub async fn consent_get( ) .await .unwrap_or(true); - let mut scopes = Vec::new(); - for scope in &requested_scopes { - let (category, required, description, display_name) = - if let Some(def) = crate::oauth::scopes::SCOPE_DEFINITIONS.get(*scope) { - ( - def.category.display_name().to_string(), - def.required, - def.description.to_string(), - def.display_name.to_string(), - ) - } else if scope.starts_with("ref:") { - ( - "Reference".to_string(), - false, - "Referenced scope".to_string(), - scope.to_string(), - ) - } else { - ( - "Other".to_string(), - false, - format!("Access to {}", scope), - scope.to_string(), - ) - }; - let granted = pref_map.get(*scope).copied(); - scopes.push(ScopeInfo { - scope: scope.to_string(), - category, - required, - description, - display_name, - granted, - }); - } + let scopes: Vec = requested_scopes + .iter() + .map(|scope| { + let (category, required, description, display_name) = + if let Some(def) = crate::oauth::scopes::SCOPE_DEFINITIONS.get(*scope) { + ( + def.category.display_name().to_string(), + def.required, + def.description.to_string(), + def.display_name.to_string(), + ) + } else if scope.starts_with("ref:") { + ( + "Reference".to_string(), + false, + "Referenced scope".to_string(), + scope.to_string(), + ) + } else { + ( + "Other".to_string(), + false, + format!("Access to {}", scope), + scope.to_string(), + ) + }; + let granted = pref_map.get(*scope).copied(); + ScopeInfo { + scope: scope.to_string(), + category, + required, + description, + display_name, + granted, + } + }) + .collect(); let (is_delegation, controller_did, controller_handle, delegation_level) = if let Some(ref ctrl_did) = request_data.controller_did { let ctrl_handle = diff --git a/crates/tranquil-pds/src/plc/mod.rs b/crates/tranquil-pds/src/plc/mod.rs index 389dd5e..2e26912 100644 --- a/crates/tranquil-pds/src/plc/mod.rs +++ b/crates/tranquil-pds/src/plc/mod.rs @@ -526,12 +526,12 @@ pub fn verify_operation_signature(op: &Value, rotation_keys: &[String]) -> Resul } let cbor_bytes = serde_ipld_dagcbor::to_vec(&unsigned_op) .map_err(|e| PlcError::Serialization(e.to_string()))?; - for key_did in rotation_keys { - if let Ok(true) = verify_signature_with_did_key(key_did, &cbor_bytes, &signature) { - return Ok(true); - } - } - Ok(false) + let verified = rotation_keys + .iter() + .any(|key_did| { + verify_signature_with_did_key(key_did, &cbor_bytes, &signature).unwrap_or(false) + }); + Ok(verified) } fn verify_signature_with_did_key( diff --git a/crates/tranquil-pds/src/scheduled.rs b/crates/tranquil-pds/src/scheduled.rs index 4de0e16..a35d328 100644 --- a/crates/tranquil-pds/src/scheduled.rs +++ b/crates/tranquil-pds/src/scheduled.rs @@ -14,6 +14,150 @@ use crate::repo::PostgresBlockStore; use crate::storage::{BackupStorage, BlobStorage}; use crate::sync::car::encode_car_header; +async fn update_genesis_blocks_cids(db: &PgPool, blocks_cids: &[String], seq: i64) -> Result<(), sqlx::Error> { + sqlx::query!( + "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", + blocks_cids, + seq + ) + .execute(db) + .await?; + Ok(()) +} + +async fn update_repo_rev(db: &PgPool, rev: &str, user_id: uuid::Uuid) -> Result<(), sqlx::Error> { + sqlx::query!( + "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", + rev, + user_id + ) + .execute(db) + .await?; + Ok(()) +} + +async fn insert_user_blocks(db: &PgPool, user_id: uuid::Uuid, block_cids: &[Vec]) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + INSERT INTO user_blocks (user_id, block_cid) + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) + ON CONFLICT (user_id, block_cid) DO NOTHING + "#, + user_id, + block_cids + ) + .execute(db) + .await?; + Ok(()) +} + +async fn fetch_user_records(db: &PgPool, user_id: uuid::Uuid) -> Result, sqlx::Error> { + let rows = sqlx::query!( + "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", + user_id + ) + .fetch_all(db) + .await?; + Ok(rows.into_iter().map(|r| (r.collection, r.rkey, r.record_cid)).collect()) +} + +async fn insert_record_blobs(db: &PgPool, user_id: uuid::Uuid, record_uris: &[String], blob_cids: &[String]) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + INSERT INTO record_blobs (repo_id, record_uri, blob_cid) + SELECT $1, record_uri, blob_cid + FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid) + ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING + "#, + user_id, + record_uris, + blob_cids + ) + .execute(db) + .await?; + Ok(()) +} + +async fn delete_backup_record(db: &PgPool, id: uuid::Uuid) -> Result<(), sqlx::Error> { + sqlx::query!("DELETE FROM account_backups WHERE id = $1", id) + .execute(db) + .await?; + Ok(()) +} + +async fn fetch_old_backups( + db: &PgPool, + user_id: uuid::Uuid, + retention_count: i64, +) -> Result, sqlx::Error> { + let rows = sqlx::query!( + r#" + SELECT id, storage_key + FROM account_backups + WHERE user_id = $1 + ORDER BY created_at DESC + OFFSET $2 + "#, + user_id, + retention_count + ) + .fetch_all(db) + .await?; + Ok(rows.into_iter().map(|r| (r.id, r.storage_key)).collect()) +} + +async fn insert_backup_record( + db: &PgPool, + user_id: uuid::Uuid, + storage_key: &str, + repo_root_cid: &str, + repo_rev: &str, + block_count: i32, + size_bytes: i64, +) -> Result<(), sqlx::Error> { + sqlx::query!( + r#" + INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes) + VALUES ($1, $2, $3, $4, $5, $6) + "#, + user_id, + storage_key, + repo_root_cid, + repo_rev, + block_count, + size_bytes + ) + .execute(db) + .await?; + Ok(()) +} + +struct GenesisCommitRow { + seq: i64, + did: String, + commit_cid: Option, +} + +async fn process_genesis_commit( + db: &PgPool, + block_store: &PostgresBlockStore, + row: GenesisCommitRow, +) -> Result<(String, i64), (i64, &'static str)> { + let commit_cid_str = row.commit_cid.ok_or((row.seq, "missing commit_cid"))?; + let commit_cid = Cid::from_str(&commit_cid_str).map_err(|_| (row.seq, "invalid CID"))?; + let block = block_store + .get(&commit_cid) + .await + .map_err(|_| (row.seq, "failed to fetch block"))? + .ok_or((row.seq, "block not found"))?; + let commit = Commit::from_cbor(&block).map_err(|_| (row.seq, "failed to parse commit"))?; + let blocks_cids = vec![commit.data.to_string(), commit_cid.to_string()]; + update_genesis_blocks_cids(db, &blocks_cids, row.seq) + .await + .map_err(|_| (row.seq, "failed to update"))?; + Ok((row.did, row.seq)) +} + pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBlockStore) { let broken_genesis_commits = match sqlx::query!( r#" @@ -44,69 +188,29 @@ pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBl "Backfilling blocks_cids for genesis commits" ); - let mut success = 0; - let mut failed = 0; - - for commit_row in broken_genesis_commits { - let commit_cid_str = match &commit_row.commit_cid { - Some(c) => c.clone(), - None => { - warn!(seq = commit_row.seq, "Genesis commit missing commit_cid"); - failed += 1; - continue; - } - }; - - let commit_cid = match Cid::from_str(&commit_cid_str) { - Ok(c) => c, - Err(_) => { - warn!(seq = commit_row.seq, "Invalid commit CID"); - failed += 1; - continue; - } - }; - - let block = match block_store.get(&commit_cid).await { - Ok(Some(b)) => b, - Ok(None) => { - warn!(seq = commit_row.seq, cid = %commit_cid_str, "Commit block not found in store"); - failed += 1; - continue; - } - Err(e) => { - warn!(seq = commit_row.seq, error = %e, "Failed to fetch commit block"); - failed += 1; - continue; - } - }; - - let commit = match Commit::from_cbor(&block) { - Ok(c) => c, - Err(e) => { - warn!(seq = commit_row.seq, error = %e, "Failed to parse commit"); - failed += 1; - continue; - } - }; - - let mst_root_cid = commit.data; - let blocks_cids: Vec = vec![mst_root_cid.to_string(), commit_cid.to_string()]; - - if let Err(e) = sqlx::query!( - "UPDATE repo_seq SET blocks_cids = $1 WHERE seq = $2", - &blocks_cids, - commit_row.seq + let results = futures::future::join_all(broken_genesis_commits.into_iter().map(|row| { + process_genesis_commit( + db, + &block_store, + GenesisCommitRow { + seq: row.seq, + did: row.did, + commit_cid: row.commit_cid, + }, ) - .execute(db) - .await - { - warn!(seq = commit_row.seq, error = %e, "Failed to update blocks_cids"); - failed += 1; - } else { - info!(seq = commit_row.seq, did = %commit_row.did, "Fixed genesis commit blocks_cids"); - success += 1; + })) + .await; + + let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r { + Ok((did, seq)) => { + info!(seq = seq, did = %did, "Fixed genesis commit blocks_cids"); + (s + 1, f) } - } + Err((seq, reason)) => { + warn!(seq = seq, reason = reason, "Failed to process genesis commit"); + (s, f + 1) + } + }); info!( success, @@ -114,6 +218,27 @@ pub async fn backfill_genesis_commit_blocks(db: &PgPool, block_store: PostgresBl ); } +async fn process_repo_rev( + db: &PgPool, + block_store: &PostgresBlockStore, + user_id: uuid::Uuid, + repo_root_cid: String, +) -> Result { + let cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?; + let block = block_store + .get(&cid) + .await + .ok() + .flatten() + .ok_or(user_id)?; + let commit = Commit::from_cbor(&block).map_err(|_| user_id)?; + let rev = commit.rev().to_string(); + update_repo_rev(db, &rev, user_id) + .await + .map_err(|_| user_id)?; + Ok(user_id) +} + pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { let repos_missing_rev = match sqlx::query!("SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL") @@ -137,54 +262,44 @@ pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { "Backfilling repo_rev for existing repos" ); - let mut success = 0; - let mut failed = 0; + let results = futures::future::join_all(repos_missing_rev.into_iter().map(|repo| { + process_repo_rev(db, &block_store, repo.user_id, repo.repo_root_cid) + })) + .await; - for repo in repos_missing_rev { - let cid = match Cid::from_str(&repo.repo_root_cid) { - Ok(c) => c, - Err(_) => { - failed += 1; - continue; + let (success, failed) = results + .iter() + .fold((0, 0), |(s, f), r| match r { + Ok(_) => (s + 1, f), + Err(user_id) => { + warn!(user_id = %user_id, "Failed to update repo_rev"); + (s, f + 1) } - }; - - let block = match block_store.get(&cid).await { - Ok(Some(b)) => b, - _ => { - failed += 1; - continue; - } - }; - - let commit = match Commit::from_cbor(&block) { - Ok(c) => c, - Err(_) => { - failed += 1; - continue; - } - }; - - let rev = commit.rev().to_string(); - - if let Err(e) = sqlx::query!( - "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", - rev, - repo.user_id - ) - .execute(db) - .await - { - warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); - failed += 1; - } else { - success += 1; - } - } + }); info!(success, failed, "Completed repo_rev backfill"); } +async fn process_user_blocks( + db: &PgPool, + block_store: &PostgresBlockStore, + user_id: uuid::Uuid, + repo_root_cid: String, +) -> Result<(uuid::Uuid, usize), uuid::Uuid> { + let root_cid = Cid::from_str(&repo_root_cid).map_err(|_| user_id)?; + let block_cids = collect_current_repo_blocks(block_store, &root_cid) + .await + .map_err(|_| user_id)?; + if block_cids.is_empty() { + return Err(user_id); + } + let count = block_cids.len(); + insert_user_blocks(db, user_id, &block_cids) + .await + .map_err(|_| user_id)?; + Ok((user_id, count)) +} + pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { let users_without_blocks = match sqlx::query!( r#" @@ -214,50 +329,21 @@ pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) "Backfilling user_blocks for existing repos" ); - let mut success = 0; - let mut failed = 0; + let results = futures::future::join_all(users_without_blocks.into_iter().map(|user| { + process_user_blocks(db, &block_store, user.user_id, user.repo_root_cid) + })) + .await; - for user in users_without_blocks { - let root_cid = match Cid::from_str(&user.repo_root_cid) { - Ok(c) => c, - Err(_) => { - failed += 1; - continue; - } - }; - - match collect_current_repo_blocks(&block_store, &root_cid).await { - Ok(block_cids) => { - if block_cids.is_empty() { - failed += 1; - continue; - } - - if let Err(e) = sqlx::query!( - r#" - INSERT INTO user_blocks (user_id, block_cid) - SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) - ON CONFLICT (user_id, block_cid) DO NOTHING - "#, - user.user_id, - &block_cids - ) - .execute(db) - .await - { - warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); - failed += 1; - } else { - info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); - success += 1; - } - } - Err(e) => { - warn!(user_id = %user.user_id, error = %e, "Failed to collect repo blocks for backfill"); - failed += 1; - } + let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r { + Ok((user_id, count)) => { + info!(user_id = %user_id, block_count = count, "Backfilled user_blocks"); + (s + 1, f) } - } + Err(user_id) => { + warn!(user_id = %user_id, "Failed to backfill user_blocks"); + (s, f + 1) + } + }); info!(success, failed, "Completed user_blocks backfill"); } @@ -314,6 +400,55 @@ pub async fn collect_current_repo_blocks( Ok(block_cids) } +async fn process_record_blobs( + db: &PgPool, + block_store: &PostgresBlockStore, + user_id: uuid::Uuid, + did: String, +) -> Result<(uuid::Uuid, String, usize), (uuid::Uuid, &'static str)> { + let records = fetch_user_records(db, user_id) + .await + .map_err(|_| (user_id, "failed to fetch records"))?; + + let mut batch_record_uris: Vec = Vec::new(); + let mut batch_blob_cids: Vec = Vec::new(); + + futures::future::join_all(records.into_iter().map(|(collection, rkey, record_cid)| { + let did = did.clone(); + async move { + let cid = Cid::from_str(&record_cid).ok()?; + let block_bytes = block_store.get(&cid).await.ok()??; + let record_ipld: Ipld = serde_ipld_dagcbor::from_slice(&block_bytes).ok()?; + let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); + Some( + blob_refs + .into_iter() + .map(|blob_ref| { + let record_uri = format!("at://{}/{}/{}", did, collection, rkey); + (record_uri, blob_ref.cid) + }) + .collect::>(), + ) + } + })) + .await + .into_iter() + .flatten() + .flatten() + .for_each(|(uri, cid)| { + batch_record_uris.push(uri); + batch_blob_cids.push(cid); + }); + + let blob_refs_found = batch_record_uris.len(); + if !batch_record_uris.is_empty() { + insert_record_blobs(db, user_id, &batch_record_uris, &batch_blob_cids) + .await + .map_err(|_| (user_id, "failed to insert"))?; + } + Ok((user_id, did, blob_refs_found)) +} + pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) { let users_needing_backfill = match sqlx::query!( r#" @@ -344,80 +479,23 @@ pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) "Backfilling record_blobs for existing repos" ); - let mut success = 0; - let mut failed = 0; + let results = futures::future::join_all(users_needing_backfill.into_iter().map(|user| { + process_record_blobs(db, &block_store, user.user_id, user.did) + })) + .await; - for user in users_needing_backfill { - let records = match sqlx::query!( - "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1", - user.user_id - ) - .fetch_all(db) - .await - { - Ok(r) => r, - Err(e) => { - warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill"); - failed += 1; - continue; - } - }; - - let mut batch_record_uris: Vec = Vec::new(); - let mut batch_blob_cids: Vec = Vec::new(); - - for record in records { - let record_cid = match Cid::from_str(&record.record_cid) { - Ok(c) => c, - Err(_) => continue, - }; - - let block_bytes = match block_store.get(&record_cid).await { - Ok(Some(b)) => b, - _ => continue, - }; - - let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) { - Ok(v) => v, - Err(_) => continue, - }; - - let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0); - for blob_ref in blob_refs { - let record_uri = format!("at://{}/{}/{}", user.did, record.collection, record.rkey); - batch_record_uris.push(record_uri); - batch_blob_cids.push(blob_ref.cid); + let (success, failed) = results.iter().fold((0, 0), |(s, f), r| match r { + Ok((user_id, did, blob_refs)) => { + if *blob_refs > 0 { + info!(user_id = %user_id, did = %did, blob_refs = blob_refs, "Backfilled record_blobs"); } + (s + 1, f) } - - let blob_refs_found = batch_record_uris.len(); - if !batch_record_uris.is_empty() { - if let Err(e) = sqlx::query!( - r#" - INSERT INTO record_blobs (repo_id, record_uri, blob_cid) - SELECT $1, record_uri, blob_cid - FROM UNNEST($2::text[], $3::text[]) AS t(record_uri, blob_cid) - ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING - "#, - user.user_id, - &batch_record_uris, - &batch_blob_cids - ) - .execute(db) - .await - { - warn!(error = %e, "Failed to batch insert record_blobs during backfill"); - } else { - info!( - user_id = %user.user_id, - did = %user.did, - blob_refs = blob_refs_found, - "Backfilled record_blobs" - ); - } + Err((user_id, reason)) => { + warn!(user_id = %user_id, reason = reason, "Failed to backfill record_blobs"); + (s, f + 1) } - success += 1; - } + }); info!(success, failed, "Completed record_blobs backfill"); } @@ -487,22 +565,16 @@ async fn process_scheduled_deletions( "Processing scheduled account deletions" ); - for account in accounts_to_delete { - if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { - warn!( - did = %account.did, - handle = %account.handle, - error = %e, - "Failed to delete scheduled account" - ); - } else { - info!( - did = %account.did, - handle = %account.handle, - "Successfully deleted scheduled account" - ); - } - } + futures::future::join_all(accounts_to_delete.into_iter().map(|account| async move { + let result = delete_account_data(db, blob_store, &account.did, &account.handle).await; + (account.did, account.handle, result) + })) + .await + .into_iter() + .for_each(|(did, handle, result)| match result { + Ok(()) => info!(did = %did, handle = %handle, "Successfully deleted scheduled account"), + Err(e) => warn!(did = %did, handle = %handle, error = %e, "Failed to delete scheduled account"), + }); Ok(()) } @@ -526,15 +598,15 @@ async fn delete_account_data( .await .map_err(|e| format!("DB error fetching blob keys: {}", e))?; - for storage_key in &blob_storage_keys { - if let Err(e) = blob_store.delete(storage_key).await { - warn!( - storage_key = %storage_key, - error = %e, - "Failed to delete blob from storage (continuing anyway)" - ); - } - } + futures::future::join_all(blob_storage_keys.iter().map(|storage_key| async move { + (storage_key, blob_store.delete(storage_key).await) + })) + .await + .into_iter() + .filter_map(|(key, result)| result.err().map(|e| (key, e))) + .for_each(|(key, e)| { + warn!(storage_key = %key, error = %e, "Failed to delete blob from storage (continuing anyway)"); + }); let mut tx = db .begin() @@ -624,6 +696,83 @@ pub async fn start_backup_tasks( } } +struct BackupResult { + did: String, + repo_rev: String, + size_bytes: i64, + block_count: i32, + user_id: uuid::Uuid, +} + +enum BackupOutcome { + Success(BackupResult), + Skipped(String, &'static str), + Failed(String, String), +} + +async fn process_single_backup( + db: &PgPool, + block_store: &PostgresBlockStore, + backup_storage: &BackupStorage, + user_id: uuid::Uuid, + did: String, + repo_root_cid: String, + repo_rev: Option, +) -> BackupOutcome { + let repo_rev = match repo_rev { + Some(rev) => rev, + None => return BackupOutcome::Skipped(did, "no repo_rev"), + }; + + let head_cid = match Cid::from_str(&repo_root_cid) { + Ok(c) => c, + Err(_) => return BackupOutcome::Skipped(did, "invalid repo_root_cid"), + }; + + let car_bytes = match generate_full_backup(db, block_store, user_id, &head_cid).await { + Ok(bytes) => bytes, + Err(e) => return BackupOutcome::Failed(did, format!("CAR generation: {}", e)), + }; + + let block_count = count_car_blocks(&car_bytes); + let size_bytes = car_bytes.len() as i64; + + let storage_key = match backup_storage.put_backup(&did, &repo_rev, &car_bytes).await { + Ok(key) => key, + Err(e) => return BackupOutcome::Failed(did, format!("S3 upload: {}", e)), + }; + + if let Err(e) = insert_backup_record( + db, + user_id, + &storage_key, + &repo_root_cid, + &repo_rev, + block_count, + size_bytes, + ) + .await + { + if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await { + error!( + did = %did, + storage_key = %storage_key, + error = %rollback_err, + "Failed to rollback orphaned backup from S3" + ); + } + return BackupOutcome::Failed(did, format!("DB insert: {}", e)); + } + + BackupOutcome::Success(BackupResult { + did, + repo_rev, + size_bytes, + block_count, + user_id, + }) +} + async fn process_scheduled_backups( db: &PgPool, block_store: &PostgresBlockStore, @@ -665,88 +814,44 @@ async fn process_scheduled_backups( "Processing scheduled backups" ); - for user in users_needing_backup { - let repo_root_cid = user.repo_root_cid.clone(); - - let repo_rev = match &user.repo_rev { - Some(rev) => rev.clone(), - None => { - warn!(did = %user.did, "User has no repo_rev, skipping backup"); - continue; - } - }; - - let head_cid = match Cid::from_str(&repo_root_cid) { - Ok(c) => c, - Err(e) => { - warn!(did = %user.did, error = %e, "Invalid repo_root_cid, skipping backup"); - continue; - } - }; - - let car_result = generate_full_backup(db, block_store, user.user_id, &head_cid).await; - let car_bytes = match car_result { - Ok(bytes) => bytes, - Err(e) => { - warn!(did = %user.did, error = %e, "Failed to generate CAR for backup"); - continue; - } - }; - - let block_count = count_car_blocks(&car_bytes); - let size_bytes = car_bytes.len() as i64; - - let storage_key = match backup_storage - .put_backup(&user.did, &repo_rev, &car_bytes) - .await - { - Ok(key) => key, - Err(e) => { - warn!(did = %user.did, error = %e, "Failed to upload backup to storage"); - continue; - } - }; - - if let Err(e) = sqlx::query!( - r#" - INSERT INTO account_backups (user_id, storage_key, repo_root_cid, repo_rev, block_count, size_bytes) - VALUES ($1, $2, $3, $4, $5, $6) - "#, + let results = futures::future::join_all(users_needing_backup.into_iter().map(|user| { + process_single_backup( + db, + block_store, + backup_storage, user.user_id, - storage_key, - repo_root_cid, - repo_rev, - block_count, - size_bytes + user.did, + user.repo_root_cid, + user.repo_rev, ) - .execute(db) - .await - { - warn!(did = %user.did, error = %e, "Failed to insert backup record, rolling back S3 upload"); - if let Err(rollback_err) = backup_storage.delete_backup(&storage_key).await { - error!( - did = %user.did, - storage_key = %storage_key, - error = %rollback_err, - "Failed to rollback orphaned backup from S3" + })) + .await; + + futures::future::join_all(results.into_iter().map(|outcome| async move { + match outcome { + BackupOutcome::Success(result) => { + info!( + did = %result.did, + rev = %result.repo_rev, + size_bytes = result.size_bytes, + block_count = result.block_count, + "Created backup" ); + if let Err(e) = + cleanup_old_backups(db, backup_storage, result.user_id, retention_count).await + { + warn!(did = %result.did, error = %e, "Failed to cleanup old backups"); + } + } + BackupOutcome::Skipped(did, reason) => { + warn!(did = %did, reason = reason, "Skipped backup"); + } + BackupOutcome::Failed(did, error) => { + warn!(did = %did, error = %error, "Failed backup"); } - continue; } - - info!( - did = %user.did, - rev = %repo_rev, - size_bytes, - block_count, - "Created backup" - ); - - if let Err(e) = cleanup_old_backups(db, backup_storage, user.user_id, retention_count).await - { - warn!(did = %user.did, error = %e, "Failed to cleanup old backups"); - } - } + })) + .await; Ok(()) } @@ -877,36 +982,30 @@ async fn cleanup_old_backups( user_id: uuid::Uuid, retention_count: u32, ) -> Result<(), String> { - let old_backups = sqlx::query!( - r#" - SELECT id, storage_key - FROM account_backups - WHERE user_id = $1 - ORDER BY created_at DESC - OFFSET $2 - "#, - user_id, - retention_count as i64 - ) - .fetch_all(db) - .await - .map_err(|e| format!("DB error fetching old backups: {}", e))?; + let old_backups = fetch_old_backups(db, user_id, retention_count as i64) + .await + .map_err(|e| format!("DB error fetching old backups: {}", e))?; - for backup in old_backups { - if let Err(e) = backup_storage.delete_backup(&backup.storage_key).await { - warn!( - storage_key = %backup.storage_key, - error = %e, - "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan" - ); - continue; + let results = futures::future::join_all(old_backups.into_iter().map(|(id, storage_key)| async move { + match backup_storage.delete_backup(&storage_key).await { + Ok(()) => match delete_backup_record(db, id).await { + Ok(()) => Ok(()), + Err(e) => Err(format!("DB delete failed for {}: {}", storage_key, e)), + }, + Err(e) => { + warn!( + storage_key = %storage_key, + error = %e, + "Failed to delete old backup from storage, skipping DB cleanup to avoid orphan" + ); + Ok(()) + } } + })) + .await; - sqlx::query!("DELETE FROM account_backups WHERE id = $1", backup.id) - .execute(db) - .await - .map_err(|e| format!("Failed to delete old backup record: {}", e))?; - } - - Ok(()) + results + .into_iter() + .find_map(|r| r.err()) + .map_or(Ok(()), Err) } diff --git a/crates/tranquil-pds/src/sync/listener.rs b/crates/tranquil-pds/src/sync/listener.rs index 9da8052..672830a 100644 --- a/crates/tranquil-pds/src/sync/listener.rs +++ b/crates/tranquil-pds/src/sync/listener.rs @@ -48,11 +48,11 @@ async fn listen_loop(state: AppState) -> anyhow::Result<()> { from_seq = catchup_start, "Broadcasting catch-up events" ); - for event in events { + events.into_iter().for_each(|event| { let seq = event.seq; let _ = state.firehose_tx.send(event); LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst); - } + }); } loop { let notification = listener.recv().await?; @@ -93,11 +93,11 @@ async fn listen_loop(state: AppState) -> anyhow::Result<()> { .await?; if !gap_events.is_empty() { debug!(count = gap_events.len(), "Filling sequence gap"); - for event in gap_events { + gap_events.into_iter().for_each(|event| { let seq = event.seq; let _ = state.firehose_tx.send(event); LAST_BROADCAST_SEQ.store(seq, Ordering::SeqCst); - } + }); } } let event = sqlx::query_as!( diff --git a/crates/tranquil-pds/src/util.rs b/crates/tranquil-pds/src/util.rs index 3ea4828..24fab14 100644 --- a/crates/tranquil-pds/src/util.rs +++ b/crates/tranquil-pds/src/util.rs @@ -257,11 +257,10 @@ mod tests { assert_eq!(parts[0].len(), 5); assert_eq!(parts[1].len(), 5); - for c in code.chars() { - if c != '-' { - assert!(BASE32_ALPHABET.contains(c)); - } - } + assert!(code + .chars() + .filter(|&c| c != '-') + .all(|c| BASE32_ALPHABET.contains(c))); } #[test] @@ -270,9 +269,7 @@ mod tests { let parts: Vec<&str> = code.split('-').collect(); assert_eq!(parts.len(), 3); - for part in parts { - assert_eq!(part.len(), 4); - } + assert!(parts.iter().all(|part| part.len() == 4)); } #[test] diff --git a/crates/tranquil-pds/src/validation/mod.rs b/crates/tranquil-pds/src/validation/mod.rs index 5105c5c..32ae6a9 100644 --- a/crates/tranquil-pds/src/validation/mod.rs +++ b/crates/tranquil-pds/src/validation/mod.rs @@ -534,7 +534,7 @@ pub fn validate_collection_nsid(collection: &str) -> Result<(), ValidationError> "Collection NSID must have at least 3 segments".to_string(), )); } - for part in &parts { + parts.iter().try_for_each(|part| { if part.is_empty() { return Err(ValidationError::InvalidRecord( "Collection NSID segments cannot be empty".to_string(), @@ -545,7 +545,8 @@ pub fn validate_collection_nsid(collection: &str) -> Result<(), ValidationError> "Collection NSID segments must be alphanumeric or hyphens".to_string(), )); } - } + Ok(()) + })?; Ok(()) } diff --git a/frontend/src/App.svelte b/frontend/src/App.svelte index fbbb45b..e04f900 100644 --- a/frontend/src/App.svelte +++ b/frontend/src/App.svelte @@ -35,11 +35,14 @@ import ActAs from './routes/ActAs.svelte' import Migration from './routes/Migration.svelte' import DidDocumentEditor from './routes/DidDocumentEditor.svelte' + import { _ } from './lib/i18n' initI18n() const auth = $derived(getAuthState()) let oauthCallbackPending = $state(hasOAuthCallback()) + let showSpinner = $state(false) + let loadingTimer: ReturnType | null = null function hasOAuthCallback(): boolean { if (window.location.pathname === '/app/migrate') { @@ -50,15 +53,33 @@ } $effect(() => { + loadingTimer = setTimeout(() => { + showSpinner = true + }, 5000) + initServerConfig() initAuth().then(({ oauthLoginCompleted }) => { if (oauthLoginCompleted) { navigate('/dashboard', { replace: true }) } oauthCallbackPending = false + if (loadingTimer) { + clearTimeout(loadingTimer) + loadingTimer = null + } }) + + return () => { + if (loadingTimer) { + clearTimeout(loadingTimer) + } + } }) + const isLoading = $derived( + auth.kind === 'loading' || $i18nLoading || oauthCallbackPending + ) + $effect(() => { if (auth.kind === 'loading') return const path = getCurrentPath() @@ -143,8 +164,15 @@
- {#if auth.kind === 'loading' || $i18nLoading || oauthCallbackPending} -
+ {#if isLoading} +
+ {#if showSpinner} +
+
+

{$_('common.loading')}

+
+ {/if} +
{:else} {/if} @@ -158,5 +186,20 @@ .loading { min-height: 100vh; + display: flex; + align-items: center; + justify-content: center; + } + + .loading-content { + display: flex; + flex-direction: column; + align-items: center; + gap: var(--space-4); + } + + .loading-content p { + margin: 0; + color: var(--text-secondary); } diff --git a/frontend/src/routes/OAuthConsent.svelte b/frontend/src/routes/OAuthConsent.svelte index 2123f13..3d2c0a4 100644 --- a/frontend/src/routes/OAuthConsent.svelte +++ b/frontend/src/routes/OAuthConsent.svelte @@ -27,6 +27,8 @@ } let loading = $state(true) + let showSpinner = $state(false) + let loadingTimer: ReturnType | null = null let error = $state(null) let submitting = $state(false) let consentData = $state(null) @@ -71,6 +73,11 @@ error = $_('oauth.error.genericError') } finally { loading = false + showSpinner = false + if (loadingTimer) { + clearTimeout(loadingTimer) + loadingTimer = null + } } } @@ -151,7 +158,17 @@ } $effect(() => { + loadingTimer = setTimeout(() => { + if (loading) { + showSpinner = true + } + }, 5000) fetchConsentData() + return () => { + if (loadingTimer) { + clearTimeout(loadingTimer) + } + } }) let scopeGroups = $derived(consentData ? groupScopesByCategory(consentData.scopes) : {}) @@ -159,7 +176,14 @@