clean up logs, delete cached data when atproto account is deleted

This commit is contained in:
Evan Jarrett
2025-12-31 12:21:17 -06:00
parent f19dfa2716
commit 1df1bb57a4
9 changed files with 236 additions and 76 deletions

View File

@@ -165,11 +165,7 @@ func serveRegistry(cmd *cobra.Command, args []string) error {
go func() {
// Wait for services to be ready (Docker startup race condition)
time.Sleep(10 * time.Second)
// Create service token getter callback that uses auth.GetOrFetchServiceToken
getServiceToken := func(ctx context.Context, userDID, holdDID, pdsEndpoint string) (string, error) {
return auth.GetOrFetchServiceToken(ctx, refresher, userDID, holdDID, pdsEndpoint)
}
if err := db.MigrateStatsToHolds(context.Background(), uiDatabase, getServiceToken); err != nil {
if err := db.MigrateStatsToHolds(context.Background(), uiDatabase); err != nil {
slog.Warn("Stats migration failed", "error", err)
}
}()

View File

@@ -717,6 +717,27 @@ func DeleteManifest(db *sql.DB, did, repository, digest string) error {
return err
}
// DeleteUserData deletes all cached data for a user.
// This is used when an account is permanently deleted or when we discover
// the account no longer exists (e.g., RepoNotFound during backfill).
//
// Due to ON DELETE CASCADE in the schema, deleting from users will automatically
// cascade to: manifests, tags, layers, references, annotations, stars, repo_pages, etc.
func DeleteUserData(db *sql.DB, did string) error {
result, err := db.Exec(`DELETE FROM users WHERE did = ?`, did)
if err != nil {
return fmt.Errorf("failed to delete user: %w", err)
}
rowsAffected, _ := result.RowsAffected()
if rowsAffected == 0 {
// User didn't exist, nothing to delete
return nil
}
return nil
}
// GetManifest fetches a single manifest by digest
// Note: Annotations are stored separately in repository_annotations table
func GetManifest(db *sql.DB, digest string) (*Manifest, error) {

View File

@@ -1199,3 +1199,111 @@ func TestParseTimestamp(t *testing.T) {
})
}
}
func TestDeleteUserData(t *testing.T) {
db, err := InitDB(":memory:", true)
if err != nil {
t.Fatalf("Failed to init database: %v", err)
}
defer db.Close()
// Create test user with related data
testUser := &User{
DID: "did:plc:deleteme",
Handle: "deleteme.bsky.social",
PDSEndpoint: "https://test.pds.example.com",
LastSeen: time.Now(),
}
if err := UpsertUser(db, testUser); err != nil {
t.Fatalf("Failed to insert user: %v", err)
}
// Add manifest
manifest := &Manifest{
DID: testUser.DID,
Repository: "myapp",
Digest: "sha256:abc123",
HoldEndpoint: "did:web:hold.example.com",
SchemaVersion: 2,
MediaType: "application/vnd.oci.image.manifest.v1+json",
CreatedAt: time.Now(),
}
manifestID, err := InsertManifest(db, manifest)
if err != nil {
t.Fatalf("Failed to insert manifest: %v", err)
}
// Add layer
layer := &Layer{
ManifestID: manifestID,
LayerIndex: 0,
Digest: "sha256:layer1",
Size: 1000,
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
}
if err := InsertLayer(db, layer); err != nil {
t.Fatalf("Failed to insert layer: %v", err)
}
// Add tag
tag := &Tag{
DID: testUser.DID,
Repository: "myapp",
Tag: "latest",
Digest: "sha256:abc123",
CreatedAt: time.Now(),
}
if err := UpsertTag(db, tag); err != nil {
t.Fatalf("Failed to insert tag: %v", err)
}
// Add annotations
if err := UpsertRepositoryAnnotations(db, testUser.DID, "myapp", map[string]string{
"org.opencontainers.image.title": "My App",
}); err != nil {
t.Fatalf("Failed to insert annotations: %v", err)
}
// Verify data exists
var count int
db.QueryRow(`SELECT COUNT(*) FROM manifests WHERE did = ?`, testUser.DID).Scan(&count)
if count != 1 {
t.Fatalf("Expected 1 manifest, got %d", count)
}
db.QueryRow(`SELECT COUNT(*) FROM tags WHERE did = ?`, testUser.DID).Scan(&count)
if count != 1 {
t.Fatalf("Expected 1 tag, got %d", count)
}
db.QueryRow(`SELECT COUNT(*) FROM layers WHERE manifest_id = ?`, manifestID).Scan(&count)
if count != 1 {
t.Fatalf("Expected 1 layer, got %d", count)
}
// Delete user data
if err := DeleteUserData(db, testUser.DID); err != nil {
t.Fatalf("Failed to delete user data: %v", err)
}
// Verify all data was cascade deleted
db.QueryRow(`SELECT COUNT(*) FROM users WHERE did = ?`, testUser.DID).Scan(&count)
if count != 0 {
t.Errorf("Expected 0 users, got %d", count)
}
db.QueryRow(`SELECT COUNT(*) FROM manifests WHERE did = ?`, testUser.DID).Scan(&count)
if count != 0 {
t.Errorf("Expected 0 manifests after cascade delete, got %d", count)
}
db.QueryRow(`SELECT COUNT(*) FROM tags WHERE did = ?`, testUser.DID).Scan(&count)
if count != 0 {
t.Errorf("Expected 0 tags after cascade delete, got %d", count)
}
db.QueryRow(`SELECT COUNT(*) FROM layers WHERE manifest_id = ?`, manifestID).Scan(&count)
if count != 0 {
t.Errorf("Expected 0 layers after cascade delete, got %d", count)
}
// Test idempotency - deleting non-existent user should not error
if err := DeleteUserData(db, testUser.DID); err != nil {
t.Errorf("Deleting non-existent user should not error, got: %v", err)
}
}

