Files
at-container-registry/pkg/appview/jetstream/worker.go

605 lines
18 KiB
Go

package jetstream
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/url"
"sync"
"time"
"github.com/bluesky-social/indigo/atproto/identity"
"github.com/bluesky-social/indigo/atproto/syntax"
"atcr.io/pkg/appview/db"
"atcr.io/pkg/atproto"
"github.com/gorilla/websocket"
"github.com/klauspost/compress/zstd"
)
// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups
type UserCache struct {
cache map[string]*db.User
}
// EventCallback is called for each processed event
type EventCallback func(timeUS int64)
// Worker consumes Jetstream events and populates the UI database
type Worker struct {
db *sql.DB
jetstreamURL string
startCursor int64
wantedCollections []string
debugCollectionCount int
userCache *UserCache
directory identity.Directory
eventCallback EventCallback
connStartTime time.Time // Track when connection started for debugging
// Ping/pong tracking for connection health
pingsSent int64
pongsReceived int64
lastPongTime time.Time
pongMutex sync.Mutex
// In-memory cursor tracking for reconnects
lastCursor int64
cursorMutex sync.RWMutex
}
// NewWorker creates a new Jetstream worker
// startCursor: Unix microseconds timestamp to start from (0 = start from now)
func NewWorker(database *sql.DB, jetstreamURL string, startCursor int64) *Worker {
if jetstreamURL == "" {
jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe"
}
return &Worker{
db: database,
jetstreamURL: jetstreamURL,
startCursor: startCursor,
wantedCollections: []string{
atproto.ManifestCollection, // io.atcr.manifest
atproto.TagCollection, // io.atcr.tag
atproto.StarCollection, // io.atcr.sailor.star
},
userCache: &UserCache{
cache: make(map[string]*db.User),
},
directory: identity.DefaultDirectory(),
}
}
// Start begins consuming Jetstream events
// This is a blocking function that runs until the context is cancelled
func (w *Worker) Start(ctx context.Context) error {
// Build connection URL with filters
u, err := url.Parse(w.jetstreamURL)
if err != nil {
return fmt.Errorf("invalid jetstream URL: %w", err)
}
q := u.Query()
for _, collection := range w.wantedCollections {
q.Add("wantedCollections", collection)
}
// Add cursor if specified (for backfilling historical data or reconnects)
if w.startCursor > 0 {
q.Set("cursor", fmt.Sprintf("%d", w.startCursor))
// Calculate lag (cursor is in microseconds)
now := time.Now().UnixMicro()
lagSeconds := float64(now-w.startCursor) / 1_000_000.0
fmt.Printf("Jetstream: Starting from cursor %d (%.1f seconds behind live)\n", w.startCursor, lagSeconds)
}
// Disable compression for now to debug
// q.Set("compress", "true")
u.RawQuery = q.Encode()
fmt.Printf("Connecting to Jetstream: %s\n", u.String())
// Connect to Jetstream
conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
if err != nil {
return fmt.Errorf("failed to connect to jetstream: %w", err)
}
defer conn.Close()
// Track connection start time for debugging
w.connStartTime = time.Now()
// Reset ping/pong counters for this connection
w.pongMutex.Lock()
w.pingsSent = 0
w.pongsReceived = 0
w.lastPongTime = time.Now()
w.pongMutex.Unlock()
// Set up pong handler - called when server responds to our ping
conn.SetPongHandler(func(appData string) error {
w.pongMutex.Lock()
w.pongsReceived++
w.lastPongTime = time.Now()
w.pongMutex.Unlock()
// Reset read deadline - we know connection is alive
// Allow 90 seconds for next pong (3x ping interval)
conn.SetReadDeadline(time.Now().Add(90 * time.Second))
return nil
})
// Set initial read deadline
conn.SetReadDeadline(time.Now().Add(90 * time.Second))
// Create zstd decoder for decompressing messages
decoder, err := zstd.NewReader(nil)
if err != nil {
return fmt.Errorf("failed to create zstd decoder: %w", err)
}
defer decoder.Close()
fmt.Println("Connected to Jetstream, listening for events...")
// Start heartbeat ticker to show Jetstream is alive
heartbeatTicker := time.NewTicker(30 * time.Second)
defer heartbeatTicker.Stop()
// Start ping ticker for keepalive
pingTicker := time.NewTicker(30 * time.Second)
defer pingTicker.Stop()
// Start ping sender goroutine
pingDone := make(chan struct{})
defer close(pingDone)
go func() {
for {
select {
case <-ctx.Done():
return
case <-pingDone:
return
case <-pingTicker.C:
// Check if we've received a pong recently
w.pongMutex.Lock()
timeSinceLastPong := time.Since(w.lastPongTime)
pingsTotal := w.pingsSent
pongsTotal := w.pongsReceived
w.pongMutex.Unlock()
// If no pong for 60 seconds, connection is likely dead
if timeSinceLastPong > 60*time.Second {
fmt.Printf("Jetstream: No pong received for %s (sent %d pings, got %d pongs), closing connection\n",
timeSinceLastPong, pingsTotal, pongsTotal)
conn.Close()
return
}
// Send ping with write deadline
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
fmt.Printf("Jetstream: Failed to send ping: %v\n", err)
conn.Close()
return
}
w.pongMutex.Lock()
w.pingsSent++
w.pongMutex.Unlock()
}
}
}()
eventCount := 0
lastHeartbeat := time.Now()
// Read messages
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-heartbeatTicker.C:
elapsed := time.Since(lastHeartbeat)
fmt.Printf("Jetstream: Alive (processed %d events in last %.0fs)\n", eventCount, elapsed.Seconds())
eventCount = 0
lastHeartbeat = time.Now()
default:
_, message, err := conn.ReadMessage()
if err != nil {
// Calculate connection duration and idle time for debugging
connDuration := time.Since(w.connStartTime)
timeSinceLastEvent := time.Since(lastHeartbeat)
// Get ping/pong stats
w.pongMutex.Lock()
pingsTotal := w.pingsSent
pongsTotal := w.pongsReceived
timeSinceLastPong := time.Since(w.lastPongTime)
w.pongMutex.Unlock()
// Calculate ping/pong success rate
var pongRate float64
if pingsTotal > 0 {
pongRate = float64(pongsTotal) / float64(pingsTotal) * 100
}
// Determine diagnosis
var diagnosis string
if pongRate >= 95 && timeSinceLastPong < 60*time.Second {
diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption"
} else if timeSinceLastPong > 60*time.Second {
diagnosis = "Connection died (no pong for >60s), network issue detected"
} else if pongRate < 80 {
diagnosis = "Connection unstable (low pong rate), network quality issues"
} else {
diagnosis = "Connection closed unexpectedly"
}
// Log detailed context about the failure
fmt.Printf("Jetstream: Connection closed after %s\n", connDuration)
fmt.Printf(" - Events in last 30s: %d\n", eventCount)
fmt.Printf(" - Time since last event: %s\n", timeSinceLastEvent)
fmt.Printf(" - Ping/Pong: %d/%d (%.1f%% success)\n", pongsTotal, pingsTotal, pongRate)
fmt.Printf(" - Last pong: %s ago\n", timeSinceLastPong)
fmt.Printf(" - Error: %v\n", err)
fmt.Printf(" - Diagnosis: %s\n", diagnosis)
return fmt.Errorf("failed to read message: %w", err)
}
// For now, process uncompressed messages
// TODO: Re-enable compression once debugging is complete
_ = decoder // Keep decoder to avoid unused variable error
if err := w.processMessage(message); err != nil {
fmt.Printf("ERROR processing message: %v\n", err)
// Continue processing other messages
} else {
eventCount++
}
}
}
}
// SetEventCallback sets a callback to be called for each event
func (w *Worker) SetEventCallback(cb EventCallback) {
w.eventCallback = cb
}
// GetLastCursor returns the last processed cursor (time_us) for reconnects
func (w *Worker) GetLastCursor() int64 {
w.cursorMutex.RLock()
defer w.cursorMutex.RUnlock()
return w.lastCursor
}
// processMessage processes a single Jetstream event
func (w *Worker) processMessage(message []byte) error {
var event JetstreamEvent
if err := json.Unmarshal(message, &event); err != nil {
return fmt.Errorf("failed to unmarshal event: %w", err)
}
// Update cursor for reconnects (do this first, even if processing fails)
w.cursorMutex.Lock()
w.lastCursor = event.TimeUS
w.cursorMutex.Unlock()
// Call callback if set
if w.eventCallback != nil {
w.eventCallback(event.TimeUS)
}
// Only process commit events
if event.Kind != "commit" {
return nil
}
commit := event.Commit
if commit == nil {
return nil
}
// Set DID on commit from parent event
commit.DID = event.DID
// Debug: log first few collections we see to understand what's coming through
if w.debugCollectionCount < 5 {
fmt.Printf("Jetstream DEBUG: Received collection=%s, did=%s\n", commit.Collection, commit.DID)
w.debugCollectionCount++
}
// Process based on collection
switch commit.Collection {
case atproto.ManifestCollection:
fmt.Printf("Jetstream: Processing manifest event: did=%s, operation=%s, rkey=%s\n",
commit.DID, commit.Operation, commit.RKey)
return w.processManifest(commit)
case atproto.TagCollection:
fmt.Printf("Jetstream: Processing tag event: did=%s, operation=%s, rkey=%s\n",
commit.DID, commit.Operation, commit.RKey)
return w.processTag(commit)
case atproto.StarCollection:
fmt.Printf("Jetstream: Processing star event: did=%s, operation=%s, rkey=%s\n",
commit.DID, commit.Operation, commit.RKey)
return w.processStar(commit)
default:
// Ignore other collections
return nil
}
}
// ensureUser resolves and upserts a user by DID
func (w *Worker) ensureUser(ctx context.Context, did string) error {
// Check cache first
if user, ok := w.userCache.cache[did]; ok {
// Update last seen
user.LastSeen = time.Now()
return db.UpsertUser(w.db, user)
}
// Resolve DID to get handle and PDS endpoint
didParsed, err := syntax.ParseDID(did)
if err != nil {
fmt.Printf("WARNING: Invalid DID %s: %v (using DID as handle)\n", did, err)
// Fallback: use DID as handle
user := &db.User{
DID: did,
Handle: did,
PDSEndpoint: "https://bsky.social", // Default PDS endpoint as fallback
LastSeen: time.Now(),
}
w.userCache.cache[did] = user
return db.UpsertUser(w.db, user)
}
ident, err := w.directory.LookupDID(ctx, didParsed)
if err != nil {
fmt.Printf("WARNING: Failed to resolve DID %s: %v (using DID as handle)\n", did, err)
// Fallback: use DID as handle
user := &db.User{
DID: did,
Handle: did,
PDSEndpoint: "https://bsky.social", // Default PDS endpoint as fallback
LastSeen: time.Now(),
}
w.userCache.cache[did] = user
return db.UpsertUser(w.db, user)
}
resolvedDID := ident.DID.String()
handle := ident.Handle.String()
pdsEndpoint := ident.PDSEndpoint()
// If handle is invalid or PDS is missing, use defaults
if handle == "handle.invalid" || handle == "" {
handle = resolvedDID
}
if pdsEndpoint == "" {
pdsEndpoint = "https://bsky.social"
}
// Fetch user's Bluesky profile (including avatar)
// Use public Bluesky AppView API (doesn't require auth for public profiles)
avatar := ""
publicClient := atproto.NewClient("https://public.api.bsky.app", "", "")
profile, err := publicClient.GetActorProfile(ctx, resolvedDID)
if err != nil {
fmt.Printf("WARNING [worker]: Failed to fetch profile for DID %s: %v\n", resolvedDID, err)
// Continue without avatar
} else {
avatar = profile.Avatar
}
// Cache the user
user := &db.User{
DID: resolvedDID,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatar,
LastSeen: time.Now(),
}
w.userCache.cache[did] = user
// Upsert to database
return db.UpsertUser(w.db, user)
}
// processManifest processes a manifest commit event
func (w *Worker) processManifest(commit *CommitEvent) error {
// Resolve and upsert user with handle/PDS endpoint
if err := w.ensureUser(context.Background(), commit.DID); err != nil {
return fmt.Errorf("failed to ensure user: %w", err)
}
if commit.Operation == "delete" {
// Delete manifest - rkey is just the digest, repository is not encoded
digest := commit.RKey
if err := db.DeleteManifest(w.db, commit.DID, "", digest); err != nil {
return err
}
// Clean up any orphaned tags pointing to this manifest
return db.CleanupOrphanedTags(w.db, commit.DID)
}
// Parse manifest record
var manifestRecord atproto.ManifestRecord
if commit.Record != nil {
recordBytes, err := json.Marshal(commit.Record)
if err != nil {
return fmt.Errorf("failed to marshal record: %w", err)
}
if err := json.Unmarshal(recordBytes, &manifestRecord); err != nil {
return fmt.Errorf("failed to unmarshal manifest: %w", err)
}
} else {
// No record data, can't process
return nil
}
// Extract OCI annotations from manifest
var title, description, sourceURL, documentationURL, licenses, iconURL string
if manifestRecord.Annotations != nil {
title = manifestRecord.Annotations["org.opencontainers.image.title"]
description = manifestRecord.Annotations["org.opencontainers.image.description"]
sourceURL = manifestRecord.Annotations["org.opencontainers.image.source"]
documentationURL = manifestRecord.Annotations["org.opencontainers.image.documentation"]
licenses = manifestRecord.Annotations["org.opencontainers.image.licenses"]
iconURL = manifestRecord.Annotations["io.atcr.icon"]
}
// Insert manifest
manifestID, err := db.InsertManifest(w.db, &db.Manifest{
DID: commit.DID,
Repository: manifestRecord.Repository,
Digest: manifestRecord.Digest,
MediaType: manifestRecord.MediaType,
SchemaVersion: manifestRecord.SchemaVersion,
ConfigDigest: manifestRecord.Config.Digest,
ConfigSize: manifestRecord.Config.Size,
HoldEndpoint: manifestRecord.HoldEndpoint,
CreatedAt: manifestRecord.CreatedAt,
Title: title,
Description: description,
SourceURL: sourceURL,
DocumentationURL: documentationURL,
Licenses: licenses,
IconURL: iconURL,
})
if err != nil {
return fmt.Errorf("failed to insert manifest: %w", err)
}
// Insert layers
for i, layer := range manifestRecord.Layers {
if err := db.InsertLayer(w.db, &db.Layer{
ManifestID: manifestID,
Digest: layer.Digest,
MediaType: layer.MediaType,
Size: layer.Size,
LayerIndex: i,
}); err != nil {
// Continue on error - layer might already exist
continue
}
}
return nil
}
// processTag processes a tag commit event
func (w *Worker) processTag(commit *CommitEvent) error {
// Resolve and upsert user with handle/PDS endpoint
if err := w.ensureUser(context.Background(), commit.DID); err != nil {
return fmt.Errorf("failed to ensure user: %w", err)
}
if commit.Operation == "delete" {
// Delete tag - decode rkey back to repository and tag
repo, tag := atproto.RKeyToRepositoryTag(commit.RKey)
return db.DeleteTag(w.db, commit.DID, repo, tag)
}
// Parse tag record
var tagRecord atproto.TagRecord
if commit.Record != nil {
recordBytes, err := json.Marshal(commit.Record)
if err != nil {
return fmt.Errorf("failed to marshal record: %w", err)
}
if err := json.Unmarshal(recordBytes, &tagRecord); err != nil {
return fmt.Errorf("failed to unmarshal tag: %w", err)
}
} else {
return nil
}
// Insert or update tag
return db.UpsertTag(w.db, &db.Tag{
DID: commit.DID,
Repository: tagRecord.Repository,
Tag: tagRecord.Tag,
Digest: tagRecord.ManifestDigest,
CreatedAt: tagRecord.UpdatedAt,
})
}
// processStar processes a star commit event
func (w *Worker) processStar(commit *CommitEvent) error {
// Resolve and upsert the user who starred (starrer)
if err := w.ensureUser(context.Background(), commit.DID); err != nil {
return fmt.Errorf("failed to ensure user: %w", err)
}
if commit.Operation == "delete" {
// Unstar - parse the rkey to get the subject (owner DID and repository)
// Delete events don't include the full record, but the rkey contains the info we need
ownerDID, repository, err := atproto.ParseStarRecordKey(commit.RKey)
if err != nil {
return fmt.Errorf("failed to parse star rkey: %w", err)
}
// Delete the star record
return db.DeleteStar(w.db, commit.DID, ownerDID, repository)
}
// Parse star record
var starRecord atproto.StarRecord
if commit.Record != nil {
recordBytes, err := json.Marshal(commit.Record)
if err != nil {
return fmt.Errorf("failed to marshal record: %w", err)
}
if err := json.Unmarshal(recordBytes, &starRecord); err != nil {
return fmt.Errorf("failed to unmarshal star: %w", err)
}
} else {
return nil
}
// Upsert the star record (idempotent - star count will be calculated on demand)
return db.UpsertStar(w.db, commit.DID, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt)
}
// JetstreamEvent represents a Jetstream event
type JetstreamEvent struct {
DID string `json:"did"`
TimeUS int64 `json:"time_us"`
Kind string `json:"kind"` // "commit", "identity", "account"
Commit *CommitEvent `json:"commit,omitempty"`
Identity *IdentityInfo `json:"identity,omitempty"`
Account *AccountInfo `json:"account,omitempty"`
}
// CommitEvent represents a commit event (create/update/delete)
type CommitEvent struct {
Rev string `json:"rev"`
Operation string `json:"operation"` // "create", "update", "delete"
Collection string `json:"collection"`
RKey string `json:"rkey"`
Record map[string]any `json:"record,omitempty"`
CID string `json:"cid,omitempty"`
DID string `json:"-"` // Set from parent event
}
// IdentityInfo represents an identity event
type IdentityInfo struct {
DID string `json:"did"`
Handle string `json:"handle"`
Seq int64 `json:"seq"`
Time string `json:"time"`
}
// AccountInfo represents an account status event
type AccountInfo struct {
Active bool `json:"active"`
DID string `json:"did"`
Seq int64 `json:"seq"`
Time string `json:"time"`
Status string `json:"status,omitempty"`
}