981 lines
32 KiB
Go
981 lines
32 KiB
Go
package jetstream
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"time"
|
|
|
|
"atcr.io/pkg/appview/db"
|
|
"atcr.io/pkg/atproto"
|
|
atpdata "github.com/bluesky-social/indigo/atproto/atdata"
|
|
"github.com/bluesky-social/indigo/atproto/identity"
|
|
"github.com/bluesky-social/indigo/atproto/lexicon"
|
|
)
|
|
|
|
// Processor handles shared database operations for both Worker (live) and Backfill (sync)
|
|
// This eliminates code duplication between the two data ingestion paths
|
|
type Processor struct {
|
|
db db.DBTX
|
|
userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
|
|
statsCache *StatsCache // In-memory cache for per-hold stats aggregation
|
|
useCache bool
|
|
catalog *lexicon.ResolvingCatalog // For debug logging of validation failures
|
|
webhookDispatcher WebhookDispatcher // Optional - only for live Worker (nil for Backfill)
|
|
}
|
|
|
|
// WebhookDispatcher is an interface for dispatching webhooks on scan completion.
|
|
// Only the live Worker sets this; backfill does NOT (avoids spamming old scan results).
|
|
type WebhookDispatcher interface {
|
|
DispatchForScan(ctx context.Context, scan, previousScan *db.Scan, userHandle, tag, holdEndpoint string)
|
|
}
|
|
|
|
// SetWebhookDispatcher sets the webhook dispatcher for scan processing.
|
|
// Only the live Worker should set this — backfill skips webhook dispatch.
|
|
func (p *Processor) SetWebhookDispatcher(d WebhookDispatcher) {
|
|
p.webhookDispatcher = d
|
|
}
|
|
|
|
// NewProcessor creates a new shared processor
|
|
// useCache: true for Worker (live streaming), false for Backfill (batch processing)
|
|
// statsCache: shared stats cache for aggregating across holds (nil to skip stats processing)
|
|
func NewProcessor(database db.DBTX, useCache bool, statsCache *StatsCache) *Processor {
|
|
// Create lexicon catalog for debug validation logging
|
|
dir := identity.DefaultDirectory()
|
|
catalog := lexicon.NewResolvingCatalog()
|
|
catalog.Directory = dir
|
|
|
|
p := &Processor{
|
|
db: database,
|
|
useCache: useCache,
|
|
statsCache: statsCache,
|
|
catalog: catalog,
|
|
}
|
|
|
|
if useCache {
|
|
p.userCache = &UserCache{
|
|
cache: make(map[string]*db.User),
|
|
}
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
// EnsureUser resolves and upserts a user by DID
|
|
// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill)
|
|
func (p *Processor) EnsureUser(ctx context.Context, did string) error {
|
|
// Check cache first (if enabled)
|
|
if p.useCache && p.userCache != nil {
|
|
if _, ok := p.userCache.cache[did]; ok {
|
|
// User in cache - just update last seen timestamp
|
|
return db.UpdateUserLastSeen(p.db, did)
|
|
}
|
|
} else if !p.useCache {
|
|
// No cache - check if user already exists in DB
|
|
existingUser, err := db.GetUserByDID(p.db, did)
|
|
if err == nil && existingUser != nil {
|
|
// User exists - just update last seen timestamp
|
|
return db.UpdateUserLastSeen(p.db, did)
|
|
}
|
|
}
|
|
|
|
// Resolve DID to get handle and PDS endpoint
|
|
resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Fetch user's Bluesky profile record from their PDS (including avatar)
|
|
avatarURL := ""
|
|
client := atproto.NewClient(pdsEndpoint, "", "")
|
|
profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
|
|
if err != nil {
|
|
slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
|
|
// Continue without avatar
|
|
} else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
|
|
avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
|
|
}
|
|
|
|
// Create user record
|
|
user := &db.User{
|
|
DID: resolvedDID,
|
|
Handle: handle,
|
|
PDSEndpoint: pdsEndpoint,
|
|
Avatar: avatarURL,
|
|
LastSeen: time.Now(),
|
|
}
|
|
|
|
// Cache if enabled
|
|
if p.useCache {
|
|
p.userCache.cache[did] = user
|
|
}
|
|
|
|
// Upsert to database
|
|
// Use UpsertUser if we successfully fetched an avatar (to update existing users)
|
|
// Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
|
|
if avatarURL != "" {
|
|
return db.UpsertUser(p.db, user)
|
|
}
|
|
return db.UpsertUserIgnoreAvatar(p.db, user)
|
|
}
|
|
|
|
// EnsureUserExists ensures a user row exists in the database without updating it.
|
|
// Used by non-profile collections to avoid unnecessary writes during backfill.
|
|
// If the user doesn't exist, resolves identity and inserts with ON CONFLICT DO NOTHING.
|
|
func (p *Processor) EnsureUserExists(ctx context.Context, did string) error {
|
|
// Check cache first (if enabled)
|
|
if p.useCache && p.userCache != nil {
|
|
if _, ok := p.userCache.cache[did]; ok {
|
|
return nil // User in cache, nothing to do
|
|
}
|
|
} else if !p.useCache {
|
|
// No cache - check if user already exists in DB
|
|
existingUser, err := db.GetUserByDID(p.db, did)
|
|
if err == nil && existingUser != nil {
|
|
return nil // User exists, nothing to do
|
|
}
|
|
}
|
|
|
|
// User doesn't exist yet — resolve and insert
|
|
resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
avatarURL := ""
|
|
client := atproto.NewClient(pdsEndpoint, "", "")
|
|
profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
|
|
if err != nil {
|
|
slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
|
|
} else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
|
|
avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
|
|
}
|
|
|
|
user := &db.User{
|
|
DID: resolvedDID,
|
|
Handle: handle,
|
|
PDSEndpoint: pdsEndpoint,
|
|
Avatar: avatarURL,
|
|
LastSeen: time.Now(),
|
|
}
|
|
|
|
// Cache if enabled
|
|
if p.useCache {
|
|
p.userCache.cache[did] = user
|
|
}
|
|
|
|
return db.InsertUserIfNotExists(p.db, user)
|
|
}
|
|
|
|
// ValidateRecord performs validation on records.
|
|
// - Full lexicon validation is logged for debugging but does NOT block ingestion
|
|
// - Targeted validation (captain/crew DID checks) DOES block bogus records
|
|
func (p *Processor) ValidateRecord(ctx context.Context, collection string, data []byte) error {
|
|
recordData, err := atpdata.UnmarshalJSON(data)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid JSON: %w", err)
|
|
}
|
|
|
|
// Debug: Full lexicon validation (log only, don't block)
|
|
if p.catalog != nil {
|
|
if err := lexicon.ValidateRecord(p.catalog, recordData, collection, 0); err != nil {
|
|
slog.Debug("Record failed full lexicon validation (ingesting anyway)",
|
|
"component", "processor",
|
|
"collection", collection,
|
|
"error", err)
|
|
}
|
|
}
|
|
|
|
// Targeted validation for collections that had bogus data issues
|
|
// These DO block ingestion
|
|
switch collection {
|
|
case atproto.CaptainCollection:
|
|
// Captain must have non-empty owner DID
|
|
owner, _ := recordData["owner"].(string)
|
|
if owner == "" || !strings.HasPrefix(owner, "did:") {
|
|
return fmt.Errorf("captain record missing or invalid owner DID")
|
|
}
|
|
|
|
case atproto.CrewCollection:
|
|
// Crew must have non-empty member DID
|
|
member, _ := recordData["member"].(string)
|
|
if member == "" || !strings.HasPrefix(member, "did:") {
|
|
return fmt.Errorf("crew record missing or invalid member DID")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProcessRecord is the unified entry point for processing any ATCR record.
|
|
// It handles:
|
|
// 1. Schema validation against published lexicons
|
|
// 2. User creation for user-activity collections
|
|
// 3. Dispatch to the appropriate Process* method
|
|
//
|
|
// queryCaptainFn is optional - used by backfill for sailor profile processing
|
|
func (p *Processor) ProcessRecord(ctx context.Context, did, collection, rkey string, data []byte, isDelete bool, queryCaptainFn func(context.Context, string) error) error {
|
|
// Skip validation for deletes (no record data)
|
|
if !isDelete && data != nil {
|
|
if err := p.ValidateRecord(ctx, collection, data); err != nil {
|
|
slog.Warn("Record failed schema validation, skipping",
|
|
"component", "processor",
|
|
"collection", collection,
|
|
"did", did,
|
|
"error", err)
|
|
return nil // Skip invalid records silently
|
|
}
|
|
}
|
|
|
|
// User-activity collections create/update user entries
|
|
// Skip for deletes - user should already exist, and we don't need to resolve identity
|
|
if !isDelete {
|
|
switch collection {
|
|
case atproto.SailorProfileCollection:
|
|
// Sailor profile is the authoritative source for user data — full upsert
|
|
if err := p.EnsureUser(ctx, did); err != nil {
|
|
return fmt.Errorf("failed to ensure user: %w", err)
|
|
}
|
|
case atproto.ManifestCollection,
|
|
atproto.TagCollection,
|
|
atproto.StarCollection,
|
|
atproto.RepoPageCollection:
|
|
// Other user collections just need the row to exist — no update if unchanged
|
|
if err := p.EnsureUserExists(ctx, did); err != nil {
|
|
return fmt.Errorf("failed to ensure user exists: %w", err)
|
|
}
|
|
// Hold collections (captain, crew, stats) - don't create user entries
|
|
// These are records FROM holds, not user activity
|
|
}
|
|
}
|
|
|
|
// Dispatch to specific handler
|
|
switch collection {
|
|
case atproto.ManifestCollection:
|
|
if isDelete {
|
|
return db.DeleteManifest(p.db, did, "", rkey)
|
|
}
|
|
_, err := p.ProcessManifest(ctx, did, data)
|
|
return err
|
|
|
|
case atproto.TagCollection:
|
|
if isDelete {
|
|
repo, tag := atproto.RKeyToRepositoryTag(rkey)
|
|
return db.DeleteTag(p.db, did, repo, tag)
|
|
}
|
|
return p.ProcessTag(ctx, did, data)
|
|
|
|
case atproto.StarCollection:
|
|
if isDelete {
|
|
ownerDID, repository, err := atproto.ParseStarRecordKey(rkey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return db.DeleteStar(p.db, did, ownerDID, repository)
|
|
}
|
|
return p.ProcessStar(ctx, did, data)
|
|
|
|
case atproto.RepoPageCollection:
|
|
return p.ProcessRepoPage(ctx, did, rkey, data, isDelete)
|
|
|
|
case atproto.SailorProfileCollection:
|
|
return p.ProcessSailorProfile(ctx, did, data, queryCaptainFn)
|
|
|
|
case atproto.ScanCollection:
|
|
return p.ProcessScan(ctx, did, data, isDelete)
|
|
|
|
case atproto.StatsCollection:
|
|
return p.ProcessStats(ctx, did, data, isDelete)
|
|
|
|
case atproto.CaptainCollection:
|
|
if isDelete {
|
|
return db.DeleteCaptainRecord(p.db, did)
|
|
}
|
|
return p.ProcessCaptain(ctx, did, data)
|
|
|
|
case atproto.CrewCollection:
|
|
if isDelete {
|
|
return db.DeleteCrewMemberByRkey(p.db, did, rkey)
|
|
}
|
|
return p.ProcessCrew(ctx, did, rkey, data)
|
|
|
|
default:
|
|
return nil // Unknown collection, ignore
|
|
}
|
|
}
|
|
|
|
// ProcessManifest processes a manifest record and stores it in the database
|
|
// Returns the manifest ID for further processing (layers/references)
|
|
func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) {
|
|
// Unmarshal manifest record
|
|
var manifestRecord atproto.ManifestRecord
|
|
if err := json.Unmarshal(recordData, &manifestRecord); err != nil {
|
|
return 0, fmt.Errorf("failed to unmarshal manifest: %w", err)
|
|
}
|
|
// Detect manifest type
|
|
isManifestList := len(manifestRecord.Manifests) > 0
|
|
|
|
// Extract hold DID from manifest (with fallback for legacy manifests)
|
|
// New manifests use holdDid field (DID format)
|
|
// Old manifests use holdEndpoint field (URL format) - convert to DID
|
|
holdDID := manifestRecord.HoldDID
|
|
if holdDID == "" && manifestRecord.HoldEndpoint != "" {
|
|
// Legacy manifest - resolve URL to DID via /.well-known/atproto-did
|
|
if resolved, err := atproto.ResolveHoldDID(ctx, manifestRecord.HoldEndpoint); err != nil {
|
|
slog.Warn("Failed to resolve hold DID from legacy manifest endpoint", "holdEndpoint", manifestRecord.HoldEndpoint, "error", err)
|
|
} else {
|
|
holdDID = resolved
|
|
}
|
|
}
|
|
|
|
// Detect artifact type from config media type
|
|
artifactType := "container-image"
|
|
if !isManifestList && manifestRecord.Config != nil {
|
|
artifactType = db.GetArtifactType(manifestRecord.Config.MediaType)
|
|
}
|
|
|
|
// Prepare manifest for insertion (WITHOUT annotation fields)
|
|
manifest := &db.Manifest{
|
|
DID: did,
|
|
Repository: manifestRecord.Repository,
|
|
Digest: manifestRecord.Digest,
|
|
MediaType: manifestRecord.MediaType,
|
|
SchemaVersion: manifestRecord.SchemaVersion,
|
|
HoldEndpoint: holdDID,
|
|
ArtifactType: artifactType,
|
|
CreatedAt: manifestRecord.CreatedAt,
|
|
// Annotations removed - stored separately in repository_annotations table
|
|
}
|
|
|
|
// Set config fields only for image manifests (not manifest lists)
|
|
if !isManifestList && manifestRecord.Config != nil {
|
|
manifest.ConfigDigest = manifestRecord.Config.Digest
|
|
manifest.ConfigSize = manifestRecord.Config.Size
|
|
}
|
|
|
|
// Insert manifest
|
|
manifestID, err := db.InsertManifest(p.db, manifest)
|
|
if err != nil {
|
|
// For backfill: if manifest already exists, get its ID
|
|
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
|
|
var existingID int64
|
|
err := p.db.QueryRow(`
|
|
SELECT id FROM manifests
|
|
WHERE did = ? AND repository = ? AND digest = ?
|
|
`, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID)
|
|
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to get existing manifest ID: %w", err)
|
|
}
|
|
manifestID = existingID
|
|
} else {
|
|
return 0, fmt.Errorf("failed to insert manifest: %w", err)
|
|
}
|
|
}
|
|
|
|
// Update repository annotations ONLY if manifest has at least one non-empty annotation
|
|
if manifestRecord.Annotations != nil {
|
|
hasData := false
|
|
for _, value := range manifestRecord.Annotations {
|
|
if value != "" {
|
|
hasData = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if hasData {
|
|
// Replace all annotations for this repository
|
|
err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to upsert annotations: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Insert manifest references or layers
|
|
if isManifestList {
|
|
// Insert manifest references (for manifest lists/indexes)
|
|
for i, ref := range manifestRecord.Manifests {
|
|
platformArch := ""
|
|
platformOS := ""
|
|
platformVariant := ""
|
|
platformOSVersion := ""
|
|
|
|
if ref.Platform != nil {
|
|
platformArch = ref.Platform.Architecture
|
|
platformOS = ref.Platform.OS
|
|
platformVariant = ref.Platform.Variant
|
|
platformOSVersion = ref.Platform.OSVersion
|
|
}
|
|
|
|
// Detect attestation manifests from annotations
|
|
isAttestation := false
|
|
if ref.Annotations != nil {
|
|
if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok {
|
|
isAttestation = refType == "attestation-manifest"
|
|
}
|
|
}
|
|
|
|
if err := db.InsertManifestReference(p.db, &db.ManifestReference{
|
|
ManifestID: manifestID,
|
|
Digest: ref.Digest,
|
|
MediaType: ref.MediaType,
|
|
Size: ref.Size,
|
|
PlatformArchitecture: platformArch,
|
|
PlatformOS: platformOS,
|
|
PlatformVariant: platformVariant,
|
|
PlatformOSVersion: platformOSVersion,
|
|
IsAttestation: isAttestation,
|
|
ReferenceIndex: i,
|
|
}); err != nil {
|
|
// Continue on error - reference might already exist
|
|
continue
|
|
}
|
|
}
|
|
} else {
|
|
// Insert layers (for image manifests)
|
|
for i, layer := range manifestRecord.Layers {
|
|
if err := db.InsertLayer(p.db, &db.Layer{
|
|
ManifestID: manifestID,
|
|
Digest: layer.Digest,
|
|
MediaType: layer.MediaType,
|
|
Size: layer.Size,
|
|
LayerIndex: i,
|
|
Annotations: layer.Annotations,
|
|
}); err != nil {
|
|
// Continue on error - layer might already exist
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
return manifestID, nil
|
|
}
|
|
|
|
// ProcessTag processes a tag record and stores it in the database
|
|
func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error {
|
|
// Unmarshal tag record
|
|
var tagRecord atproto.TagRecord
|
|
if err := json.Unmarshal(recordData, &tagRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal tag: %w", err)
|
|
}
|
|
// Extract digest from tag record (tries manifest field first, falls back to manifestDigest)
|
|
manifestDigest, err := tagRecord.GetManifestDigest()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get manifest digest from tag record: %w", err)
|
|
}
|
|
|
|
// Insert or update tag
|
|
return db.UpsertTag(p.db, &db.Tag{
|
|
DID: did,
|
|
Repository: tagRecord.Repository,
|
|
Tag: tagRecord.Tag,
|
|
Digest: manifestDigest,
|
|
CreatedAt: tagRecord.UpdatedAt,
|
|
})
|
|
}
|
|
|
|
// ProcessStar processes a star record and stores it in the database
|
|
func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
|
|
// Unmarshal star record (handles both old object and new AT URI subject formats)
|
|
var starRecord atproto.StarRecord
|
|
if err := json.Unmarshal(recordData, &starRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal star: %w", err)
|
|
}
|
|
|
|
// Extract owner DID and repository from subject AT URI
|
|
ownerDID, repository, err := starRecord.GetSubjectDIDAndRepository()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse star subject: %w", err)
|
|
}
|
|
|
|
// Ensure the starred repository's owner exists in the users table
|
|
// (the starrer is already ensured by ProcessRecord, but the owner
|
|
// may not have been processed yet during backfill or live events)
|
|
if err := p.EnsureUserExists(ctx, ownerDID); err != nil {
|
|
return fmt.Errorf("failed to ensure star subject user: %w", err)
|
|
}
|
|
|
|
// Upsert the star record (idempotent - won't duplicate)
|
|
// The DID here is the starrer (user who starred)
|
|
// Star count will be calculated on demand from the stars table
|
|
return db.UpsertStar(p.db, did, ownerDID, repository, starRecord.CreatedAt)
|
|
}
|
|
|
|
// ProcessSailorProfile processes a sailor profile record
|
|
// This is primarily used by backfill to cache captain records for holds
|
|
func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error {
|
|
// Unmarshal sailor profile record
|
|
var profileRecord atproto.SailorProfileRecord
|
|
if err := json.Unmarshal(recordData, &profileRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal sailor profile: %w", err)
|
|
}
|
|
|
|
// Cache OCI client preference (always, even if no default hold)
|
|
if profileRecord.OciClient != "" {
|
|
if err := db.UpdateUserOciClient(p.db, did, profileRecord.OciClient); err != nil {
|
|
slog.Warn("Failed to cache OCI client preference", "component", "processor", "did", did, "ociClient", profileRecord.OciClient, "error", err)
|
|
}
|
|
}
|
|
|
|
// Skip hold processing if no default hold set
|
|
if profileRecord.DefaultHold == "" {
|
|
return nil
|
|
}
|
|
|
|
// Convert hold URL/DID to canonical DID
|
|
holdDID, err := atproto.ResolveHoldDID(ctx, profileRecord.DefaultHold)
|
|
if err != nil {
|
|
slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold, "error", err)
|
|
return nil
|
|
}
|
|
|
|
// Cache default hold DID on the user record
|
|
if err := db.UpdateUserDefaultHold(p.db, did, holdDID); err != nil {
|
|
slog.Warn("Failed to cache default hold DID", "component", "processor", "did", did, "holdDid", holdDID, "error", err)
|
|
}
|
|
|
|
// Query and cache the captain record using provided function
|
|
// This allows backfill-specific logic (retries, test mode handling) without duplicating it here
|
|
if queryCaptainFn != nil {
|
|
return queryCaptainFn(ctx, holdDID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProcessRepoPage processes a repository page record
|
|
// This is called when Jetstream receives a repo page create/update event
|
|
func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error {
|
|
if isDelete {
|
|
// Delete the repo page from our cache
|
|
return db.DeleteRepoPage(p.db, did, rkey)
|
|
}
|
|
|
|
// Unmarshal repo page record
|
|
var pageRecord atproto.RepoPageRecord
|
|
if err := json.Unmarshal(recordData, &pageRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal repo page: %w", err)
|
|
}
|
|
|
|
// Extract avatar CID if present
|
|
avatarCID := ""
|
|
if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" {
|
|
avatarCID = pageRecord.Avatar.Ref.Link
|
|
}
|
|
|
|
// Upsert to database
|
|
return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.UserEdited, pageRecord.CreatedAt, pageRecord.UpdatedAt)
|
|
}
|
|
|
|
// ProcessIdentity handles identity change events (handle updates)
|
|
// This is called when Jetstream receives an identity event indicating a handle change.
|
|
// The identity cache is invalidated to ensure the next lookup uses the new handle,
|
|
// and the database is updated to reflect the change in the UI.
|
|
//
|
|
// Only processes events for users who already exist in our database (have ATCR activity).
|
|
func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error {
|
|
// Check if user exists in our database - only update if they're an ATCR user
|
|
user, err := db.GetUserByDID(p.db, did)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check user existence: %w", err)
|
|
}
|
|
|
|
// Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.)
|
|
if user == nil {
|
|
return nil
|
|
}
|
|
|
|
// Update handle in database
|
|
if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil {
|
|
slog.Warn("Failed to update user handle in database",
|
|
"component", "processor",
|
|
"did", did,
|
|
"handle", newHandle,
|
|
"error", err)
|
|
// Continue to invalidate cache even if DB update fails
|
|
}
|
|
|
|
// Invalidate cached identity data to force re-resolution on next lookup
|
|
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
|
|
slog.Warn("Failed to invalidate identity cache",
|
|
"component", "processor",
|
|
"did", did,
|
|
"error", err)
|
|
return err
|
|
}
|
|
|
|
slog.Info("Processed identity change event",
|
|
"component", "processor",
|
|
"did", did,
|
|
"old_handle", user.Handle,
|
|
"new_handle", newHandle)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProcessProfileUpdate handles app.bsky.actor.profile updates for known ATCR users
|
|
// This refreshes the cached avatar URL when a user changes their Bluesky profile picture
|
|
func (p *Processor) ProcessProfileUpdate(ctx context.Context, did string, recordData []byte) error {
|
|
// Check if user exists in our database - only update if they're an ATCR user
|
|
user, err := db.GetUserByDID(p.db, did)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check user existence: %w", err)
|
|
}
|
|
|
|
// Skip if user doesn't exist - they don't have any ATCR activity
|
|
if user == nil {
|
|
return nil
|
|
}
|
|
|
|
// Parse the profile record to extract avatar
|
|
var profile struct {
|
|
Avatar *atproto.ATProtoBlobRef `json:"avatar"`
|
|
}
|
|
if err := json.Unmarshal(recordData, &profile); err != nil {
|
|
return fmt.Errorf("failed to unmarshal profile: %w", err)
|
|
}
|
|
|
|
// Build new avatar URL
|
|
avatarURL := ""
|
|
if profile.Avatar != nil && profile.Avatar.Ref.Link != "" {
|
|
avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
|
|
}
|
|
|
|
// Update if changed
|
|
if avatarURL != user.Avatar {
|
|
slog.Info("Updating avatar from profile change",
|
|
"component", "processor",
|
|
"did", did,
|
|
"old_avatar", user.Avatar,
|
|
"new_avatar", avatarURL)
|
|
return db.UpdateUserAvatar(p.db, did, avatarURL)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RefreshUserAvatar fetches the user's current Bluesky profile and updates their cached avatar
|
|
// This is called during backfill to ensure avatars stay fresh for existing users
|
|
func (p *Processor) RefreshUserAvatar(ctx context.Context, did, pdsEndpoint string) error {
|
|
// Get user from database to compare avatar
|
|
user, err := db.GetUserByDID(p.db, did)
|
|
if err != nil || user == nil {
|
|
return nil // User doesn't exist, skip
|
|
}
|
|
|
|
// Fetch profile from PDS
|
|
client := atproto.NewClient(pdsEndpoint, "", "")
|
|
profile, err := client.GetProfileRecord(ctx, did)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to fetch profile: %w", err)
|
|
}
|
|
|
|
// Build avatar URL
|
|
avatarURL := ""
|
|
if profile.Avatar != nil && profile.Avatar.Ref.Link != "" {
|
|
avatarURL = atproto.BlobCDNURL(did, profile.Avatar.Ref.Link)
|
|
}
|
|
|
|
// Update if changed
|
|
if avatarURL != user.Avatar {
|
|
slog.Info("Backfill refreshing avatar",
|
|
"component", "processor",
|
|
"did", did,
|
|
"old_avatar", user.Avatar,
|
|
"new_avatar", avatarURL)
|
|
return db.UpdateUserAvatar(p.db, did, avatarURL)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProcessScan handles scan record events from hold PDSes.
|
|
// Caches scan results in the appview DB and dispatches webhooks (if dispatcher is set).
|
|
func (p *Processor) ProcessScan(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error {
|
|
if isDelete {
|
|
return nil // Scan deletes are not processed (scans are immutable)
|
|
}
|
|
|
|
// Unmarshal scan record
|
|
var scanRecord atproto.ScanRecord
|
|
if err := json.Unmarshal(recordData, &scanRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal scan record: %w", err)
|
|
}
|
|
|
|
// Extract manifest digest from the scan record's manifest AT-URI
|
|
manifestDigest := ""
|
|
if parts := strings.Split(scanRecord.Manifest, "/"); len(parts) > 0 {
|
|
manifestDigest = "sha256:" + parts[len(parts)-1]
|
|
}
|
|
|
|
// Parse scanned_at timestamp
|
|
scannedAt := time.Now()
|
|
if t, err := time.Parse(time.RFC3339, scanRecord.ScannedAt); err == nil {
|
|
scannedAt = t
|
|
}
|
|
|
|
scan := &db.Scan{
|
|
HoldDID: holdDID,
|
|
ManifestDigest: manifestDigest,
|
|
UserDID: scanRecord.UserDID,
|
|
Repository: scanRecord.Repository,
|
|
Critical: int(scanRecord.Critical),
|
|
High: int(scanRecord.High),
|
|
Medium: int(scanRecord.Medium),
|
|
Low: int(scanRecord.Low),
|
|
Total: int(scanRecord.Total),
|
|
ScannerVersion: scanRecord.ScannerVersion,
|
|
ScannedAt: scannedAt,
|
|
}
|
|
|
|
// Upsert scan to DB (returns previous scan for change detection)
|
|
previousScan, err := db.UpsertScan(p.db, scan)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to upsert scan: %w", err)
|
|
}
|
|
|
|
// Dispatch webhooks if dispatcher is set (live Worker only, not backfill)
|
|
if p.webhookDispatcher != nil {
|
|
// Resolve user handle from cache or DB
|
|
userHandle := ""
|
|
user, userErr := db.GetUserByDID(p.db, scanRecord.UserDID)
|
|
if userErr == nil && user != nil {
|
|
userHandle = user.Handle
|
|
}
|
|
|
|
// Resolve tag for the manifest digest
|
|
tag := ""
|
|
if tagVal, tagErr := db.GetTagByDigest(p.db, scanRecord.UserDID, scanRecord.Repository, manifestDigest); tagErr == nil {
|
|
tag = tagVal
|
|
}
|
|
|
|
// Resolve hold endpoint URL
|
|
holdEndpoint := ""
|
|
if holdURL, holdErr := atproto.ResolveHoldURL(ctx, holdDID); holdErr == nil {
|
|
holdEndpoint = holdURL
|
|
}
|
|
|
|
p.webhookDispatcher.DispatchForScan(ctx, scan, previousScan, userHandle, tag, holdEndpoint)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProcessStats handles stats record events from hold PDSes
|
|
// This is called when Jetstream receives a stats create/update/delete event from a hold
|
|
// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownerDID + repository
|
|
func (p *Processor) ProcessStats(ctx context.Context, holdDID string, recordData []byte, isDelete bool) error {
|
|
// Skip if no stats cache configured
|
|
if p.statsCache == nil {
|
|
return nil
|
|
}
|
|
|
|
// Unmarshal stats record
|
|
var statsRecord atproto.StatsRecord
|
|
if err := json.Unmarshal(recordData, &statsRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal stats record: %w", err)
|
|
}
|
|
|
|
// Ensure the owner user exists in the users table (repository_stats has FK to users.did)
|
|
if !isDelete && statsRecord.OwnerDID != "" {
|
|
if err := p.EnsureUserExists(ctx, statsRecord.OwnerDID); err != nil {
|
|
return fmt.Errorf("failed to ensure stats owner user exists: %w", err)
|
|
}
|
|
}
|
|
|
|
if isDelete {
|
|
// Delete from in-memory cache
|
|
p.statsCache.Delete(holdDID, statsRecord.OwnerDID, statsRecord.Repository)
|
|
} else {
|
|
// Parse timestamps
|
|
var lastPull, lastPush *time.Time
|
|
if statsRecord.LastPull != "" {
|
|
t, err := time.Parse(time.RFC3339, statsRecord.LastPull)
|
|
if err == nil {
|
|
lastPull = &t
|
|
}
|
|
}
|
|
if statsRecord.LastPush != "" {
|
|
t, err := time.Parse(time.RFC3339, statsRecord.LastPush)
|
|
if err == nil {
|
|
lastPush = &t
|
|
}
|
|
}
|
|
|
|
// Update in-memory cache
|
|
p.statsCache.Update(holdDID, statsRecord.OwnerDID, statsRecord.Repository,
|
|
statsRecord.PullCount, statsRecord.PushCount, lastPull, lastPush)
|
|
}
|
|
|
|
// Get aggregated stats across all holds
|
|
totalPull, totalPush, latestPull, latestPush := p.statsCache.GetAggregated(
|
|
statsRecord.OwnerDID, statsRecord.Repository)
|
|
|
|
// Upsert aggregated stats to repository_stats
|
|
return db.UpsertRepositoryStats(p.db, &db.RepositoryStats{
|
|
DID: statsRecord.OwnerDID,
|
|
Repository: statsRecord.Repository,
|
|
PullCount: int(totalPull),
|
|
PushCount: int(totalPush),
|
|
LastPull: latestPull,
|
|
LastPush: latestPush,
|
|
})
|
|
}
|
|
|
|
// ProcessCaptain handles captain record events from hold PDSes
|
|
// This is called when Jetstream receives a captain create/update/delete event from a hold
|
|
// The holdDID is the DID of the hold PDS (event.DID), and the record contains ownership info
|
|
func (p *Processor) ProcessCaptain(ctx context.Context, holdDID string, recordData []byte) error {
|
|
// Unmarshal captain record
|
|
var captainRecord atproto.CaptainRecord
|
|
if err := json.Unmarshal(recordData, &captainRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal captain record: %w", err)
|
|
}
|
|
|
|
// Convert to db struct and upsert
|
|
record := &db.HoldCaptainRecord{
|
|
HoldDID: holdDID,
|
|
OwnerDID: captainRecord.Owner,
|
|
Public: captainRecord.Public,
|
|
AllowAllCrew: captainRecord.AllowAllCrew,
|
|
DeployedAt: captainRecord.DeployedAt,
|
|
Region: captainRecord.Region,
|
|
Successor: captainRecord.Successor,
|
|
UpdatedAt: time.Now(),
|
|
}
|
|
|
|
if err := db.UpsertCaptainRecord(p.db, record); err != nil {
|
|
return fmt.Errorf("failed to upsert captain record: %w", err)
|
|
}
|
|
|
|
slog.Info("Processed captain record",
|
|
"component", "processor",
|
|
"hold_did", holdDID,
|
|
"owner_did", captainRecord.Owner,
|
|
"public", captainRecord.Public,
|
|
"allow_all_crew", captainRecord.AllowAllCrew)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProcessCrew handles crew record events from hold PDSes
|
|
// This is called when Jetstream receives a crew create/update/delete event from a hold
|
|
// The holdDID is the DID of the hold PDS (event.DID), and the record contains member info
|
|
func (p *Processor) ProcessCrew(ctx context.Context, holdDID string, rkey string, recordData []byte) error {
|
|
// Unmarshal crew record
|
|
var crewRecord atproto.CrewRecord
|
|
if err := json.Unmarshal(recordData, &crewRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal crew record: %w", err)
|
|
}
|
|
|
|
// Marshal permissions to JSON string
|
|
permissionsJSON := ""
|
|
if len(crewRecord.Permissions) > 0 {
|
|
if jsonBytes, err := json.Marshal(crewRecord.Permissions); err == nil {
|
|
permissionsJSON = string(jsonBytes)
|
|
}
|
|
}
|
|
|
|
// Convert to db struct and upsert
|
|
member := &db.CrewMember{
|
|
HoldDID: holdDID,
|
|
MemberDID: crewRecord.Member,
|
|
Rkey: rkey,
|
|
Role: crewRecord.Role,
|
|
Permissions: permissionsJSON,
|
|
Tier: crewRecord.Tier,
|
|
AddedAt: crewRecord.AddedAt,
|
|
}
|
|
|
|
if err := db.UpsertCrewMember(p.db, member); err != nil {
|
|
return fmt.Errorf("failed to upsert crew member: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProcessAccount handles account status events (deactivation/deletion/etc)
|
|
// This is called when Jetstream receives an account event indicating status changes.
|
|
//
|
|
// Status handling:
|
|
// - "deleted": Account permanently deleted - remove all cached data
|
|
// - "deactivated": Could be PDS migration or permanent - invalidate cache only
|
|
// - "takendown": Moderation action - invalidate cache only
|
|
// - Other: Ignore
|
|
//
|
|
// For "deactivated", we don't delete data because it's ambiguous:
|
|
// - Could be permanent deactivation (user deleted account)
|
|
// - Could be PDS migration (account moves to new PDS)
|
|
// Cache invalidation forces re-resolution on next lookup.
|
|
//
|
|
// Only processes events for users who already exist in our database (have ATCR activity).
|
|
func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error {
|
|
// Skip active accounts or unknown statuses
|
|
if active {
|
|
return nil
|
|
}
|
|
|
|
// Check if user exists in our database - only process if they're an ATCR user
|
|
user, err := db.GetUserByDID(p.db, did)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check user existence: %w", err)
|
|
}
|
|
|
|
// Skip if user doesn't exist - they don't have any ATCR activity
|
|
if user == nil {
|
|
return nil
|
|
}
|
|
|
|
switch status {
|
|
case "deleted":
|
|
// Account permanently deleted - remove all cached data
|
|
if _, err := db.DeleteUserData(p.db, did); err != nil {
|
|
slog.Error("Failed to delete user data for deleted account",
|
|
"component", "processor",
|
|
"did", did,
|
|
"handle", user.Handle,
|
|
"error", err)
|
|
return err
|
|
}
|
|
|
|
// Also invalidate identity cache
|
|
_ = atproto.InvalidateIdentity(ctx, did)
|
|
|
|
slog.Info("Deleted user data for deleted account",
|
|
"component", "processor",
|
|
"did", did,
|
|
"handle", user.Handle)
|
|
|
|
case "deactivated", "takendown":
|
|
// Ambiguous status - invalidate cache but keep data
|
|
// For deactivated: could be PDS migration, will resolve on next lookup
|
|
// For takendown: moderation action, keep data in case of appeal
|
|
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
|
|
slog.Warn("Failed to invalidate identity cache",
|
|
"component", "processor",
|
|
"did", did,
|
|
"status", status,
|
|
"error", err)
|
|
return err
|
|
}
|
|
|
|
slog.Info("Processed account status event - cache invalidated",
|
|
"component", "processor",
|
|
"did", did,
|
|
"handle", user.Handle,
|
|
"status", status)
|
|
|
|
default:
|
|
// Unknown status - ignore
|
|
slog.Debug("Ignoring unknown account status",
|
|
"component", "processor",
|
|
"did", did,
|
|
"status", status)
|
|
}
|
|
|
|
return nil
|
|
}
|