diff --git a/.sqlx/query-1091e22985de9cd9d24f55975eaa8bb1ea40bbf5237b328a031ba9d3b3d5a5e0.json b/.sqlx/query-1091e22985de9cd9d24f55975eaa8bb1ea40bbf5237b328a031ba9d3b3d5a5e0.json new file mode 100644 index 0000000..d487897 --- /dev/null +++ b/.sqlx/query-1091e22985de9cd9d24f55975eaa8bb1ea40bbf5237b328a031ba9d3b3d5a5e0.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT uk.key_bytes, uk.encryption_version\n FROM user_keys uk\n JOIN users u ON uk.user_id = u.id\n WHERE u.did = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "key_bytes", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "encryption_version", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + true + ] + }, + "hash": "1091e22985de9cd9d24f55975eaa8bb1ea40bbf5237b328a031ba9d3b3d5a5e0" +} diff --git a/.sqlx/query-1261d14a3763b98464b212d001c9a11da30a55869128320de6d62693415953f5.json b/.sqlx/query-1261d14a3763b98464b212d001c9a11da30a55869128320de6d62693415953f5.json deleted file mode 100644 index 77d0b01..0000000 --- a/.sqlx/query-1261d14a3763b98464b212d001c9a11da30a55869128320de6d62693415953f5.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT cid, data FROM blocks", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "cid", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "data", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false, - false - ] - }, - "hash": "1261d14a3763b98464b212d001c9a11da30a55869128320de6d62693415953f5" -} diff --git a/.sqlx/query-239555df14c147a09096beb28f2ff0b093523c27e9527cd1f623c9a87b05b532.json b/.sqlx/query-130dd93754cc36188e01255166aa65603f909c2b181fa0caa7796c62d4bc60e1.json similarity index 85% rename from .sqlx/query-239555df14c147a09096beb28f2ff0b093523c27e9527cd1f623c9a87b05b532.json rename to .sqlx/query-130dd93754cc36188e01255166aa65603f909c2b181fa0caa7796c62d4bc60e1.json index 503820c..70a561f 100644 --- a/.sqlx/query-239555df14c147a09096beb28f2ff0b093523c27e9527cd1f623c9a87b05b532.json +++ b/.sqlx/query-130dd93754cc36188e01255166aa65603f909c2b181fa0caa7796c62d4bc60e1.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status\n FROM repo_seq\n WHERE seq > $1\n ORDER BY seq ASC\n ", + "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev\n FROM repo_seq\n WHERE seq > $1\n ORDER BY seq ASC\n ", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "ordinal": 12, "name": "status", "type_info": "Text" + }, + { + "ordinal": 13, + "name": "rev", + "type_info": "Text" } ], "parameters": { @@ -87,8 +92,9 @@ true, true, true, + true, true ] }, - "hash": "239555df14c147a09096beb28f2ff0b093523c27e9527cd1f623c9a87b05b532" + "hash": "130dd93754cc36188e01255166aa65603f909c2b181fa0caa7796c62d4bc60e1" } diff --git a/.sqlx/query-3792c455a955b8cf2c70c8aa76635083354c87330101ea0ea69d30f2a5b4b960.json b/.sqlx/query-48f2ff46677b5dc26a4ca9ac0b4e86ebdb3a9862d006aa0e21bfa2d7b25a8f71.json similarity index 65% rename from .sqlx/query-3792c455a955b8cf2c70c8aa76635083354c87330101ea0ea69d30f2a5b4b960.json rename to .sqlx/query-48f2ff46677b5dc26a4ca9ac0b4e86ebdb3a9862d006aa0e21bfa2d7b25a8f71.json index 1e90686..04dc643 100644 --- a/.sqlx/query-3792c455a955b8cf2c70c8aa76635083354c87330101ea0ea69d30f2a5b4b960.json +++ b/.sqlx/query-48f2ff46677b5dc26a4ca9ac0b4e86ebdb3a9862d006aa0e21bfa2d7b25a8f71.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO repo_seq (did, event_type, commit_cid)\n VALUES ($1, 'sync', $2)\n RETURNING seq\n ", + "query": "\n INSERT INTO repo_seq (did, event_type, commit_cid, rev)\n VALUES ($1, 'sync', $2, $3)\n RETURNING seq\n ", "describe": { "columns": [ { @@ -11,6 +11,7 @@ ], "parameters": { "Left": [ + "Text", "Text", "Text" ] @@ -19,5 +20,5 @@ false ] }, - "hash": "3792c455a955b8cf2c70c8aa76635083354c87330101ea0ea69d30f2a5b4b960" + "hash": "48f2ff46677b5dc26a4ca9ac0b4e86ebdb3a9862d006aa0e21bfa2d7b25a8f71" } diff --git a/.sqlx/query-1bff90667ece9e1e44e20e3477df1473bafa06610e54f10d9b3cb2cc06469854.json b/.sqlx/query-5c322bbdf9cecab9077c937bd322e49200ac2b8931da1dfe6e55d56087fc1d35.json similarity index 83% rename from .sqlx/query-1bff90667ece9e1e44e20e3477df1473bafa06610e54f10d9b3cb2cc06469854.json rename to .sqlx/query-5c322bbdf9cecab9077c937bd322e49200ac2b8931da1dfe6e55d56087fc1d35.json index cfcb8ac..ca23869 100644 --- a/.sqlx/query-1bff90667ece9e1e44e20e3477df1473bafa06610e54f10d9b3cb2cc06469854.json +++ b/.sqlx/query-5c322bbdf9cecab9077c937bd322e49200ac2b8931da1dfe6e55d56087fc1d35.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status\n FROM repo_seq\n WHERE seq > $1\n ORDER BY seq ASC\n LIMIT $2\n ", + "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev\n FROM repo_seq\n WHERE seq > $1 AND seq < $2\n ORDER BY seq ASC\n ", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "ordinal": 12, "name": "status", "type_info": "Text" + }, + { + "ordinal": 13, + "name": "rev", + "type_info": "Text" } ], "parameters": { @@ -88,8 +93,9 @@ true, true, true, + true, true ] }, - "hash": "1bff90667ece9e1e44e20e3477df1473bafa06610e54f10d9b3cb2cc06469854" + "hash": "5c322bbdf9cecab9077c937bd322e49200ac2b8931da1dfe6e55d56087fc1d35" } diff --git a/.sqlx/query-44b78996f9799398f384d9aebb36d01c27738d4677b7cae7ea6697f3f5135388.json b/.sqlx/query-7a4016fed3eb3a16d6eb267013751af47ad6e8c9595711fe6c9d41121f904cb4.json similarity index 85% rename from .sqlx/query-44b78996f9799398f384d9aebb36d01c27738d4677b7cae7ea6697f3f5135388.json rename to .sqlx/query-7a4016fed3eb3a16d6eb267013751af47ad6e8c9595711fe6c9d41121f904cb4.json index 5f566ab..d0a4470 100644 --- a/.sqlx/query-44b78996f9799398f384d9aebb36d01c27738d4677b7cae7ea6697f3f5135388.json +++ b/.sqlx/query-7a4016fed3eb3a16d6eb267013751af47ad6e8c9595711fe6c9d41121f904cb4.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status\n FROM repo_seq\n WHERE seq = $1\n ", + "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev\n FROM repo_seq\n WHERE seq = $1\n ", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "ordinal": 12, "name": "status", "type_info": "Text" + }, + { + "ordinal": 13, + "name": "rev", + "type_info": "Text" } ], "parameters": { @@ -87,8 +92,9 @@ true, true, true, + true, true ] }, - "hash": "44b78996f9799398f384d9aebb36d01c27738d4677b7cae7ea6697f3f5135388" + "hash": "7a4016fed3eb3a16d6eb267013751af47ad6e8c9595711fe6c9d41121f904cb4" } diff --git a/.sqlx/query-777386dcbf2aa2785a6c16abdccbd8f751893039fb6bc2363d9760ca0d8a8a56.json b/.sqlx/query-cc28150d6e1e1823a918d6dcf7744209614ef7b8298c210ddd65ac44da5c551a.json similarity index 82% rename from .sqlx/query-777386dcbf2aa2785a6c16abdccbd8f751893039fb6bc2363d9760ca0d8a8a56.json rename to .sqlx/query-cc28150d6e1e1823a918d6dcf7744209614ef7b8298c210ddd65ac44da5c551a.json index e52bb8e..fe02aaf 100644 --- a/.sqlx/query-777386dcbf2aa2785a6c16abdccbd8f751893039fb6bc2363d9760ca0d8a8a56.json +++ b/.sqlx/query-cc28150d6e1e1823a918d6dcf7744209614ef7b8298c210ddd65ac44da5c551a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status\n FROM repo_seq\n WHERE seq > $1 AND seq < $2\n ORDER BY seq ASC\n ", + "query": "\n SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev\n FROM repo_seq\n WHERE seq > $1\n ORDER BY seq ASC\n LIMIT $2\n ", "describe": { "columns": [ { @@ -67,6 +67,11 @@ "ordinal": 12, "name": "status", "type_info": "Text" + }, + { + "ordinal": 13, + "name": "rev", + "type_info": "Text" } ], "parameters": { @@ -88,8 +93,9 @@ true, true, true, + true, true ] }, - "hash": "777386dcbf2aa2785a6c16abdccbd8f751893039fb6bc2363d9760ca0d8a8a56" + "hash": "cc28150d6e1e1823a918d6dcf7744209614ef7b8298c210ddd65ac44da5c551a" } diff --git a/Dockerfile b/Dockerfile index d5274da..1a82cf2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,22 +1,23 @@ -# Stage 1: Build frontend with Deno FROM denoland/deno:alpine AS frontend-builder WORKDIR /frontend COPY frontend/ ./ RUN deno task build -# Stage 2: Build Rust backend + FROM rust:1.92-alpine AS builder -RUN apk add ca-certificates openssl openssl-dev pkgconfig +RUN apk add ca-certificates openssl openssl-dev openssl-libs-static pkgconfig musl-dev WORKDIR /app COPY Cargo.toml Cargo.lock ./ -RUN mkdir src && echo "fn main() {}" > src/main.rs && cargo build --release && rm -rf src COPY src ./src COPY tests ./tests COPY migrations ./migrations COPY .sqlx ./.sqlx -RUN touch src/main.rs && cargo build --release -# Stage 3: Final image +RUN --mount=type=cache,target=/usr/local/cargo/registry \ + --mount=type=cache,target=/app/target \ + cargo build --release && \ + cp target/release/tranquil-pds /tmp/tranquil-pds + FROM alpine:3.23 -COPY --from=builder /app/target/release/tranquil-pds /usr/local/bin/tranquil-pds +COPY --from=builder /tmp/tranquil-pds /usr/local/bin/tranquil-pds COPY --from=builder /app/migrations /app/migrations COPY --from=frontend-builder /frontend/dist /app/frontend/dist WORKDIR /app diff --git a/migrations/20251238_add_rev_to_repo_seq.sql b/migrations/20251238_add_rev_to_repo_seq.sql new file mode 100644 index 0000000..9a1e0a4 --- /dev/null +++ b/migrations/20251238_add_rev_to_repo_seq.sql @@ -0,0 +1 @@ +ALTER TABLE repo_seq ADD COLUMN rev TEXT; diff --git a/src/api/repo/import.rs b/src/api/repo/import.rs index 15066fc..35fd520 100644 --- a/src/api/repo/import.rs +++ b/src/api/repo/import.rs @@ -1,4 +1,5 @@ use crate::api::ApiError; +use crate::api::repo::record::create_signed_commit; use crate::state::AppState; use crate::sync::import::{ImportError, apply_import, parse_car}; use crate::sync::verify::CarVerifier; @@ -9,6 +10,9 @@ use axum::{ http::StatusCode, response::{IntoResponse, Response}, }; +use jacquard::types::{integer::LimitedU32, string::Tid}; +use jacquard_repo::storage::BlockStore; +use k256::ecdsa::SigningKey; use serde_json::json; use tracing::{debug, error, info, warn}; @@ -312,14 +316,114 @@ pub async fn import_repo( .and_then(|s| s.parse().ok()) .unwrap_or(DEFAULT_MAX_BLOCKS); match apply_import(&state.db, user_id, root, blocks, max_blocks).await { - Ok(records) => { + Ok(import_result) => { info!( "Successfully imported {} records for user {}", - records.len(), + import_result.records.len(), did ); + let key_row = match sqlx::query!( + r#"SELECT uk.key_bytes, uk.encryption_version + FROM user_keys uk + JOIN users u ON uk.user_id = u.id + WHERE u.did = $1"#, + did + ) + .fetch_optional(&state.db) + .await + { + Ok(Some(row)) => row, + Ok(None) => { + error!("No signing key found for user {}", did); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError", "message": "Signing key not found"})), + ) + .into_response(); + } + Err(e) => { + error!("DB error fetching signing key: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + let key_bytes = match crate::config::decrypt_key(&key_row.key_bytes, key_row.encryption_version) { + Ok(k) => k, + Err(e) => { + error!("Failed to decrypt signing key: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + let signing_key = match SigningKey::from_slice(&key_bytes) { + Ok(k) => k, + Err(e) => { + error!("Invalid signing key: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + let new_rev = Tid::now(LimitedU32::MIN); + let new_rev_str = new_rev.to_string(); + let (commit_bytes, _sig) = match create_signed_commit( + did, + import_result.data_cid, + &new_rev_str, + None, + &signing_key, + ) { + Ok(result) => result, + Err(e) => { + error!("Failed to create new commit: {}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + let new_root_cid: cid::Cid = match state.block_store.put(&commit_bytes).await { + Ok(cid) => cid, + Err(e) => { + error!("Failed to store new commit block: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + let new_root_str = new_root_cid.to_string(); + if let Err(e) = sqlx::query!( + "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", + new_root_str, + user_id + ) + .execute(&state.db) + .await + { + error!("Failed to update repo root: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + info!( + "Created new commit for imported repo: cid={}, rev={}", + new_root_str, new_rev_str + ); if !is_migration { - if let Err(e) = sequence_import_event(&state, did, &root.to_string()).await { + if let Err(e) = sequence_import_event(&state, did, &new_root_str).await { warn!("Failed to sequence import event: {:?}", e); } } diff --git a/src/api/repo/record/utils.rs b/src/api/repo/record/utils.rs index 31c2329..52e2a6c 100644 --- a/src/api/repo/record/utils.rs +++ b/src/api/repo/record/utils.rs @@ -321,7 +321,7 @@ pub async fn commit_and_log( .await .map_err(|e| format!("Failed to commit transaction: {}", e))?; if is_account_active { - let _ = sequence_sync_event(state, did, &new_root_cid.to_string()).await; + let _ = sequence_sync_event(state, did, &new_root_cid.to_string(), Some(&rev_str)).await; } Ok(CommitResult { commit_cid: new_root_cid, @@ -470,15 +470,17 @@ pub async fn sequence_sync_event( state: &AppState, did: &str, commit_cid: &str, + rev: Option<&str>, ) -> Result { let seq_row = sqlx::query!( r#" - INSERT INTO repo_seq (did, event_type, commit_cid) - VALUES ($1, 'sync', $2) + INSERT INTO repo_seq (did, event_type, commit_cid, rev) + VALUES ($1, 'sync', $2, $3) RETURNING seq "#, did, commit_cid, + rev, ) .fetch_one(&state.db) .await diff --git a/src/api/server/account_status.rs b/src/api/server/account_status.rs index 02b8894..eb3a024 100644 --- a/src/api/server/account_status.rs +++ b/src/api/server/account_status.rs @@ -8,10 +8,14 @@ use axum::{ response::{IntoResponse, Response}, }; use bcrypt::verify; +use cid::Cid; use chrono::{Duration, Utc}; +use jacquard_repo::commit::Commit; +use jacquard_repo::storage::BlockStore; use k256::ecdsa::SigningKey; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::str::FromStr; use tracing::{error, info, warn}; use uuid::Uuid; @@ -245,8 +249,24 @@ async fn assert_valid_did_document_for_service( } } else if did.starts_with("did:web:") { let client = reqwest::Client::new(); - let did_path = &did[8..]; - let url = format!("https://{}/.well-known/did.json", did_path.replace(':', "/")); + let host_and_path = &did[8..]; + let decoded = host_and_path.replace("%3A", ":"); + let parts: Vec<&str> = decoded.split(':').collect(); + let (host, path_parts) = if parts.len() > 1 && parts[1].chars().all(|c| c.is_ascii_digit()) { + (format!("{}:{}", parts[0], parts[1]), parts[2..].to_vec()) + } else { + (parts[0].to_string(), parts[1..].to_vec()) + }; + let scheme = if host.starts_with("localhost") || host.starts_with("127.") || host.contains(':') { + "http" + } else { + "https" + }; + let url = if path_parts.is_empty() { + format!("{}://{}/.well-known/did.json", scheme, host) + } else { + format!("{}://{}/{}/did.json", scheme, host, path_parts.join("/")) + }; let resp = client.get(&url).send().await.map_err(|e| { warn!("Failed to fetch did:web document for {}: {:?}", did, e); ( @@ -381,8 +401,17 @@ pub async fn activate_account( .ok() .flatten(); if let Some(root_cid) = repo_root { + let rev = if let Ok(cid) = Cid::from_str(&root_cid) { + if let Ok(Some(block)) = state.block_store.get(&cid).await { + Commit::from_cbor(&block).ok().map(|c| c.rev().to_string()) + } else { + None + } + } else { + None + }; if let Err(e) = - crate::api::repo::record::sequence_sync_event(&state, &did, &root_cid).await + crate::api::repo::record::sequence_sync_event(&state, &did, &root_cid, rev.as_deref()).await { warn!("Failed to sequence sync event for activation: {}", e); } diff --git a/src/sync/firehose.rs b/src/sync/firehose.rs index f5a147f..f87438d 100644 --- a/src/sync/firehose.rs +++ b/src/sync/firehose.rs @@ -17,4 +17,5 @@ pub struct SequencedEvent { pub handle: Option, pub active: Option, pub status: Option, + pub rev: Option, } diff --git a/src/sync/import.rs b/src/sync/import.rs index a8a6bd9..b8fbc3b 100644 --- a/src/sync/import.rs +++ b/src/sync/import.rs @@ -255,6 +255,11 @@ pub struct CommitInfo { pub prev: Option, } +pub struct ImportResult { + pub records: Vec, + pub data_cid: Cid, +} + fn extract_commit_info(commit: &Ipld) -> Result<(Cid, CommitInfo), ImportError> { let obj = match commit { Ipld::Map(m) => m, @@ -299,7 +304,7 @@ pub async fn apply_import( root: Cid, blocks: HashMap, max_blocks: usize, -) -> Result, ImportError> { +) -> Result { if blocks.len() > max_blocks { return Err(ImportError::SizeLimitExceeded); } @@ -352,14 +357,6 @@ pub async fn apply_import( .await?; } } - let root_str = root.to_string(); - sqlx::query!( - "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", - root_str, - user_id - ) - .execute(&mut *tx) - .await?; sqlx::query!("DELETE FROM records WHERE repo_id = $1", user_id) .execute(&mut *tx) .await?; @@ -385,7 +382,7 @@ pub async fn apply_import( blocks.len(), records.len() ); - Ok(records) + Ok(ImportResult { records, data_cid }) } #[cfg(test)] diff --git a/src/sync/listener.rs b/src/sync/listener.rs index 7ce61dd..9da8052 100644 --- a/src/sync/listener.rs +++ b/src/sync/listener.rs @@ -33,7 +33,7 @@ async fn listen_loop(state: AppState) -> anyhow::Result<()> { let events = sqlx::query_as!( SequencedEvent, r#" - SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev FROM repo_seq WHERE seq > $1 ORDER BY seq ASC @@ -81,7 +81,7 @@ async fn listen_loop(state: AppState) -> anyhow::Result<()> { let gap_events = sqlx::query_as!( SequencedEvent, r#" - SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev FROM repo_seq WHERE seq > $1 AND seq < $2 ORDER BY seq ASC @@ -103,7 +103,7 @@ async fn listen_loop(state: AppState) -> anyhow::Result<()> { let event = sqlx::query_as!( SequencedEvent, r#" - SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev FROM repo_seq WHERE seq = $1 "#, diff --git a/src/sync/subscribe_repos.rs b/src/sync/subscribe_repos.rs index 5bb7ee0..612c312 100644 --- a/src/sync/subscribe_repos.rs +++ b/src/sync/subscribe_repos.rs @@ -66,7 +66,7 @@ async fn handle_socket_inner( let events = sqlx::query_as!( SequencedEvent, r#" - SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status + SELECT seq, did, created_at, event_type, commit_cid, prev_cid, prev_data_cid, ops, blobs, blocks_cids, handle, active, status, rev FROM repo_seq WHERE seq > $1 ORDER BY seq ASC diff --git a/src/sync/util.rs b/src/sync/util.rs index 7aff5d1..e8c6bde 100644 --- a/src/sync/util.rs +++ b/src/sync/util.rs @@ -112,8 +112,12 @@ async fn format_sync_event( .get(&commit_cid) .await? .ok_or_else(|| anyhow::anyhow!("Commit block not found"))?; - let rev = extract_rev_from_commit_bytes(&commit_bytes) - .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; + let rev = if let Some(ref stored_rev) = event.rev { + stored_rev.clone() + } else { + extract_rev_from_commit_bytes(&commit_bytes) + .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? + }; let car_bytes = write_car_blocks(commit_cid, Some(commit_bytes), BTreeMap::new()).await?; let frame = SyncFrame { did: event.did.clone(), @@ -251,8 +255,12 @@ fn format_sync_event_with_prefetched( let commit_bytes = prefetched .get(&commit_cid) .ok_or_else(|| anyhow::anyhow!("Commit block not found in prefetched"))?; - let rev = extract_rev_from_commit_bytes(commit_bytes) - .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))?; + let rev = if let Some(ref stored_rev) = event.rev { + stored_rev.clone() + } else { + extract_rev_from_commit_bytes(commit_bytes) + .ok_or_else(|| anyhow::anyhow!("Could not extract rev from commit"))? + }; let car_bytes = futures::executor::block_on(write_car_blocks( commit_cid, Some(commit_bytes.clone()), diff --git a/tests/identity.rs b/tests/identity.rs index bf7cb7f..e431af3 100644 --- a/tests/identity.rs +++ b/tests/identity.rs @@ -357,10 +357,10 @@ async fn test_get_recommended_did_credentials_success() { assert!(also_known_as[0].as_str().unwrap().starts_with("at://")); assert!(body["verificationMethods"]["atproto"].is_string()); assert_eq!( - body["services"]["atprotoPds"]["type"], + body["services"]["atproto_pds"]["type"], "AtprotoPersonalDataServer" ); - assert!(body["services"]["atprotoPds"]["endpoint"].is_string()); + assert!(body["services"]["atproto_pds"]["endpoint"].is_string()); } #[tokio::test] diff --git a/tests/import_with_verification.rs b/tests/import_with_verification.rs index d0a3aff..cc9a9bd 100644 --- a/tests/import_with_verification.rs +++ b/tests/import_with_verification.rs @@ -10,7 +10,6 @@ use serde_json::json; use sha2::{Digest, Sha256}; use sqlx::PgPool; use std::collections::BTreeMap; -use std::str::FromStr; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate};