Files
at-container-registry/pkg/appview/jetstream/worker.go
2026-04-19 18:01:17 -05:00

679 lines
20 KiB
Go

// Package jetstream provides an ATProto Jetstream consumer for real-time updates.
// It connects to the Bluesky Jetstream WebSocket, processes repository events,
// indexes manifests and tags, and populates the AppView database for the web UI.
package jetstream
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"net/url"
"sync"
"time"
"atcr.io/pkg/appview/db"
"atcr.io/pkg/atproto"
"github.com/gorilla/websocket"
"github.com/klauspost/compress/zstd"
)
// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups
type UserCache struct {
cache map[string]*db.User
}
// EventCallback is called for each processed event
type EventCallback func(timeUS int64)
// Worker consumes Jetstream events and populates the UI database
type Worker struct {
db *sql.DB
jetstreamURL string
endpoints *EndpointRotator
startCursor int64
wantedCollections []string
debugCollectionCount int
processor *Processor // Shared processor for DB operations
statsCache *StatsCache // In-memory cache for stats aggregation across holds
eventCallback EventCallback
connStartTime time.Time // Track when connection started for debugging
// Ping/pong tracking for connection health
pingsSent int64
pongsReceived int64
lastPongTime time.Time
pongMutex sync.Mutex
// In-memory cursor tracking for reconnects
lastCursor int64
cursorMutex sync.RWMutex
// Cursor persistence: a single-slot channel carries the most recent
// cursor to a background saver goroutine. The saver writes to
// jetstream_cursor every tick, dropping any older value that has not
// yet been flushed so the WS read loop is never blocked on DB I/O.
cursorSave chan int64
}
// NewWorker creates a new Jetstream worker
// startCursor: Unix microseconds timestamp to start from (0 = start from now)
func NewWorker(database *sql.DB, urls []string, startCursor int64) *Worker {
if len(urls) == 0 {
urls = []string{"wss://jetstream2.us-west.bsky.network/subscribe"}
}
rotator := NewEndpointRotator(urls)
// Create shared stats cache for aggregating across holds
statsCache := NewStatsCache()
return &Worker{
db: database,
jetstreamURL: rotator.Current(),
endpoints: rotator,
startCursor: startCursor,
wantedCollections: []string{
"io.atcr.*", // Subscribe to all ATCR collections
"app.bsky.actor.profile", // Subscribe to Bluesky profile updates for avatar sync
},
statsCache: statsCache,
processor: NewProcessor(database, true, statsCache), // Use cache for live streaming
cursorSave: make(chan int64, 1),
}
}
// Start begins consuming Jetstream events
// This is a blocking function that runs until the context is cancelled
func (w *Worker) Start(ctx context.Context) error {
// Build connection URL with filters
u, err := url.Parse(w.jetstreamURL)
if err != nil {
return fmt.Errorf("invalid jetstream URL: %w", err)
}
q := u.Query()
for _, collection := range w.wantedCollections {
q.Add("wantedCollections", collection)
}
// Add cursor if specified (for backfilling historical data or reconnects)
if w.startCursor > 0 {
q.Set("cursor", fmt.Sprintf("%d", w.startCursor))
// Calculate lag (cursor is in microseconds)
now := time.Now().UnixMicro()
lagSeconds := float64(now-w.startCursor) / 1_000_000.0
slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds)
}
// Disable compression for now to debug
// q.Set("compress", "true")
u.RawQuery = q.Encode()
slog.Info("Connecting to Jetstream", "url", u.String())
// Connect to Jetstream
conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
if err != nil {
return fmt.Errorf("failed to connect to jetstream: %w", err)
}
defer conn.Close()
// Track connection start time for debugging
w.connStartTime = time.Now()
// Reset ping/pong counters for this connection
w.pongMutex.Lock()
w.pingsSent = 0
w.pongsReceived = 0
w.lastPongTime = time.Now()
w.pongMutex.Unlock()
// Set up pong handler - called when server responds to our ping
conn.SetPongHandler(func(appData string) error {
w.pongMutex.Lock()
w.pongsReceived++
w.lastPongTime = time.Now()
w.pongMutex.Unlock()
// Reset read deadline - we know connection is alive
// Allow 90 seconds for next pong (3x ping interval)
return conn.SetReadDeadline(time.Now().Add(90 * time.Second))
})
// Set initial read deadline
if err := conn.SetReadDeadline(time.Now().Add(90 * time.Second)); err != nil {
return fmt.Errorf("failed to set read deadline: %w", err)
}
// Create zstd decoder for decompressing messages
decoder, err := zstd.NewReader(nil)
if err != nil {
return fmt.Errorf("failed to create zstd decoder: %w", err)
}
defer decoder.Close()
slog.Info("Connected to Jetstream, listening for events...")
// Start heartbeat ticker to show Jetstream is alive
heartbeatTicker := time.NewTicker(30 * time.Second)
defer heartbeatTicker.Stop()
// Start ping ticker for keepalive
pingTicker := time.NewTicker(30 * time.Second)
defer pingTicker.Stop()
// Start ping sender goroutine
pingDone := make(chan struct{})
defer close(pingDone)
go func() {
for {
select {
case <-ctx.Done():
return
case <-pingDone:
return
case <-pingTicker.C:
// Check if we've received a pong recently
w.pongMutex.Lock()
timeSinceLastPong := time.Since(w.lastPongTime)
pingsTotal := w.pingsSent
pongsTotal := w.pongsReceived
w.pongMutex.Unlock()
// If no pong for 60 seconds, connection is likely dead
if timeSinceLastPong > 60*time.Second {
slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal)
conn.Close()
return
}
// Send ping with write deadline
if err := conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
slog.Warn("Jetstream failed to set write deadline", "error", err)
conn.Close()
return
}
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
slog.Warn("Jetstream failed to send ping", "error", err)
conn.Close()
return
}
w.pongMutex.Lock()
w.pingsSent++
w.pongMutex.Unlock()
}
}
}()
eventCount := 0
lastHeartbeat := time.Now()
// Read messages
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-heartbeatTicker.C:
elapsed := time.Since(lastHeartbeat)
slog.Debug("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds())
eventCount = 0
lastHeartbeat = time.Now()
default:
_, message, err := conn.ReadMessage()
if err != nil {
// Calculate connection duration and idle time for debugging
connDuration := time.Since(w.connStartTime)
timeSinceLastEvent := time.Since(lastHeartbeat)
// Get ping/pong stats
w.pongMutex.Lock()
pingsTotal := w.pingsSent
pongsTotal := w.pongsReceived
timeSinceLastPong := time.Since(w.lastPongTime)
w.pongMutex.Unlock()
// Calculate ping/pong success rate
var pongRate float64
if pingsTotal > 0 {
pongRate = float64(pongsTotal) / float64(pingsTotal) * 100
}
// Determine diagnosis
var diagnosis string
if pongRate >= 95 && timeSinceLastPong < 60*time.Second {
diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption"
} else if timeSinceLastPong > 60*time.Second {
diagnosis = "Connection died (no pong for >60s), network issue detected"
} else if pongRate < 80 {
diagnosis = "Connection unstable (low pong rate), network quality issues"
} else {
diagnosis = "Connection closed unexpectedly"
}
// Log detailed context about the failure
slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis)
return fmt.Errorf("failed to read message: %w", err)
}
// For now, process uncompressed messages
// TODO: Re-enable compression once debugging is complete
_ = decoder // Keep decoder to avoid unused variable error
if err := w.processMessageResilient(ctx, message); err != nil {
slog.Error("ERROR processing message", "error", err)
// Continue processing other messages
} else {
eventCount++
}
}
}
}
// StartWithFailover runs the Jetstream worker with automatic failover across endpoints.
// On disconnect it retries the same endpoint with escalating delays (1s, 5s, 10s).
// If all retries fail, it fails over to the next endpoint and rewinds the cursor
// 30 seconds to avoid missing events (events are idempotent DB upserts).
// Cycles through all endpoints indefinitely and never gives up.
func (w *Worker) StartWithFailover(ctx context.Context) {
// Bootstrap from the persisted cursor the first time we run. If the DB
// has a saved cursor we resume from it (minus a small safety rewind so
// any gap from the previous shutdown is covered). Events are idempotent
// UPSERTs, so re-processing a handful is harmless.
if w.startCursor == 0 {
if cursor, err := db.GetJetstreamCursor(w.db); err != nil {
slog.Warn("Jetstream failed to load persisted cursor", "error", err)
} else if cursor > 0 {
const rewind = int64(30 * 1_000_000) // 30s safety rewind, same units as cursor
resume := cursor - rewind
if resume < 0 {
resume = 0
}
w.cursorMutex.Lock()
w.startCursor = resume
w.lastCursor = resume
w.cursorMutex.Unlock()
slog.Info("Jetstream resuming from persisted cursor",
"persisted_cursor", cursor,
"resume_cursor", resume)
}
}
// Launch the background cursor saver. It runs for the lifetime of this
// Start call and exits on ctx.Done with a final flush.
saverDone := make(chan struct{})
go w.runCursorSaver(ctx, saverDone)
defer func() {
<-saverDone
}()
retryDelays := []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second}
for {
currentURL := w.endpoints.Current()
w.jetstreamURL = currentURL
slog.Info("Jetstream connecting", "url", currentURL)
err := w.Start(ctx)
if ctx.Err() != nil {
return // Context cancelled, clean shutdown
}
// Capture cursor at disconnect time for rewind calculation
disconnectCursor := w.GetLastCursor()
// Retry same endpoint with escalating delays
recovered := false
for i, delay := range retryDelays {
slog.Warn("Jetstream disconnected, retrying same endpoint",
"url", currentURL,
"attempt", i+1,
"delay", delay,
"error", err)
time.Sleep(delay)
if ctx.Err() != nil {
return
}
w.jetstreamURL = currentURL
err = w.Start(ctx)
if ctx.Err() != nil {
return
}
if err == nil {
recovered = true
break
}
// Update disconnect cursor if we got further
if latest := w.GetLastCursor(); latest > disconnectCursor {
disconnectCursor = latest
}
}
if recovered {
continue
}
// All retries failed — failover to next endpoint
failedURL := currentURL
nextURL := w.endpoints.Next()
// Rewind cursor 30 seconds (30M microseconds) to avoid gaps
if disconnectCursor > 0 {
rewound := disconnectCursor - 30_000_000
if rewound < 0 {
rewound = 0
}
w.cursorMutex.Lock()
w.lastCursor = rewound
w.startCursor = rewound
w.cursorMutex.Unlock()
slog.Warn("Jetstream failing over to next endpoint",
"failed_url", failedURL,
"next_url", nextURL,
"cursor_rewound_by", "30s")
} else {
slog.Warn("Jetstream failing over to next endpoint",
"failed_url", failedURL,
"next_url", nextURL)
}
}
}
// SetEventCallback sets a callback to be called for each event
func (w *Worker) SetEventCallback(cb EventCallback) {
w.eventCallback = cb
}
// Processor returns the worker's processor for configuration (e.g., setting webhook dispatcher)
func (w *Worker) Processor() *Processor {
return w.processor
}
// runCursorSaver is a long-running goroutine that persists the most recent
// Jetstream cursor to SQLite. It writes at most once every cursorSaveInterval
// so we never hit the DB faster than it can keep up, and always flushes a
// final value on shutdown so the next Start resumes from the right place.
//
// The goroutine intentionally uses b.db directly (not ExecResilient) because
// the INSERT ... ON CONFLICT statement is a single round-trip that cannot
// trigger the poisoned-tx cascade.
func (w *Worker) runCursorSaver(ctx context.Context, done chan<- struct{}) {
defer close(done)
const cursorSaveInterval = 5 * time.Second
ticker := time.NewTicker(cursorSaveInterval)
defer ticker.Stop()
var pending int64
flush := func() {
if pending == 0 {
return
}
if err := db.SaveJetstreamCursor(w.db, pending); err != nil {
slog.Warn("Jetstream failed to persist cursor", "cursor", pending, "error", err)
return
}
pending = 0
}
for {
select {
case <-ctx.Done():
flush()
return
case c := <-w.cursorSave:
// Keep only the newest value; the ticker decides when to flush.
if c > pending {
pending = c
}
case <-ticker.C:
flush()
}
}
}
// GetLastCursor returns the last processed cursor (time_us) for reconnects
func (w *Worker) GetLastCursor() int64 {
w.cursorMutex.RLock()
defer w.cursorMutex.RUnlock()
return w.lastCursor
}
// processMessageResilient runs processMessage and, if the underlying DB
// connection was poisoned by a remote tx timeout (common with Bunny Database
// after a backfill chunk exceeded the server-side transaction limit), drains
// the poisoned connections from the pool and retries once. A second failure
// returns the error so the caller's Error log line fires — replacing silent
// data loss with a loud, attributable one.
func (w *Worker) processMessageResilient(ctx context.Context, message []byte) error {
err := w.processMessage(message)
if err == nil || !db.IsPoisonedTxErr(err) {
return err
}
slog.Warn("Jetstream poisoned connection detected, draining pool and retrying",
"error", err)
drainPool(ctx, w.db)
time.Sleep(100 * time.Millisecond)
return w.processMessage(message)
}
// drainPool borrows each idle connection from the pool in turn and runs a
// trivial probe. A poisoned connection fails the probe, and db.ExecResilient
// evicts it via driver.ErrBadConn. Loops up to the pool's open-connection
// limit so a single call can clear every bad conn.
func drainPool(ctx context.Context, database *sql.DB) {
// MaxOpenConns is 8 (see pkg/appview/db/schema.go). We probe one more time
// than that to ensure we cycle through every conn if any were mid-use.
const maxProbes = 10
for i := 0; i < maxProbes; i++ {
err := db.ExecResilient(ctx, database, func(conn *sql.Conn) error {
_, err := conn.ExecContext(ctx, "SELECT 1")
return err
})
if err == nil {
// Got a healthy conn; no need to probe further — any remaining
// poisoned conns will be evicted on their next use.
return
}
if ctx.Err() != nil {
return
}
}
}
// processMessage processes a single Jetstream event
func (w *Worker) processMessage(message []byte) error {
var event JetstreamEvent
if err := json.Unmarshal(message, &event); err != nil {
return fmt.Errorf("failed to unmarshal event: %w", err)
}
// Update cursor for reconnects (do this first, even if processing fails)
w.cursorMutex.Lock()
w.lastCursor = event.TimeUS
w.cursorMutex.Unlock()
// Offer the cursor to the async saver. Non-blocking: if the saver is
// still writing the previous value, we drop-and-replace so the DB always
// converges on the freshest cursor without ever stalling the read loop.
if w.cursorSave != nil {
select {
case w.cursorSave <- event.TimeUS:
default:
// Drain any stale value and try once more — if that still fails
// we just skip this tick; the saver's timer will catch up.
select {
case <-w.cursorSave:
default:
}
select {
case w.cursorSave <- event.TimeUS:
default:
}
}
}
// Call callback if set
if w.eventCallback != nil {
w.eventCallback(event.TimeUS)
}
// Process based on event kind
switch event.Kind {
case "commit":
commit := event.Commit
if commit == nil {
return nil
}
// Set DID on commit from parent event
commit.DID = event.DID
// Debug: log first few collections we see to understand what's coming through
if w.debugCollectionCount < 5 {
slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID)
w.debugCollectionCount++
}
// Check if this is a collection we care about
if !isRelevantCollection(commit.Collection) {
return nil // Ignore irrelevant collections
}
// Marshal record to bytes for processing
var recordBytes []byte
if commit.Record != nil {
var err error
recordBytes, err = json.Marshal(commit.Record)
if err != nil {
return fmt.Errorf("failed to marshal record: %w", err)
}
}
// Handle Bluesky profile updates separately (for avatar sync)
if commit.Collection == BlueskyProfileCollection {
// Only process creates/updates, not deletes (we don't clear avatars on profile delete)
if commit.Operation == "delete" {
return nil
}
return w.processor.ProcessProfileUpdate(context.Background(), commit.DID, recordBytes)
}
// Log ATCR events (but not Bluesky profile events - too noisy)
slog.Info("Jetstream processing event",
"collection", commit.Collection,
"did", commit.DID,
"operation", commit.Operation,
"rkey", commit.RKey)
isDelete := commit.Operation == "delete"
return w.processor.ProcessRecord(context.Background(), commit.DID, commit.Collection, commit.RKey, recordBytes, isDelete, nil)
case "identity":
if event.Identity == nil {
return nil
}
return w.processIdentity(&event)
case "account":
if event.Account == nil {
return nil
}
return w.processAccount(&event)
default:
// Ignore unknown event kinds
return nil
}
}
// processIdentity processes an identity event (handle change)
func (w *Worker) processIdentity(event *JetstreamEvent) error {
if event.Identity == nil {
return nil
}
identity := event.Identity
// Process via shared processor (only ATCR users will be logged at Info level)
return w.processor.ProcessIdentity(context.Background(), identity.DID, identity.Handle)
}
// processAccount processes an account event (status change)
func (w *Worker) processAccount(event *JetstreamEvent) error {
if event.Account == nil {
return nil
}
account := event.Account
// Process via shared processor (only ATCR users will be logged at Info level)
return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status)
}
// BlueskyProfileCollection is the collection for Bluesky actor profiles
const BlueskyProfileCollection = "app.bsky.actor.profile"
// isRelevantCollection returns true if the collection is one we process
func isRelevantCollection(collection string) bool {
switch collection {
case atproto.ManifestCollection,
atproto.TagCollection,
atproto.StarCollection,
atproto.RepoPageCollection,
atproto.SailorProfileCollection,
atproto.StatsCollection,
atproto.CaptainCollection,
atproto.CrewCollection,
atproto.ScanCollection,
BlueskyProfileCollection: // For avatar sync
return true
default:
return false
}
}
// JetstreamEvent represents a Jetstream event
type JetstreamEvent struct {
DID string `json:"did"`
TimeUS int64 `json:"time_us"`
Kind string `json:"kind"` // "commit", "identity", "account"
Commit *CommitEvent `json:"commit,omitempty"`
Identity *IdentityInfo `json:"identity,omitempty"`
Account *AccountInfo `json:"account,omitempty"`
}
// CommitEvent represents a commit event (create/update/delete)
type CommitEvent struct {
Rev string `json:"rev"`
Operation string `json:"operation"` // "create", "update", "delete"
Collection string `json:"collection"`
RKey string `json:"rkey"`
Record map[string]any `json:"record,omitempty"`
CID string `json:"cid,omitempty"`
DID string `json:"-"` // Set from parent event
}
// IdentityInfo represents an identity event
type IdentityInfo struct {
DID string `json:"did"`
Handle string `json:"handle"`
Seq int64 `json:"seq"`
Time string `json:"time"`
}
// AccountInfo represents an account status event
type AccountInfo struct {
Active bool `json:"active"`
DID string `json:"did"`
Seq int64 `json:"seq"`
Time string `json:"time"`
Status string `json:"status,omitempty"`
}