422 lines
14 KiB
Go
422 lines
14 KiB
Go
package jetstream
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"time"
|
|
|
|
"atcr.io/pkg/appview/db"
|
|
"atcr.io/pkg/atproto"
|
|
)
|
|
|
|
// Processor handles shared database operations for both Worker (live) and Backfill (sync)
|
|
// This eliminates code duplication between the two data ingestion paths
|
|
type Processor struct {
|
|
db *sql.DB
|
|
userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
|
|
useCache bool
|
|
}
|
|
|
|
// NewProcessor creates a new shared processor
|
|
// useCache: true for Worker (live streaming), false for Backfill (batch processing)
|
|
func NewProcessor(database *sql.DB, useCache bool) *Processor {
|
|
p := &Processor{
|
|
db: database,
|
|
useCache: useCache,
|
|
}
|
|
|
|
if useCache {
|
|
p.userCache = &UserCache{
|
|
cache: make(map[string]*db.User),
|
|
}
|
|
}
|
|
|
|
return p
|
|
}
|
|
|
|
// EnsureUser resolves and upserts a user by DID
|
|
// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill)
|
|
func (p *Processor) EnsureUser(ctx context.Context, did string) error {
|
|
// Check cache first (if enabled)
|
|
if p.useCache && p.userCache != nil {
|
|
if _, ok := p.userCache.cache[did]; ok {
|
|
// User in cache - just update last seen timestamp
|
|
return db.UpdateUserLastSeen(p.db, did)
|
|
}
|
|
} else if !p.useCache {
|
|
// No cache - check if user already exists in DB
|
|
existingUser, err := db.GetUserByDID(p.db, did)
|
|
if err == nil && existingUser != nil {
|
|
// User exists - just update last seen timestamp
|
|
return db.UpdateUserLastSeen(p.db, did)
|
|
}
|
|
}
|
|
|
|
// Resolve DID to get handle and PDS endpoint
|
|
resolvedDID, handle, pdsEndpoint, err := atproto.ResolveIdentity(ctx, did)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Fetch user's Bluesky profile record from their PDS (including avatar)
|
|
avatarURL := ""
|
|
client := atproto.NewClient(pdsEndpoint, "", "")
|
|
profileRecord, err := client.GetProfileRecord(ctx, resolvedDID)
|
|
if err != nil {
|
|
slog.Warn("Failed to fetch profile record", "component", "processor", "did", resolvedDID, "error", err)
|
|
// Continue without avatar
|
|
} else if profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
|
|
avatarURL = atproto.BlobCDNURL(resolvedDID, profileRecord.Avatar.Ref.Link)
|
|
}
|
|
|
|
// Create user record
|
|
user := &db.User{
|
|
DID: resolvedDID,
|
|
Handle: handle,
|
|
PDSEndpoint: pdsEndpoint,
|
|
Avatar: avatarURL,
|
|
LastSeen: time.Now(),
|
|
}
|
|
|
|
// Cache if enabled
|
|
if p.useCache {
|
|
p.userCache.cache[did] = user
|
|
}
|
|
|
|
// Upsert to database
|
|
// Use UpsertUser if we successfully fetched an avatar (to update existing users)
|
|
// Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
|
|
if avatarURL != "" {
|
|
return db.UpsertUser(p.db, user)
|
|
}
|
|
return db.UpsertUserIgnoreAvatar(p.db, user)
|
|
}
|
|
|
|
// ProcessManifest processes a manifest record and stores it in the database
|
|
// Returns the manifest ID for further processing (layers/references)
|
|
func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) {
|
|
// Unmarshal manifest record
|
|
var manifestRecord atproto.ManifestRecord
|
|
if err := json.Unmarshal(recordData, &manifestRecord); err != nil {
|
|
return 0, fmt.Errorf("failed to unmarshal manifest: %w", err)
|
|
}
|
|
// Detect manifest type
|
|
isManifestList := len(manifestRecord.Manifests) > 0
|
|
|
|
// Extract hold DID from manifest (with fallback for legacy manifests)
|
|
// New manifests use holdDid field (DID format)
|
|
// Old manifests use holdEndpoint field (URL format) - convert to DID
|
|
holdDID := manifestRecord.HoldDID
|
|
if holdDID == "" && manifestRecord.HoldEndpoint != "" {
|
|
// Legacy manifest - convert URL to DID
|
|
holdDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint)
|
|
}
|
|
|
|
// Prepare manifest for insertion (WITHOUT annotation fields)
|
|
manifest := &db.Manifest{
|
|
DID: did,
|
|
Repository: manifestRecord.Repository,
|
|
Digest: manifestRecord.Digest,
|
|
MediaType: manifestRecord.MediaType,
|
|
SchemaVersion: manifestRecord.SchemaVersion,
|
|
HoldEndpoint: holdDID,
|
|
CreatedAt: manifestRecord.CreatedAt,
|
|
// Annotations removed - stored separately in repository_annotations table
|
|
}
|
|
|
|
// Set config fields only for image manifests (not manifest lists)
|
|
if !isManifestList && manifestRecord.Config != nil {
|
|
manifest.ConfigDigest = manifestRecord.Config.Digest
|
|
manifest.ConfigSize = manifestRecord.Config.Size
|
|
}
|
|
|
|
// Insert manifest
|
|
manifestID, err := db.InsertManifest(p.db, manifest)
|
|
if err != nil {
|
|
// For backfill: if manifest already exists, get its ID
|
|
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
|
|
var existingID int64
|
|
err := p.db.QueryRow(`
|
|
SELECT id FROM manifests
|
|
WHERE did = ? AND repository = ? AND digest = ?
|
|
`, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID)
|
|
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to get existing manifest ID: %w", err)
|
|
}
|
|
manifestID = existingID
|
|
} else {
|
|
return 0, fmt.Errorf("failed to insert manifest: %w", err)
|
|
}
|
|
}
|
|
|
|
// Update repository annotations ONLY if manifest has at least one non-empty annotation
|
|
if manifestRecord.Annotations != nil {
|
|
hasData := false
|
|
for _, value := range manifestRecord.Annotations {
|
|
if value != "" {
|
|
hasData = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if hasData {
|
|
// Replace all annotations for this repository
|
|
err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to upsert annotations: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Insert manifest references or layers
|
|
if isManifestList {
|
|
// Insert manifest references (for manifest lists/indexes)
|
|
for i, ref := range manifestRecord.Manifests {
|
|
platformArch := ""
|
|
platformOS := ""
|
|
platformVariant := ""
|
|
platformOSVersion := ""
|
|
|
|
if ref.Platform != nil {
|
|
platformArch = ref.Platform.Architecture
|
|
platformOS = ref.Platform.OS
|
|
platformVariant = ref.Platform.Variant
|
|
platformOSVersion = ref.Platform.OSVersion
|
|
}
|
|
|
|
// Detect attestation manifests from annotations
|
|
isAttestation := false
|
|
if ref.Annotations != nil {
|
|
if refType, ok := ref.Annotations["vnd.docker.reference.type"]; ok {
|
|
isAttestation = refType == "attestation-manifest"
|
|
}
|
|
}
|
|
|
|
if err := db.InsertManifestReference(p.db, &db.ManifestReference{
|
|
ManifestID: manifestID,
|
|
Digest: ref.Digest,
|
|
MediaType: ref.MediaType,
|
|
Size: ref.Size,
|
|
PlatformArchitecture: platformArch,
|
|
PlatformOS: platformOS,
|
|
PlatformVariant: platformVariant,
|
|
PlatformOSVersion: platformOSVersion,
|
|
IsAttestation: isAttestation,
|
|
ReferenceIndex: i,
|
|
}); err != nil {
|
|
// Continue on error - reference might already exist
|
|
continue
|
|
}
|
|
}
|
|
} else {
|
|
// Insert layers (for image manifests)
|
|
for i, layer := range manifestRecord.Layers {
|
|
if err := db.InsertLayer(p.db, &db.Layer{
|
|
ManifestID: manifestID,
|
|
Digest: layer.Digest,
|
|
MediaType: layer.MediaType,
|
|
Size: layer.Size,
|
|
LayerIndex: i,
|
|
}); err != nil {
|
|
// Continue on error - layer might already exist
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
return manifestID, nil
|
|
}
|
|
|
|
// ProcessTag processes a tag record and stores it in the database
|
|
func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error {
|
|
// Unmarshal tag record
|
|
var tagRecord atproto.TagRecord
|
|
if err := json.Unmarshal(recordData, &tagRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal tag: %w", err)
|
|
}
|
|
// Extract digest from tag record (tries manifest field first, falls back to manifestDigest)
|
|
manifestDigest, err := tagRecord.GetManifestDigest()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get manifest digest from tag record: %w", err)
|
|
}
|
|
|
|
// Insert or update tag
|
|
return db.UpsertTag(p.db, &db.Tag{
|
|
DID: did,
|
|
Repository: tagRecord.Repository,
|
|
Tag: tagRecord.Tag,
|
|
Digest: manifestDigest,
|
|
CreatedAt: tagRecord.UpdatedAt,
|
|
})
|
|
}
|
|
|
|
// ProcessStar processes a star record and stores it in the database
|
|
func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
|
|
// Unmarshal star record
|
|
var starRecord atproto.StarRecord
|
|
if err := json.Unmarshal(recordData, &starRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal star: %w", err)
|
|
}
|
|
// Upsert the star record (idempotent - won't duplicate)
|
|
// The DID here is the starrer (user who starred)
|
|
// The subject contains the owner DID and repository
|
|
// Star count will be calculated on demand from the stars table
|
|
return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt)
|
|
}
|
|
|
|
// ProcessSailorProfile processes a sailor profile record
|
|
// This is primarily used by backfill to cache captain records for holds
|
|
func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error {
|
|
// Unmarshal sailor profile record
|
|
var profileRecord atproto.SailorProfileRecord
|
|
if err := json.Unmarshal(recordData, &profileRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal sailor profile: %w", err)
|
|
}
|
|
|
|
// Skip if no default hold set
|
|
if profileRecord.DefaultHold == "" {
|
|
return nil
|
|
}
|
|
|
|
// Convert hold URL/DID to canonical DID
|
|
holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold)
|
|
if holdDID == "" {
|
|
slog.Warn("Invalid hold reference in profile", "component", "processor", "did", did, "default_hold", profileRecord.DefaultHold)
|
|
return nil
|
|
}
|
|
|
|
// Query and cache the captain record using provided function
|
|
// This allows backfill-specific logic (retries, test mode handling) without duplicating it here
|
|
if queryCaptainFn != nil {
|
|
return queryCaptainFn(ctx, holdDID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProcessRepoPage processes a repository page record
|
|
// This is called when Jetstream receives a repo page create/update event
|
|
func (p *Processor) ProcessRepoPage(ctx context.Context, did string, rkey string, recordData []byte, isDelete bool) error {
|
|
if isDelete {
|
|
// Delete the repo page from our cache
|
|
return db.DeleteRepoPage(p.db, did, rkey)
|
|
}
|
|
|
|
// Unmarshal repo page record
|
|
var pageRecord atproto.RepoPageRecord
|
|
if err := json.Unmarshal(recordData, &pageRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal repo page: %w", err)
|
|
}
|
|
|
|
// Extract avatar CID if present
|
|
avatarCID := ""
|
|
if pageRecord.Avatar != nil && pageRecord.Avatar.Ref.Link != "" {
|
|
avatarCID = pageRecord.Avatar.Ref.Link
|
|
}
|
|
|
|
// Upsert to database
|
|
return db.UpsertRepoPage(p.db, did, pageRecord.Repository, pageRecord.Description, avatarCID, pageRecord.CreatedAt, pageRecord.UpdatedAt)
|
|
}
|
|
|
|
// ProcessIdentity handles identity change events (handle updates)
|
|
// This is called when Jetstream receives an identity event indicating a handle change.
|
|
// The identity cache is invalidated to ensure the next lookup uses the new handle,
|
|
// and the database is updated to reflect the change in the UI.
|
|
//
|
|
// Only processes events for users who already exist in our database (have ATCR activity).
|
|
func (p *Processor) ProcessIdentity(ctx context.Context, did string, newHandle string) error {
|
|
// Check if user exists in our database - only update if they're an ATCR user
|
|
user, err := db.GetUserByDID(p.db, did)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check user existence: %w", err)
|
|
}
|
|
|
|
// Skip if user doesn't exist - they don't have any ATCR activity (manifests, profiles, etc.)
|
|
if user == nil {
|
|
return nil
|
|
}
|
|
|
|
// Update handle in database
|
|
if err := db.UpdateUserHandle(p.db, did, newHandle); err != nil {
|
|
slog.Warn("Failed to update user handle in database",
|
|
"component", "processor",
|
|
"did", did,
|
|
"handle", newHandle,
|
|
"error", err)
|
|
// Continue to invalidate cache even if DB update fails
|
|
}
|
|
|
|
// Invalidate cached identity data to force re-resolution on next lookup
|
|
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
|
|
slog.Warn("Failed to invalidate identity cache",
|
|
"component", "processor",
|
|
"did", did,
|
|
"error", err)
|
|
return err
|
|
}
|
|
|
|
slog.Info("Processed identity change event",
|
|
"component", "processor",
|
|
"did", did,
|
|
"old_handle", user.Handle,
|
|
"new_handle", newHandle)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ProcessAccount handles account status events (deactivation/reactivation)
|
|
// This is called when Jetstream receives an account event indicating status changes.
|
|
//
|
|
// IMPORTANT: Deactivation events are ambiguous - they could indicate:
|
|
// 1. Permanent account deactivation (user deleted account)
|
|
// 2. PDS migration (account deactivated at old PDS, reactivated at new PDS)
|
|
//
|
|
// We DO NOT delete user data on deactivation events. Instead, we invalidate the
|
|
// identity cache. On the next resolution attempt:
|
|
// - If migrated: Resolution finds the new PDS and updates the database automatically
|
|
// - If truly deactivated: Resolution fails and user won't appear in new queries
|
|
//
|
|
// This approach prevents data loss from PDS migrations while still handling deactivations.
|
|
//
|
|
// Only processes events for users who already exist in our database (have ATCR activity).
|
|
func (p *Processor) ProcessAccount(ctx context.Context, did string, active bool, status string) error {
|
|
// Only process deactivation events
|
|
if active || status != "deactivated" {
|
|
return nil
|
|
}
|
|
|
|
// Check if user exists in our database - only update if they're an ATCR user
|
|
user, err := db.GetUserByDID(p.db, did)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check user existence: %w", err)
|
|
}
|
|
|
|
// Skip if user doesn't exist - they don't have any ATCR activity
|
|
if user == nil {
|
|
return nil
|
|
}
|
|
|
|
// Invalidate cached identity data to force re-resolution on next lookup
|
|
// This will discover if the account was migrated (new PDS) or truly deactivated (resolution fails)
|
|
if err := atproto.InvalidateIdentity(ctx, did); err != nil {
|
|
slog.Warn("Failed to invalidate identity cache for deactivated account",
|
|
"component", "processor",
|
|
"did", did,
|
|
"error", err)
|
|
return err
|
|
}
|
|
|
|
slog.Info("Processed account deactivation event - cache invalidated",
|
|
"component", "processor",
|
|
"did", did,
|
|
"handle", user.Handle,
|
|
"status", status)
|
|
|
|
return nil
|
|
}
|