diff --git a/cmd/appview/serve.go b/cmd/appview/serve.go index bb267ac..963232f 100644 --- a/cmd/appview/serve.go +++ b/cmd/appview/serve.go @@ -165,11 +165,7 @@ func serveRegistry(cmd *cobra.Command, args []string) error { go func() { // Wait for services to be ready (Docker startup race condition) time.Sleep(10 * time.Second) - // Create service token getter callback that uses auth.GetOrFetchServiceToken - getServiceToken := func(ctx context.Context, userDID, holdDID, pdsEndpoint string) (string, error) { - return auth.GetOrFetchServiceToken(ctx, refresher, userDID, holdDID, pdsEndpoint) - } - if err := db.MigrateStatsToHolds(context.Background(), uiDatabase, getServiceToken); err != nil { + if err := db.MigrateStatsToHolds(context.Background(), uiDatabase); err != nil { slog.Warn("Stats migration failed", "error", err) } }() diff --git a/pkg/appview/db/queries.go b/pkg/appview/db/queries.go index 5609d9d..ea97c9b 100644 --- a/pkg/appview/db/queries.go +++ b/pkg/appview/db/queries.go @@ -717,6 +717,27 @@ func DeleteManifest(db *sql.DB, did, repository, digest string) error { return err } +// DeleteUserData deletes all cached data for a user. +// This is used when an account is permanently deleted or when we discover +// the account no longer exists (e.g., RepoNotFound during backfill). +// +// Due to ON DELETE CASCADE in the schema, deleting from users will automatically +// cascade to: manifests, tags, layers, references, annotations, stars, repo_pages, etc. +func DeleteUserData(db *sql.DB, did string) error { + result, err := db.Exec(`DELETE FROM users WHERE did = ?`, did) + if err != nil { + return fmt.Errorf("failed to delete user: %w", err) + } + + rowsAffected, _ := result.RowsAffected() + if rowsAffected == 0 { + // User didn't exist, nothing to delete + return nil + } + + return nil +} + // GetManifest fetches a single manifest by digest // Note: Annotations are stored separately in repository_annotations table func GetManifest(db *sql.DB, digest string) (*Manifest, error) { diff --git a/pkg/appview/db/queries_test.go b/pkg/appview/db/queries_test.go index 3cbb7bd..d329972 100644 --- a/pkg/appview/db/queries_test.go +++ b/pkg/appview/db/queries_test.go @@ -1199,3 +1199,111 @@ func TestParseTimestamp(t *testing.T) { }) } } + +func TestDeleteUserData(t *testing.T) { + db, err := InitDB(":memory:", true) + if err != nil { + t.Fatalf("Failed to init database: %v", err) + } + defer db.Close() + + // Create test user with related data + testUser := &User{ + DID: "did:plc:deleteme", + Handle: "deleteme.bsky.social", + PDSEndpoint: "https://test.pds.example.com", + LastSeen: time.Now(), + } + if err := UpsertUser(db, testUser); err != nil { + t.Fatalf("Failed to insert user: %v", err) + } + + // Add manifest + manifest := &Manifest{ + DID: testUser.DID, + Repository: "myapp", + Digest: "sha256:abc123", + HoldEndpoint: "did:web:hold.example.com", + SchemaVersion: 2, + MediaType: "application/vnd.oci.image.manifest.v1+json", + CreatedAt: time.Now(), + } + manifestID, err := InsertManifest(db, manifest) + if err != nil { + t.Fatalf("Failed to insert manifest: %v", err) + } + + // Add layer + layer := &Layer{ + ManifestID: manifestID, + LayerIndex: 0, + Digest: "sha256:layer1", + Size: 1000, + MediaType: "application/vnd.oci.image.layer.v1.tar+gzip", + } + if err := InsertLayer(db, layer); err != nil { + t.Fatalf("Failed to insert layer: %v", err) + } + + // Add tag + tag := &Tag{ + DID: testUser.DID, + Repository: "myapp", + Tag: "latest", + Digest: "sha256:abc123", + CreatedAt: time.Now(), + } + if err := UpsertTag(db, tag); err != nil { + t.Fatalf("Failed to insert tag: %v", err) + } + + // Add annotations + if err := UpsertRepositoryAnnotations(db, testUser.DID, "myapp", map[string]string{ + "org.opencontainers.image.title": "My App", + }); err != nil { + t.Fatalf("Failed to insert annotations: %v", err) + } + + // Verify data exists + var count int + db.QueryRow(`SELECT COUNT(*) FROM manifests WHERE did = ?`, testUser.DID).Scan(&count) + if count != 1 { + t.Fatalf("Expected 1 manifest, got %d", count) + } + db.QueryRow(`SELECT COUNT(*) FROM tags WHERE did = ?`, testUser.DID).Scan(&count) + if count != 1 { + t.Fatalf("Expected 1 tag, got %d", count) + } + db.QueryRow(`SELECT COUNT(*) FROM layers WHERE manifest_id = ?`, manifestID).Scan(&count) + if count != 1 { + t.Fatalf("Expected 1 layer, got %d", count) + } + + // Delete user data + if err := DeleteUserData(db, testUser.DID); err != nil { + t.Fatalf("Failed to delete user data: %v", err) + } + + // Verify all data was cascade deleted + db.QueryRow(`SELECT COUNT(*) FROM users WHERE did = ?`, testUser.DID).Scan(&count) + if count != 0 { + t.Errorf("Expected 0 users, got %d", count) + } + db.QueryRow(`SELECT COUNT(*) FROM manifests WHERE did = ?`, testUser.DID).Scan(&count) + if count != 0 { + t.Errorf("Expected 0 manifests after cascade delete, got %d", count) + } + db.QueryRow(`SELECT COUNT(*) FROM tags WHERE did = ?`, testUser.DID).Scan(&count) + if count != 0 { + t.Errorf("Expected 0 tags after cascade delete, got %d", count) + } + db.QueryRow(`SELECT COUNT(*) FROM layers WHERE manifest_id = ?`, manifestID).Scan(&count) + if count != 0 { + t.Errorf("Expected 0 layers after cascade delete, got %d", count) + } + + // Test idempotency - deleting non-existent user should not error + if err := DeleteUserData(db, testUser.DID); err != nil { + t.Errorf("Deleting non-existent user should not error, got: %v", err) + } +} diff --git a/pkg/appview/db/stats_migration.go b/pkg/appview/db/stats_migration.go index 58b121e..e3ae730 100644 --- a/pkg/appview/db/stats_migration.go +++ b/pkg/appview/db/stats_migration.go @@ -14,10 +14,6 @@ import ( "atcr.io/pkg/atproto" ) -// ServiceTokenGetter is a function type for getting service tokens. -// This avoids importing auth from db (which would create import cycles with tests). -type ServiceTokenGetter func(ctx context.Context, userDID, holdDID, pdsEndpoint string) (string, error) - // MigrateStatsToHolds migrates existing repository_stats data to hold services. // This is a one-time migration that runs on startup. // @@ -25,14 +21,12 @@ type ServiceTokenGetter func(ctx context.Context, userDID, holdDID, pdsEndpoint // 1. Checks if migration has already completed // 2. Reads all repository_stats entries // 3. For each entry, looks up the hold DID from manifests table -// 4. Gets a service token for the user and calls the hold's setStats endpoint +// 4. Calls the hold's setStats endpoint (no auth required - temporary migration endpoint) // 5. Marks migration complete after all entries are processed // // If a hold is offline, the migration logs a warning and continues. // The hold will receive real-time stats updates via Jetstream once online. -// -// The getServiceToken parameter is a callback to avoid import cycles with pkg/auth. -func MigrateStatsToHolds(ctx context.Context, db *sql.DB, getServiceToken ServiceTokenGetter) error { +func MigrateStatsToHolds(ctx context.Context, db *sql.DB) error { // Check if migration already done var migrationDone bool err := db.QueryRowContext(ctx, ` @@ -121,24 +115,6 @@ func MigrateStatsToHolds(ctx context.Context, db *sql.DB, getServiceToken Servic continue } - // Get user's PDS endpoint - user, err := GetUserByDID(db, stat.DID) - if err != nil || user == nil { - slog.Debug("User not found in database, skipping", "component", "migration", - "did", stat.DID, "repository", stat.Repository) - skipCount++ - continue - } - - // Get service token for the user - serviceToken, err := getServiceToken(ctx, stat.DID, holdDID, user.PDSEndpoint) - if err != nil { - slog.Warn("Failed to get service token, skipping", "component", "migration", - "did", stat.DID, "repository", stat.Repository, "error", err) - errorCount++ - continue - } - // Resolve hold DID to HTTP URL holdURL := atproto.ResolveHoldURL(holdDID) if holdURL == "" { @@ -148,8 +124,8 @@ func MigrateStatsToHolds(ctx context.Context, db *sql.DB, getServiceToken Servic continue } - // Call hold's setStats endpoint - err = callSetStats(ctx, holdURL, serviceToken, stat.DID, stat.Repository, + // Call hold's setStats endpoint (no auth required for migration) + err = callSetStats(ctx, holdURL, stat.DID, stat.Repository, stat.PullCount, stat.PushCount, stat.LastPull.String, stat.LastPush.String) if err != nil { slog.Warn("Failed to migrate stats to hold, continuing", "component", "migration", @@ -185,7 +161,8 @@ func markMigrationComplete(db *sql.DB) error { } // callSetStats calls the hold's io.atcr.hold.setStats endpoint -func callSetStats(ctx context.Context, holdURL, serviceToken, ownerDID, repository string, pullCount, pushCount int64, lastPull, lastPush string) error { +// No authentication required - this is a temporary migration endpoint +func callSetStats(ctx context.Context, holdURL, ownerDID, repository string, pullCount, pushCount int64, lastPull, lastPush string) error { // Build request reqBody := map[string]any{ "ownerDid": ownerDID, @@ -212,7 +189,6 @@ func callSetStats(ctx context.Context, holdURL, serviceToken, ownerDID, reposito } req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+serviceToken) // Send request with timeout client := &http.Client{Timeout: 10 * time.Second} diff --git a/pkg/appview/jetstream/backfill.go b/pkg/appview/jetstream/backfill.go index 80b0574..a44517a 100644 --- a/pkg/appview/jetstream/backfill.go +++ b/pkg/appview/jetstream/backfill.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "io" "log/slog" @@ -111,7 +112,17 @@ func (b *BackfillWorker) backfillCollection(ctx context.Context, collection stri for _, repo := range result.Repos { recordCount, err := b.backfillRepo(ctx, repo.DID, collection) if err != nil { - slog.Warn("Backfill failed to backfill repo", "did", repo.DID, "error", err) + // RepoNotFound means account was deleted/deactivated + // Clean up our cached data since the source is gone + if strings.Contains(err.Error(), "RepoNotFound") { + if delErr := db.DeleteUserData(b.db, repo.DID); delErr != nil { + slog.Warn("Backfill failed to delete data for removed repo", "did", repo.DID, "error", delErr) + } else { + slog.Info("Backfill cleaned up data for deleted/deactivated repo", "did", repo.DID) + } + } else { + slog.Warn("Backfill failed to backfill repo", "did", repo.DID, "error", err) + } continue } @@ -583,6 +594,11 @@ func (b *BackfillWorker) updateRepoPageInPDS(ctx context.Context, did, pdsEndpoi var createdAt time.Time var avatarRef *atproto.ATProtoBlobRef + if err != nil && !errors.Is(err, atproto.ErrRecordNotFound) { + // Non-404 error (e.g., no OAuth session) - fail fast instead of trying PutRecord + return fmt.Errorf("failed to check existing record: %w", err) + } + if err == nil && existingRecord != nil { // Parse existing record var existingPage atproto.RepoPageRecord diff --git a/pkg/appview/jetstream/processor.go b/pkg/appview/jetstream/processor.go index ab728c4..c4527d1 100644 --- a/pkg/appview/jetstream/processor.go +++ b/pkg/appview/jetstream/processor.go @@ -426,28 +426,28 @@ func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData }) } -// ProcessAccount handles account status events (deactivation/reactivation) +// ProcessAccount handles account status events (deactivation/deletion/etc) // This is called when Jetstream receives an account event indicating status changes. // -// IMPORTANT: Deactivation events are ambiguous - they could indicate: -// 1. Permanent account deactivation (user deleted account) -// 2. PDS migration (account deactivated at old PDS, reactivated at new PDS) +// Status handling: +// - "deleted": Account permanently deleted - remove all cached data +// - "deactivated": Could be PDS migration or permanent - invalidate cache only +// - "takendown": Moderation action - invalidate cache only +// - Other: Ignore // -// We DO NOT delete user data on deactivation events. Instead, we invalidate the -// identity cache. On the next resolution attempt: -// - If migrated: Resolution finds the new PDS and updates the database automatically -// - If truly deactivated: Resolution fails and user won't appear in new queries -// -// This approach prevents data loss from PDS migrations while still handling deactivations. +// For "deactivated", we don't delete data because it's ambiguous: +// - Could be permanent deactivation (user deleted account) +// - Could be PDS migration (account moves to new PDS) +// Cache invalidation forces re-resolution on next lookup. // // Only processes events for users who already exist in our database (have ATCR activity). func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error { - // Only process deactivation events - if active || status != "deactivated" { + // Skip active accounts or unknown statuses + if active { return nil } - // Check if user exists in our database - only update if they're an ATCR user + // Check if user exists in our database - only process if they're an ATCR user user, err := db.GetUserByDID(p.db, did) if err != nil { return fmt.Errorf("failed to check user existence: %w", err) @@ -458,21 +458,52 @@ func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, return nil } - // Invalidate cached identity data to force re-resolution on next lookup - // This will discover if the account was migrated (new PDS) or truly deactivated (resolution fails) - if err := atproto.InvalidateIdentity(ctx, did); err != nil { - slog.Warn("Failed to invalidate identity cache for deactivated account", + switch status { + case "deleted": + // Account permanently deleted - remove all cached data + if err := db.DeleteUserData(p.db, did); err != nil { + slog.Error("Failed to delete user data for deleted account", + "component", "processor", + "did", did, + "handle", user.Handle, + "error", err) + return err + } + + // Also invalidate identity cache + _ = atproto.InvalidateIdentity(ctx, did) + + slog.Info("Deleted user data for deleted account", "component", "processor", "did", did, - "error", err) - return err - } + "handle", user.Handle) - slog.Info("Processed account deactivation event - cache invalidated", - "component", "processor", - "did", did, - "handle", user.Handle, - "status", status) + case "deactivated", "takendown": + // Ambiguous status - invalidate cache but keep data + // For deactivated: could be PDS migration, will resolve on next lookup + // For takendown: moderation action, keep data in case of appeal + if err := atproto.InvalidateIdentity(ctx, did); err != nil { + slog.Warn("Failed to invalidate identity cache", + "component", "processor", + "did", did, + "status", status, + "error", err) + return err + } + + slog.Info("Processed account status event - cache invalidated", + "component", "processor", + "did", did, + "handle", user.Handle, + "status", status) + + default: + // Unknown status - ignore + slog.Debug("Ignoring unknown account status", + "component", "processor", + "did", did, + "status", status) + } return nil } diff --git a/pkg/appview/jetstream/processor_test.go b/pkg/appview/jetstream/processor_test.go index 81ef82a..78fcafe 100644 --- a/pkg/appview/jetstream/processor_test.go +++ b/pkg/appview/jetstream/processor_test.go @@ -691,4 +691,27 @@ func TestProcessAccount(t *testing.T) { if !exists { t.Error("User should still exist after multiple deactivation events") } + + // Test 6: Process account deletion - should delete user data + err = processor.ProcessAccount(context.Background(), testDID, false, "deleted") + if err != nil { + t.Logf("Cache invalidation error during deletion (expected): %v", err) + } + + // User should be deleted after "deleted" status + err = db.QueryRow(` + SELECT EXISTS(SELECT 1 FROM users WHERE did = ?) + `, testDID).Scan(&exists) + if err != nil { + t.Fatalf("Failed to check if user exists after deletion: %v", err) + } + if exists { + t.Error("User should NOT exist after deletion event") + } + + // Test 7: Process deletion for already-deleted user (idempotent) + err = processor.ProcessAccount(context.Background(), testDID, false, "deleted") + if err != nil { + t.Errorf("Deletion of non-existent user should not error, got: %v", err) + } } diff --git a/pkg/appview/jetstream/worker.go b/pkg/appview/jetstream/worker.go index 1a280d2..00647b1 100644 --- a/pkg/appview/jetstream/worker.go +++ b/pkg/appview/jetstream/worker.go @@ -203,7 +203,7 @@ func (w *Worker) Start(ctx context.Context) error { return ctx.Err() case <-heartbeatTicker.C: elapsed := time.Since(lastHeartbeat) - slog.Info("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds()) + slog.Debug("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds()) eventCount = 0 lastHeartbeat = time.Now() default: diff --git a/pkg/hold/oci/xrpc.go b/pkg/hold/oci/xrpc.go index 8ca5d52..0c25c2d 100644 --- a/pkg/hold/oci/xrpc.go +++ b/pkg/hold/oci/xrpc.go @@ -381,17 +381,12 @@ func (h *XRPCHandler) HandleNotifyManifest(w http.ResponseWriter, r *http.Reques } // HandleSetStats sets absolute stats values for a repository (used by migration) -// This is a migration-only endpoint that allows AppView to sync existing stats to holds +// This is a temporary migration-only endpoint that allows AppView to sync existing stats to holds. +// No authentication required - this endpoint will be removed after migration is complete. +// TODO: Remove this endpoint after stats migration is complete func (h *XRPCHandler) HandleSetStats(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - // Validate service token (same auth as blob:write endpoints) - validatedUser, err := pds.ValidateBlobWriteAccess(r, h.pds, h.httpClient) - if err != nil { - RespondError(w, http.StatusForbidden, fmt.Sprintf("authorization failed: %v", err)) - return - } - // Parse request var req struct { OwnerDID string `json:"ownerDid"` @@ -407,12 +402,6 @@ func (h *XRPCHandler) HandleSetStats(w http.ResponseWriter, r *http.Request) { return } - // Verify user DID matches token (user can only set stats for their own repos) - if req.OwnerDID != validatedUser.DID { - RespondError(w, http.StatusForbidden, "owner DID mismatch") - return - } - // Validate required fields if req.OwnerDID == "" || req.Repository == "" { RespondError(w, http.StatusBadRequest, "ownerDid and repository are required")