Files

981 lines
32 KiB
Go

package jetstream
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"
"atcr.io/pkg/appview/db"
"atcr.io/pkg/atproto"
atpdata "github.com/bluesky-social/indigo/atproto/atdata"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/lexicon"
)
// Processor handles shared database operations for both Worker (live) and Backfill (sync)
// This eliminates code duplication between the two data ingestion paths
type Processor struct {
db db.DBTX
userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
statsCache *StatsCache // In-memory cache for per-hold stats aggregation
useCache bool
catalog *lexicon.ResolvingCatalog // For debug logging of validation failures
webhookDispatcher WebhookDispatcher // Optional - only for live Worker (nil for Backfill)
}
// WebhookDispatcher is an interface for dispatching webhooks on scan completion.
// Only the live Worker sets this; backfill does NOT (avoids spamming old scan results).
type WebhookDispatcher interface {
DispatchForScan(ctx context.Context, scan, previousScan *db.Scan, userHandle, tag, holdEndpoint string)
}
// SetWebhookDispatcher sets the webhook dispatcher for scan processing.
// Only the live Worker should set this — backfill skips webhook dispatch.
func (p *Processor) SetWebhookDispatcher(d WebhookDispatcher) {
p.webhookDispatcher = d
}
// NewProcessor creates a new shared processor
// useCache: true for Worker (live streaming), false for Backfill (batch processing)
// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing)
func NewProcessor(database db.DBTX, useCache bool, statsCache *StatsCache) *Processor {
// Create lexicon catalog for debug validation logging
dir := identity.DefaultDirectory()
catalog := lexicon.NewResolvingCatalog()
catalog.Directory = dir
p := &Processor{
db: database,
useCache: useCache,
statsCache: statsCache,
catalog: catalog,
}
if useCache {
p.userCache = &UserCache{
cache: make(map[string]*db.User),
}
}
return p
}
// EnsureUser resolves and upserts a user by DID
// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill)
func (p *Processor) EnsureUser(ctx context.Context, did string) error {
// Check cache first (if enabled)
if p.useCache && p.userCache != nil {
if _, ok := p.userCache.cache[did]; ok {
// User in cache - just update last seen timestamp
return db.UpdateUserLastSeen(p.db, did)
}
} else if !p.useCache {
// No cache - check if user already exists in DB
existingUser, err := db.GetUserByDID(p.db, did)
if err == nil && existingUser != nil {
// User exists - just update last seen timestamp
return db.UpdateUserLastSeen(p.db, did)
}
}
// Resolve DID to get handle and PDS endpoint
resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
if err != nil {
return err
}
// Fetch user's Bluesky profile record from their PDS (including avatar)
avatarURL := ""
client := atproto.NewClient(pdsEndpoint, "", "")
profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
if err != nil {
slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
// Continue without avatar
} else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
}
// Create user record
user := &db.User{
DID: resolvedDID,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatarURL,
LastSeen: time.Now(),
}
// Cache if enabled
if p.useCache {
p.userCache.cache[did] = user
}
// Upsert to database
// Use UpsertUser if we successfully fetched an avatar (to update existing users)
// Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
if avatarURL != "" {
return db.UpsertUser(p.db, user)
}
return db.UpsertUserIgnoreAvatar(p.db, user)
}
// EnsureUserExists ensures a user row exists in the database without updating it.
// Used by non-profile collections to avoid unnecessary writes during backfill.
// If the user doesn't exist, resolves identity and inserts with ON CONFLICT DO NOTHING.
func (p *Processor) EnsureUserExists(ctx context.Context, did string) error {
// Check cache first (if enabled)
if p.useCache && p.userCache != nil {
if _, ok := p.userCache.cache[did]; ok {
return nil // User in cache, nothing to do
}
} else if !p.useCache {
// No cache - check if user already exists in DB
existingUser, err := db.GetUserByDID(p.db, did)
if err == nil && existingUser != nil {
return nil // User exists, nothing to do
}
}
// User doesn't exist yet — resolve and insert
resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
if err != nil {
return err
}
avatarURL := ""
client := atproto.NewClient(pdsEndpoint, "", "")
profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
if err != nil {
slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
} else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
}
user := &db.User{
DID: resolvedDID,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatarURL,
LastSeen: time.Now(),
}
// Cache if enabled
if p.useCache {
p.userCache.cache[did] = user
}
return db.InsertUserIfNotExists(p.db, user)
}
// ValidateRecord performs validation on records.
// - Full lexicon validation is logged for debugging but does NOT block ingestion
// - Targeted validation (captain/crew DID checks) DOES block bogus records
func (p *Processor) ValidateRecord(ctx context.Context, collection string, data []byte) error {
recordData, err := atpdata.UnmarshalJSON(data)
if err != nil {
return fmt.Errorf("invalid JSON: %w", err)
}
// Debug: Full lexicon validation (log only, don't block)
if p.catalog != nil {
if err := lexicon.ValidateRecord(p.catalog, recordData, collection, 0); err != nil {
slog.Debug("Record failed full lexicon validation (ingesting anyway)",
"component", "processor",
"collection", collection,
"error", err)
}
}
// Targeted validation for collections that had bogus data issues
// These DO block ingestion
switch collection {
case atproto.CaptainCollection:
// Captain must have non-empty owner DID
owner, _ := recordData["owner"].(string)
if owner == "" || !strings.HasPrefix(owner, "did:") {
return fmt.Errorf("captain record missing or invalid owner DID")
}
case atproto.CrewCollection:
// Crew must have non-empty member DID
member, _ := recordData["member"].(string)
if member == "" || !strings.HasPrefix(member, "did:") {
return fmt.Errorf("crew record missing or invalid member DID")
}
}
return nil
}
// ProcessRecord is the unified entry point for processing any ATCR record.
// It handles:
// 1. Schema validation against published lexicons
// 2. User creation for user-activity collections
// 3. Dispatch to the appropriate Process* method
//
// queryCaptainFn is optional - used by backfill for sailor profile processing
func (p *Processor) ProcessRecord(ctx context.Context, did, collection, rkey string, data []byte, isDelete bool, queryCaptainFn func(context.Context, string) error) error {
// Skip validation for deletes (no record data)
if !isDelete && data != nil {
if err := p.ValidateRecord(ctx, collection, data); err != nil {
slog.Warn("Record failed schema validation, skipping",
"component", "processor",
"collection", collection,
"did", did,
"error", err)
return nil // Skip invalid records silently
}
}
// User-activity collections create/update user entries
// Skip for deletes - user should already exist, and we don't need to resolve identity
if !isDelete {
switch collection {
case atproto.SailorProfileCollection:
// Sailor profile is the authoritative source for user data — full upsert
if err := p.EnsureUser(ctx, did); err != nil {
return fmt.Errorf("failed to ensure user: %w", err)
}
case atproto.ManifestCollection,
atproto.TagCollection,
atproto.StarCollection,
atproto.RepoPageCollection:
// Other user collections just need the row to exist — no update if unchanged
if err := p.EnsureUserExists(ctx, did); err != nil {
return fmt.Errorf("failed to ensure user exists: %w", err)
}
// Hold collections (captain, crew, stats) - don't create user entries
// These are records FROM holds, not user activity
}
}
// Dispatch to specific handler
switch collection {
case atproto.ManifestCollection:
if isDelete {
return db.DeleteManifest(p.db, did, "", rkey)
}
_, err := p.ProcessManifest(ctx, did, data)
return err
case atproto.TagCollection:
if isDelete {
repo, tag := atproto.RKeyToRepositoryTag(rkey)
return db.DeleteTag(p.db, did, repo, tag)
}
return p.ProcessTag(ctx, did, data)
case atproto.StarCollection:
if isDelete {
ownerDID, repository, err := atproto.ParseStarRecordKey(rkey)
if err != nil {
return err
}
return db.DeleteStar(p.db, did, ownerDID, repository)
}
return p.ProcessStar(ctx, did, data)
case atproto.RepoPageCollection:
return p.ProcessRepoPage(ctx, did, rkey, data, isDelete)
case atproto.SailorProfileCollection:
return p.ProcessSailorProfile(ctx, did, data, queryCaptainFn)
case atproto.ScanCollection:
return p.ProcessScan(ctx, did, data, isDelete)
case atproto.StatsCollection:
return p.ProcessStats(ctx, did, data, isDelete)
case atproto.CaptainCollection:
if isDelete {
return db.DeleteCaptainRecord(p.db, did)
}
return p.ProcessCaptain(ctx, did, data)
case atproto.CrewCollection:
if isDelete {
return db.DeleteCrewMemberByRkey(p.db, did, rkey)
}
return p.ProcessCrew(ctx, did, rkey, data)
default:
return nil // Unknown collection, ignore
}
}
// ProcessManifest processes a manifest record and stores it in the database
// Returns the manifest ID for further processing (layers/references)
func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) {
// Unmarshal manifest record
var manifestRecord atproto.ManifestRecord
if err := json.Unmarshal(recordData, &manifestRecord); err != nil {
return 0, fmt.Errorf("failed to unmarshal manifest: %w", err)
}
// Detect manifest type
isManifestList := len(manifestRecord.Manifests) > 0
// Extract hold DID from manifest (with fallback for legacy manifests)
// New manifests use holdDid field (DID format)
// Old manifests use holdEndpoint field (URL format) - convert to DID
holdDID := manifestRecord.HoldDID
if holdDID == "" && manifestRecord.HoldEndpoint != "" {
// Legacy manifest - resolve URL to DID via /.well-known/atproto-did
if resolved, err := atproto.ResolveHoldDID(ctx, manifestRecord.HoldEndpoint); err != nil {
slog.Warn("Failed to resolve hold DID from legacy manifest endpoint", "holdEndpoint", manifestRecord.HoldEndpoint, "error", err)
} else {
holdDID = resolved
}
}
// Detect artifact type from config media type
artifactType := "container-image"
if !isManifestList && manifestRecord.Config != nil {
artifactType = db.GetArtifactType(manifestRecord.Config.MediaType)
}
// Prepare manifest for insertion (WITHOUT annotation fields)
manifest := &db.Manifest{
DID: did,
Repository: manifestRecord.Repository,
Digest: manifestRecord.Digest,
MediaType: manifestRecord.MediaType,
SchemaVersion: manifestRecord.SchemaVersion,
HoldEndpoint: holdDID,
ArtifactType: artifactType,
CreatedAt: manifestRecord.CreatedAt,
// Annotations removed - stored separately in repository_annotations table
}
// Set config fields only for image manifests (not manifest lists)
if !isManifestList && manifestRecord.Config != nil {
manifest.ConfigDigest = manifestRecord.Config.Digest
manifest.ConfigSize = manifestRecord.Config.Size
}
// Insert manifest
manifestID, err := db.InsertManifest(p.db, manifest)
if err != nil {
// For backfill: if manifest already exists, get its ID
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
var existingID int64
err := p.db.QueryRow(`
SELECT id FROM manifests
WHERE did = ? AND repository = ? AND digest = ?
`, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID)
if err != nil {
return 0, fmt.Errorf("failed to get existing manifest ID: %w", err)
}
manifestID = existingID
} else {
return 0, fmt.Errorf("failed to insert manifest: %w", err)
}
}
// Update repository annotations ONLY if manifest has at least one non-empty annotation
if manifestRecord.Annotations != nil {
hasData := false
for _, value := range manifestRecord.Annotations {
if value != "" {
hasData = true
break
}
}
if hasData {
// Replace all annotations for this repository
err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations)
if err != nil {
return 0, fmt.Errorf("failed to upsert annotations: %w", err)
}
}
}
// Insert manifest references or layers
if isManifestList {
// Insert manifest references (for manifest lists/indexes)
for i, ref := range manifestRecord.Manifests {
platformArch := ""
platformOS := ""
platformVariant := ""
platformOSVersion := ""
if ref.Platform != nil {
platformArch = ref.Platform.Architecture
platformOS = ref.Platform.OS
platformVariant = ref.Platform.Variant
platformOSVersion = ref.Platform.OSVersion
}
// Detect attestation manifests from annotations
isAttestation := false
if ref.Annotations != nil {
if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok {
isAttestation = refType == "attestation-manifest"
}
}
if err := db.InsertManifestReference(p.db, &db.ManifestReference{
ManifestID: manifestID,
Digest: ref.Digest,
MediaType: ref.MediaType,
Size: ref.Size,
PlatformArchitecture: platformArch,
PlatformOS: platformOS,
PlatformVariant: platformVariant,
PlatformOSVersion: platformOSVersion,
IsAttestation: isAttestation,
ReferenceIndex: i,
}); err != nil {
// Continue on error - reference might already exist
continue
}
}
} else {
// Insert layers (for image manifests)
for i, layer := range manifestRecord.Layers {
if err := db.InsertLayer(p.db, &db.Layer{
ManifestID: manifestID,
Digest: layer.Digest,
MediaType: layer.MediaType,
Size: layer.Size,
LayerIndex: i,
Annotations: layer.Annotations,
}); err != nil {
// Continue on error - layer might already exist
continue
}
}
}
return manifestID, nil
}
// ProcessTag processes a tag record and stores it in the database
func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error {
// Unmarshal tag record
var tagRecord atproto.TagRecord
if err := json.Unmarshal(recordData, &tagRecord); err != nil {
return fmt.Errorf("failed to unmarshal tag: %w", err)
}
// Extract digest from tag record (tries manifest field first, falls back to manifestDigest)
manifestDigest, err := tagRecord.GetManifestDigest()
if err != nil {
return fmt.Errorf("failed to get manifest digest from tag record: %w", err)
}
// Insert or update tag
return db.UpsertTag(p.db, &db.Tag{
DID: did,
Repository: tagRecord.Repository,
Tag: tagRecord.Tag,
Digest: manifestDigest,
CreatedAt: tagRecord.UpdatedAt,
})
}
// ProcessStar processes a star record and stores it in the database
func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
// Unmarshal star record (handles both old object and new AT URI subject formats)
var starRecord atproto.StarRecord
if err := json.Unmarshal(recordData, &starRecord); err != nil {
return fmt.Errorf("failed to unmarshal star: %w", err)
}
// Extract owner DID and repository from subject AT URI
ownerDID, repository, err := starRecord.GetSubjectDIDAndRepository()
if err != nil {
return fmt.Errorf("failed to parse star subject: %w", err)
}
// Ensure the starred repository's owner exists in the users table
// (the starrer is already ensured by ProcessRecord, but the owner
// may not have been processed yet during backfill or live events)
if err := p.EnsureUserExists(ctx, ownerDID); err != nil {
return fmt.Errorf("failed to ensure star subject user: %w", err)
}
// Upsert the star record (idempotent - won't duplicate)
// The DID here is the starrer (user who starred)
// Star count will be calculated on demand from the stars table
return db.UpsertStar(p.db, did, ownerDID, repository, starRecord.CreatedAt)
}
// ProcessSailorProfile processes a sailor profile record
// This is primarily used by backfill to cache captain records for holds
func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error {
// Unmarshal sailor profile record
var profileRecord atproto.SailorProfileRecord
if err := json.Unmarshal(recordData, &profileRecord); err != nil {
return fmt.Errorf("failed to unmarshal sailor profile: %w", err)
}
// Cache OCI client preference (always, even if no default hold)
if profileRecord.OciClient != "" {
if err := db.UpdateUserOciClient(p.db, did, profileRecord.OciClient); err != nil {
slog.Warn("Failed to cache OCI client preference", "component", "processor", "did", did, "ociClient", profileRecord.OciClient, "error", err)
}
}
// Skip hold processing if no default hold set
if profileRecord.DefaultHold == "" {
return nil
}
// Convert hold URL/DID to canonical DID
holdDID, err := atproto.ResolveHoldDID(ctx, profileRecord.DefaultHold)
if err != nil {
slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold, "error", err)
return nil
}
// Cache default hold DID on the user record
if err := db.UpdateUserDefaultHold(p.db, did, holdDID); err != nil {
slog.Warn("Failed to cache default hold DID", "component", "processor", "did", did, "holdDid", holdDID, "error", err)
}
// Query and cache the captain record using provided function
// This allows backfill-specific logic (retries, test mode handling) without duplicating it here
if queryCaptainFn != nil {
return queryCaptainFn(ctx, holdDID)
}
return nil
}
// ProcessRepoPage processes a repository page record
// This is called when Jetstream receives a repo page create/update event
func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error {
if isDelete {
// Delete the repo page from our cache
return db.DeleteRepoPage(p.db, did, rkey)
}
// Unmarshal repo page record
var pageRecord atproto.RepoPageRecord
if err := json.Unmarshal(recordData, &pageRecord); err != nil {
return fmt.Errorf("failed to unmarshal repo page: %w", err)
}
// Extract avatar CID if present
avatarCID := ""
if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" {
avatarCID = pageRecord.Avatar.Ref.Link
}
// Upsert to database
return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.UserEdited, pageRecord.CreatedAt, pageRecord.UpdatedAt)
}
// ProcessIdentity handles identity change events (handle updates)
// This is called when Jetstream receives an identity event indicating a handle change.
// The identity cache is invalidated to ensure the next lookup uses the new handle,
// and the database is updated to reflect the change in the UI.
//
// Only processes events for users who already exist in our database (have ATCR activity).
func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error {
// Check if user exists in our database - only update if they're an ATCR user
user, err := db.GetUserByDID(p.db, did)
if err != nil {
return fmt.Errorf("failed to check user existence: %w", err)
}
// Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.)
if user == nil {
return nil
}
// Update handle in database
if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil {
slog.Warn("Failed to update user handle in database",
"component", "processor",
"did", did,
"handle", newHandle,
"error", err)
// Continue to invalidate cache even if DB update fails
}
// Invalidate cached identity data to force re-resolution on next lookup
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
slog.Warn("Failed to invalidate identity cache",
"component", "processor",
"did", did,
"error", err)
return err
}
slog.Info("Processed identity change event",
"component", "processor",
"did", did,
"old_handle", user.Handle,
"new_handle", newHandle)
return nil
}
// ProcessProfileUpdate handles app.bsky.actor.profile updates for known ATCR users
// This refreshes the cached avatar URL when a user changes their Bluesky profile picture
func (p *Processor) ProcessProfileUpdate(ctx context.Context, did string, recordData []byte) error {
// Check if user exists in our database - only update if they're an ATCR user
user, err := db.GetUserByDID(p.db, did)
if err != nil {
return fmt.Errorf("failed to check user existence: %w", err)
}
// Skip if user doesn't exist - they don't have any ATCR activity
if user == nil {
return nil
}
// Parse the profile record to extract avatar
var profile struct {
Avatar *atproto.ATProtoBlobRef `json:"avatar"`
}
if err := json.Unmarshal(recordData, &profile); err != nil {
return fmt.Errorf("failed to unmarshal profile: %w", err)
}
// Build new avatar URL
avatarURL := ""
if profile.Avatar != nil && profile.Avatar.Ref.Link != "" {
avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
}
// Update if changed
if avatarURL != user.Avatar {
slog.Info("Updating avatar from profile change",
"component", "processor",
"did", did,
"old_avatar", user.Avatar,
"new_avatar", avatarURL)
return db.UpdateUserAvatar(p.db, did, avatarURL)
}
return nil
}
// RefreshUserAvatar fetches the user's current Bluesky profile and updates their cached avatar
// This is called during backfill to ensure avatars stay fresh for existing users
func (p *Processor) RefreshUserAvatar(ctx context.Context, did, pdsEndpoint string) error {
// Get user from database to compare avatar
user, err := db.GetUserByDID(p.db, did)
if err != nil || user == nil {
return nil // User doesn't exist, skip
}
// Fetch profile from PDS
client := atproto.NewClient(pdsEndpoint, "", "")
profile, err := client.GetProfileRecord(ctx, did)
if err != nil {
return fmt.Errorf("failed to fetch profile: %w", err)
}
// Build avatar URL
avatarURL := ""
if profile.Avatar != nil && profile.Avatar.Ref.Link != "" {
avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
}
// Update if changed
if avatarURL != user.Avatar {
slog.Info("Backfill refreshing avatar",
"component", "processor",
"did", did,
"old_avatar", user.Avatar,
"new_avatar", avatarURL)
return db.UpdateUserAvatar(p.db, did, avatarURL)
}
return nil
}
// ProcessScan handles scan record events from hold PDSes.
// Caches scan results in the appview DB and dispatches webhooks (if dispatcher is set).
func (p *Processor) ProcessScan(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error {
if isDelete {
return nil // Scan deletes are not processed (scans are immutable)
}
// Unmarshal scan record
var scanRecord atproto.ScanRecord
if err := json.Unmarshal(recordData, &scanRecord); err != nil {
return fmt.Errorf("failed to unmarshal scan record: %w", err)
}
// Extract manifest digest from the scan record's manifest AT-URI
manifestDigest := ""
if parts := strings.Split(scanRecord.Manifest, "/"); len(parts) > 0 {
manifestDigest = "sha256:" + parts[len(parts)-1]
}
// Parse scanned_at timestamp
scannedAt := time.Now()
if t, err := time.Parse(time.RFC3339, scanRecord.ScannedAt); err == nil {
scannedAt = t
}
scan := &db.Scan{
HoldDID: holdDID,
ManifestDigest: manifestDigest,
UserDID: scanRecord.UserDID,
Repository: scanRecord.Repository,
Critical: int(scanRecord.Critical),
High: int(scanRecord.High),
Medium: int(scanRecord.Medium),
Low: int(scanRecord.Low),
Total: int(scanRecord.Total),
ScannerVersion: scanRecord.ScannerVersion,
ScannedAt: scannedAt,
}
// Upsert scan to DB (returns previous scan for change detection)
previousScan, err := db.UpsertScan(p.db, scan)
if err != nil {
return fmt.Errorf("failed to upsert scan: %w", err)
}
// Dispatch webhooks if dispatcher is set (live Worker only, not backfill)
if p.webhookDispatcher != nil {
// Resolve user handle from cache or DB
userHandle := ""
user, userErr := db.GetUserByDID(p.db, scanRecord.UserDID)
if userErr == nil && user != nil {
userHandle = user.Handle
}
// Resolve tag for the manifest digest
tag := ""
if tagVal, tagErr := db.GetTagByDigest(p.db, scanRecord.UserDID, scanRecord.Repository, manifestDigest); tagErr == nil {
tag = tagVal
}
// Resolve hold endpoint URL
holdEndpoint := ""
if holdURL, holdErr := atproto.ResolveHoldURL(ctx, holdDID); holdErr == nil {
holdEndpoint = holdURL
}
p.webhookDispatcher.DispatchForScan(ctx, scan, previousScan, userHandle, tag, holdEndpoint)
}
return nil
}
// ProcessStats handles stats record events from hold PDSes
// This is called when Jetstream receives a stats create/update/delete event from a hold
// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository
func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error {
// Skip if no stats cache configured
if p.statsCache == nil {
return nil
}
// Unmarshal stats record
var statsRecord atproto.StatsRecord
if err := json.Unmarshal(recordData, &statsRecord); err != nil {
return fmt.Errorf("failed to unmarshal stats record: %w", err)
}
// Ensure the owner user exists in the users table (repository_stats has FK to users.did)
if !isDelete && statsRecord.OwnerDID != "" {
if err := p.EnsureUserExists(ctx, statsRecord.OwnerDID); err != nil {
return fmt.Errorf("failed to ensure stats owner user exists: %w", err)
}
}
if isDelete {
// Delete from in-memory cache
p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository)
} else {
// Parse timestamps
var lastPull, lastPush *time.Time
if statsRecord.LastPull != "" {
t, err := time.Parse(time.RFC3339, statsRecord.LastPull)
if err == nil {
lastPull = &t
}
}
if statsRecord.LastPush != "" {
t, err := time.Parse(time.RFC3339, statsRecord.LastPush)
if err == nil {
lastPush = &t
}
}
// Update in-memory cache
p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository,
statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush)
}
// Get aggregated stats across all holds
totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated(
statsRecord.OwnerDID, statsRecord.Repository)
// Upsert aggregated stats to repository_stats
return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{
DID: statsRecord.OwnerDID,
Repository: statsRecord.Repository,
PullCount: int(totalPull),
PushCount: int(totalPush),
LastPull: latestPull,
LastPush: latestPush,
})
}
// ProcessCaptain handles captain record events from hold PDSes
// This is called when Jetstream receives a captain create/update/delete event from a hold
// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownership info
func (p *Processor) ProcessCaptain(ctx context.Context, holdDID string, recordData []byte) error {
// Unmarshal captain record
var captainRecord atproto.CaptainRecord
if err := json.Unmarshal(recordData, &captainRecord); err != nil {
return fmt.Errorf("failed to unmarshal captain record: %w", err)
}
// Convert to db struct and upsert
record := &db.HoldCaptainRecord{
HoldDID: holdDID,
OwnerDID: captainRecord.Owner,
Public: captainRecord.Public,
AllowAllCrew: captainRecord.AllowAllCrew,
DeployedAt: captainRecord.DeployedAt,
Region: captainRecord.Region,
Successor: captainRecord.Successor,
UpdatedAt: time.Now(),
}
if err := db.UpsertCaptainRecord(p.db, record); err != nil {
return fmt.Errorf("failed to upsert captain record: %w", err)
}
slog.Info("Processed captain record",
"component", "processor",
"hold_did", holdDID,
"owner_did", captainRecord.Owner,
"public", captainRecord.Public,
"allow_all_crew", captainRecord.AllowAllCrew)
return nil
}
// ProcessCrew handles crew record events from hold PDSes
// This is called when Jetstream receives a crew create/update/delete event from a hold
// The holdDID is the DID of the hold PDS (event.DID), and the record contains member info
func (p *Processor) ProcessCrew(ctx context.Context, holdDID string, rkey string, recordData []byte) error {
// Unmarshal crew record
var crewRecord atproto.CrewRecord
if err := json.Unmarshal(recordData, &crewRecord); err != nil {
return fmt.Errorf("failed to unmarshal crew record: %w", err)
}
// Marshal permissions to JSON string
permissionsJSON := ""
if len(crewRecord.Permissions) > 0 {
if jsonBytes, err := json.Marshal(crewRecord.Permissions); err == nil {
permissionsJSON = string(jsonBytes)
}
}
// Convert to db struct and upsert
member := &db.CrewMember{
HoldDID: holdDID,
MemberDID: crewRecord.Member,
Rkey: rkey,
Role: crewRecord.Role,
Permissions: permissionsJSON,
Tier: crewRecord.Tier,
AddedAt: crewRecord.AddedAt,
}
if err := db.UpsertCrewMember(p.db, member); err != nil {
return fmt.Errorf("failed to upsert crew member: %w", err)
}
return nil
}
// ProcessAccount handles account status events (deactivation/deletion/etc)
// This is called when Jetstream receives an account event indicating status changes.
//
// Status handling:
// - "deleted": Account permanently deleted - remove all cached data
// - "deactivated": Could be PDS migration or permanent - invalidate cache only
// - "takendown": Moderation action - invalidate cache only
// - Other: Ignore
//
// For "deactivated", we don't delete data because it's ambiguous:
// - Could be permanent deactivation (user deleted account)
// - Could be PDS migration (account moves to new PDS)
// Cache invalidation forces re-resolution on next lookup.
//
// Only processes events for users who already exist in our database (have ATCR activity).
func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error {
// Skip active accounts or unknown statuses
if active {
return nil
}
// Check if user exists in our database - only process if they're an ATCR user
user, err := db.GetUserByDID(p.db, did)
if err != nil {
return fmt.Errorf("failed to check user existence: %w", err)
}
// Skip if user doesn't exist - they don't have any ATCR activity
if user == nil {
return nil
}
switch status {
case "deleted":
// Account permanently deleted - remove all cached data
if _, err := db.DeleteUserData(p.db, did); err != nil {
slog.Error("Failed to delete user data for deleted account",
"component", "processor",
"did", did,
"handle", user.Handle,
"error", err)
return err
}
// Also invalidate identity cache
_ = atproto.InvalidateIdentity(ctx, did)
slog.Info("Deleted user data for deleted account",
"component", "processor",
"did", did,
"handle", user.Handle)
case "deactivated", "takendown":
// Ambiguous status - invalidate cache but keep data
// For deactivated: could be PDS migration, will resolve on next lookup
// For takendown: moderation action, keep data in case of appeal
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
slog.Warn("Failed to invalidate identity cache",
"component", "processor",
"did", did,
"status", status,
"error", err)
return err
}
slog.Info("Processed account status event - cache invalidated",
"component", "processor",
"did", did,
"handle", user.Handle,
"status", status)
default:
// Unknown status - ignore
slog.Debug("Ignoring unknown account status",
"component", "processor",
"did", did,
"status", status)
}
return nil
}