View File

@@ -14,10 +14,6 @@ import (
"atcr.io/pkg/atproto"
)
// ServiceTokenGetter is a function type for getting service tokens.
// This avoids importing auth from db (which would create import cycles with tests).
type ServiceTokenGetter func(ctx context.Context, userDID, holdDID, pdsEndpoint string) (string, error)
// MigrateStatsToHolds migrates existing repository_stats data to hold services.
// This is a one-time migration that runs on startup.
//
@@ -25,14 +21,12 @@ type ServiceTokenGetter func(ctx context.Context, userDID, holdDID, pdsEndpoint
// 1. Checks if migration has already completed
// 2. Reads all repository_stats entries
// 3. For each entry, looks up the hold DID from manifests table
// 4. Gets a service token for the user and calls the hold's setStats endpoint
// 4. Calls the hold's setStats endpoint (no auth required - temporary migration endpoint)
// 5. Marks migration complete after all entries are processed
//
// If a hold is offline, the migration logs a warning and continues.
// The hold will receive real-time stats updates via Jetstream once online.
//
// The getServiceToken parameter is a callback to avoid import cycles with pkg/auth.
func MigrateStatsToHolds(ctx context.Context, db *sql.DB, getServiceToken ServiceTokenGetter) error {
func MigrateStatsToHolds(ctx context.Context, db *sql.DB) error {
// Check if migration already done
var migrationDone bool
err := db.QueryRowContext(ctx, `
@@ -121,24 +115,6 @@ func MigrateStatsToHolds(ctx context.Context, db *sql.DB, getServiceToken Servic
continue
}
// Get user's PDS endpoint
user, err := GetUserByDID(db, stat.DID)
if err != nil || user == nil {
slog.Debug("User not found in database, skipping", "component", "migration",
"did", stat.DID, "repository", stat.Repository)
skipCount++
continue
}
// Get service token for the user
serviceToken, err := getServiceToken(ctx, stat.DID, holdDID, user.PDSEndpoint)
if err != nil {
slog.Warn("Failed to get service token, skipping", "component", "migration",
"did", stat.DID, "repository", stat.Repository, "error", err)
errorCount++
continue
}
// Resolve hold DID to HTTP URL
holdURL := atproto.ResolveHoldURL(holdDID)
if holdURL == "" {
@@ -148,8 +124,8 @@ func MigrateStatsToHolds(ctx context.Context, db *sql.DB, getServiceToken Servic
continue
}
// Call hold's setStats endpoint
err = callSetStats(ctx, holdURL, serviceToken, stat.DID, stat.Repository,
// Call hold's setStats endpoint (no auth required for migration)
err = callSetStats(ctx, holdURL, stat.DID, stat.Repository,
stat.PullCount, stat.PushCount, stat.LastPull.String, stat.LastPush.String)
if err != nil {
slog.Warn("Failed to migrate stats to hold, continuing", "component", "migration",
@@ -185,7 +161,8 @@ func markMigrationComplete(db *sql.DB) error {
}
// callSetStats calls the hold's io.atcr.hold.setStats endpoint
func callSetStats(ctx context.Context, holdURL, serviceToken, ownerDID, repository string, pullCount, pushCount int64, lastPull, lastPush string) error {
// No authentication required - this is a temporary migration endpoint
func callSetStats(ctx context.Context, holdURL, ownerDID, repository string, pullCount, pushCount int64, lastPull, lastPush string) error {
// Build request
reqBody := map[string]any{
"ownerDid": ownerDID,
@@ -212,7 +189,6 @@ func callSetStats(ctx context.Context, holdURL, serviceToken, ownerDID, reposito
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+serviceToken)
// Send request with timeout
client := &http.Client{Timeout: 10 * time.Second}

View File

@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
@@ -111,7 +112,17 @@ func (b *BackfillWorker) backfillCollection(ctx context.Context, collection stri
for _, repo := range result.Repos {
recordCount, err := b.backfillRepo(ctx, repo.DID, collection)
if err != nil {
slog.Warn("Backfill failed to backfill repo", "did", repo.DID, "error", err)
// RepoNotFound means account was deleted/deactivated
// Clean up our cached data since the source is gone
if strings.Contains(err.Error(), "RepoNotFound") {
if delErr := db.DeleteUserData(b.db, repo.DID); delErr != nil {
slog.Warn("Backfill failed to delete data for removed repo", "did", repo.DID, "error", delErr)
} else {
slog.Info("Backfill cleaned up data for deleted/deactivated repo", "did", repo.DID)
}
} else {
slog.Warn("Backfill failed to backfill repo", "did", repo.DID, "error", err)
}
continue
}
@@ -583,6 +594,11 @@ func (b *BackfillWorker) updateRepoPageInPDS(ctx context.Context, did, pdsEndpoi
var createdAt time.Time
var avatarRef *atproto.ATProtoBlobRef
if err != nil && !errors.Is(err, atproto.ErrRecordNotFound) {
// Non-404 error (e.g., no OAuth session) - fail fast instead of trying PutRecord
return fmt.Errorf("failed to check existing record: %w", err)
}
if err == nil && existingRecord != nil {
// Parse existing record
var existingPage atproto.RepoPageRecord

View File

@@ -426,28 +426,28 @@ func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData
})
}
// ProcessAccount handles account status events (deactivation/reactivation)
// ProcessAccount handles account status events (deactivation/deletion/etc)
// This is called when Jetstream receives an account event indicating status changes.
//
// IMPORTANT: Deactivation events are ambiguous - they could indicate:
// 1. Permanent account deactivation (user deleted account)
// 2. PDS migration (account deactivated at old PDS, reactivated at new PDS)
// Status handling:
// - "deleted": Account permanently deleted - remove all cached data
// - "deactivated": Could be PDS migration or permanent - invalidate cache only
// - "takendown": Moderation action - invalidate cache only
// - Other: Ignore
//
// We DO NOT delete user data on deactivation events. Instead, we invalidate the
// identity cache. On the next resolution attempt:
// - If migrated: Resolution finds the new PDS and updates the database automatically
// - If truly deactivated: Resolution fails and user won't appear in new queries
//
// This approach prevents data loss from PDS migrations while still handling deactivations.
// For "deactivated", we don't delete data because it's ambiguous:
// - Could be permanent deactivation (user deleted account)
// - Could be PDS migration (account moves to new PDS)
// Cache invalidation forces re-resolution on next lookup.
//
// Only processes events for users who already exist in our database (have ATCR activity).
func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error {
// Only process deactivation events
if active || status != "deactivated" {
// Skip active accounts or unknown statuses
if active {
return nil
}
// Check if user exists in our database - only update if they're an ATCR user
// Check if user exists in our database - only process if they're an ATCR user
user, err := db.GetUserByDID(p.db, did)
if err != nil {
return fmt.Errorf("failed to check user existence: %w", err)
@@ -458,21 +458,52 @@ func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool,
return nil
}
// Invalidate cached identity data to force re-resolution on next lookup
// This will discover if the account was migrated (new PDS) or truly deactivated (resolution fails)
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
slog.Warn("Failed to invalidate identity cache for deactivated account",
switch status {
case "deleted":
// Account permanently deleted - remove all cached data
if err := db.DeleteUserData(p.db, did); err != nil {
slog.Error("Failed to delete user data for deleted account",
"component", "processor",
"did", did,
"handle", user.Handle,
"error", err)
return err
}
// Also invalidate identity cache
_ = atproto.InvalidateIdentity(ctx, did)
slog.Info("Deleted user data for deleted account",
"component", "processor",
"did", did,
"error", err)
return err
}
"handle", user.Handle)
slog.Info("Processed account deactivation event - cache invalidated",
"component", "processor",
"did", did,
"handle", user.Handle,
"status", status)
case "deactivated", "takendown":
// Ambiguous status - invalidate cache but keep data
// For deactivated: could be PDS migration, will resolve on next lookup
// For takendown: moderation action, keep data in case of appeal
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
slog.Warn("Failed to invalidate identity cache",
"component", "processor",
"did", did,
"status", status,
"error", err)
return err
}
slog.Info("Processed account status event - cache invalidated",
"component", "processor",
"did", did,
"handle", user.Handle,
"status", status)
default:
// Unknown status - ignore
slog.Debug("Ignoring unknown account status",
"component", "processor",
"did", did,
"status", status)
}
return nil
}

View File

@@ -691,4 +691,27 @@ func TestProcessAccount(t *testing.T) {
if !exists {
t.Error("User should still exist after multiple deactivation events")
}
// Test 6: Process account deletion - should delete user data
err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
if err != nil {
t.Logf("Cache invalidation error during deletion (expected): %v", err)
}
// User should be deleted after "deleted" status
err = db.QueryRow(`
SELECT EXISTS(SELECT 1 FROM users WHERE did = ?)
`, testDID).Scan(&exists)
if err != nil {
t.Fatalf("Failed to check if user exists after deletion: %v", err)
}
if exists {
t.Error("User should NOT exist after deletion event")
}
// Test 7: Process deletion for already-deleted user (idempotent)
err = processor.ProcessAccount(context.Background(), testDID, false, "deleted")
if err != nil {
t.Errorf("Deletion of non-existent user should not error, got: %v", err)
}
}

View File

@@ -203,7 +203,7 @@ func (w *Worker) Start(ctx context.Context) error {
return ctx.Err()
case <-heartbeatTicker.C:
elapsed := time.Since(lastHeartbeat)
slog.Info("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds())
slog.Debug("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds())
eventCount = 0
lastHeartbeat = time.Now()
default:

View File

@@ -381,17 +381,12 @@ func (h *XRPCHandler) HandleNotifyManifest(w http.ResponseWriter, r *http.Reques
}
// HandleSetStats sets absolute stats values for a repository (used by migration)
// This is a migration-only endpoint that allows AppView to sync existing stats to holds
// This is a temporary migration-only endpoint that allows AppView to sync existing stats to holds.
// No authentication required - this endpoint will be removed after migration is complete.
// TODO: Remove this endpoint after stats migration is complete
func (h *XRPCHandler) HandleSetStats(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Validate service token (same auth as blob:write endpoints)
validatedUser, err := pds.ValidateBlobWriteAccess(r, h.pds, h.httpClient)
if err != nil {
RespondError(w, http.StatusForbidden, fmt.Sprintf("authorization failed: %v", err))
return
}
// Parse request
var req struct {
OwnerDID string `json:"ownerDid"`
@@ -407,12 +402,6 @@ func (h *XRPCHandler) HandleSetStats(w http.ResponseWriter, r *http.Request) {
return
}
// Verify user DID matches token (user can only set stats for their own repos)
if req.OwnerDID != validatedUser.DID {
RespondError(w, http.StatusForbidden, "owner DID mismatch")
return
}
// Validate required fields
if req.OwnerDID == "" || req.Repository == "" {
RespondError(w, http.StatusBadRequest, "ownerDid and repository are required")