mirror of
https://tangled.org/tranquil.farm/tranquil-pds
synced 2026-02-08 21:30:08 +00:00
Fixed migration problem
This commit is contained in:
16
.sqlx/query-297e5495004fa601f86b3ada9e512815d4b7d73aacf3f3654628c93e5db8b791.json
generated
Normal file
16
.sqlx/query-297e5495004fa601f86b3ada9e512815d4b7d73aacf3f3654628c93e5db8b791.json
generated
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO record_blobs (repo_id, record_uri, blob_cid)\n VALUES ($1, $2, $3)\n ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid",
|
||||
"Text",
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "297e5495004fa601f86b3ada9e512815d4b7d73aacf3f3654628c93e5db8b791"
|
||||
}
|
||||
@@ -1,23 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "one",
|
||||
"type_info": "Int4"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "423bbfd2ddf9b41d3bb339b8b94ac47524dc9233ec70cf2b6c5e9bc2de49b22d"
|
||||
}
|
||||
@@ -1,28 +0,0 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "repo_root_cid",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "repo_rev",
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
true
|
||||
]
|
||||
},
|
||||
"hash": "49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f"
|
||||
}
|
||||
30
.sqlx/query-6f88c5e63c1beb47733daed5295492d59c649a35ef78414c62dcdf4d0b2a3115.json
generated
Normal file
30
.sqlx/query-6f88c5e63c1beb47733daed5295492d59c649a35ef78414c62dcdf4d0b2a3115.json
generated
Normal file
@@ -0,0 +1,30 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT rb.blob_cid, rb.record_uri\n FROM record_blobs rb\n LEFT JOIN blobs b ON rb.blob_cid = b.cid AND b.created_by_user = rb.repo_id\n WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2\n ORDER BY rb.blob_cid\n LIMIT $3\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "blob_cid",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "record_uri",
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid",
|
||||
"Text",
|
||||
"Int8"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "6f88c5e63c1beb47733daed5295492d59c649a35ef78414c62dcdf4d0b2a3115"
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
|
||||
"query": "SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
@@ -21,10 +21,7 @@
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid",
|
||||
"Text",
|
||||
"Text",
|
||||
"Int8"
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
@@ -33,5 +30,5 @@
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "ff7899984ea138f1e608fa862def47402369a428ac9116c653890e5fcaa0015b"
|
||||
"hash": "72d9db2d1287fa43f69666a5259d3243e5d87807551565948ab99f1400b8cc4c"
|
||||
}
|
||||
22
.sqlx/query-9f993908f6ab139a1a8c6f75a1147e6ee6ceac794350fc4543bb93e62748ced2.json
generated
Normal file
22
.sqlx/query-9f993908f6ab139a1a8c6f75a1147e6ee6ceac794350fc4543bb93e62748ced2.json
generated
Normal file
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "count",
|
||||
"type_info": "Int8"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "9f993908f6ab139a1a8c6f75a1147e6ee6ceac794350fc4543bb93e62748ced2"
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
|
||||
"query": "INSERT INTO repo_seq (did, event_type, handle) VALUES ($1, 'identity', $2) RETURNING seq",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
@@ -11,6 +11,7 @@
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
@@ -18,5 +19,5 @@
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "95165c49f57bb8e130bf1d8444566e1f9521777f994573a4f3cdee809fb63fd7"
|
||||
"hash": "c4a99ff3485bfe5971b2a2c4144097ec168f9feb8c2500d5d4693c94ff6dbc75"
|
||||
}
|
||||
16
.sqlx/query-e155d44cb2bd48ff141a27c51f34dfebeb628992a03f4bd6b10ade365ef8dc5e.json
generated
Normal file
16
.sqlx/query-e155d44cb2bd48ff141a27c51f34dfebeb628992a03f4bd6b10ade365ef8dc5e.json
generated
Normal file
@@ -0,0 +1,16 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n INSERT INTO record_blobs (repo_id, record_uri, blob_cid)\n VALUES ($1, $2, $3)\n ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING\n ",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Uuid",
|
||||
"Text",
|
||||
"Text"
|
||||
]
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "e155d44cb2bd48ff141a27c51f34dfebeb628992a03f4bd6b10ade365ef8dc5e"
|
||||
}
|
||||
26
.sqlx/query-f3a7d87d9479500a9ddff82ea6de30334870a272d1a06cd003181b11d8f3b304.json
generated
Normal file
26
.sqlx/query-f3a7d87d9479500a9ddff82ea6de30334870a272d1a06cd003181b11d8f3b304.json
generated
Normal file
@@ -0,0 +1,26 @@
|
||||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "\n SELECT DISTINCT u.id as user_id, u.did\n FROM users u\n JOIN records r ON r.repo_id = u.id\n WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)\n LIMIT 100\n ",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "user_id",
|
||||
"type_info": "Uuid"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "did",
|
||||
"type_info": "Text"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": []
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "f3a7d87d9479500a9ddff82ea6de30334870a272d1a06cd003181b11d8f3b304"
|
||||
}
|
||||
@@ -1,9 +0,0 @@
|
||||
# Known Issues
|
||||
|
||||
## Account migration from bsky.social
|
||||
|
||||
Migrating your account from bsky.social to this PDS works, but Bluesky's appview may not recognize your new signing key. This means you can post and your followers will see it, but some authenticated requests might fail with "jwt signature does not match jwt issuer".
|
||||
|
||||
We've been trying hard to verify that our side is correct (PLC updated, signing keys match, relays have the account) but something about how we're emitting events isn't triggering Bluesky's appview to refresh its identity data. Still investigating.
|
||||
|
||||
No workaround yet.
|
||||
@@ -621,6 +621,14 @@
|
||||
<div class="code-block">
|
||||
<pre>{`{
|
||||
"id": "${flow.state.sourceDid}",
|
||||
"verificationMethod": [
|
||||
{
|
||||
"id": "${flow.state.sourceDid}#atproto",
|
||||
"type": "Multikey",
|
||||
"controller": "${flow.state.sourceDid}",
|
||||
"publicKeyMultibase": "${flow.state.targetVerificationMethod?.replace('did:key:', '') || '...'}"
|
||||
}
|
||||
],
|
||||
"service": [
|
||||
{
|
||||
"id": "#atproto_pds",
|
||||
|
||||
@@ -65,6 +65,7 @@ export function createInboundMigrationFlow() {
|
||||
error: null,
|
||||
requires2FA: false,
|
||||
twoFactorCode: "",
|
||||
targetVerificationMethod: null,
|
||||
});
|
||||
|
||||
let sourceClient: AtprotoClient | null = null;
|
||||
@@ -372,6 +373,9 @@ export function createInboundMigrationFlow() {
|
||||
}
|
||||
|
||||
if (state.sourceDid.startsWith("did:web:")) {
|
||||
const credentials = await localClient.getRecommendedDidCredentials();
|
||||
state.targetVerificationMethod =
|
||||
credentials.verificationMethods?.atproto || null;
|
||||
setStep("did-web-update");
|
||||
} else {
|
||||
setProgress({ currentOperation: "Requesting PLC operation token..." });
|
||||
@@ -406,6 +410,9 @@ export function createInboundMigrationFlow() {
|
||||
state.targetPassword,
|
||||
);
|
||||
if (state.sourceDid.startsWith("did:web:")) {
|
||||
const credentials = await localClient.getRecommendedDidCredentials();
|
||||
state.targetVerificationMethod =
|
||||
credentials.verificationMethods?.atproto || null;
|
||||
setStep("did-web-update");
|
||||
} else {
|
||||
await sourceClient.requestPlcOperationSignature();
|
||||
@@ -620,6 +627,7 @@ export function createInboundMigrationFlow() {
|
||||
error: null,
|
||||
requires2FA: false,
|
||||
twoFactorCode: "",
|
||||
targetVerificationMethod: null,
|
||||
};
|
||||
sourceClient = null;
|
||||
clearMigrationState();
|
||||
|
||||
@@ -56,6 +56,7 @@ export interface InboundMigrationState {
|
||||
error: string | null;
|
||||
requires2FA: boolean;
|
||||
twoFactorCode: string;
|
||||
targetVerificationMethod: string | null;
|
||||
}
|
||||
|
||||
export interface OutboundMigrationState {
|
||||
|
||||
11
migrations/20251243_record_blobs.sql
Normal file
11
migrations/20251243_record_blobs.sql
Normal file
@@ -0,0 +1,11 @@
|
||||
CREATE TABLE record_blobs (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
repo_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
record_uri TEXT NOT NULL,
|
||||
blob_cid TEXT NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
UNIQUE(repo_id, record_uri, blob_cid)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_record_blobs_repo_id ON record_blobs(repo_id);
|
||||
CREATE INDEX idx_record_blobs_blob_cid ON record_blobs(blob_cid);
|
||||
@@ -222,8 +222,9 @@ pub async fn submit_plc_operation(
|
||||
}
|
||||
}
|
||||
match sqlx::query!(
|
||||
"INSERT INTO repo_seq (did, event_type) VALUES ($1, 'identity') RETURNING seq",
|
||||
did
|
||||
"INSERT INTO repo_seq (did, event_type, handle) VALUES ($1, 'identity', $2) RETURNING seq",
|
||||
did,
|
||||
user.handle
|
||||
)
|
||||
.fetch_one(&state.db)
|
||||
.await
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::auth::{ServiceTokenVerifier, is_service_token};
|
||||
use crate::delegation::{self, DelegationActionType};
|
||||
use crate::state::AppState;
|
||||
use crate::sync::import::find_blob_refs_ipld;
|
||||
use axum::body::Bytes;
|
||||
use axum::{
|
||||
Json,
|
||||
@@ -10,14 +9,11 @@ use axum::{
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use cid::Cid;
|
||||
use ipld_core::ipld::Ipld;
|
||||
use jacquard_repo::storage::BlockStore;
|
||||
use multihash::Multihash;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::str::FromStr;
|
||||
use tracing::{debug, error, warn};
|
||||
use tracing::{debug, error};
|
||||
|
||||
const MAX_BLOB_SIZE: usize = 10_000_000_000;
|
||||
const MAX_VIDEO_BLOB_SIZE: usize = 10_000_000_000;
|
||||
@@ -303,26 +299,26 @@ pub async fn list_missing_blobs(
|
||||
}
|
||||
};
|
||||
let limit = params.limit.unwrap_or(500).clamp(1, 1000);
|
||||
let cursor_str = params.cursor.unwrap_or_default();
|
||||
let (cursor_collection, cursor_rkey) = if cursor_str.contains('|') {
|
||||
let parts: Vec<&str> = cursor_str.split('|').collect();
|
||||
(parts[0].to_string(), parts[1].to_string())
|
||||
} else {
|
||||
(String::new(), String::new())
|
||||
};
|
||||
let records_query = sqlx::query!(
|
||||
"SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1 AND (collection, rkey) > ($2, $3) ORDER BY collection, rkey LIMIT $4",
|
||||
let cursor_cid = params.cursor.as_deref().unwrap_or("");
|
||||
let missing_query = sqlx::query!(
|
||||
r#"
|
||||
SELECT rb.blob_cid, rb.record_uri
|
||||
FROM record_blobs rb
|
||||
LEFT JOIN blobs b ON rb.blob_cid = b.cid AND b.created_by_user = rb.repo_id
|
||||
WHERE rb.repo_id = $1 AND b.cid IS NULL AND rb.blob_cid > $2
|
||||
ORDER BY rb.blob_cid
|
||||
LIMIT $3
|
||||
"#,
|
||||
user_id,
|
||||
cursor_collection,
|
||||
cursor_rkey,
|
||||
limit
|
||||
cursor_cid,
|
||||
limit + 1
|
||||
)
|
||||
.fetch_all(&state.db)
|
||||
.await;
|
||||
let records = match records_query {
|
||||
let rows = match missing_query {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
error!("DB error fetching records: {:?}", e);
|
||||
error!("DB error fetching missing blobs: {:?}", e);
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(json!({"error": "InternalError"})),
|
||||
@@ -330,65 +326,25 @@ pub async fn list_missing_blobs(
|
||||
.into_response();
|
||||
}
|
||||
};
|
||||
let mut missing_blobs = Vec::new();
|
||||
let mut last_cursor = None;
|
||||
for row in &records {
|
||||
let collection = &row.collection;
|
||||
let rkey = &row.rkey;
|
||||
let record_cid_str = &row.record_cid;
|
||||
last_cursor = Some(format!("{}|{}", collection, rkey));
|
||||
let record_cid = match Cid::from_str(record_cid_str) {
|
||||
Ok(c) => c,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let block_bytes = match state.block_store.get(&record_cid).await {
|
||||
Ok(Some(b)) => b,
|
||||
_ => continue,
|
||||
};
|
||||
let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
warn!("Failed to parse record {} as IPLD: {:?}", record_cid_str, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let blob_refs = find_blob_refs_ipld(&record_ipld, 0);
|
||||
for blob_ref in blob_refs {
|
||||
let blob_cid_str = blob_ref.cid;
|
||||
let exists = sqlx::query!(
|
||||
"SELECT 1 as one FROM blobs WHERE cid = $1 AND created_by_user = $2",
|
||||
blob_cid_str,
|
||||
user_id
|
||||
)
|
||||
.fetch_optional(&state.db)
|
||||
.await;
|
||||
match exists {
|
||||
Ok(None) => {
|
||||
missing_blobs.push(RecordBlob {
|
||||
cid: blob_cid_str,
|
||||
record_uri: format!("at://{}/{}/{}", did, collection, rkey),
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!("DB error checking blob existence: {:?}", e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
// if we fetched fewer records than limit, we are done, so cursor is None.
|
||||
// otherwise, cursor is the last one we saw.
|
||||
// ...right?
|
||||
let next_cursor = if records.len() < limit as usize {
|
||||
None
|
||||
let has_more = rows.len() > limit as usize;
|
||||
let blobs: Vec<RecordBlob> = rows
|
||||
.into_iter()
|
||||
.take(limit as usize)
|
||||
.map(|row| RecordBlob {
|
||||
cid: row.blob_cid,
|
||||
record_uri: row.record_uri,
|
||||
})
|
||||
.collect();
|
||||
let next_cursor = if has_more {
|
||||
blobs.last().map(|b| b.cid.clone())
|
||||
} else {
|
||||
last_cursor
|
||||
None
|
||||
};
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(ListMissingBlobsOutput {
|
||||
cursor: next_cursor,
|
||||
blobs: missing_blobs,
|
||||
blobs,
|
||||
}),
|
||||
)
|
||||
.into_response()
|
||||
|
||||
@@ -322,6 +322,32 @@ pub async fn import_repo(
|
||||
import_result.records.len(),
|
||||
did
|
||||
);
|
||||
let mut blob_ref_count = 0;
|
||||
for record in &import_result.records {
|
||||
for blob_ref in &record.blob_refs {
|
||||
let record_uri = format!("at://{}/{}/{}", did, record.collection, record.rkey);
|
||||
if let Err(e) = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
|
||||
"#,
|
||||
user_id,
|
||||
record_uri,
|
||||
blob_ref.cid
|
||||
)
|
||||
.execute(&state.db)
|
||||
.await
|
||||
{
|
||||
warn!("Failed to insert record_blob for {}: {:?}", record_uri, e);
|
||||
} else {
|
||||
blob_ref_count += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
if blob_ref_count > 0 {
|
||||
info!("Recorded {} blob references for imported repo", blob_ref_count);
|
||||
}
|
||||
let key_row = match sqlx::query!(
|
||||
r#"SELECT uk.key_bytes, uk.encryption_version
|
||||
FROM user_keys uk
|
||||
|
||||
@@ -122,7 +122,7 @@ pub async fn check_account_status(
|
||||
.await
|
||||
.unwrap_or(Some(0))
|
||||
.unwrap_or(0);
|
||||
let blob_count: i64 = sqlx::query_scalar!(
|
||||
let imported_blobs: i64 = sqlx::query_scalar!(
|
||||
"SELECT COUNT(*) FROM blobs WHERE created_by_user = $1",
|
||||
user_id
|
||||
)
|
||||
@@ -130,6 +130,14 @@ pub async fn check_account_status(
|
||||
.await
|
||||
.unwrap_or(Some(0))
|
||||
.unwrap_or(0);
|
||||
let expected_blobs: i64 = sqlx::query_scalar!(
|
||||
"SELECT COUNT(DISTINCT blob_cid) FROM record_blobs WHERE repo_id = $1",
|
||||
user_id
|
||||
)
|
||||
.fetch_one(&state.db)
|
||||
.await
|
||||
.unwrap_or(Some(0))
|
||||
.unwrap_or(0);
|
||||
let valid_did = is_valid_did_for_service(&state.db, &did).await;
|
||||
(
|
||||
StatusCode::OK,
|
||||
@@ -141,8 +149,8 @@ pub async fn check_account_status(
|
||||
repo_blocks: block_count as i64,
|
||||
indexed_records: record_count,
|
||||
private_state_values: 0,
|
||||
expected_blobs: blob_count,
|
||||
imported_blobs: blob_count,
|
||||
expected_blobs,
|
||||
imported_blobs,
|
||||
}),
|
||||
)
|
||||
.into_response()
|
||||
|
||||
@@ -5,7 +5,7 @@ use tokio::sync::watch;
|
||||
use tracing::{error, info, warn};
|
||||
use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender};
|
||||
use tranquil_pds::crawlers::{Crawlers, start_crawlers_service};
|
||||
use tranquil_pds::scheduled::{backfill_genesis_commit_blocks, backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks};
|
||||
use tranquil_pds::scheduled::{backfill_genesis_commit_blocks, backfill_record_blobs, backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks};
|
||||
use tranquil_pds::state::AppState;
|
||||
|
||||
#[tokio::main]
|
||||
@@ -34,7 +34,8 @@ async fn run() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tokio::spawn(async move {
|
||||
backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()).await;
|
||||
backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await;
|
||||
backfill_user_blocks(&backfill_db, backfill_block_store).await;
|
||||
backfill_user_blocks(&backfill_db, backfill_block_store.clone()).await;
|
||||
backfill_record_blobs(&backfill_db, backfill_block_store).await;
|
||||
});
|
||||
|
||||
let mut comms_service = CommsService::new(state.db.clone());
|
||||
|
||||
106
src/scheduled.rs
106
src/scheduled.rs
@@ -293,6 +293,112 @@ pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore)
|
||||
info!(success, failed, "Completed user_blocks backfill");
|
||||
}
|
||||
|
||||
pub async fn backfill_record_blobs(db: &PgPool, block_store: PostgresBlockStore) {
|
||||
let users_needing_backfill = match sqlx::query!(
|
||||
r#"
|
||||
SELECT DISTINCT u.id as user_id, u.did
|
||||
FROM users u
|
||||
JOIN records r ON r.repo_id = u.id
|
||||
WHERE NOT EXISTS (SELECT 1 FROM record_blobs rb WHERE rb.repo_id = u.id)
|
||||
LIMIT 100
|
||||
"#
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
{
|
||||
Ok(rows) => rows,
|
||||
Err(e) => {
|
||||
error!("Failed to query users for record_blobs backfill: {}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if users_needing_backfill.is_empty() {
|
||||
debug!("No users need record_blobs backfill");
|
||||
return;
|
||||
}
|
||||
|
||||
info!(
|
||||
count = users_needing_backfill.len(),
|
||||
"Backfilling record_blobs for existing repos"
|
||||
);
|
||||
|
||||
let mut success = 0;
|
||||
let mut failed = 0;
|
||||
|
||||
for user in users_needing_backfill {
|
||||
let records = match sqlx::query!(
|
||||
"SELECT collection, rkey, record_cid FROM records WHERE repo_id = $1",
|
||||
user.user_id
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
warn!(user_id = %user.user_id, error = %e, "Failed to fetch records for backfill");
|
||||
failed += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let mut blob_refs_found = 0;
|
||||
for record in records {
|
||||
let record_cid = match Cid::from_str(&record.record_cid) {
|
||||
Ok(c) => c,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let block_bytes = match block_store.get(&record_cid).await {
|
||||
Ok(Some(b)) => b,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
let record_ipld: Ipld = match serde_ipld_dagcbor::from_slice(&block_bytes) {
|
||||
Ok(v) => v,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
let blob_refs = crate::sync::import::find_blob_refs_ipld(&record_ipld, 0);
|
||||
for blob_ref in blob_refs {
|
||||
let record_uri = format!(
|
||||
"at://{}/{}/{}",
|
||||
user.did, record.collection, record.rkey
|
||||
);
|
||||
if let Err(e) = sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO record_blobs (repo_id, record_uri, blob_cid)
|
||||
VALUES ($1, $2, $3)
|
||||
ON CONFLICT (repo_id, record_uri, blob_cid) DO NOTHING
|
||||
"#,
|
||||
user.user_id,
|
||||
record_uri,
|
||||
blob_ref.cid
|
||||
)
|
||||
.execute(db)
|
||||
.await
|
||||
{
|
||||
warn!(error = %e, "Failed to insert record_blob during backfill");
|
||||
} else {
|
||||
blob_refs_found += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if blob_refs_found > 0 {
|
||||
info!(
|
||||
user_id = %user.user_id,
|
||||
did = %user.did,
|
||||
blob_refs = blob_refs_found,
|
||||
"Backfilled record_blobs"
|
||||
);
|
||||
}
|
||||
success += 1;
|
||||
}
|
||||
|
||||
info!(success, failed, "Completed record_blobs backfill");
|
||||
}
|
||||
|
||||
pub async fn start_scheduled_tasks(
|
||||
db: PgPool,
|
||||
blob_store: Arc<dyn BlobStorage>,
|
||||
|
||||
Reference in New Issue
Block a user