Files
at-container-registry/pkg/appview/jetstream/processor.go

322 lines
9.6 KiB
Go

package jetstream
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"atcr.io/pkg/appview/db"
"atcr.io/pkg/atproto"
)
// Processor handles shared database operations for both Worker (live) and Backfill (sync)
// This eliminates code duplication between the two data ingestion paths
type Processor struct {
db *sql.DB
directory identity.Directory
userCache *UserCache // Optional - enabled for Worker, disabled for Backfill
useCache bool
}
// NewProcessor creates a new shared processor
// useCache: true for Worker (live streaming), false for Backfill (batch processing)
func NewProcessor(database *sql.DB, useCache bool) *Processor {
p := &Processor{
db: database,
directory: identity.DefaultDirectory(),
useCache: useCache,
}
if useCache {
p.userCache = &UserCache{
cache: make(map[string]*db.User),
}
}
return p
}
// EnsureUser resolves and upserts a user by DID
// Uses cache if enabled (Worker), queries DB if cache disabled (Backfill)
func (p *Processor) EnsureUser(ctx context.Context, did string) error {
// Check cache first (if enabled)
if p.useCache && p.userCache != nil {
if user, ok := p.userCache.cache[did]; ok {
// Update last seen
user.LastSeen = time.Now()
return db.UpsertUser(p.db, user)
}
} else if !p.useCache {
// No cache - check if user already exists in DB
existingUser, err := db.GetUserByDID(p.db, did)
if err == nil && existingUser != nil {
// Update last seen
existingUser.LastSeen = time.Now()
return db.UpsertUser(p.db, existingUser)
}
}
// Resolve DID to get handle and PDS endpoint
didParsed, err := syntax.ParseDID(did)
if err != nil {
// Fallback: use DID as handle
user := &db.User{
DID: did,
Handle: did,
PDSEndpoint: "https://bsky.social",
LastSeen: time.Now(),
}
if p.useCache {
p.userCache.cache[did] = user
}
return db.UpsertUser(p.db, user)
}
ident, err := p.directory.LookupDID(ctx, didParsed)
if err != nil {
// Fallback: use DID as handle
user := &db.User{
DID: did,
Handle: did,
PDSEndpoint: "https://bsky.social",
LastSeen: time.Now(),
}
if p.useCache {
p.userCache.cache[did] = user
}
return db.UpsertUser(p.db, user)
}
resolvedDID := ident.DID.String()
handle := ident.Handle.String()
pdsEndpoint := ident.PDSEndpoint()
// If handle is invalid or PDS is missing, use defaults
if handle == "handle.invalid" || handle == "" {
handle = resolvedDID
}
if pdsEndpoint == "" {
pdsEndpoint = "https://bsky.social"
}
// Fetch user's Bluesky profile (including avatar)
// Use public Bluesky AppView API (doesn't require auth for public profiles)
avatar := ""
publicClient := atproto.NewClient("https://public.api.bsky.app", "", "")
profile, err := publicClient.GetActorProfile(ctx, resolvedDID)
if err != nil {
fmt.Printf("WARNING [processor]: Failed to fetch profile for DID %s: %v\n", resolvedDID, err)
// Continue without avatar
} else {
avatar = profile.Avatar
}
// Create user record
user := &db.User{
DID: resolvedDID,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatar,
LastSeen: time.Now(),
}
// Cache if enabled
if p.useCache {
p.userCache.cache[did] = user
}
// Upsert to database
return db.UpsertUser(p.db, user)
}
// ProcessManifest processes a manifest record and stores it in the database
// Returns the manifest ID for further processing (layers/references)
func (p *Processor) ProcessManifest(ctx context.Context, did string, recordData []byte) (int64, error) {
// Unmarshal manifest record
var manifestRecord atproto.ManifestRecord
if err := json.Unmarshal(recordData, &manifestRecord); err != nil {
return 0, fmt.Errorf("failed to unmarshal manifest: %w", err)
}
// Detect manifest type
isManifestList := len(manifestRecord.Manifests) > 0
// Prepare manifest for insertion (WITHOUT annotation fields)
manifest := &db.Manifest{
DID: did,
Repository: manifestRecord.Repository,
Digest: manifestRecord.Digest,
MediaType: manifestRecord.MediaType,
SchemaVersion: manifestRecord.SchemaVersion,
HoldEndpoint: manifestRecord.HoldEndpoint,
CreatedAt: manifestRecord.CreatedAt,
// Annotations removed - stored separately in repository_annotations table
}
// Set config fields only for image manifests (not manifest lists)
if !isManifestList && manifestRecord.Config != nil {
manifest.ConfigDigest = manifestRecord.Config.Digest
manifest.ConfigSize = manifestRecord.Config.Size
}
// Insert manifest
manifestID, err := db.InsertManifest(p.db, manifest)
if err != nil {
// For backfill: if manifest already exists, get its ID
if strings.Contains(err.Error(), "UNIQUE constraint failed") {
var existingID int64
err := p.db.QueryRow(`
SELECT id FROM manifests
WHERE did = ? AND repository = ? AND digest = ?
`, manifest.DID, manifest.Repository, manifest.Digest).Scan(&existingID)
if err != nil {
return 0, fmt.Errorf("failed to get existing manifest ID: %w", err)
}
manifestID = existingID
} else {
return 0, fmt.Errorf("failed to insert manifest: %w", err)
}
}
// Update repository annotations ONLY if manifest has at least one non-empty annotation
if manifestRecord.Annotations != nil {
hasData := false
for _, value := range manifestRecord.Annotations {
if value != "" {
hasData = true
break
}
}
if hasData {
// Replace all annotations for this repository
err = db.UpsertRepositoryAnnotations(p.db, did, manifestRecord.Repository, manifestRecord.Annotations)
if err != nil {
return 0, fmt.Errorf("failed to upsert annotations: %w", err)
}
}
}
// Insert manifest references or layers
if isManifestList {
// Insert manifest references (for manifest lists/indexes)
for i, ref := range manifestRecord.Manifests {
platformArch := ""
platformOS := ""
platformVariant := ""
platformOSVersion := ""
if ref.Platform != nil {
platformArch = ref.Platform.Architecture
platformOS = ref.Platform.OS
platformVariant = ref.Platform.Variant
platformOSVersion = ref.Platform.OSVersion
}
if err := db.InsertManifestReference(p.db, &db.ManifestReference{
ManifestID: manifestID,
Digest: ref.Digest,
MediaType: ref.MediaType,
Size: ref.Size,
PlatformArchitecture: platformArch,
PlatformOS: platformOS,
PlatformVariant: platformVariant,
PlatformOSVersion: platformOSVersion,
ReferenceIndex: i,
}); err != nil {
// Continue on error - reference might already exist
continue
}
}
} else {
// Insert layers (for image manifests)
for i, layer := range manifestRecord.Layers {
if err := db.InsertLayer(p.db, &db.Layer{
ManifestID: manifestID,
Digest: layer.Digest,
MediaType: layer.MediaType,
Size: layer.Size,
LayerIndex: i,
}); err != nil {
// Continue on error - layer might already exist
continue
}
}
}
return manifestID, nil
}
// ProcessTag processes a tag record and stores it in the database
func (p *Processor) ProcessTag(ctx context.Context, did string, recordData []byte) error {
// Unmarshal tag record
var tagRecord atproto.TagRecord
if err := json.Unmarshal(recordData, &tagRecord); err != nil {
return fmt.Errorf("failed to unmarshal tag: %w", err)
}
// Extract digest from tag record (tries manifest field first, falls back to manifestDigest)
manifestDigest, err := tagRecord.GetManifestDigest()
if err != nil {
return fmt.Errorf("failed to get manifest digest from tag record: %w", err)
}
// Insert or update tag
return db.UpsertTag(p.db, &db.Tag{
DID: did,
Repository: tagRecord.Repository,
Tag: tagRecord.Tag,
Digest: manifestDigest,
CreatedAt: tagRecord.UpdatedAt,
})
}
// ProcessStar processes a star record and stores it in the database
func (p *Processor) ProcessStar(ctx context.Context, did string, recordData []byte) error {
// Unmarshal star record
var starRecord atproto.StarRecord
if err := json.Unmarshal(recordData, &starRecord); err != nil {
return fmt.Errorf("failed to unmarshal star: %w", err)
}
// Upsert the star record (idempotent - won't duplicate)
// The DID here is the starrer (user who starred)
// The subject contains the owner DID and repository
// Star count will be calculated on demand from the stars table
return db.UpsertStar(p.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt)
}
// ProcessSailorProfile processes a sailor profile record
// This is primarily used by backfill to cache captain records for holds
func (p *Processor) ProcessSailorProfile(ctx context.Context, did string, recordData []byte, queryCaptainFn func(context.Context, string) error) error {
// Unmarshal sailor profile record
var profileRecord atproto.SailorProfileRecord
if err := json.Unmarshal(recordData, &profileRecord); err != nil {
return fmt.Errorf("failed to unmarshal sailor profile: %w", err)
}
// Skip if no default hold set
if profileRecord.DefaultHold == "" {
return nil
}
// Convert hold URL/DID to canonical DID
holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold)
if holdDID == "" {
fmt.Printf("WARNING [processor]: Invalid hold reference in profile for %s: %s\n", did, profileRecord.DefaultHold)
return nil
}
// Query and cache the captain record using provided function
// This allows backfill-specific logic (retries, test mode handling) without duplicating it here
if queryCaptainFn != nil {
return queryCaptainFn(ctx, holdDID)
}
return nil
}