535 lines
16 KiB
Go
535 lines
16 KiB
Go
// Package jetstream provides an ATProto Jetstream consumer for real-time updates.
|
|
// It connects to the Bluesky Jetstream WebSocket, processes repository events,
|
|
// indexes manifests and tags, and populates the AppView database for the web UI.
|
|
package jetstream
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/url"
|
|
"sync"
|
|
"time"
|
|
|
|
"atcr.io/pkg/appview/db"
|
|
"atcr.io/pkg/atproto"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/klauspost/compress/zstd"
|
|
)
|
|
|
|
// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups
|
|
type UserCache struct {
|
|
cache map[string]*db.User
|
|
}
|
|
|
|
// EventCallback is called for each processed event
|
|
type EventCallback func(timeUS int64)
|
|
|
|
// Worker consumes Jetstream events and populates the UI database
|
|
type Worker struct {
|
|
db *sql.DB
|
|
jetstreamURL string
|
|
endpoints *EndpointRotator
|
|
startCursor int64
|
|
wantedCollections []string
|
|
debugCollectionCount int
|
|
processor *Processor // Shared processor for DB operations
|
|
statsCache *StatsCache // In-memory cache for stats aggregation across holds
|
|
eventCallback EventCallback
|
|
connStartTime time.Time // Track when connection started for debugging
|
|
|
|
// Ping/pong tracking for connection health
|
|
pingsSent int64
|
|
pongsReceived int64
|
|
lastPongTime time.Time
|
|
pongMutex sync.Mutex
|
|
|
|
// In-memory cursor tracking for reconnects
|
|
lastCursor int64
|
|
cursorMutex sync.RWMutex
|
|
}
|
|
|
|
// NewWorker creates a new Jetstream worker
|
|
// startCursor: Unix microseconds timestamp to start from (0 = start from now)
|
|
func NewWorker(database *sql.DB, urls []string, startCursor int64) *Worker {
|
|
if len(urls) == 0 {
|
|
urls = []string{"wss://jetstream2.us-west.bsky.network/subscribe"}
|
|
}
|
|
|
|
rotator := NewEndpointRotator(urls)
|
|
|
|
// Create shared stats cache for aggregating across holds
|
|
statsCache := NewStatsCache()
|
|
|
|
return &Worker{
|
|
db: database,
|
|
jetstreamURL: rotator.Current(),
|
|
endpoints: rotator,
|
|
startCursor: startCursor,
|
|
wantedCollections: []string{
|
|
"io.atcr.*", // Subscribe to all ATCR collections
|
|
"app.bsky.actor.profile", // Subscribe to Bluesky profile updates for avatar sync
|
|
},
|
|
statsCache: statsCache,
|
|
processor: NewProcessor(database, true, statsCache), // Use cache for live streaming
|
|
}
|
|
}
|
|
|
|
// Start begins consuming Jetstream events
|
|
// This is a blocking function that runs until the context is cancelled
|
|
func (w *Worker) Start(ctx context.Context) error {
|
|
// Build connection URL with filters
|
|
u, err := url.Parse(w.jetstreamURL)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid jetstream URL: %w", err)
|
|
}
|
|
|
|
q := u.Query()
|
|
for _, collection := range w.wantedCollections {
|
|
q.Add("wantedCollections", collection)
|
|
}
|
|
|
|
// Add cursor if specified (for backfilling historical data or reconnects)
|
|
if w.startCursor > 0 {
|
|
q.Set("cursor", fmt.Sprintf("%d", w.startCursor))
|
|
|
|
// Calculate lag (cursor is in microseconds)
|
|
now := time.Now().UnixMicro()
|
|
lagSeconds := float64(now-w.startCursor) / 1_000_000.0
|
|
slog.Info("Jetstream starting from cursor", "cursor", w.startCursor, "lag_seconds", lagSeconds)
|
|
}
|
|
|
|
// Disable compression for now to debug
|
|
// q.Set("compress", "true")
|
|
u.RawQuery = q.Encode()
|
|
|
|
slog.Info("Connecting to Jetstream", "url", u.String())
|
|
|
|
// Connect to Jetstream
|
|
conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to jetstream: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
// Track connection start time for debugging
|
|
w.connStartTime = time.Now()
|
|
|
|
// Reset ping/pong counters for this connection
|
|
w.pongMutex.Lock()
|
|
w.pingsSent = 0
|
|
w.pongsReceived = 0
|
|
w.lastPongTime = time.Now()
|
|
w.pongMutex.Unlock()
|
|
|
|
// Set up pong handler - called when server responds to our ping
|
|
conn.SetPongHandler(func(appData string) error {
|
|
w.pongMutex.Lock()
|
|
w.pongsReceived++
|
|
w.lastPongTime = time.Now()
|
|
w.pongMutex.Unlock()
|
|
|
|
// Reset read deadline - we know connection is alive
|
|
// Allow 90 seconds for next pong (3x ping interval)
|
|
return conn.SetReadDeadline(time.Now().Add(90 * time.Second))
|
|
})
|
|
|
|
// Set initial read deadline
|
|
if err := conn.SetReadDeadline(time.Now().Add(90 * time.Second)); err != nil {
|
|
return fmt.Errorf("failed to set read deadline: %w", err)
|
|
}
|
|
|
|
// Create zstd decoder for decompressing messages
|
|
decoder, err := zstd.NewReader(nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create zstd decoder: %w", err)
|
|
}
|
|
defer decoder.Close()
|
|
|
|
slog.Info("Connected to Jetstream, listening for events...")
|
|
|
|
// Start heartbeat ticker to show Jetstream is alive
|
|
heartbeatTicker := time.NewTicker(30 * time.Second)
|
|
defer heartbeatTicker.Stop()
|
|
|
|
// Start ping ticker for keepalive
|
|
pingTicker := time.NewTicker(30 * time.Second)
|
|
defer pingTicker.Stop()
|
|
|
|
// Start ping sender goroutine
|
|
pingDone := make(chan struct{})
|
|
defer close(pingDone)
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-pingDone:
|
|
return
|
|
case <-pingTicker.C:
|
|
// Check if we've received a pong recently
|
|
w.pongMutex.Lock()
|
|
timeSinceLastPong := time.Since(w.lastPongTime)
|
|
pingsTotal := w.pingsSent
|
|
pongsTotal := w.pongsReceived
|
|
w.pongMutex.Unlock()
|
|
|
|
// If no pong for 60 seconds, connection is likely dead
|
|
if timeSinceLastPong > 60*time.Second {
|
|
slog.Info("Jetstream no pong received, closing connection", "time_since_last_pong", timeSinceLastPong, "pings_sent", pingsTotal, "pongs_received", pongsTotal)
|
|
conn.Close()
|
|
return
|
|
}
|
|
|
|
// Send ping with write deadline
|
|
if err := conn.SetWriteDeadline(time.Now().Add(10 * time.Second)); err != nil {
|
|
slog.Warn("Jetstream failed to set write deadline", "error", err)
|
|
conn.Close()
|
|
return
|
|
}
|
|
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
slog.Warn("Jetstream failed to send ping", "error", err)
|
|
conn.Close()
|
|
return
|
|
}
|
|
|
|
w.pongMutex.Lock()
|
|
w.pingsSent++
|
|
w.pongMutex.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
|
|
eventCount := 0
|
|
lastHeartbeat := time.Now()
|
|
|
|
// Read messages
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-heartbeatTicker.C:
|
|
elapsed := time.Since(lastHeartbeat)
|
|
slog.Debug("Jetstream alive", "events_processed", eventCount, "elapsed_seconds", elapsed.Seconds())
|
|
eventCount = 0
|
|
lastHeartbeat = time.Now()
|
|
default:
|
|
_, message, err := conn.ReadMessage()
|
|
if err != nil {
|
|
// Calculate connection duration and idle time for debugging
|
|
connDuration := time.Since(w.connStartTime)
|
|
timeSinceLastEvent := time.Since(lastHeartbeat)
|
|
|
|
// Get ping/pong stats
|
|
w.pongMutex.Lock()
|
|
pingsTotal := w.pingsSent
|
|
pongsTotal := w.pongsReceived
|
|
timeSinceLastPong := time.Since(w.lastPongTime)
|
|
w.pongMutex.Unlock()
|
|
|
|
// Calculate ping/pong success rate
|
|
var pongRate float64
|
|
if pingsTotal > 0 {
|
|
pongRate = float64(pongsTotal) / float64(pingsTotal) * 100
|
|
}
|
|
|
|
// Determine diagnosis
|
|
var diagnosis string
|
|
if pongRate >= 95 && timeSinceLastPong < 60*time.Second {
|
|
diagnosis = "Connection was healthy (good ping/pong), likely server-side timeout or network interruption"
|
|
} else if timeSinceLastPong > 60*time.Second {
|
|
diagnosis = "Connection died (no pong for >60s), network issue detected"
|
|
} else if pongRate < 80 {
|
|
diagnosis = "Connection unstable (low pong rate), network quality issues"
|
|
} else {
|
|
diagnosis = "Connection closed unexpectedly"
|
|
}
|
|
|
|
// Log detailed context about the failure
|
|
slog.Info("Jetstream connection closed", "duration", connDuration, "events_in_last_30s", eventCount, "time_since_last_event", timeSinceLastEvent, "pongs_received", pongsTotal, "pings_sent", pingsTotal, "pong_rate_pct", pongRate, "time_since_last_pong", timeSinceLastPong, "error", err, "diagnosis", diagnosis)
|
|
|
|
return fmt.Errorf("failed to read message: %w", err)
|
|
}
|
|
|
|
// For now, process uncompressed messages
|
|
// TODO: Re-enable compression once debugging is complete
|
|
_ = decoder // Keep decoder to avoid unused variable error
|
|
|
|
if err := w.processMessage(message); err != nil {
|
|
slog.Error("ERROR processing message", "error", err)
|
|
// Continue processing other messages
|
|
} else {
|
|
eventCount++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// StartWithFailover runs the Jetstream worker with automatic failover across endpoints.
|
|
// On disconnect it retries the same endpoint with escalating delays (1s, 5s, 10s).
|
|
// If all retries fail, it fails over to the next endpoint and rewinds the cursor
|
|
// 30 seconds to avoid missing events (events are idempotent DB upserts).
|
|
// Cycles through all endpoints indefinitely and never gives up.
|
|
func (w *Worker) StartWithFailover(ctx context.Context) {
|
|
retryDelays := []time.Duration{1 * time.Second, 5 * time.Second, 10 * time.Second}
|
|
|
|
for {
|
|
currentURL := w.endpoints.Current()
|
|
w.jetstreamURL = currentURL
|
|
|
|
slog.Info("Jetstream connecting", "url", currentURL)
|
|
err := w.Start(ctx)
|
|
if ctx.Err() != nil {
|
|
return // Context cancelled, clean shutdown
|
|
}
|
|
|
|
// Capture cursor at disconnect time for rewind calculation
|
|
disconnectCursor := w.GetLastCursor()
|
|
|
|
// Retry same endpoint with escalating delays
|
|
recovered := false
|
|
for i, delay := range retryDelays {
|
|
slog.Warn("Jetstream disconnected, retrying same endpoint",
|
|
"url", currentURL,
|
|
"attempt", i+1,
|
|
"delay", delay,
|
|
"error", err)
|
|
time.Sleep(delay)
|
|
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
w.jetstreamURL = currentURL
|
|
err = w.Start(ctx)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
if err == nil {
|
|
recovered = true
|
|
break
|
|
}
|
|
// Update disconnect cursor if we got further
|
|
if latest := w.GetLastCursor(); latest > disconnectCursor {
|
|
disconnectCursor = latest
|
|
}
|
|
}
|
|
|
|
if recovered {
|
|
continue
|
|
}
|
|
|
|
// All retries failed — failover to next endpoint
|
|
failedURL := currentURL
|
|
nextURL := w.endpoints.Next()
|
|
|
|
// Rewind cursor 30 seconds (30M microseconds) to avoid gaps
|
|
if disconnectCursor > 0 {
|
|
rewound := disconnectCursor - 30_000_000
|
|
if rewound < 0 {
|
|
rewound = 0
|
|
}
|
|
w.cursorMutex.Lock()
|
|
w.lastCursor = rewound
|
|
w.startCursor = rewound
|
|
w.cursorMutex.Unlock()
|
|
slog.Warn("Jetstream failing over to next endpoint",
|
|
"failed_url", failedURL,
|
|
"next_url", nextURL,
|
|
"cursor_rewound_by", "30s")
|
|
} else {
|
|
slog.Warn("Jetstream failing over to next endpoint",
|
|
"failed_url", failedURL,
|
|
"next_url", nextURL)
|
|
}
|
|
}
|
|
}
|
|
|
|
// SetEventCallback sets a callback to be called for each event
|
|
func (w *Worker) SetEventCallback(cb EventCallback) {
|
|
w.eventCallback = cb
|
|
}
|
|
|
|
// Processor returns the worker's processor for configuration (e.g., setting webhook dispatcher)
|
|
func (w *Worker) Processor() *Processor {
|
|
return w.processor
|
|
}
|
|
|
|
// GetLastCursor returns the last processed cursor (time_us) for reconnects
|
|
func (w *Worker) GetLastCursor() int64 {
|
|
w.cursorMutex.RLock()
|
|
defer w.cursorMutex.RUnlock()
|
|
return w.lastCursor
|
|
}
|
|
|
|
// processMessage processes a single Jetstream event
|
|
func (w *Worker) processMessage(message []byte) error {
|
|
var event JetstreamEvent
|
|
if err := json.Unmarshal(message, &event); err != nil {
|
|
return fmt.Errorf("failed to unmarshal event: %w", err)
|
|
}
|
|
|
|
// Update cursor for reconnects (do this first, even if processing fails)
|
|
w.cursorMutex.Lock()
|
|
w.lastCursor = event.TimeUS
|
|
w.cursorMutex.Unlock()
|
|
|
|
// Call callback if set
|
|
if w.eventCallback != nil {
|
|
w.eventCallback(event.TimeUS)
|
|
}
|
|
|
|
// Process based on event kind
|
|
switch event.Kind {
|
|
case "commit":
|
|
commit := event.Commit
|
|
if commit == nil {
|
|
return nil
|
|
}
|
|
|
|
// Set DID on commit from parent event
|
|
commit.DID = event.DID
|
|
|
|
// Debug: log first few collections we see to understand what's coming through
|
|
if w.debugCollectionCount < 5 {
|
|
slog.Debug("Jetstream received collection", "collection", commit.Collection, "did", commit.DID)
|
|
w.debugCollectionCount++
|
|
}
|
|
|
|
// Check if this is a collection we care about
|
|
if !isRelevantCollection(commit.Collection) {
|
|
return nil // Ignore irrelevant collections
|
|
}
|
|
|
|
// Marshal record to bytes for processing
|
|
var recordBytes []byte
|
|
if commit.Record != nil {
|
|
var err error
|
|
recordBytes, err = json.Marshal(commit.Record)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal record: %w", err)
|
|
}
|
|
}
|
|
|
|
// Handle Bluesky profile updates separately (for avatar sync)
|
|
if commit.Collection == BlueskyProfileCollection {
|
|
// Only process creates/updates, not deletes (we don't clear avatars on profile delete)
|
|
if commit.Operation == "delete" {
|
|
return nil
|
|
}
|
|
return w.processor.ProcessProfileUpdate(context.Background(), commit.DID, recordBytes)
|
|
}
|
|
|
|
// Log ATCR events (but not Bluesky profile events - too noisy)
|
|
slog.Info("Jetstream processing event",
|
|
"collection", commit.Collection,
|
|
"did", commit.DID,
|
|
"operation", commit.Operation,
|
|
"rkey", commit.RKey)
|
|
|
|
isDelete := commit.Operation == "delete"
|
|
return w.processor.ProcessRecord(context.Background(), commit.DID, commit.Collection, commit.RKey, recordBytes, isDelete, nil)
|
|
|
|
case "identity":
|
|
if event.Identity == nil {
|
|
return nil
|
|
}
|
|
return w.processIdentity(&event)
|
|
|
|
case "account":
|
|
if event.Account == nil {
|
|
return nil
|
|
}
|
|
return w.processAccount(&event)
|
|
|
|
default:
|
|
// Ignore unknown event kinds
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// processIdentity processes an identity event (handle change)
|
|
func (w *Worker) processIdentity(event *JetstreamEvent) error {
|
|
if event.Identity == nil {
|
|
return nil
|
|
}
|
|
|
|
identity := event.Identity
|
|
// Process via shared processor (only ATCR users will be logged at Info level)
|
|
return w.processor.ProcessIdentity(context.Background(), identity.DID, identity.Handle)
|
|
}
|
|
|
|
// processAccount processes an account event (status change)
|
|
func (w *Worker) processAccount(event *JetstreamEvent) error {
|
|
if event.Account == nil {
|
|
return nil
|
|
}
|
|
|
|
account := event.Account
|
|
// Process via shared processor (only ATCR users will be logged at Info level)
|
|
return w.processor.ProcessAccount(context.Background(), account.DID, account.Active, account.Status)
|
|
}
|
|
|
|
// BlueskyProfileCollection is the collection for Bluesky actor profiles
|
|
const BlueskyProfileCollection = "app.bsky.actor.profile"
|
|
|
|
// isRelevantCollection returns true if the collection is one we process
|
|
func isRelevantCollection(collection string) bool {
|
|
switch collection {
|
|
case atproto.ManifestCollection,
|
|
atproto.TagCollection,
|
|
atproto.StarCollection,
|
|
atproto.RepoPageCollection,
|
|
atproto.SailorProfileCollection,
|
|
atproto.StatsCollection,
|
|
atproto.CaptainCollection,
|
|
atproto.CrewCollection,
|
|
atproto.ScanCollection,
|
|
BlueskyProfileCollection: // For avatar sync
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// JetstreamEvent represents a Jetstream event
|
|
type JetstreamEvent struct {
|
|
DID string `json:"did"`
|
|
TimeUS int64 `json:"time_us"`
|
|
Kind string `json:"kind"` // "commit", "identity", "account"
|
|
Commit *CommitEvent `json:"commit,omitempty"`
|
|
Identity *IdentityInfo `json:"identity,omitempty"`
|
|
Account *AccountInfo `json:"account,omitempty"`
|
|
}
|
|
|
|
// CommitEvent represents a commit event (create/update/delete)
|
|
type CommitEvent struct {
|
|
Rev string `json:"rev"`
|
|
Operation string `json:"operation"` // "create", "update", "delete"
|
|
Collection string `json:"collection"`
|
|
RKey string `json:"rkey"`
|
|
Record map[string]any `json:"record,omitempty"`
|
|
CID string `json:"cid,omitempty"`
|
|
DID string `json:"-"` // Set from parent event
|
|
}
|
|
|
|
// IdentityInfo represents an identity event
|
|
type IdentityInfo struct {
|
|
DID string `json:"did"`
|
|
Handle string `json:"handle"`
|
|
Seq int64 `json:"seq"`
|
|
Time string `json:"time"`
|
|
}
|
|
|
|
// AccountInfo represents an account status event
|
|
type AccountInfo struct {
|
|
Active bool `json:"active"`
|
|
DID string `json:"did"`
|
|
Seq int64 `json:"seq"`
|
|
Time string `json:"time"`
|
|
Status string `json:"status,omitempty"`
|
|
}
|