Files

422 lines
14 KiB
Go

package jetstream
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"
"atcr.io/pkg/appview/db"
"atcr.io/pkg/atproto"
)
// Processor handles shared database operations for both Worker (live) and Backfill (sync)
// This eliminates code duplication between the two data ingestion paths
type Processor struct {
db *sql.DB
userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
useCache bool
}
// NewProcessor creates a new shared processor
// useCache: true for Worker (live streaming), false for Backfill (batch processing)
func NewProcessor(database *sql.DB, useCache bool) *Processor {
p := &Processor{
db: database,
useCache: useCache,
}
if useCache {
p.userCache = &UserCache{
cache: make(map[string]*db.User),
}
}
return p
}
// EnsureUser resolves and upserts a user by DID
// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill)
func (p *Processor) EnsureUser(ctx context.Context, did string) error {
// Check cache first (if enabled)
if p.useCache && p.userCache != nil {
if _, ok := p.userCache.cache[did]; ok {
// User in cache - just update last seen timestamp
return db.UpdateUserLastSeen(p.db, did)
}
} else if !p.useCache {
// No cache - check if user already exists in DB
existingUser, err := db.GetUserByDID(p.db, did)
if err == nil && existingUser != nil {
// User exists - just update last seen timestamp
return db.UpdateUserLastSeen(p.db, did)
}
}
// Resolve DID to get handle and PDS endpoint
resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
if err != nil {
return err
}
// Fetch user's Bluesky profile record from their PDS (including avatar)
avatarURL := ""
client := atproto.NewClient(pdsEndpoint, "", "")
profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
if err != nil {
slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
// Continue without avatar
} else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
}
// Create user record
user := &db.User{
DID: resolvedDID,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatarURL,
LastSeen: time.Now(),
}
// Cache if enabled
if p.useCache {
p.userCache.cache[did] = user
}
// Upsert to database
// Use UpsertUser if we successfully fetched an avatar (to update existing users)
// Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
if avatarURL != "" {
return db.UpsertUser(p.db, user)
}
return db.UpsertUserIgnoreAvatar(p.db, user)
}
// ProcessManifest processes a manifest record and stores it in the database
// Returns the manifest ID for further processing (layers/references)
func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) {
// Unmarshal manifest record
var manifestRecord atproto.ManifestRecord
if err := json.Unmarshal(recordData, &manifestRecord); err != nil {
return 0, fmt.Errorf("failed to unmarshal manifest: %w", err)
}
// Detect manifest type
isManifestList := len(manifestRecord.Manifests) > 0
// Extract hold DID from manifest (with fallback for legacy manifests)
// New manifests use holdDid field (DID format)
// Old manifests use holdEndpoint field (URL format) - convert to DID
holdDID := manifestRecord.HoldDID
if holdDID == "" && manifestRecord.HoldEndpoint != "" {
// Legacy manifest - convert URL to DID
holdDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint)
}
// Prepare manifest for insertion (WITHOUT annotation fields)
manifest := &db.Manifest{
DID: did,
Repository: manifestRecord.Repository,
Digest: manifestRecord.Digest,
MediaType: manifestRecord.MediaType,
SchemaVersion: manifestRecord.SchemaVersion,
HoldEndpoint: holdDID,
CreatedAt: manifestRecord.CreatedAt,
// Annotations removed - stored separately in repository_annotations table
}
// Set config fields only for image manifests (not manifest lists)
if !isManifestList && manifestRecord.Config != nil {
manifest.ConfigDigest = manifestRecord.Config.Digest
manifest.ConfigSize = manifestRecord.Config.Size
}
// Insert manifest
manifestID, err := db.InsertManifest(p.db, manifest)
if err != nil {
// For backfill: if manifest already exists, get its ID
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
var existingID int64
err := p.db.QueryRow(`
SELECT id FROM manifests
WHERE did = ? AND repository = ? AND digest = ?
`, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID)
if err != nil {
return 0, fmt.Errorf("failed to get existing manifest ID: %w", err)
}
manifestID = existingID
} else {
return 0, fmt.Errorf("failed to insert manifest: %w", err)
}
}
// Update repository annotations ONLY if manifest has at least one non-empty annotation
if manifestRecord.Annotations != nil {
hasData := false
for _, value := range manifestRecord.Annotations {
if value != "" {
hasData = true
break
}
}
if hasData {
// Replace all annotations for this repository
err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations)
if err != nil {
return 0, fmt.Errorf("failed to upsert annotations: %w", err)
}
}
}
// Insert manifest references or layers
if isManifestList {
// Insert manifest references (for manifest lists/indexes)
for i, ref := range manifestRecord.Manifests {
platformArch := ""
platformOS := ""
platformVariant := ""
platformOSVersion := ""
if ref.Platform != nil {
platformArch = ref.Platform.Architecture
platformOS = ref.Platform.OS
platformVariant = ref.Platform.Variant
platformOSVersion = ref.Platform.OSVersion
}
// Detect attestation manifests from annotations
isAttestation := false
if ref.Annotations != nil {
if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok {
isAttestation = refType == "attestation-manifest"
}
}
if err := db.InsertManifestReference(p.db, &db.ManifestReference{
ManifestID: manifestID,
Digest: ref.Digest,
MediaType: ref.MediaType,
Size: ref.Size,
PlatformArchitecture: platformArch,
PlatformOS: platformOS,
PlatformVariant: platformVariant,
PlatformOSVersion: platformOSVersion,
IsAttestation: isAttestation,
ReferenceIndex: i,
}); err != nil {
// Continue on error - reference might already exist
continue
}
}
} else {
// Insert layers (for image manifests)
for i, layer := range manifestRecord.Layers {
if err := db.InsertLayer(p.db, &db.Layer{
ManifestID: manifestID,
Digest: layer.Digest,
MediaType: layer.MediaType,
Size: layer.Size,
LayerIndex: i,
}); err != nil {
// Continue on error - layer might already exist
continue
}
}
}
return manifestID, nil
}
// ProcessTag processes a tag record and stores it in the database
func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error {
// Unmarshal tag record
var tagRecord atproto.TagRecord
if err := json.Unmarshal(recordData, &tagRecord); err != nil {
return fmt.Errorf("failed to unmarshal tag: %w", err)
}
// Extract digest from tag record (tries manifest field first, falls back to manifestDigest)
manifestDigest, err := tagRecord.GetManifestDigest()
if err != nil {
return fmt.Errorf("failed to get manifest digest from tag record: %w", err)
}
// Insert or update tag
return db.UpsertTag(p.db, &db.Tag{
DID: did,
Repository: tagRecord.Repository,
Tag: tagRecord.Tag,
Digest: manifestDigest,
CreatedAt: tagRecord.UpdatedAt,
})
}
// ProcessStar processes a star record and stores it in the database
func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
// Unmarshal star record
var starRecord atproto.StarRecord
if err := json.Unmarshal(recordData, &starRecord); err != nil {
return fmt.Errorf("failed to unmarshal star: %w", err)
}
// Upsert the star record (idempotent - won't duplicate)
// The DID here is the starrer (user who starred)
// The subject contains the owner DID and repository
// Star count will be calculated on demand from the stars table
return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt)
}
// ProcessSailorProfile processes a sailor profile record
// This is primarily used by backfill to cache captain records for holds
func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error {
// Unmarshal sailor profile record
var profileRecord atproto.SailorProfileRecord
if err := json.Unmarshal(recordData, &profileRecord); err != nil {
return fmt.Errorf("failed to unmarshal sailor profile: %w", err)
}
// Skip if no default hold set
if profileRecord.DefaultHold == "" {
return nil
}
// Convert hold URL/DID to canonical DID
holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold)
if holdDID == "" {
slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold)
return nil
}
// Query and cache the captain record using provided function
// This allows backfill-specific logic (retries, test mode handling) without duplicating it here
if queryCaptainFn != nil {
return queryCaptainFn(ctx, holdDID)
}
return nil
}
// ProcessRepoPage processes a repository page record
// This is called when Jetstream receives a repo page create/update event
func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error {
if isDelete {
// Delete the repo page from our cache
return db.DeleteRepoPage(p.db, did, rkey)
}
// Unmarshal repo page record
var pageRecord atproto.RepoPageRecord
if err := json.Unmarshal(recordData, &pageRecord); err != nil {
return fmt.Errorf("failed to unmarshal repo page: %w", err)
}
// Extract avatar CID if present
avatarCID := ""
if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" {
avatarCID = pageRecord.Avatar.Ref.Link
}
// Upsert to database
return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.CreatedAt, pageRecord.UpdatedAt)
}
// ProcessIdentity handles identity change events (handle updates)
// This is called when Jetstream receives an identity event indicating a handle change.
// The identity cache is invalidated to ensure the next lookup uses the new handle,
// and the database is updated to reflect the change in the UI.
//
// Only processes events for users who already exist in our database (have ATCR activity).
func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error {
// Check if user exists in our database - only update if they're an ATCR user
user, err := db.GetUserByDID(p.db, did)
if err != nil {
return fmt.Errorf("failed to check user existence: %w", err)
}
// Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.)
if user == nil {
return nil
}
// Update handle in database
if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil {
slog.Warn("Failed to update user handle in database",
"component", "processor",
"did", did,
"handle", newHandle,
"error", err)
// Continue to invalidate cache even if DB update fails
}
// Invalidate cached identity data to force re-resolution on next lookup
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
slog.Warn("Failed to invalidate identity cache",
"component", "processor",
"did", did,
"error", err)
return err
}
slog.Info("Processed identity change event",
"component", "processor",
"did", did,
"old_handle", user.Handle,
"new_handle", newHandle)
return nil
}
// ProcessAccount handles account status events (deactivation/reactivation)
// This is called when Jetstream receives an account event indicating status changes.
//
// IMPORTANT: Deactivation events are ambiguous - they could indicate:
// 1. Permanent account deactivation (user deleted account)
// 2. PDS migration (account deactivated at old PDS, reactivated at new PDS)
//
// We DO NOT delete user data on deactivation events. Instead, we invalidate the
// identity cache. On the next resolution attempt:
// - If migrated: Resolution finds the new PDS and updates the database automatically
// - If truly deactivated: Resolution fails and user won't appear in new queries
//
// This approach prevents data loss from PDS migrations while still handling deactivations.
//
// Only processes events for users who already exist in our database (have ATCR activity).
func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error {
// Only process deactivation events
if active || status != "deactivated" {
return nil
}
// Check if user exists in our database - only update if they're an ATCR user
user, err := db.GetUserByDID(p.db, did)
if err != nil {
return fmt.Errorf("failed to check user existence: %w", err)
}
// Skip if user doesn't exist - they don't have any ATCR activity
if user == nil {
return nil
}
// Invalidate cached identity data to force re-resolution on next lookup
// This will discover if the account was migrated (new PDS) or truly deactivated (resolution fails)
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
slog.Warn("Failed to invalidate identity cache for deactivated account",
"component", "processor",
"did", did,
"error", err)
return err
}
slog.Info("Processed account deactivation event - cache invalidated",
"component", "processor",
"did", did,
"handle", user.Handle,
"status", status)
return nil
}