From c302d2aea14f0827d1ae6ec35d6489bb4819ebc0 Mon Sep 17 00:00:00 2001 From: lewis Date: Mon, 29 Dec 2025 21:51:09 +0200 Subject: [PATCH] Account lifecycle conf. vs ref --- .env.example | 14 + ...1373eecd788e508246b5ad84e31b1adbdd2c1.json | 15 - ...00e48a1c9277be3e6f0e7e6231a0f3e53a7a4.json | 15 + ...2c586335c795003ce6090fb5af2b107208305.json | 16 + ...5d32a4a970f61c84f0539f4c4ee484afdce7d.json | 26 ++ ...090f68e0083610d07e609825d528ef58ade1f.json | 28 ++ ...a3eace1c428f5bb5130dff0cccf14e4bdb4c1.json | 22 ++ ...7737de7f6dc39c3f2509b55bb6c4580e3d2ee.json | 28 ++ ...cfe52980f0e02f2908b17cdd00fc679e6da36.json | 22 ++ ...faf7edc34b5952143334b8fc834350894478f.json | 16 + ...f0f78b7dd5d3d50162deb535c583796afe192.json | 15 + ...0ebe833c85fe4aa1b77af1ce67dd8fcda507a.json | 26 ++ ...5008e3a32ba603f09fcdbbc29bf23cb870444.json | 15 + ...880febb5185f7b7babb616f9c0f1f7016f59e.json | 16 + ...d66de27fde3af24b4a7a3fce7e098812e38a5.json | 15 + ...ae0c23e0095f42585e2ecb962422d9a45ef17.json | 15 + ...a10530815dd4737e4c4b821f5b26756b63ba.json} | 4 +- ...b06cad977bf73bf3bb0fd3fc88938d875637.json} | 8 +- ...388654a783bddb111b1f9aa04507f176980d3.json | 15 - ...06dacd72b33d04c1e2b98475018ad25485852.json | 22 ++ ...5421853c6ef710bee0170430416485f41a0c3.json | 26 ++ migrations/20251239_add_delete_after.sql | 1 + migrations/20251240_add_block_count.sql | 7 + src/api/delegation.rs | 26 +- src/api/identity/account.rs | 155 ++++---- src/api/repo/import.rs | 26 +- src/api/repo/record/utils.rs | 38 +- src/api/server/account_status.rs | 124 +++++- src/api/server/meta.rs | 10 + src/api/server/passkey_account.rs | 26 +- src/lib.rs | 1 + src/main.rs | 18 +- src/scheduled.rs | 368 ++++++++++++++++++ src/sync/frame.rs | 7 +- tests/account_lifecycle.rs | 279 +++++++++++++ tests/common/mod.rs | 5 +- tests/sync_blob.rs | 3 +- tests/sync_conformance.rs | 2 +- tests/sync_repo.rs | 7 +- 39 files changed, 1338 insertions(+), 144 deletions(-) delete mode 100644 .sqlx/query-14a68a119586aa980fb7b64646c1373eecd788e508246b5ad84e31b1adbdd2c1.json create mode 100644 .sqlx/query-244b55cedfe51f834337141d3bb00e48a1c9277be3e6f0e7e6231a0f3e53a7a4.json create mode 100644 .sqlx/query-3567e730c1fe4dee7753a53b71c2c586335c795003ce6090fb5af2b107208305.json create mode 100644 .sqlx/query-3f13f59e14ca24d4523be38a0b95d32a4a970f61c84f0539f4c4ee484afdce7d.json create mode 100644 .sqlx/query-49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f.json create mode 100644 .sqlx/query-51da09ecbd806c8ee59acfbe333a3eace1c428f5bb5130dff0cccf14e4bdb4c1.json create mode 100644 .sqlx/query-6b3704b48a690ea278019a70a977737de7f6dc39c3f2509b55bb6c4580e3d2ee.json create mode 100644 .sqlx/query-908e74d3c4c6e429133adb7074dcfe52980f0e02f2908b17cdd00fc679e6da36.json create mode 100644 .sqlx/query-94683841b256b65ed2ac4806206faf7edc34b5952143334b8fc834350894478f.json create mode 100644 .sqlx/query-978ec276ffa89b539b5365e8106f0f78b7dd5d3d50162deb535c583796afe192.json create mode 100644 .sqlx/query-9c1d6f38011f8070e058ef4c9100ebe833c85fe4aa1b77af1ce67dd8fcda507a.json create mode 100644 .sqlx/query-9f461c44be23d43feb8491422dd5008e3a32ba603f09fcdbbc29bf23cb870444.json create mode 100644 .sqlx/query-a9e604216b880a8e1be9b4cec84880febb5185f7b7babb616f9c0f1f7016f59e.json create mode 100644 .sqlx/query-b6d6548acb89d6384cd226f6ed0d66de27fde3af24b4a7a3fce7e098812e38a5.json create mode 100644 .sqlx/query-b7432d134013ff1f64389dda715ae0c23e0095f42585e2ecb962422d9a45ef17.json rename .sqlx/{query-2588479ef83ed45a5d0dee599636f195ca38c5df164e225dcb1b829b497c8f14.json => query-b8de174efc5f897e688bc1fb5c49a10530815dd4737e4c4b821f5b26756b63ba.json} (54%) rename .sqlx/{query-53b0ea60a759f8bb37d01461fd0769dcc683e796287e41d5180340296286fcbe.json => query-c9067e3e62c22fe92a135fa0c6c2b06cad977bf73bf3bb0fd3fc88938d875637.json} (58%) delete mode 100644 .sqlx/query-f1e88d447915b116f887c378253388654a783bddb111b1f9aa04507f176980d3.json create mode 100644 .sqlx/query-f59010ecdd7f782489e0e03288a06dacd72b33d04c1e2b98475018ad25485852.json create mode 100644 .sqlx/query-f90c58a4e9dc9c28a682405fb7d5421853c6ef710bee0170430416485f41a0c3.json create mode 100644 migrations/20251239_add_delete_after.sql create mode 100644 migrations/20251240_add_block_count.sql create mode 100644 src/scheduled.rs create mode 100644 tests/account_lifecycle.rs diff --git a/.env.example b/.env.example index 491ef48..05651a7 100644 --- a/.env.example +++ b/.env.example @@ -100,11 +100,25 @@ AWS_SECRET_ACCESS_KEY=minioadmin # Comma-separated list of available user domains # AVAILABLE_USER_DOMAINS=example.com # ============================================================================= +# Server Metadata (returned by describeServer) +# ============================================================================= +# Privacy policy URL (optional) +# PRIVACY_POLICY_URL=https://example.com/privacy +# Terms of service URL (optional) +# TERMS_OF_SERVICE_URL=https://example.com/terms +# Contact email address (optional) +# CONTACT_EMAIL=admin@example.com +# ============================================================================= # Rate Limiting # ============================================================================= # Disable all rate limiting (testing only, NEVER in production) # DISABLE_RATE_LIMITING=1 # ============================================================================= +# Account Deletion +# ============================================================================= +# How often to check for scheduled account deletions (default: 3600 = 1 hour) +# SCHEDULED_DELETE_CHECK_INTERVAL_SECS=3600 +# ============================================================================= # Miscellaneous # ============================================================================= # Allow HTTP for proxy requests (development only) diff --git a/.sqlx/query-14a68a119586aa980fb7b64646c1373eecd788e508246b5ad84e31b1adbdd2c1.json b/.sqlx/query-14a68a119586aa980fb7b64646c1373eecd788e508246b5ad84e31b1adbdd2c1.json deleted file mode 100644 index 1df1055..0000000 --- a/.sqlx/query-14a68a119586aa980fb7b64646c1373eecd788e508246b5ad84e31b1adbdd2c1.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid", - "Text" - ] - }, - "nullable": [] - }, - "hash": "14a68a119586aa980fb7b64646c1373eecd788e508246b5ad84e31b1adbdd2c1" -} diff --git a/.sqlx/query-244b55cedfe51f834337141d3bb00e48a1c9277be3e6f0e7e6231a0f3e53a7a4.json b/.sqlx/query-244b55cedfe51f834337141d3bb00e48a1c9277be3e6f0e7e6231a0f3e53a7a4.json new file mode 100644 index 0000000..cab1328 --- /dev/null +++ b/.sqlx/query-244b55cedfe51f834337141d3bb00e48a1c9277be3e6f0e7e6231a0f3e53a7a4.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "ByteaArray" + ] + }, + "nullable": [] + }, + "hash": "244b55cedfe51f834337141d3bb00e48a1c9277be3e6f0e7e6231a0f3e53a7a4" +} diff --git a/.sqlx/query-3567e730c1fe4dee7753a53b71c2c586335c795003ce6090fb5af2b107208305.json b/.sqlx/query-3567e730c1fe4dee7753a53b71c2c586335c795003ce6090fb5af2b107208305.json new file mode 100644 index 0000000..f305667 --- /dev/null +++ b/.sqlx/query-3567e730c1fe4dee7753a53b71c2c586335c795003ce6090fb5af2b107208305.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "3567e730c1fe4dee7753a53b71c2c586335c795003ce6090fb5af2b107208305" +} diff --git a/.sqlx/query-3f13f59e14ca24d4523be38a0b95d32a4a970f61c84f0539f4c4ee484afdce7d.json b/.sqlx/query-3f13f59e14ca24d4523be38a0b95d32a4a970f61c84f0539f4c4ee484afdce7d.json new file mode 100644 index 0000000..7d33a95 --- /dev/null +++ b/.sqlx/query-3f13f59e14ca24d4523be38a0b95d32a4a970f61c84f0539f4c4ee484afdce7d.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT u.id as user_id, r.repo_root_cid\n FROM users u\n JOIN repos r ON r.user_id = u.id\n WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id)\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "user_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "repo_root_cid", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "3f13f59e14ca24d4523be38a0b95d32a4a970f61c84f0539f4c4ee484afdce7d" +} diff --git a/.sqlx/query-49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f.json b/.sqlx/query-49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f.json new file mode 100644 index 0000000..26c08c7 --- /dev/null +++ b/.sqlx/query-49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "repo_root_cid", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "repo_rev", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false, + true + ] + }, + "hash": "49f01f438353a771fd42473fee5090f68e0083610d07e609825d528ef58ade1f" +} diff --git a/.sqlx/query-51da09ecbd806c8ee59acfbe333a3eace1c428f5bb5130dff0cccf14e4bdb4c1.json b/.sqlx/query-51da09ecbd806c8ee59acfbe333a3eace1c428f5bb5130dff0cccf14e4bdb4c1.json new file mode 100644 index 0000000..e0d2f15 --- /dev/null +++ b/.sqlx/query-51da09ecbd806c8ee59acfbe333a3eace1c428f5bb5130dff0cccf14e4bdb4c1.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO repo_seq (did, event_type, active, status)\n VALUES ($1, 'account', false, 'deleted')\n RETURNING seq\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "seq", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Text" + ] + }, + "nullable": [ + false + ] + }, + "hash": "51da09ecbd806c8ee59acfbe333a3eace1c428f5bb5130dff0cccf14e4bdb4c1" +} diff --git a/.sqlx/query-6b3704b48a690ea278019a70a977737de7f6dc39c3f2509b55bb6c4580e3d2ee.json b/.sqlx/query-6b3704b48a690ea278019a70a977737de7f6dc39c3f2509b55bb6c4580e3d2ee.json new file mode 100644 index 0000000..19f42d5 --- /dev/null +++ b/.sqlx/query-6b3704b48a690ea278019a70a977737de7f6dc39c3f2509b55bb6c4580e3d2ee.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "repo_root_cid", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "repo_rev", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + true + ] + }, + "hash": "6b3704b48a690ea278019a70a977737de7f6dc39c3f2509b55bb6c4580e3d2ee" +} diff --git a/.sqlx/query-908e74d3c4c6e429133adb7074dcfe52980f0e02f2908b17cdd00fc679e6da36.json b/.sqlx/query-908e74d3c4c6e429133adb7074dcfe52980f0e02f2908b17cdd00fc679e6da36.json new file mode 100644 index 0000000..3189208 --- /dev/null +++ b/.sqlx/query-908e74d3c4c6e429133adb7074dcfe52980f0e02f2908b17cdd00fc679e6da36.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "908e74d3c4c6e429133adb7074dcfe52980f0e02f2908b17cdd00fc679e6da36" +} diff --git a/.sqlx/query-94683841b256b65ed2ac4806206faf7edc34b5952143334b8fc834350894478f.json b/.sqlx/query-94683841b256b65ed2ac4806206faf7edc34b5952143334b8fc834350894478f.json new file mode 100644 index 0000000..0e6be84 --- /dev/null +++ b/.sqlx/query-94683841b256b65ed2ac4806206faf7edc34b5952143334b8fc834350894478f.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Text" + ] + }, + "nullable": [] + }, + "hash": "94683841b256b65ed2ac4806206faf7edc34b5952143334b8fc834350894478f" +} diff --git a/.sqlx/query-978ec276ffa89b539b5365e8106f0f78b7dd5d3d50162deb535c583796afe192.json b/.sqlx/query-978ec276ffa89b539b5365e8106f0f78b7dd5d3d50162deb535c583796afe192.json new file mode 100644 index 0000000..e9c33f5 --- /dev/null +++ b/.sqlx/query-978ec276ffa89b539b5365e8106f0f78b7dd5d3d50162deb535c583796afe192.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "ByteaArray" + ] + }, + "nullable": [] + }, + "hash": "978ec276ffa89b539b5365e8106f0f78b7dd5d3d50162deb535c583796afe192" +} diff --git a/.sqlx/query-9c1d6f38011f8070e058ef4c9100ebe833c85fe4aa1b77af1ce67dd8fcda507a.json b/.sqlx/query-9c1d6f38011f8070e058ef4c9100ebe833c85fe4aa1b77af1ce67dd8fcda507a.json new file mode 100644 index 0000000..8be6f58 --- /dev/null +++ b/.sqlx/query-9c1d6f38011f8070e058ef4c9100ebe833c85fe4aa1b77af1ce67dd8fcda507a.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT did, handle\n FROM users\n WHERE delete_after IS NOT NULL\n AND delete_after < NOW()\n AND deactivated_at IS NOT NULL\n LIMIT 100\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "did", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "handle", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "9c1d6f38011f8070e058ef4c9100ebe833c85fe4aa1b77af1ce67dd8fcda507a" +} diff --git a/.sqlx/query-9f461c44be23d43feb8491422dd5008e3a32ba603f09fcdbbc29bf23cb870444.json b/.sqlx/query-9f461c44be23d43feb8491422dd5008e3a32ba603f09fcdbbc29bf23cb870444.json new file mode 100644 index 0000000..904728d --- /dev/null +++ b/.sqlx/query-9f461c44be23d43feb8491422dd5008e3a32ba603f09fcdbbc29bf23cb870444.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO user_blocks (user_id, block_cid)\n SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid)\n ON CONFLICT (user_id, block_cid) DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "ByteaArray" + ] + }, + "nullable": [] + }, + "hash": "9f461c44be23d43feb8491422dd5008e3a32ba603f09fcdbbc29bf23cb870444" +} diff --git a/.sqlx/query-a9e604216b880a8e1be9b4cec84880febb5185f7b7babb616f9c0f1f7016f59e.json b/.sqlx/query-a9e604216b880a8e1be9b4cec84880febb5185f7b7babb616f9c0f1f7016f59e.json new file mode 100644 index 0000000..fd81a47 --- /dev/null +++ b/.sqlx/query-a9e604216b880a8e1be9b4cec84880febb5185f7b7babb616f9c0f1f7016f59e.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Text", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "a9e604216b880a8e1be9b4cec84880febb5185f7b7babb616f9c0f1f7016f59e" +} diff --git a/.sqlx/query-b6d6548acb89d6384cd226f6ed0d66de27fde3af24b4a7a3fce7e098812e38a5.json b/.sqlx/query-b6d6548acb89d6384cd226f6ed0d66de27fde3af24b4a7a3fce7e098812e38a5.json new file mode 100644 index 0000000..09f619e --- /dev/null +++ b/.sqlx/query-b6d6548acb89d6384cd226f6ed0d66de27fde3af24b4a7a3fce7e098812e38a5.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "b6d6548acb89d6384cd226f6ed0d66de27fde3af24b4a7a3fce7e098812e38a5" +} diff --git a/.sqlx/query-b7432d134013ff1f64389dda715ae0c23e0095f42585e2ecb962422d9a45ef17.json b/.sqlx/query-b7432d134013ff1f64389dda715ae0c23e0095f42585e2ecb962422d9a45ef17.json new file mode 100644 index 0000000..4b2dfbc --- /dev/null +++ b/.sqlx/query-b7432d134013ff1f64389dda715ae0c23e0095f42585e2ecb962422d9a45ef17.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "b7432d134013ff1f64389dda715ae0c23e0095f42585e2ecb962422d9a45ef17" +} diff --git a/.sqlx/query-2588479ef83ed45a5d0dee599636f195ca38c5df164e225dcb1b829b497c8f14.json b/.sqlx/query-b8de174efc5f897e688bc1fb5c49a10530815dd4737e4c4b821f5b26756b63ba.json similarity index 54% rename from .sqlx/query-2588479ef83ed45a5d0dee599636f195ca38c5df164e225dcb1b829b497c8f14.json rename to .sqlx/query-b8de174efc5f897e688bc1fb5c49a10530815dd4737e4c4b821f5b26756b63ba.json index 8900d0a..ec340ec 100644 --- a/.sqlx/query-2588479ef83ed45a5d0dee599636f195ca38c5df164e225dcb1b829b497c8f14.json +++ b/.sqlx/query-b8de174efc5f897e688bc1fb5c49a10530815dd4737e4c4b821f5b26756b63ba.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", + "query": "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", "describe": { "columns": [], "parameters": { @@ -11,5 +11,5 @@ }, "nullable": [] }, - "hash": "2588479ef83ed45a5d0dee599636f195ca38c5df164e225dcb1b829b497c8f14" + "hash": "b8de174efc5f897e688bc1fb5c49a10530815dd4737e4c4b821f5b26756b63ba" } diff --git a/.sqlx/query-53b0ea60a759f8bb37d01461fd0769dcc683e796287e41d5180340296286fcbe.json b/.sqlx/query-c9067e3e62c22fe92a135fa0c6c2b06cad977bf73bf3bb0fd3fc88938d875637.json similarity index 58% rename from .sqlx/query-53b0ea60a759f8bb37d01461fd0769dcc683e796287e41d5180340296286fcbe.json rename to .sqlx/query-c9067e3e62c22fe92a135fa0c6c2b06cad977bf73bf3bb0fd3fc88938d875637.json index 40c18af..5732741 100644 --- a/.sqlx/query-53b0ea60a759f8bb37d01461fd0769dcc683e796287e41d5180340296286fcbe.json +++ b/.sqlx/query-c9067e3e62c22fe92a135fa0c6c2b06cad977bf73bf3bb0fd3fc88938d875637.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids)\n VALUES ($1, 'commit', $2, $2, $3, $4, $5)\n RETURNING seq\n ", + "query": "\n INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev)\n VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7)\n RETURNING seq\n ", "describe": { "columns": [ { @@ -11,16 +11,18 @@ ], "parameters": { "Left": [ + "Text", "Text", "Text", "Jsonb", "TextArray", - "TextArray" + "TextArray", + "Text" ] }, "nullable": [ false ] }, - "hash": "53b0ea60a759f8bb37d01461fd0769dcc683e796287e41d5180340296286fcbe" + "hash": "c9067e3e62c22fe92a135fa0c6c2b06cad977bf73bf3bb0fd3fc88938d875637" } diff --git a/.sqlx/query-f1e88d447915b116f887c378253388654a783bddb111b1f9aa04507f176980d3.json b/.sqlx/query-f1e88d447915b116f887c378253388654a783bddb111b1f9aa04507f176980d3.json deleted file mode 100644 index a54a145..0000000 --- a/.sqlx/query-f1e88d447915b116f887c378253388654a783bddb111b1f9aa04507f176980d3.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Text", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "f1e88d447915b116f887c378253388654a783bddb111b1f9aa04507f176980d3" -} diff --git a/.sqlx/query-f59010ecdd7f782489e0e03288a06dacd72b33d04c1e2b98475018ad25485852.json b/.sqlx/query-f59010ecdd7f782489e0e03288a06dacd72b33d04c1e2b98475018ad25485852.json new file mode 100644 index 0000000..218f7cd --- /dev/null +++ b/.sqlx/query-f59010ecdd7f782489e0e03288a06dacd72b33d04c1e2b98475018ad25485852.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT storage_key as \"storage_key!\" FROM blobs WHERE created_by_user = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "storage_key!", + "type_info": "Text" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false + ] + }, + "hash": "f59010ecdd7f782489e0e03288a06dacd72b33d04c1e2b98475018ad25485852" +} diff --git a/.sqlx/query-f90c58a4e9dc9c28a682405fb7d5421853c6ef710bee0170430416485f41a0c3.json b/.sqlx/query-f90c58a4e9dc9c28a682405fb7d5421853c6ef710bee0170430416485f41a0c3.json new file mode 100644 index 0000000..b4f38c7 --- /dev/null +++ b/.sqlx/query-f90c58a4e9dc9c28a682405fb7d5421853c6ef710bee0170430416485f41a0c3.json @@ -0,0 +1,26 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "user_id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "repo_root_cid", + "type_info": "Text" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false, + false + ] + }, + "hash": "f90c58a4e9dc9c28a682405fb7d5421853c6ef710bee0170430416485f41a0c3" +} diff --git a/migrations/20251239_add_delete_after.sql b/migrations/20251239_add_delete_after.sql new file mode 100644 index 0000000..d893d1a --- /dev/null +++ b/migrations/20251239_add_delete_after.sql @@ -0,0 +1 @@ +ALTER TABLE users ADD COLUMN IF NOT EXISTS delete_after TIMESTAMPTZ; diff --git a/migrations/20251240_add_block_count.sql b/migrations/20251240_add_block_count.sql new file mode 100644 index 0000000..6b46cb5 --- /dev/null +++ b/migrations/20251240_add_block_count.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS user_blocks ( + user_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE, + block_cid BYTEA NOT NULL, + PRIMARY KEY (user_id, block_cid) +); + +CREATE INDEX IF NOT EXISTS idx_user_blocks_user_id ON user_blocks(user_id); diff --git a/src/api/delegation.rs b/src/api/delegation.rs index f62abdd..85b26f6 100644 --- a/src/api/delegation.rs +++ b/src/api/delegation.rs @@ -886,10 +886,12 @@ pub async fn create_delegated_account( } }; let commit_cid_str = commit_cid.to_string(); + let rev_str = rev.as_ref().to_string(); if let Err(e) = sqlx::query!( - "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", + "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", user_id, - commit_cid_str + commit_cid_str, + rev_str ) .execute(&mut *tx) .await @@ -901,6 +903,26 @@ pub async fn create_delegated_account( ) .into_response(); } + let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; + if let Err(e) = sqlx::query!( + r#" + INSERT INTO user_blocks (user_id, block_cid) + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) + ON CONFLICT (user_id, block_cid) DO NOTHING + "#, + user_id, + &genesis_block_cids + ) + .execute(&mut *tx) + .await + { + error!("Error inserting user_blocks: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } if let Some(ref code) = input.invite_code { let _ = sqlx::query!( diff --git a/src/api/identity/account.rs b/src/api/identity/account.rs index c052eff..d751aa6 100644 --- a/src/api/identity/account.rs +++ b/src/api/identity/account.rs @@ -57,9 +57,9 @@ pub struct CreateAccountOutput { pub handle: String, pub did: String, #[serde(skip_serializing_if = "Option::is_none")] - pub access_jwt: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub refresh_jwt: Option, + pub did_doc: Option, + pub access_jwt: String, + pub refresh_jwt: String, pub verification_required: bool, pub verification_channel: String, } @@ -624,9 +624,10 @@ pub async fn create_account( StatusCode::OK, Json(CreateAccountOutput { handle: handle.clone(), - did, - access_jwt: Some(access_meta.token), - refresh_jwt: Some(refresh_meta.token), + did: did.clone(), + did_doc: state.did_resolver.resolve_did_document(&did).await, + access_jwt: access_meta.token, + refresh_jwt: refresh_meta.token, verification_required: false, verification_channel: "email".to_string(), }), @@ -912,10 +913,12 @@ pub async fn create_account( } }; let commit_cid_str = commit_cid.to_string(); + let rev_str = rev.as_ref().to_string(); let repo_insert = sqlx::query!( - "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", + "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", user_id, - commit_cid_str + commit_cid_str, + rev_str ) .execute(&mut *tx) .await; @@ -927,6 +930,26 @@ pub async fn create_account( ) .into_response(); } + let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; + if let Err(e) = sqlx::query!( + r#" + INSERT INTO user_blocks (user_id, block_cid) + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) + ON CONFLICT (user_id, block_cid) DO NOTHING + "#, + user_id, + &genesis_block_cids + ) + .execute(&mut *tx) + .await + { + error!("Error inserting user_blocks: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } if let Some(code) = &input.invite_code && !code.trim().is_empty() { @@ -965,6 +988,21 @@ pub async fn create_account( { warn!("Failed to sequence account event for {}: {}", did, e); } + if let Err(e) = + crate::api::repo::record::sequence_empty_commit_event(&state, &did).await + { + warn!("Failed to sequence commit event for {}: {}", did, e); + } + if let Err(e) = crate::api::repo::record::sequence_sync_event( + &state, + &did, + &commit_cid_str, + Some(rev.as_ref()), + ) + .await + { + warn!("Failed to sequence sync event for {}: {}", did, e); + } let profile_record = json!({ "$type": "app.bsky.actor.profile", "displayName": input.handle @@ -1023,71 +1061,50 @@ pub async fn create_account( } } - let (access_jwt, refresh_jwt) = if is_migration { - info!( - "[MIGRATION] createAccount: Creating session tokens for migration did={}", - did - ); - let access_meta = match crate::auth::create_access_token_with_metadata( - &did, - &secret_key_bytes, - ) { - Ok(m) => m, - Err(e) => { - error!( - "[MIGRATION] createAccount: Error creating access token for migration: {:?}", - e - ); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({"error": "InternalError"})), - ) - .into_response(); - } - }; - let refresh_meta = match crate::auth::create_refresh_token_with_metadata( - &did, - &secret_key_bytes, - ) { - Ok(m) => m, - Err(e) => { - error!( - "[MIGRATION] createAccount: Error creating refresh token for migration: {:?}", - e - ); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!({"error": "InternalError"})), - ) - .into_response(); - } - }; - if let Err(e) = sqlx::query!( - "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", - did, - access_meta.jti, - refresh_meta.jti, - access_meta.expires_at, - refresh_meta.expires_at - ) - .execute(&state.db) - .await - { - error!("[MIGRATION] createAccount: Error creating session for migration: {:?}", e); + let access_meta = match crate::auth::create_access_token_with_metadata(&did, &secret_key_bytes) + { + Ok(m) => m, + Err(e) => { + error!("createAccount: Error creating access token: {:?}", e); return ( StatusCode::INTERNAL_SERVER_ERROR, Json(json!({"error": "InternalError"})), ) .into_response(); } - info!( - "[MIGRATION] createAccount: Session created successfully for did={}", - did - ); - (Some(access_meta.token), Some(refresh_meta.token)) - } else { - (None, None) }; + let refresh_meta = + match crate::auth::create_refresh_token_with_metadata(&did, &secret_key_bytes) { + Ok(m) => m, + Err(e) => { + error!("createAccount: Error creating refresh token: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + }; + if let Err(e) = sqlx::query!( + "INSERT INTO session_tokens (did, access_jti, refresh_jti, access_expires_at, refresh_expires_at) VALUES ($1, $2, $3, $4, $5)", + did, + access_meta.jti, + refresh_meta.jti, + access_meta.expires_at, + refresh_meta.expires_at + ) + .execute(&state.db) + .await + { + error!("createAccount: Error creating session: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } + + let did_doc = state.did_resolver.resolve_did_document(&did).await; if is_migration { info!( @@ -1101,11 +1118,13 @@ pub async fn create_account( Json(CreateAccountOutput { handle: handle.clone(), did, - access_jwt, - refresh_jwt, + did_doc, + access_jwt: access_meta.token, + refresh_jwt: refresh_meta.token, verification_required: !is_migration, verification_channel: verification_channel.to_string(), }), ) .into_response() } + diff --git a/src/api/repo/import.rs b/src/api/repo/import.rs index fc703ed..c6c3c48 100644 --- a/src/api/repo/import.rs +++ b/src/api/repo/import.rs @@ -315,7 +315,7 @@ pub async fn import_repo( .ok() .and_then(|s| s.parse().ok()) .unwrap_or(DEFAULT_MAX_BLOCKS); - match apply_import(&state.db, user_id, root, blocks, max_blocks).await { + match apply_import(&state.db, user_id, root, blocks.clone(), max_blocks).await { Ok(import_result) => { info!( "Successfully imported {} records for user {}", @@ -405,8 +405,9 @@ pub async fn import_repo( }; let new_root_str = new_root_cid.to_string(); if let Err(e) = sqlx::query!( - "UPDATE repos SET repo_root_cid = $1, updated_at = NOW() WHERE user_id = $2", + "UPDATE repos SET repo_root_cid = $1, repo_rev = $2, updated_at = NOW() WHERE user_id = $3", new_root_str, + &new_rev_str, user_id ) .execute(&state.db) @@ -419,6 +420,27 @@ pub async fn import_repo( ) .into_response(); } + let mut all_block_cids: Vec> = blocks.keys().map(|c| c.to_bytes()).collect(); + all_block_cids.push(new_root_cid.to_bytes()); + if let Err(e) = sqlx::query!( + r#" + INSERT INTO user_blocks (user_id, block_cid) + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) + ON CONFLICT (user_id, block_cid) DO NOTHING + "#, + user_id, + &all_block_cids + ) + .execute(&state.db) + .await + { + error!("Failed to insert user_blocks: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } info!( "Created new commit for imported repo: cid={}, rev={}", new_root_str, new_rev_str diff --git a/src/api/repo/record/utils.rs b/src/api/repo/record/utils.rs index bbb0805..6ea56f1 100644 --- a/src/api/repo/record/utils.rs +++ b/src/api/repo/record/utils.rs @@ -173,13 +173,34 @@ pub async fn commit_and_log( .flatten() .unwrap_or(false); sqlx::query!( - "UPDATE repos SET repo_root_cid = $1 WHERE user_id = $2", + "UPDATE repos SET repo_root_cid = $1, repo_rev = $2 WHERE user_id = $3", new_root_cid.to_string(), + &rev_str, user_id ) .execute(&mut *tx) .await .map_err(|e| format!("DB Error (repos): {}", e))?; + let mut all_block_cids: Vec> = blocks_cids + .iter() + .filter_map(|s| Cid::from_str(s).ok()) + .map(|c| c.to_bytes()) + .collect(); + all_block_cids.push(new_root_cid.to_bytes()); + if !all_block_cids.is_empty() { + sqlx::query!( + r#" + INSERT INTO user_blocks (user_id, block_cid) + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) + ON CONFLICT (user_id, block_cid) DO NOTHING + "#, + user_id, + &all_block_cids + ) + .execute(&mut *tx) + .await + .map_err(|e| format!("DB Error (user_blocks): {}", e))?; + } let mut upsert_collections: Vec = Vec::new(); let mut upsert_rkeys: Vec = Vec::new(); let mut upsert_cids: Vec = Vec::new(); @@ -492,8 +513,8 @@ pub async fn sequence_sync_event( } pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result { - let repo_root = sqlx::query_scalar!( - "SELECT r.repo_root_cid FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", + let repo_info = sqlx::query!( + "SELECT r.repo_root_cid, r.repo_rev FROM repos r JOIN users u ON r.user_id = u.id WHERE u.did = $1", did ) .fetch_optional(&state.db) @@ -503,17 +524,20 @@ pub async fn sequence_empty_commit_event(state: &AppState, did: &str) -> Result< let ops = serde_json::json!([]); let blobs: Vec = vec![]; let blocks_cids: Vec = vec![]; + let prev_cid: Option<&str> = None; let seq_row = sqlx::query!( r#" - INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids) - VALUES ($1, 'commit', $2, $2, $3, $4, $5) + INSERT INTO repo_seq (did, event_type, commit_cid, prev_cid, ops, blobs, blocks_cids, rev) + VALUES ($1, 'commit', $2, $3::TEXT, $4, $5, $6, $7) RETURNING seq "#, did, - repo_root, + repo_info.repo_root_cid, + prev_cid, ops, &blobs, - &blocks_cids + &blocks_cids, + repo_info.repo_rev ) .fetch_one(&state.db) .await diff --git a/src/api/server/account_status.rs b/src/api/server/account_status.rs index 07aa08d..f1e7bad 100644 --- a/src/api/server/account_status.rs +++ b/src/api/server/account_status.rs @@ -83,14 +83,38 @@ pub async fn check_account_status( _ => None, }; let repo_result = sqlx::query!( - "SELECT repo_root_cid FROM repos WHERE user_id = $1", + "SELECT repo_root_cid, repo_rev FROM repos WHERE user_id = $1", user_id ) .fetch_optional(&state.db) .await; - let repo_commit = match repo_result { - Ok(Some(row)) => row.repo_root_cid, - _ => String::new(), + let (repo_commit, repo_rev_from_db) = match repo_result { + Ok(Some(row)) => (row.repo_root_cid, row.repo_rev), + _ => (String::new(), None), + }; + let block_count: i64 = + sqlx::query_scalar!("SELECT COUNT(*) FROM user_blocks WHERE user_id = $1", user_id) + .fetch_one(&state.db) + .await + .unwrap_or(Some(0)) + .unwrap_or(0); + let repo_rev = if let Some(rev) = repo_rev_from_db { + rev + } else if !repo_commit.is_empty() { + if let Ok(cid) = Cid::from_str(&repo_commit) { + if let Ok(Some(block)) = state.block_store.get(&cid).await { + Commit::from_cbor(&block) + .ok() + .map(|c| c.rev().to_string()) + .unwrap_or_default() + } else { + String::new() + } + } else { + String::new() + } + } else { + String::new() }; let record_count: i64 = sqlx::query_scalar!("SELECT COUNT(*) FROM records WHERE repo_id = $1", user_id) @@ -106,15 +130,15 @@ pub async fn check_account_status( .await .unwrap_or(Some(0)) .unwrap_or(0); - let valid_did = did.starts_with("did:"); + let valid_did = is_valid_did_for_service(&state.db, &did).await; ( StatusCode::OK, Json(CheckAccountStatusOutput { activated: deactivated_at.is_none(), valid_did, repo_commit: repo_commit.clone(), - repo_rev: chrono::Utc::now().timestamp_millis().to_string(), - repo_blocks: 0, + repo_rev, + repo_blocks: block_count as i64, indexed_records: record_count, private_state_values: 0, expected_blobs: blob_count, @@ -124,9 +148,16 @@ pub async fn check_account_status( .into_response() } +async fn is_valid_did_for_service(db: &sqlx::PgPool, did: &str) -> bool { + assert_valid_did_document_for_service(db, did, false) + .await + .is_ok() +} + async fn assert_valid_did_document_for_service( db: &sqlx::PgPool, did: &str, + with_retry: bool, ) -> Result<(), (StatusCode, Json)> { let hostname = std::env::var("PDS_HOSTNAME").unwrap_or_else(|_| "localhost".to_string()); let expected_endpoint = format!("https://{}", hostname); @@ -134,9 +165,10 @@ async fn assert_valid_did_document_for_service( if did.starts_with("did:plc:") { let plc_client = PlcClient::new(None); + let max_attempts = if with_retry { 5 } else { 1 }; let mut last_error = None; let mut doc_data = None; - for attempt in 0..5 { + for attempt in 0..max_attempts { if attempt > 0 { let delay_ms = 500 * (1 << (attempt - 1)); info!( @@ -196,6 +228,28 @@ async fn assert_valid_did_document_for_service( } }; + let server_rotation_key = std::env::var("PLC_ROTATION_KEY").ok(); + if let Some(ref expected_rotation_key) = server_rotation_key { + let rotation_keys = doc_data + .get("rotationKeys") + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|k| k.as_str()) + .collect::>() + }) + .unwrap_or_default(); + if !rotation_keys.contains(&expected_rotation_key.as_str()) { + return Err(( + StatusCode::BAD_REQUEST, + Json(json!({ + "error": "InvalidRequest", + "message": "Server rotation key not included in PLC DID data" + })), + )); + } + } + let doc_signing_key = doc_data .get("verificationMethods") .and_then(|v| v.get("atproto")) @@ -378,7 +432,7 @@ pub async fn activate_account( did ); let did_validation_start = std::time::Instant::now(); - if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did).await { + if let Err((status, json)) = assert_valid_did_document_for_service(&state.db, &did, true).await { info!( "[MIGRATION] activateAccount: DID document validation FAILED for {} (took {:?})", did, @@ -511,7 +565,7 @@ pub struct DeactivateAccountInput { pub async fn deactivate_account( State(state): State, headers: axum::http::HeaderMap, - Json(_input): Json, + Json(input): Json, ) -> Response { let extracted = match crate::auth::extract_auth_token_from_header( headers.get("Authorization").and_then(|h| h.to_str().ok()), @@ -548,6 +602,12 @@ pub async fn deactivate_account( return e; } + let delete_after: Option> = input + .delete_after + .as_ref() + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&chrono::Utc)); + let did = auth_user.did; let handle = sqlx::query_scalar!("SELECT handle FROM users WHERE did = $1", did) .fetch_optional(&state.db) @@ -555,8 +615,9 @@ pub async fn deactivate_account( .ok() .flatten(); let result = sqlx::query!( - "UPDATE users SET deactivated_at = NOW() WHERE did = $1", - did + "UPDATE users SET deactivated_at = NOW(), delete_after = $2 WHERE did = $1", + did, + delete_after ) .execute(&state.db) .await; @@ -693,6 +754,14 @@ pub async fn delete_account( ) .into_response(); } + const OLD_PASSWORD_MAX_LENGTH: usize = 512; + if password.len() > OLD_PASSWORD_MAX_LENGTH { + return ( + StatusCode::BAD_REQUEST, + Json(json!({"error": "InvalidRequest", "message": "Invalid password length."})), + ) + .into_response(); + } if token.is_empty() { return ( StatusCode::BAD_REQUEST, @@ -842,18 +911,35 @@ pub async fn delete_account( ) .into_response(); } - if let Err(e) = crate::api::repo::record::sequence_account_event( + let account_seq = crate::api::repo::record::sequence_account_event( &state, did, false, Some("deleted"), ) - .await - { - warn!( - "Failed to sequence account deletion event for {}: {}", - did, e - ); + .await; + match account_seq { + Ok(seq) => { + if let Err(e) = sqlx::query!( + "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", + did, + seq + ) + .execute(&state.db) + .await + { + warn!( + "Failed to cleanup sequences for deleted account {}: {}", + did, e + ); + } + } + Err(e) => { + warn!( + "Failed to sequence account deletion event for {}: {}", + did, e + ); + } } let _ = state.cache.delete(&format!("handle:{}", handle)).await; info!("Account {} deleted successfully", did); diff --git a/src/api/server/meta.rs b/src/api/server/meta.rs index 4245a79..2305f29 100644 --- a/src/api/server/meta.rs +++ b/src/api/server/meta.rs @@ -32,10 +32,20 @@ pub async fn describe_server() -> impl IntoResponse { let invite_code_required = std::env::var("INVITE_CODE_REQUIRED") .map(|v| v == "true" || v == "1") .unwrap_or(false); + let privacy_policy = std::env::var("PRIVACY_POLICY_URL").ok(); + let terms_of_service = std::env::var("TERMS_OF_SERVICE_URL").ok(); + let contact_email = std::env::var("CONTACT_EMAIL").ok(); Json(json!({ "availableUserDomains": domains, "inviteCodeRequired": invite_code_required, "did": format!("did:web:{}", pds_hostname), + "links": { + "privacyPolicy": privacy_policy, + "termsOfService": terms_of_service + }, + "contact": { + "email": contact_email + }, "version": env!("CARGO_PKG_VERSION"), "availableCommsChannels": get_available_comms_channels() })) diff --git a/src/api/server/passkey_account.rs b/src/api/server/passkey_account.rs index 9b5350b..a164690 100644 --- a/src/api/server/passkey_account.rs +++ b/src/api/server/passkey_account.rs @@ -612,10 +612,12 @@ pub async fn create_passkey_account( } }; let commit_cid_str = commit_cid.to_string(); + let rev_str = rev.as_ref().to_string(); if let Err(e) = sqlx::query!( - "INSERT INTO repos (user_id, repo_root_cid) VALUES ($1, $2)", + "INSERT INTO repos (user_id, repo_root_cid, repo_rev) VALUES ($1, $2, $3)", user_id, - commit_cid_str + commit_cid_str, + rev_str ) .execute(&mut *tx) .await @@ -627,6 +629,26 @@ pub async fn create_passkey_account( ) .into_response(); } + let genesis_block_cids = vec![mst_root.to_bytes(), commit_cid.to_bytes()]; + if let Err(e) = sqlx::query!( + r#" + INSERT INTO user_blocks (user_id, block_cid) + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) + ON CONFLICT (user_id, block_cid) DO NOTHING + "#, + user_id, + &genesis_block_cids + ) + .execute(&mut *tx) + .await + { + error!("Error inserting user_blocks: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!({"error": "InternalError"})), + ) + .into_response(); + } if let Some(ref code) = input.invite_code { let _ = sqlx::query!( diff --git a/src/lib.rs b/src/lib.rs index d61c6f2..6ded3be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,7 @@ pub mod oauth; pub mod plc; pub mod rate_limit; pub mod repo; +pub mod scheduled; pub mod state; pub mod storage; pub mod sync; diff --git a/src/main.rs b/src/main.rs index bf0764c..7c54349 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ use tokio::sync::watch; use tracing::{error, info, warn}; use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; +use tranquil_pds::scheduled::{backfill_repo_rev, backfill_user_blocks, start_scheduled_tasks}; use tranquil_pds::state::AppState; #[tokio::main] @@ -28,6 +29,13 @@ async fn run() -> Result<(), Box> { let (shutdown_tx, shutdown_rx) = watch::channel(false); + let backfill_db = state.db.clone(); + let backfill_block_store = state.block_store.clone(); + tokio::spawn(async move { + backfill_repo_rev(&backfill_db, backfill_block_store.clone()).await; + backfill_user_blocks(&backfill_db, backfill_block_store).await; + }); + let mut comms_service = CommsService::new(state.db.clone()); if let Some(email_sender) = EmailSender::from_env() { @@ -63,13 +71,19 @@ async fn run() -> Result<(), Box> { Some(tokio::spawn(start_crawlers_service( crawlers, firehose_rx, - shutdown_rx, + shutdown_rx.clone(), ))) } else { warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); None }; + let scheduled_handle = tokio::spawn(start_scheduled_tasks( + state.db.clone(), + state.blob_store.clone(), + shutdown_rx, + )); + let app = tranquil_pds::app(state); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); info!("listening on {}", addr); @@ -88,6 +102,8 @@ async fn run() -> Result<(), Box> { handle.await.ok(); } + scheduled_handle.await.ok(); + if let Err(e) = server_result { return Err(format!("Server error: {}", e).into()); } diff --git a/src/scheduled.rs b/src/scheduled.rs new file mode 100644 index 0000000..0f39bd0 --- /dev/null +++ b/src/scheduled.rs @@ -0,0 +1,368 @@ +use cid::Cid; +use jacquard_repo::commit::Commit; +use jacquard_repo::storage::BlockStore; +use ipld_core::ipld::Ipld; +use sqlx::PgPool; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::watch; +use tokio::time::interval; +use tracing::{debug, error, info, warn}; + +use crate::repo::PostgresBlockStore; +use crate::storage::BlobStorage; + +pub async fn backfill_repo_rev(db: &PgPool, block_store: PostgresBlockStore) { + let repos_missing_rev = match sqlx::query!( + "SELECT user_id, repo_root_cid FROM repos WHERE repo_rev IS NULL" + ) + .fetch_all(db) + .await + { + Ok(rows) => rows, + Err(e) => { + error!("Failed to query repos for backfill: {}", e); + return; + } + }; + + if repos_missing_rev.is_empty() { + debug!("No repos need repo_rev backfill"); + return; + } + + info!( + count = repos_missing_rev.len(), + "Backfilling repo_rev for existing repos" + ); + + let mut success = 0; + let mut failed = 0; + + for repo in repos_missing_rev { + let cid = match Cid::from_str(&repo.repo_root_cid) { + Ok(c) => c, + Err(_) => { + failed += 1; + continue; + } + }; + + let block = match block_store.get(&cid).await { + Ok(Some(b)) => b, + _ => { + failed += 1; + continue; + } + }; + + let commit = match Commit::from_cbor(&block) { + Ok(c) => c, + Err(_) => { + failed += 1; + continue; + } + }; + + let rev = commit.rev().to_string(); + + if let Err(e) = sqlx::query!( + "UPDATE repos SET repo_rev = $1 WHERE user_id = $2", + rev, + repo.user_id + ) + .execute(db) + .await + { + warn!(user_id = %repo.user_id, error = %e, "Failed to update repo_rev"); + failed += 1; + } else { + success += 1; + } + } + + info!(success, failed, "Completed repo_rev backfill"); +} + +pub async fn backfill_user_blocks(db: &PgPool, block_store: PostgresBlockStore) { + let users_without_blocks = match sqlx::query!( + r#" + SELECT u.id as user_id, r.repo_root_cid + FROM users u + JOIN repos r ON r.user_id = u.id + WHERE NOT EXISTS (SELECT 1 FROM user_blocks ub WHERE ub.user_id = u.id) + "# + ) + .fetch_all(db) + .await + { + Ok(rows) => rows, + Err(e) => { + error!("Failed to query users for user_blocks backfill: {}", e); + return; + } + }; + + if users_without_blocks.is_empty() { + debug!("No users need user_blocks backfill"); + return; + } + + info!( + count = users_without_blocks.len(), + "Backfilling user_blocks for existing repos" + ); + + let mut success = 0; + let mut failed = 0; + + for user in users_without_blocks { + let root_cid = match Cid::from_str(&user.repo_root_cid) { + Ok(c) => c, + Err(_) => { + failed += 1; + continue; + } + }; + + let mut block_cids: Vec> = Vec::new(); + let mut to_visit = vec![root_cid]; + let mut visited = std::collections::HashSet::new(); + + while let Some(cid) = to_visit.pop() { + if visited.contains(&cid) { + continue; + } + visited.insert(cid); + block_cids.push(cid.to_bytes()); + + let block = match block_store.get(&cid).await { + Ok(Some(b)) => b, + _ => continue, + }; + + if let Ok(commit) = Commit::from_cbor(&block) { + to_visit.push(commit.data); + if let Some(prev) = commit.prev { + to_visit.push(prev); + } + } else if let Ok(ipld) = serde_ipld_dagcbor::from_slice::(&block) { + if let Ipld::Map(ref obj) = ipld { + if let Some(Ipld::Link(left_cid)) = obj.get("l") { + to_visit.push(*left_cid); + } + if let Some(Ipld::List(entries)) = obj.get("e") { + for entry in entries { + if let Ipld::Map(entry_obj) = entry { + if let Some(Ipld::Link(tree_cid)) = entry_obj.get("t") { + to_visit.push(*tree_cid); + } + if let Some(Ipld::Link(val_cid)) = entry_obj.get("v") { + to_visit.push(*val_cid); + } + } + } + } + } + } + } + + if block_cids.is_empty() { + failed += 1; + continue; + } + + if let Err(e) = sqlx::query!( + r#" + INSERT INTO user_blocks (user_id, block_cid) + SELECT $1, block_cid FROM UNNEST($2::bytea[]) AS t(block_cid) + ON CONFLICT (user_id, block_cid) DO NOTHING + "#, + user.user_id, + &block_cids + ) + .execute(db) + .await + { + warn!(user_id = %user.user_id, error = %e, "Failed to backfill user_blocks"); + failed += 1; + } else { + info!(user_id = %user.user_id, block_count = block_cids.len(), "Backfilled user_blocks"); + success += 1; + } + } + + info!(success, failed, "Completed user_blocks backfill"); +} + +pub async fn start_scheduled_tasks( + db: PgPool, + blob_store: Arc, + mut shutdown_rx: watch::Receiver, +) { + let check_interval = Duration::from_secs( + std::env::var("SCHEDULED_DELETE_CHECK_INTERVAL_SECS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(3600), + ); + + info!( + check_interval_secs = check_interval.as_secs(), + "Starting scheduled tasks service" + ); + + let mut ticker = interval(check_interval); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + _ = shutdown_rx.changed() => { + if *shutdown_rx.borrow() { + info!("Scheduled tasks service shutting down"); + break; + } + } + _ = ticker.tick() => { + if let Err(e) = process_scheduled_deletions(&db, &blob_store).await { + error!("Error processing scheduled deletions: {}", e); + } + } + } + } +} + +async fn process_scheduled_deletions( + db: &PgPool, + blob_store: &Arc, +) -> Result<(), String> { + let accounts_to_delete = sqlx::query!( + r#" + SELECT did, handle + FROM users + WHERE delete_after IS NOT NULL + AND delete_after < NOW() + AND deactivated_at IS NOT NULL + LIMIT 100 + "# + ) + .fetch_all(db) + .await + .map_err(|e| format!("DB error fetching accounts to delete: {}", e))?; + + if accounts_to_delete.is_empty() { + debug!("No accounts scheduled for deletion"); + return Ok(()); + } + + info!( + count = accounts_to_delete.len(), + "Processing scheduled account deletions" + ); + + for account in accounts_to_delete { + if let Err(e) = delete_account_data(db, blob_store, &account.did, &account.handle).await { + warn!( + did = %account.did, + handle = %account.handle, + error = %e, + "Failed to delete scheduled account" + ); + } else { + info!( + did = %account.did, + handle = %account.handle, + "Successfully deleted scheduled account" + ); + } + } + + Ok(()) +} + +async fn delete_account_data( + db: &PgPool, + blob_store: &Arc, + did: &str, + _handle: &str, +) -> Result<(), String> { + let user_id: uuid::Uuid = sqlx::query_scalar!( + "SELECT id FROM users WHERE did = $1", + did + ) + .fetch_one(db) + .await + .map_err(|e| format!("DB error fetching user: {}", e))?; + + let blob_storage_keys: Vec = sqlx::query_scalar!( + r#"SELECT storage_key as "storage_key!" FROM blobs WHERE created_by_user = $1"#, + user_id + ) + .fetch_all(db) + .await + .map_err(|e| format!("DB error fetching blob keys: {}", e))?; + + for storage_key in &blob_storage_keys { + if let Err(e) = blob_store.delete(storage_key).await { + warn!( + storage_key = %storage_key, + error = %e, + "Failed to delete blob from storage (continuing anyway)" + ); + } + } + + let mut tx = db + .begin() + .await + .map_err(|e| format!("Failed to begin transaction: {}", e))?; + + sqlx::query!("DELETE FROM blobs WHERE created_by_user = $1", user_id) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to delete blobs: {}", e))?; + + sqlx::query!("DELETE FROM users WHERE id = $1", user_id) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to delete user: {}", e))?; + + let account_seq = sqlx::query_scalar!( + r#" + INSERT INTO repo_seq (did, event_type, active, status) + VALUES ($1, 'account', false, 'deleted') + RETURNING seq + "#, + did + ) + .fetch_one(&mut *tx) + .await + .map_err(|e| format!("Failed to sequence account deletion: {}", e))?; + + sqlx::query!( + "DELETE FROM repo_seq WHERE did = $1 AND seq != $2", + did, + account_seq + ) + .execute(&mut *tx) + .await + .map_err(|e| format!("Failed to cleanup sequences: {}", e))?; + + tx.commit() + .await + .map_err(|e| format!("Failed to commit transaction: {}", e))?; + + sqlx::query(&format!("NOTIFY repo_updates, '{}'", account_seq)) + .execute(db) + .await + .map_err(|e| format!("Failed to notify: {}", e))?; + + info!( + did = %did, + blob_count = blob_storage_keys.len(), + "Deleted account data including blobs from storage" + ); + + Ok(()) +} diff --git a/src/sync/frame.rs b/src/sync/frame.rs index 13176a8..18cf15f 100644 --- a/src/sync/frame.rs +++ b/src/sync/frame.rs @@ -101,6 +101,7 @@ pub struct CommitFrameBuilder { pub ops_json: serde_json::Value, pub blobs: Vec, pub time: chrono::DateTime, + pub rev: Option, } impl CommitFrameBuilder { @@ -122,7 +123,8 @@ impl CommitFrameBuilder { .iter() .filter_map(|s| Cid::from_str(s).ok()) .collect(); - let rev = placeholder_rev(); + let rev = self.rev.unwrap_or_else(placeholder_rev); + let since = self.prev_cid_str.as_ref().map(|_| rev.clone()); Ok(CommitFrame { seq: self.seq, rebase: false, @@ -130,7 +132,7 @@ impl CommitFrameBuilder { repo: self.did, commit: commit_cid, rev, - since: self.prev_cid_str.as_ref().map(|_| placeholder_rev()), + since, blocks: Vec::new(), ops, blobs, @@ -161,6 +163,7 @@ impl TryFrom for CommitFrame { ops_json: event.ops.unwrap_or_default(), blobs: event.blobs.unwrap_or_default(), time: event.created_at, + rev: event.rev, }; builder.build() } diff --git a/tests/account_lifecycle.rs b/tests/account_lifecycle.rs new file mode 100644 index 0000000..187410d --- /dev/null +++ b/tests/account_lifecycle.rs @@ -0,0 +1,279 @@ +mod common; +mod helpers; +use common::*; +use reqwest::StatusCode; +use serde_json::{Value, json}; + +#[tokio::test] +async fn test_check_account_status_returns_correct_block_count() { + let client = client(); + let base = base_url().await; + let (access_jwt, did) = create_account_and_login(&client).await; + + let status1 = client + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) + .bearer_auth(&access_jwt) + .send() + .await + .unwrap(); + assert_eq!(status1.status(), StatusCode::OK); + let body1: Value = status1.json().await.unwrap(); + let initial_blocks = body1["repoBlocks"].as_i64().unwrap(); + assert!(initial_blocks >= 2, "New account should have at least 2 blocks (commit + empty MST)"); + + let create_res = client + .post(format!("{}/xrpc/com.atproto.repo.createRecord", base)) + .bearer_auth(&access_jwt) + .json(&json!({ + "repo": did, + "collection": "app.bsky.feed.post", + "record": { + "$type": "app.bsky.feed.post", + "text": "Test post for block counting", + "createdAt": chrono::Utc::now().to_rfc3339() + } + })) + .send() + .await + .unwrap(); + assert_eq!(create_res.status(), StatusCode::OK); + let create_body: Value = create_res.json().await.unwrap(); + let rkey = create_body["uri"].as_str().unwrap().split('/').last().unwrap().to_string(); + + let status2 = client + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) + .bearer_auth(&access_jwt) + .send() + .await + .unwrap(); + let body2: Value = status2.json().await.unwrap(); + let after_create_blocks = body2["repoBlocks"].as_i64().unwrap(); + assert!(after_create_blocks > initial_blocks, "Block count should increase after creating a record"); + + let delete_res = client + .post(format!("{}/xrpc/com.atproto.repo.deleteRecord", base)) + .bearer_auth(&access_jwt) + .json(&json!({ + "repo": did, + "collection": "app.bsky.feed.post", + "rkey": rkey + })) + .send() + .await + .unwrap(); + assert_eq!(delete_res.status(), StatusCode::OK); + + let status3 = client + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) + .bearer_auth(&access_jwt) + .send() + .await + .unwrap(); + let body3: Value = status3.json().await.unwrap(); + let after_delete_blocks = body3["repoBlocks"].as_i64().unwrap(); + assert!( + after_delete_blocks >= after_create_blocks, + "Block count should not decrease after deleting a record (was {}, now {})", + after_create_blocks, + after_delete_blocks + ); +} + +#[tokio::test] +async fn test_check_account_status_returns_valid_repo_rev() { + let client = client(); + let base = base_url().await; + let (access_jwt, _) = create_account_and_login(&client).await; + + let status = client + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) + .bearer_auth(&access_jwt) + .send() + .await + .unwrap(); + assert_eq!(status.status(), StatusCode::OK); + let body: Value = status.json().await.unwrap(); + + let repo_rev = body["repoRev"].as_str().unwrap(); + assert!(!repo_rev.is_empty(), "repoRev should not be empty"); + assert!(repo_rev.chars().all(|c| c.is_alphanumeric()), "repoRev should be alphanumeric TID"); +} + +#[tokio::test] +async fn test_check_account_status_valid_did_is_true_for_active_account() { + let client = client(); + let base = base_url().await; + let (access_jwt, _) = create_account_and_login(&client).await; + + let status = client + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) + .bearer_auth(&access_jwt) + .send() + .await + .unwrap(); + assert_eq!(status.status(), StatusCode::OK); + let body: Value = status.json().await.unwrap(); + + assert_eq!(body["validDid"], true, "validDid should be true for active account with correct DID document"); + assert_eq!(body["activated"], true, "activated should be true for active account"); +} + +#[tokio::test] +async fn test_deactivate_account_with_delete_after() { + let client = client(); + let base = base_url().await; + let (access_jwt, _) = create_account_and_login(&client).await; + + let future_time = chrono::Utc::now() + chrono::Duration::hours(24); + let delete_after = future_time.to_rfc3339(); + + let deactivate = client + .post(format!("{}/xrpc/com.atproto.server.deactivateAccount", base)) + .bearer_auth(&access_jwt) + .json(&json!({ + "deleteAfter": delete_after + })) + .send() + .await + .unwrap(); + assert_eq!(deactivate.status(), StatusCode::OK); + + let status = client + .get(format!("{}/xrpc/com.atproto.server.checkAccountStatus", base)) + .bearer_auth(&access_jwt) + .send() + .await + .unwrap(); + assert_eq!(status.status(), StatusCode::OK); + let body: Value = status.json().await.unwrap(); + assert_eq!(body["activated"], false, "Account should be deactivated"); +} + +#[tokio::test] +async fn test_create_account_returns_did_doc() { + let client = client(); + let base = base_url().await; + + let handle = format!("diddoctest-{}", uuid::Uuid::new_v4()); + let payload = json!({ + "handle": handle, + "email": format!("{}@example.com", handle), + "password": "Testpass123!" + }); + + let create_res = client + .post(format!("{}/xrpc/com.atproto.server.createAccount", base)) + .json(&payload) + .send() + .await + .unwrap(); + assert_eq!(create_res.status(), StatusCode::OK); + let body: Value = create_res.json().await.unwrap(); + + assert!(body["accessJwt"].is_string(), "accessJwt should always be returned"); + assert!(body["refreshJwt"].is_string(), "refreshJwt should always be returned"); + assert!(body["did"].is_string(), "did should be returned"); + + if body["didDoc"].is_object() { + let did_doc = &body["didDoc"]; + assert!(did_doc["id"].is_string(), "didDoc should have id field"); + } +} + +#[tokio::test] +async fn test_create_account_always_returns_tokens() { + let client = client(); + let base = base_url().await; + + let handle = format!("tokentest-{}", uuid::Uuid::new_v4()); + let payload = json!({ + "handle": handle, + "email": format!("{}@example.com", handle), + "password": "Testpass123!" + }); + + let create_res = client + .post(format!("{}/xrpc/com.atproto.server.createAccount", base)) + .json(&payload) + .send() + .await + .unwrap(); + assert_eq!(create_res.status(), StatusCode::OK); + let body: Value = create_res.json().await.unwrap(); + + let access_jwt = body["accessJwt"].as_str().expect("accessJwt should be present"); + let refresh_jwt = body["refreshJwt"].as_str().expect("refreshJwt should be present"); + + assert!(!access_jwt.is_empty(), "accessJwt should not be empty"); + assert!(!refresh_jwt.is_empty(), "refreshJwt should not be empty"); + + let parts: Vec<&str> = access_jwt.split('.').collect(); + assert_eq!(parts.len(), 3, "accessJwt should be a valid JWT with 3 parts"); +} + +#[tokio::test] +async fn test_describe_server_has_links_and_contact() { + let client = client(); + let base = base_url().await; + + let describe = client + .get(format!("{}/xrpc/com.atproto.server.describeServer", base)) + .send() + .await + .unwrap(); + assert_eq!(describe.status(), StatusCode::OK); + let body: Value = describe.json().await.unwrap(); + + assert!(body.get("links").is_some(), "describeServer should include links object"); + assert!(body.get("contact").is_some(), "describeServer should include contact object"); + + let links = &body["links"]; + assert!(links.get("privacyPolicy").is_some() || links["privacyPolicy"].is_null(), + "links should have privacyPolicy field (can be null)"); + assert!(links.get("termsOfService").is_some() || links["termsOfService"].is_null(), + "links should have termsOfService field (can be null)"); + + let contact = &body["contact"]; + assert!(contact.get("email").is_some() || contact["email"].is_null(), + "contact should have email field (can be null)"); +} + +#[tokio::test] +async fn test_delete_account_password_max_length() { + let client = client(); + let base = base_url().await; + + let handle = format!("pwdlentest-{}", uuid::Uuid::new_v4()); + let payload = json!({ + "handle": handle, + "email": format!("{}@example.com", handle), + "password": "Testpass123!" + }); + + let create_res = client + .post(format!("{}/xrpc/com.atproto.server.createAccount", base)) + .json(&payload) + .send() + .await + .unwrap(); + assert_eq!(create_res.status(), StatusCode::OK); + let body: Value = create_res.json().await.unwrap(); + let did = body["did"].as_str().unwrap(); + + let too_long_password = "a".repeat(600); + let delete_res = client + .post(format!("{}/xrpc/com.atproto.server.deleteAccount", base)) + .json(&json!({ + "did": did, + "password": too_long_password, + "token": "fake-token" + })) + .send() + .await + .unwrap(); + + assert_eq!(delete_res.status(), StatusCode::BAD_REQUEST); + let error_body: Value = delete_res.json().await.unwrap(); + assert!(error_body["message"].as_str().unwrap().contains("password length") + || error_body["error"].as_str().unwrap() == "InvalidRequest"); +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index d72eb97..fb01e0c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -466,8 +466,11 @@ async fn create_account_and_login_internal(client: &Client, make_admin: bool) -> .await .expect("Failed to mark user as admin"); } + let verification_required = body["verificationRequired"].as_bool().unwrap_or(true); if let Some(access_jwt) = body["accessJwt"].as_str() { - return (access_jwt.to_string(), did); + if !verification_required { + return (access_jwt.to_string(), did); + } } let body_text: String = sqlx::query_scalar!( "SELECT body FROM comms_queue WHERE user_id = (SELECT id FROM users WHERE did = $1) AND comms_type = 'email_verification' ORDER BY created_at DESC LIMIT 1", diff --git a/tests/sync_blob.rs b/tests/sync_blob.rs index 7ac6168..5c1b9d5 100644 --- a/tests/sync_blob.rs +++ b/tests/sync_blob.rs @@ -8,6 +8,7 @@ use serde_json::Value; async fn test_list_blobs_success() { let client = client(); let (access_jwt, did) = create_account_and_login(&client).await; + let unique_content = format!("test blob content {}", uuid::Uuid::new_v4()); let blob_res = client .post(format!( "{}/xrpc/com.atproto.repo.uploadBlob", @@ -15,7 +16,7 @@ async fn test_list_blobs_success() { )) .header(header::CONTENT_TYPE, "text/plain") .bearer_auth(&access_jwt) - .body("test blob content") + .body(unique_content) .send() .await .expect("Failed to upload blob"); diff --git a/tests/sync_conformance.rs b/tests/sync_conformance.rs index 0ba5590..a1164a5 100644 --- a/tests/sync_conformance.rs +++ b/tests/sync_conformance.rs @@ -162,7 +162,7 @@ async fn test_list_repos_shows_status_field() { let res = client .get(format!( - "{}/xrpc/com.atproto.sync.listRepos", + "{}/xrpc/com.atproto.sync.listRepos?limit=1000", base_url().await )) .send() diff --git a/tests/sync_repo.rs b/tests/sync_repo.rs index 43483f1..9e2167f 100644 --- a/tests/sync_repo.rs +++ b/tests/sync_repo.rs @@ -552,7 +552,8 @@ async fn test_sync_repo_export_lifecycle() { tokio::time::sleep(std::time::Duration::from_millis(100)).await; create_post(&client, &did, &jwt, &format!("Export test post {}", i)).await; } - let blob_data = b"blob data for sync export test"; + let blob_data = format!("blob data for sync export test {}", uuid::Uuid::new_v4()); + let blob_bytes = blob_data.as_bytes().to_vec(); let upload_res = client .post(format!( "{}/xrpc/com.atproto.repo.uploadBlob", @@ -560,7 +561,7 @@ async fn test_sync_repo_export_lifecycle() { )) .header(header::CONTENT_TYPE, "application/octet-stream") .bearer_auth(&jwt) - .body(blob_data.to_vec()) + .body(blob_bytes.clone()) .send() .await .expect("Failed to upload blob"); @@ -631,7 +632,7 @@ async fn test_sync_repo_export_lifecycle() { let retrieved_blob = get_blob_res.bytes().await.unwrap(); assert_eq!( retrieved_blob.as_ref(), - blob_data, + blob_bytes.as_slice(), "Retrieved blob should match uploaded data" ); let latest_commit_res = client