package jetstream import ( "context" "encoding/json" "fmt" "log/slog" "strings" "time" "atcr.io/pkg/appview/db" "atcr.io/pkg/atproto" atpdata "github.com/bluesky-social/indigo/atproto/atdata" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/lexicon" ) // Processor handles shared database operations for both Worker (live) and Backfill (sync) // This eliminates code duplication between the two data ingestion paths type Processor struct { db db.DBTX userCache *UserCache // Optional - enabled for Worker, disabled for Backfill statsCache *StatsCache // In-memory cache for per-hold stats aggregation useCache bool catalog *lexicon.ResolvingCatalog // For debug logging of validation failures webhookDispatcher WebhookDispatcher // Optional - only for live Worker (nil for Backfill) } // WebhookDispatcher is an interface for dispatching webhooks on scan completion. // Only the live Worker sets this; backfill does NOT (avoids spamming old scan results). type WebhookDispatcher interface { DispatchForScan(ctx context.Context, scan, previousScan *db.Scan, userHandle, tag, holdEndpoint string) } // SetWebhookDispatcher sets the webhook dispatcher for scan processing. // Only the live Worker should set this — backfill skips webhook dispatch. func (p *Processor) SetWebhookDispatcher(d WebhookDispatcher) { p.webhookDispatcher = d } // NewProcessor creates a new shared processor // useCache: true for Worker (live streaming), false for Backfill (batch processing) // statsCache: shared stats cache for aggregating across holds (nil to skip stats processing) func NewProcessor(database db.DBTX, useCache bool, statsCache *StatsCache) *Processor { // Create lexicon catalog for debug validation logging dir := identity.DefaultDirectory() catalog := lexicon.NewResolvingCatalog() catalog.Directory = dir p := &Processor{ db: database, useCache: useCache, statsCache: statsCache, catalog: catalog, } if useCache { p.userCache = &UserCache{ cache: make(map[string]*db.User), } } return p } // EnsureUser resolves and upserts a user by DID // Uses cache if enabled (Worker), queries DB if cache disabled (Backfill) func (p *Processor) EnsureUser(ctx context.Context, did string) error { // Check cache first (if enabled) if p.useCache && p.userCache != nil { if _, ok := p.userCache.cache[did]; ok { // User in cache - just update last seen timestamp return db.UpdateUserLastSeen(p.db, did) } } else if !p.useCache { // No cache - check if user already exists in DB existingUser, err := db.GetUserByDID(p.db, did) if err == nil && existingUser != nil { // User exists - just update last seen timestamp return db.UpdateUserLastSeen(p.db, did) } } // Resolve DID to get handle and PDS endpoint resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did) if err != nil { return err } // Fetch user's Bluesky profile record from their PDS (including avatar) avatarURL := "" client := atproto.NewClient(pdsEndpoint, "", "") profileRecord, err := client.GetProfileRecord(ctx, resolvedDID) if err != nil { slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err) // Continue without avatar } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" { avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link) } // Create user record user := &db.User{ DID: resolvedDID, Handle: handle, PDSEndpoint: pdsEndpoint, Avatar: avatarURL, LastSeen: time.Now(), } // Cache if enabled if p.useCache { p.userCache.cache[did] = user } // Upsert to database // Use UpsertUser if we successfully fetched an avatar (to update existing users) // Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars) if avatarURL != "" { return db.UpsertUser(p.db, user) } return db.UpsertUserIgnoreAvatar(p.db, user) } // EnsureUserExists ensures a user row exists in the database without updating it. // Used by non-profile collections to avoid unnecessary writes during backfill. // If the user doesn't exist, resolves identity and inserts with ON CONFLICT DO NOTHING. func (p *Processor) EnsureUserExists(ctx context.Context, did string) error { // Check cache first (if enabled) if p.useCache && p.userCache != nil { if _, ok := p.userCache.cache[did]; ok { return nil // User in cache, nothing to do } } else if !p.useCache { // No cache - check if user already exists in DB existingUser, err := db.GetUserByDID(p.db, did) if err == nil && existingUser != nil { return nil // User exists, nothing to do } } // User doesn't exist yet — resolve and insert resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did) if err != nil { return err } avatarURL := "" client := atproto.NewClient(pdsEndpoint, "", "") profileRecord, err := client.GetProfileRecord(ctx, resolvedDID) if err != nil { slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err) } else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" { avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link) } user := &db.User{ DID: resolvedDID, Handle: handle, PDSEndpoint: pdsEndpoint, Avatar: avatarURL, LastSeen: time.Now(), } // Cache if enabled if p.useCache { p.userCache.cache[did] = user } return db.InsertUserIfNotExists(p.db, user) } // ValidateRecord performs validation on records. // - Full lexicon validation is logged for debugging but does NOT block ingestion // - Targeted validation (captain/crew DID checks) DOES block bogus records func (p *Processor) ValidateRecord(ctx context.Context, collection string, data []byte) error { recordData, err := atpdata.UnmarshalJSON(data) if err != nil { return fmt.Errorf("invalid JSON: %w", err) } // Debug: Full lexicon validation (log only, don't block) if p.catalog != nil { if err := lexicon.ValidateRecord(p.catalog, recordData, collection, 0); err != nil { slog.Debug("Record failed full lexicon validation (ingesting anyway)", "component", "processor", "collection", collection, "error", err) } } // Targeted validation for collections that had bogus data issues // These DO block ingestion switch collection { case atproto.CaptainCollection: // Captain must have non-empty owner DID owner, _ := recordData["owner"].(string) if owner == "" || !strings.HasPrefix(owner, "did:") { return fmt.Errorf("captain record missing or invalid owner DID") } case atproto.CrewCollection: // Crew must have non-empty member DID member, _ := recordData["member"].(string) if member == "" || !strings.HasPrefix(member, "did:") { return fmt.Errorf("crew record missing or invalid member DID") } } return nil } // ProcessRecord is the unified entry point for processing any ATCR record. // It handles: // 1. Schema validation against published lexicons // 2. User creation for user-activity collections // 3. Dispatch to the appropriate Process* method // // queryCaptainFn is optional - used by backfill for sailor profile processing func (p *Processor) ProcessRecord(ctx context.Context, did, collection, rkey string, data []byte, isDelete bool, queryCaptainFn func(context.Context, string) error) error { // Skip validation for deletes (no record data) if !isDelete && data != nil { if err := p.ValidateRecord(ctx, collection, data); err != nil { slog.Warn("Record failed schema validation, skipping", "component", "processor", "collection", collection, "did", did, "error", err) return nil // Skip invalid records silently } } // User-activity collections create/update user entries // Skip for deletes - user should already exist, and we don't need to resolve identity if !isDelete { switch collection { case atproto.SailorProfileCollection: // Sailor profile is the authoritative source for user data — full upsert if err := p.EnsureUser(ctx, did); err != nil { return fmt.Errorf("failed to ensure user: %w", err) } case atproto.ManifestCollection, atproto.TagCollection, atproto.StarCollection, atproto.RepoPageCollection: // Other user collections just need the row to exist — no update if unchanged if err := p.EnsureUserExists(ctx, did); err != nil { return fmt.Errorf("failed to ensure user exists: %w", err) } // Hold collections (captain, crew, stats) - don't create user entries // These are records FROM holds, not user activity } } // Dispatch to specific handler switch collection { case atproto.ManifestCollection: if isDelete { return db.DeleteManifest(p.db, did, "", rkey) } _, err := p.ProcessManifest(ctx, did, data) return err case atproto.TagCollection: if isDelete { repo, tag := atproto.RKeyToRepositoryTag(rkey) return db.DeleteTag(p.db, did, repo, tag) } return p.ProcessTag(ctx, did, data) case atproto.StarCollection: if isDelete { ownerDID, repository, err := atproto.ParseStarRecordKey(rkey) if err != nil { return err } return db.DeleteStar(p.db, did, ownerDID, repository) } return p.ProcessStar(ctx, did, data) case atproto.RepoPageCollection: return p.ProcessRepoPage(ctx, did, rkey, data, isDelete) case atproto.SailorProfileCollection: return p.ProcessSailorProfile(ctx, did, data, queryCaptainFn) case atproto.ScanCollection: return p.ProcessScan(ctx, did, data, isDelete) case atproto.StatsCollection: return p.ProcessStats(ctx, did, data, isDelete) case atproto.CaptainCollection: if isDelete { return db.DeleteCaptainRecord(p.db, did) } return p.ProcessCaptain(ctx, did, data) case atproto.CrewCollection: if isDelete { return db.DeleteCrewMemberByRkey(p.db, did, rkey) } return p.ProcessCrew(ctx, did, rkey, data) default: return nil // Unknown collection, ignore } } // ProcessManifest processes a manifest record and stores it in the database // Returns the manifest ID for further processing (layers/references) func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) { // Unmarshal manifest record var manifestRecord atproto.ManifestRecord if err := json.Unmarshal(recordData, &manifestRecord); err != nil { return 0, fmt.Errorf("failed to unmarshal manifest: %w", err) } // Detect manifest type isManifestList := len(manifestRecord.Manifests) > 0 // Extract hold DID from manifest (with fallback for legacy manifests) // New manifests use holdDid field (DID format) // Old manifests use holdEndpoint field (URL format) - convert to DID holdDID := manifestRecord.HoldDID if holdDID == "" && manifestRecord.HoldEndpoint != "" { // Legacy manifest - resolve URL to DID via /.well-known/atproto-did if resolved, err := atproto.ResolveHoldDID(ctx, manifestRecord.HoldEndpoint); err != nil { slog.Warn("Failed to resolve hold DID from legacy manifest endpoint", "holdEndpoint", manifestRecord.HoldEndpoint, "error", err) } else { holdDID = resolved } } // Detect artifact type from config media type artifactType := "container-image" if !isManifestList && manifestRecord.Config != nil { artifactType = db.GetArtifactType(manifestRecord.Config.MediaType) } // Prepare manifest for insertion (WITHOUT annotation fields) manifest := &db.Manifest{ DID: did, Repository: manifestRecord.Repository, Digest: manifestRecord.Digest, MediaType: manifestRecord.MediaType, SchemaVersion: manifestRecord.SchemaVersion, HoldEndpoint: holdDID, ArtifactType: artifactType, CreatedAt: manifestRecord.CreatedAt, // Annotations removed - stored separately in repository_annotations table } // Set config fields only for image manifests (not manifest lists) if !isManifestList && manifestRecord.Config != nil { manifest.ConfigDigest = manifestRecord.Config.Digest manifest.ConfigSize = manifestRecord.Config.Size } // Insert manifest manifestID, err := db.InsertManifest(p.db, manifest) if err != nil { // For backfill: if manifest already exists, get its ID if strings.Contains(err.Error(), "UNIQUE constraint failed") { var existingID int64 err := p.db.QueryRow(` SELECT id FROM manifests WHERE did = ? AND repository = ? AND digest = ? `, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID) if err != nil { return 0, fmt.Errorf("failed to get existing manifest ID: %w", err) } manifestID = existingID } else { return 0, fmt.Errorf("failed to insert manifest: %w", err) } } // Update repository annotations ONLY if manifest has at least one non-empty annotation if manifestRecord.Annotations != nil { hasData := false for _, value := range manifestRecord.Annotations { if value != "" { hasData = true break } } if hasData { // Replace all annotations for this repository err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations) if err != nil { return 0, fmt.Errorf("failed to upsert annotations: %w", err) } } } // Insert manifest references or layers if isManifestList { // Insert manifest references (for manifest lists/indexes) for i, ref := range manifestRecord.Manifests { platformArch := "" platformOS := "" platformVariant := "" platformOSVersion := "" if ref.Platform != nil { platformArch = ref.Platform.Architecture platformOS = ref.Platform.OS platformVariant = ref.Platform.Variant platformOSVersion = ref.Platform.OSVersion } // Detect attestation manifests from annotations isAttestation := false if ref.Annotations != nil { if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok { isAttestation = refType == "attestation-manifest" } } if err := db.InsertManifestReference(p.db, &db.ManifestReference{ ManifestID: manifestID, Digest: ref.Digest, MediaType: ref.MediaType, Size: ref.Size, PlatformArchitecture: platformArch, PlatformOS: platformOS, PlatformVariant: platformVariant, PlatformOSVersion: platformOSVersion, IsAttestation: isAttestation, ReferenceIndex: i, }); err != nil { // Continue on error - reference might already exist continue } } } else { // Insert layers (for image manifests) for i, layer := range manifestRecord.Layers { if err := db.InsertLayer(p.db, &db.Layer{ ManifestID: manifestID, Digest: layer.Digest, MediaType: layer.MediaType, Size: layer.Size, LayerIndex: i, Annotations: layer.Annotations, }); err != nil { // Continue on error - layer might already exist continue } } } return manifestID, nil } // ProcessTag processes a tag record and stores it in the database func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error { // Unmarshal tag record var tagRecord atproto.TagRecord if err := json.Unmarshal(recordData, &tagRecord); err != nil { return fmt.Errorf("failed to unmarshal tag: %w", err) } // Extract digest from tag record (tries manifest field first, falls back to manifestDigest) manifestDigest, err := tagRecord.GetManifestDigest() if err != nil { return fmt.Errorf("failed to get manifest digest from tag record: %w", err) } // Insert or update tag return db.UpsertTag(p.db, &db.Tag{ DID: did, Repository: tagRecord.Repository, Tag: tagRecord.Tag, Digest: manifestDigest, CreatedAt: tagRecord.UpdatedAt, }) } // ProcessStar processes a star record and stores it in the database func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error { // Unmarshal star record (handles both old object and new AT URI subject formats) var starRecord atproto.StarRecord if err := json.Unmarshal(recordData, &starRecord); err != nil { return fmt.Errorf("failed to unmarshal star: %w", err) } // Extract owner DID and repository from subject AT URI ownerDID, repository, err := starRecord.GetSubjectDIDAndRepository() if err != nil { return fmt.Errorf("failed to parse star subject: %w", err) } // Ensure the starred repository's owner exists in the users table // (the starrer is already ensured by ProcessRecord, but the owner // may not have been processed yet during backfill or live events) if err := p.EnsureUserExists(ctx, ownerDID); err != nil { return fmt.Errorf("failed to ensure star subject user: %w", err) } // Upsert the star record (idempotent - won't duplicate) // The DID here is the starrer (user who starred) // Star count will be calculated on demand from the stars table return db.UpsertStar(p.db, did, ownerDID, repository, starRecord.CreatedAt) } // ProcessSailorProfile processes a sailor profile record // This is primarily used by backfill to cache captain records for holds func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error { // Unmarshal sailor profile record var profileRecord atproto.SailorProfileRecord if err := json.Unmarshal(recordData, &profileRecord); err != nil { return fmt.Errorf("failed to unmarshal sailor profile: %w", err) } // Cache OCI client preference (always, even if no default hold) if profileRecord.OciClient != "" { if err := db.UpdateUserOciClient(p.db, did, profileRecord.OciClient); err != nil { slog.Warn("Failed to cache OCI client preference", "component", "processor", "did", did, "ociClient", profileRecord.OciClient, "error", err) } } // Skip hold processing if no default hold set if profileRecord.DefaultHold == "" { return nil } // Convert hold URL/DID to canonical DID holdDID, err := atproto.ResolveHoldDID(ctx, profileRecord.DefaultHold) if err != nil { slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold, "error", err) return nil } // Cache default hold DID on the user record if err := db.UpdateUserDefaultHold(p.db, did, holdDID); err != nil { slog.Warn("Failed to cache default hold DID", "component", "processor", "did", did, "holdDid", holdDID, "error", err) } // Query and cache the captain record using provided function // This allows backfill-specific logic (retries, test mode handling) without duplicating it here if queryCaptainFn != nil { return queryCaptainFn(ctx, holdDID) } return nil } // ProcessRepoPage processes a repository page record // This is called when Jetstream receives a repo page create/update event func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error { if isDelete { // Delete the repo page from our cache return db.DeleteRepoPage(p.db, did, rkey) } // Unmarshal repo page record var pageRecord atproto.RepoPageRecord if err := json.Unmarshal(recordData, &pageRecord); err != nil { return fmt.Errorf("failed to unmarshal repo page: %w", err) } // Extract avatar CID if present avatarCID := "" if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" { avatarCID = pageRecord.Avatar.Ref.Link } // Upsert to database return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.UserEdited, pageRecord.CreatedAt, pageRecord.UpdatedAt) } // ProcessIdentity handles identity change events (handle updates) // This is called when Jetstream receives an identity event indicating a handle change. // The identity cache is invalidated to ensure the next lookup uses the new handle, // and the database is updated to reflect the change in the UI. // // Only processes events for users who already exist in our database (have ATCR activity). func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error { // Check if user exists in our database - only update if they're an ATCR user user, err := db.GetUserByDID(p.db, did) if err != nil { return fmt.Errorf("failed to check user existence: %w", err) } // Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.) if user == nil { return nil } // Update handle in database if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil { slog.Warn("Failed to update user handle in database", "component", "processor", "did", did, "handle", newHandle, "error", err) // Continue to invalidate cache even if DB update fails } // Invalidate cached identity data to force re-resolution on next lookup if err := atproto.InvalidateIdentity(ctx, did); err != nil { slog.Warn("Failed to invalidate identity cache", "component", "processor", "did", did, "error", err) return err } slog.Info("Processed identity change event", "component", "processor", "did", did, "old_handle", user.Handle, "new_handle", newHandle) return nil } // ProcessProfileUpdate handles app.bsky.actor.profile updates for known ATCR users // This refreshes the cached avatar URL when a user changes their Bluesky profile picture func (p *Processor) ProcessProfileUpdate(ctx context.Context, did string, recordData []byte) error { // Check if user exists in our database - only update if they're an ATCR user user, err := db.GetUserByDID(p.db, did) if err != nil { return fmt.Errorf("failed to check user existence: %w", err) } // Skip if user doesn't exist - they don't have any ATCR activity if user == nil { return nil } // Parse the profile record to extract avatar var profile struct { Avatar *atproto.ATProtoBlobRef `json:"avatar"` } if err := json.Unmarshal(recordData, &profile); err != nil { return fmt.Errorf("failed to unmarshal profile: %w", err) } // Build new avatar URL avatarURL := "" if profile.Avatar != nil && profile.Avatar.Ref.Link != "" { avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link) } // Update if changed if avatarURL != user.Avatar { slog.Info("Updating avatar from profile change", "component", "processor", "did", did, "old_avatar", user.Avatar, "new_avatar", avatarURL) return db.UpdateUserAvatar(p.db, did, avatarURL) } return nil } // RefreshUserAvatar fetches the user's current Bluesky profile and updates their cached avatar // This is called during backfill to ensure avatars stay fresh for existing users func (p *Processor) RefreshUserAvatar(ctx context.Context, did, pdsEndpoint string) error { // Get user from database to compare avatar user, err := db.GetUserByDID(p.db, did) if err != nil || user == nil { return nil // User doesn't exist, skip } // Fetch profile from PDS client := atproto.NewClient(pdsEndpoint, "", "") profile, err := client.GetProfileRecord(ctx, did) if err != nil { return fmt.Errorf("failed to fetch profile: %w", err) } // Build avatar URL avatarURL := "" if profile.Avatar != nil && profile.Avatar.Ref.Link != "" { avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link) } // Update if changed if avatarURL != user.Avatar { slog.Info("Backfill refreshing avatar", "component", "processor", "did", did, "old_avatar", user.Avatar, "new_avatar", avatarURL) return db.UpdateUserAvatar(p.db, did, avatarURL) } return nil } // ProcessScan handles scan record events from hold PDSes. // Caches scan results in the appview DB and dispatches webhooks (if dispatcher is set). func (p *Processor) ProcessScan(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error { if isDelete { return nil // Scan deletes are not processed (scans are immutable) } // Unmarshal scan record var scanRecord atproto.ScanRecord if err := json.Unmarshal(recordData, &scanRecord); err != nil { return fmt.Errorf("failed to unmarshal scan record: %w", err) } // Extract manifest digest from the scan record's manifest AT-URI manifestDigest := "" if parts := strings.Split(scanRecord.Manifest, "/"); len(parts) > 0 { manifestDigest = "sha256:" + parts[len(parts)-1] } // Parse scanned_at timestamp scannedAt := time.Now() if t, err := time.Parse(time.RFC3339, scanRecord.ScannedAt); err == nil { scannedAt = t } scan := &db.Scan{ HoldDID: holdDID, ManifestDigest: manifestDigest, UserDID: scanRecord.UserDID, Repository: scanRecord.Repository, Critical: int(scanRecord.Critical), High: int(scanRecord.High), Medium: int(scanRecord.Medium), Low: int(scanRecord.Low), Total: int(scanRecord.Total), ScannerVersion: scanRecord.ScannerVersion, ScannedAt: scannedAt, } // Upsert scan to DB (returns previous scan for change detection) previousScan, err := db.UpsertScan(p.db, scan) if err != nil { return fmt.Errorf("failed to upsert scan: %w", err) } // Dispatch webhooks if dispatcher is set (live Worker only, not backfill) if p.webhookDispatcher != nil { // Resolve user handle from cache or DB userHandle := "" user, userErr := db.GetUserByDID(p.db, scanRecord.UserDID) if userErr == nil && user != nil { userHandle = user.Handle } // Resolve tag for the manifest digest tag := "" if tagVal, tagErr := db.GetTagByDigest(p.db, scanRecord.UserDID, scanRecord.Repository, manifestDigest); tagErr == nil { tag = tagVal } // Resolve hold endpoint URL holdEndpoint := "" if holdURL, holdErr := atproto.ResolveHoldURL(ctx, holdDID); holdErr == nil { holdEndpoint = holdURL } p.webhookDispatcher.DispatchForScan(ctx, scan, previousScan, userHandle, tag, holdEndpoint) } return nil } // ProcessStats handles stats record events from hold PDSes // This is called when Jetstream receives a stats create/update/delete event from a hold // The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error { // Skip if no stats cache configured if p.statsCache == nil { return nil } // Unmarshal stats record var statsRecord atproto.StatsRecord if err := json.Unmarshal(recordData, &statsRecord); err != nil { return fmt.Errorf("failed to unmarshal stats record: %w", err) } // Ensure the owner user exists in the users table (repository_stats has FK to users.did) if !isDelete && statsRecord.OwnerDID != "" { if err := p.EnsureUserExists(ctx, statsRecord.OwnerDID); err != nil { return fmt.Errorf("failed to ensure stats owner user exists: %w", err) } } if isDelete { // Delete from in-memory cache p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository) } else { // Parse timestamps var lastPull, lastPush *time.Time if statsRecord.LastPull != "" { t, err := time.Parse(time.RFC3339, statsRecord.LastPull) if err == nil { lastPull = &t } } if statsRecord.LastPush != "" { t, err := time.Parse(time.RFC3339, statsRecord.LastPush) if err == nil { lastPush = &t } } // Update in-memory cache p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository, statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush) } // Get aggregated stats across all holds totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated( statsRecord.OwnerDID, statsRecord.Repository) // Upsert aggregated stats to repository_stats return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{ DID: statsRecord.OwnerDID, Repository: statsRecord.Repository, PullCount: int(totalPull), PushCount: int(totalPush), LastPull: latestPull, LastPush: latestPush, }) } // ProcessCaptain handles captain record events from hold PDSes // This is called when Jetstream receives a captain create/update/delete event from a hold // The holdDID is the DID of the hold PDS (event.DID), and the record contains ownership info func (p *Processor) ProcessCaptain(ctx context.Context, holdDID string, recordData []byte) error { // Unmarshal captain record var captainRecord atproto.CaptainRecord if err := json.Unmarshal(recordData, &captainRecord); err != nil { return fmt.Errorf("failed to unmarshal captain record: %w", err) } // Convert to db struct and upsert record := &db.HoldCaptainRecord{ HoldDID: holdDID, OwnerDID: captainRecord.Owner, Public: captainRecord.Public, AllowAllCrew: captainRecord.AllowAllCrew, DeployedAt: captainRecord.DeployedAt, Region: captainRecord.Region, Successor: captainRecord.Successor, UpdatedAt: time.Now(), } if err := db.UpsertCaptainRecord(p.db, record); err != nil { return fmt.Errorf("failed to upsert captain record: %w", err) } slog.Info("Processed captain record", "component", "processor", "hold_did", holdDID, "owner_did", captainRecord.Owner, "public", captainRecord.Public, "allow_all_crew", captainRecord.AllowAllCrew) return nil } // ProcessCrew handles crew record events from hold PDSes // This is called when Jetstream receives a crew create/update/delete event from a hold // The holdDID is the DID of the hold PDS (event.DID), and the record contains member info func (p *Processor) ProcessCrew(ctx context.Context, holdDID string, rkey string, recordData []byte) error { // Unmarshal crew record var crewRecord atproto.CrewRecord if err := json.Unmarshal(recordData, &crewRecord); err != nil { return fmt.Errorf("failed to unmarshal crew record: %w", err) } // Marshal permissions to JSON string permissionsJSON := "" if len(crewRecord.Permissions) > 0 { if jsonBytes, err := json.Marshal(crewRecord.Permissions); err == nil { permissionsJSON = string(jsonBytes) } } // Convert to db struct and upsert member := &db.CrewMember{ HoldDID: holdDID, MemberDID: crewRecord.Member, Rkey: rkey, Role: crewRecord.Role, Permissions: permissionsJSON, Tier: crewRecord.Tier, AddedAt: crewRecord.AddedAt, } if err := db.UpsertCrewMember(p.db, member); err != nil { return fmt.Errorf("failed to upsert crew member: %w", err) } return nil } // ProcessAccount handles account status events (deactivation/deletion/etc) // This is called when Jetstream receives an account event indicating status changes. // // Status handling: // - "deleted": Account permanently deleted - remove all cached data // - "deactivated": Could be PDS migration or permanent - invalidate cache only // - "takendown": Moderation action - invalidate cache only // - Other: Ignore // // For "deactivated", we don't delete data because it's ambiguous: // - Could be permanent deactivation (user deleted account) // - Could be PDS migration (account moves to new PDS) // Cache invalidation forces re-resolution on next lookup. // // Only processes events for users who already exist in our database (have ATCR activity). func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error { // Skip active accounts or unknown statuses if active { return nil } // Check if user exists in our database - only process if they're an ATCR user user, err := db.GetUserByDID(p.db, did) if err != nil { return fmt.Errorf("failed to check user existence: %w", err) } // Skip if user doesn't exist - they don't have any ATCR activity if user == nil { return nil } switch status { case "deleted": // Account permanently deleted - remove all cached data if _, err := db.DeleteUserData(p.db, did); err != nil { slog.Error("Failed to delete user data for deleted account", "component", "processor", "did", did, "handle", user.Handle, "error", err) return err } // Also invalidate identity cache _ = atproto.InvalidateIdentity(ctx, did) slog.Info("Deleted user data for deleted account", "component", "processor", "did", did, "handle", user.Handle) case "deactivated", "takendown": // Ambiguous status - invalidate cache but keep data // For deactivated: could be PDS migration, will resolve on next lookup // For takendown: moderation action, keep data in case of appeal if err := atproto.InvalidateIdentity(ctx, did); err != nil { slog.Warn("Failed to invalidate identity cache", "component", "processor", "did", did, "status", status, "error", err) return err } slog.Info("Processed account status event - cache invalidated", "component", "processor", "did", did, "handle", user.Handle, "status", status) default: // Unknown status - ignore slog.Debug("Ignoring unknown account status", "component", "processor", "did", did, "status", status) } return nil }