mirror of
https://tangled.org/evan.jarrett.net/at-container-registry
synced 2026-06-08 00:02:34 +00:00
505 lines
15 KiB
Go
505 lines
15 KiB
Go
package jetstream
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/bluesky-social/indigo/atproto/identity"
|
|
"github.com/bluesky-social/indigo/atproto/syntax"
|
|
|
|
"atcr.io/pkg/appview/db"
|
|
"atcr.io/pkg/atproto"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/klauspost/compress/zstd"
|
|
)
|
|
|
|
// UserCache caches DID -> handle/PDS mappings to avoid repeated lookups
|
|
type UserCache struct {
|
|
cache map[string]*db.User
|
|
}
|
|
|
|
// EventCallback is called for each processed event
|
|
type EventCallback func(timeUS int64)
|
|
|
|
// Worker consumes Jetstream events and populates the UI database
|
|
type Worker struct {
|
|
db *sql.DB
|
|
jetstreamURL string
|
|
startCursor int64
|
|
wantedCollections []string
|
|
debugCollectionCount int
|
|
userCache *UserCache
|
|
directory identity.Directory
|
|
eventCallback EventCallback
|
|
connStartTime time.Time // Track when connection started for debugging
|
|
}
|
|
|
|
// NewWorker creates a new Jetstream worker
|
|
// startCursor: Unix microseconds timestamp to start from (0 = start from now)
|
|
func NewWorker(database *sql.DB, jetstreamURL string, startCursor int64) *Worker {
|
|
if jetstreamURL == "" {
|
|
jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe"
|
|
}
|
|
|
|
return &Worker{
|
|
db: database,
|
|
jetstreamURL: jetstreamURL,
|
|
startCursor: startCursor,
|
|
wantedCollections: []string{
|
|
atproto.ManifestCollection, // io.atcr.manifest
|
|
atproto.TagCollection, // io.atcr.tag
|
|
atproto.StarCollection, // io.atcr.sailor.star
|
|
},
|
|
userCache: &UserCache{
|
|
cache: make(map[string]*db.User),
|
|
},
|
|
directory: identity.DefaultDirectory(),
|
|
}
|
|
}
|
|
|
|
// Start begins consuming Jetstream events
|
|
// This is a blocking function that runs until the context is cancelled
|
|
func (w *Worker) Start(ctx context.Context) error {
|
|
// Build connection URL with filters
|
|
u, err := url.Parse(w.jetstreamURL)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid jetstream URL: %w", err)
|
|
}
|
|
|
|
q := u.Query()
|
|
for _, collection := range w.wantedCollections {
|
|
q.Add("wantedCollections", collection)
|
|
}
|
|
|
|
// Add cursor if specified (for backfilling historical data)
|
|
if w.startCursor > 0 {
|
|
q.Set("cursor", fmt.Sprintf("%d", w.startCursor))
|
|
fmt.Printf("Starting from cursor: %d (replaying historical events)\n", w.startCursor)
|
|
}
|
|
|
|
// Disable compression for now to debug
|
|
// q.Set("compress", "true")
|
|
u.RawQuery = q.Encode()
|
|
|
|
fmt.Printf("Connecting to Jetstream: %s\n", u.String())
|
|
|
|
// Connect to Jetstream
|
|
conn, _, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to jetstream: %w", err)
|
|
}
|
|
defer conn.Close()
|
|
|
|
// Track connection start time for debugging
|
|
w.connStartTime = time.Now()
|
|
|
|
// Create zstd decoder for decompressing messages
|
|
decoder, err := zstd.NewReader(nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create zstd decoder: %w", err)
|
|
}
|
|
defer decoder.Close()
|
|
|
|
fmt.Println("Connected to Jetstream, listening for events...")
|
|
|
|
// Start heartbeat ticker to show Jetstream is alive
|
|
heartbeatTicker := time.NewTicker(30 * time.Second)
|
|
defer heartbeatTicker.Stop()
|
|
|
|
eventCount := 0
|
|
lastHeartbeat := time.Now()
|
|
|
|
// Read messages
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-heartbeatTicker.C:
|
|
elapsed := time.Since(lastHeartbeat)
|
|
fmt.Printf("Jetstream: Alive (processed %d events in last %.0fs)\n", eventCount, elapsed.Seconds())
|
|
eventCount = 0
|
|
lastHeartbeat = time.Now()
|
|
default:
|
|
_, message, err := conn.ReadMessage()
|
|
if err != nil {
|
|
// Calculate connection duration and idle time for debugging
|
|
connDuration := time.Since(w.connStartTime)
|
|
timeSinceLastEvent := time.Since(lastHeartbeat)
|
|
|
|
// Log detailed context about the failure
|
|
fmt.Printf("Jetstream: Connection closed after %s\n", connDuration)
|
|
fmt.Printf(" - Events in last 30s: %d\n", eventCount)
|
|
fmt.Printf(" - Time since last event: %s\n", timeSinceLastEvent)
|
|
fmt.Printf(" - Error: %v\n", err)
|
|
|
|
return fmt.Errorf("failed to read message: %w", err)
|
|
}
|
|
|
|
// For now, process uncompressed messages
|
|
// TODO: Re-enable compression once debugging is complete
|
|
_ = decoder // Keep decoder to avoid unused variable error
|
|
|
|
if err := w.processMessage(message); err != nil {
|
|
fmt.Printf("ERROR processing message: %v\n", err)
|
|
// Continue processing other messages
|
|
} else {
|
|
eventCount++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SetEventCallback sets a callback to be called for each event
|
|
func (w *Worker) SetEventCallback(cb EventCallback) {
|
|
w.eventCallback = cb
|
|
}
|
|
|
|
// processMessage processes a single Jetstream event
|
|
func (w *Worker) processMessage(message []byte) error {
|
|
var event JetstreamEvent
|
|
if err := json.Unmarshal(message, &event); err != nil {
|
|
return fmt.Errorf("failed to unmarshal event: %w", err)
|
|
}
|
|
|
|
// Call callback if set
|
|
if w.eventCallback != nil {
|
|
w.eventCallback(event.TimeUS)
|
|
}
|
|
|
|
// Only process commit events
|
|
if event.Kind != "commit" {
|
|
return nil
|
|
}
|
|
|
|
commit := event.Commit
|
|
if commit == nil {
|
|
return nil
|
|
}
|
|
|
|
// Set DID on commit from parent event
|
|
commit.DID = event.DID
|
|
|
|
// Debug: log first few collections we see to understand what's coming through
|
|
if w.debugCollectionCount < 5 {
|
|
fmt.Printf("Jetstream DEBUG: Received collection=%s, did=%s\n", commit.Collection, commit.DID)
|
|
w.debugCollectionCount++
|
|
}
|
|
|
|
// Process based on collection
|
|
switch commit.Collection {
|
|
case atproto.ManifestCollection:
|
|
fmt.Printf("Jetstream: Processing manifest event: did=%s, operation=%s, rkey=%s\n",
|
|
commit.DID, commit.Operation, commit.RKey)
|
|
return w.processManifest(commit)
|
|
case atproto.TagCollection:
|
|
fmt.Printf("Jetstream: Processing tag event: did=%s, operation=%s, rkey=%s\n",
|
|
commit.DID, commit.Operation, commit.RKey)
|
|
return w.processTag(commit)
|
|
case atproto.StarCollection:
|
|
fmt.Printf("Jetstream: Processing star event: did=%s, operation=%s, rkey=%s\n",
|
|
commit.DID, commit.Operation, commit.RKey)
|
|
return w.processStar(commit)
|
|
default:
|
|
// Ignore other collections
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ensureUser resolves and upserts a user by DID
|
|
func (w *Worker) ensureUser(ctx context.Context, did string) error {
|
|
// Check cache first
|
|
if user, ok := w.userCache.cache[did]; ok {
|
|
// Update last seen
|
|
user.LastSeen = time.Now()
|
|
return db.UpsertUser(w.db, user)
|
|
}
|
|
|
|
// Resolve DID to get handle and PDS endpoint
|
|
didParsed, err := syntax.ParseDID(did)
|
|
if err != nil {
|
|
fmt.Printf("WARNING: Invalid DID %s: %v (using DID as handle)\n", did, err)
|
|
// Fallback: use DID as handle
|
|
user := &db.User{
|
|
DID: did,
|
|
Handle: did,
|
|
PDSEndpoint: "https://bsky.social", // Default PDS endpoint as fallback
|
|
LastSeen: time.Now(),
|
|
}
|
|
w.userCache.cache[did] = user
|
|
return db.UpsertUser(w.db, user)
|
|
}
|
|
|
|
ident, err := w.directory.LookupDID(ctx, didParsed)
|
|
if err != nil {
|
|
fmt.Printf("WARNING: Failed to resolve DID %s: %v (using DID as handle)\n", did, err)
|
|
// Fallback: use DID as handle
|
|
user := &db.User{
|
|
DID: did,
|
|
Handle: did,
|
|
PDSEndpoint: "https://bsky.social", // Default PDS endpoint as fallback
|
|
LastSeen: time.Now(),
|
|
}
|
|
w.userCache.cache[did] = user
|
|
return db.UpsertUser(w.db, user)
|
|
}
|
|
|
|
resolvedDID := ident.DID.String()
|
|
handle := ident.Handle.String()
|
|
pdsEndpoint := ident.PDSEndpoint()
|
|
|
|
// If handle is invalid or PDS is missing, use defaults
|
|
if handle == "handle.invalid" || handle == "" {
|
|
handle = resolvedDID
|
|
}
|
|
if pdsEndpoint == "" {
|
|
pdsEndpoint = "https://bsky.social"
|
|
}
|
|
|
|
// Fetch user's Bluesky profile (including avatar)
|
|
// Use public Bluesky AppView API (doesn't require auth for public profiles)
|
|
avatar := ""
|
|
publicClient := atproto.NewClient("https://public.api.bsky.app", "", "")
|
|
profile, err := publicClient.GetActorProfile(ctx, resolvedDID)
|
|
if err != nil {
|
|
fmt.Printf("WARNING [worker]: Failed to fetch profile for DID %s: %v\n", resolvedDID, err)
|
|
// Continue without avatar
|
|
} else {
|
|
avatar = profile.Avatar
|
|
}
|
|
|
|
// Cache the user
|
|
user := &db.User{
|
|
DID: resolvedDID,
|
|
Handle: handle,
|
|
PDSEndpoint: pdsEndpoint,
|
|
Avatar: avatar,
|
|
LastSeen: time.Now(),
|
|
}
|
|
w.userCache.cache[did] = user
|
|
|
|
// Upsert to database
|
|
return db.UpsertUser(w.db, user)
|
|
}
|
|
|
|
// processManifest processes a manifest commit event
|
|
func (w *Worker) processManifest(commit *CommitEvent) error {
|
|
// Resolve and upsert user with handle/PDS endpoint
|
|
if err := w.ensureUser(context.Background(), commit.DID); err != nil {
|
|
return fmt.Errorf("failed to ensure user: %w", err)
|
|
}
|
|
|
|
if commit.Operation == "delete" {
|
|
// Delete manifest
|
|
repo := extractRepoFromRKey(commit.RKey)
|
|
digest := commit.RKey
|
|
return db.DeleteManifest(w.db, commit.DID, repo, digest)
|
|
}
|
|
|
|
// Parse manifest record
|
|
var manifestRecord atproto.ManifestRecord
|
|
if commit.Record != nil {
|
|
recordBytes, err := json.Marshal(commit.Record)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal record: %w", err)
|
|
}
|
|
if err := json.Unmarshal(recordBytes, &manifestRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal manifest: %w", err)
|
|
}
|
|
} else {
|
|
// No record data, can't process
|
|
return nil
|
|
}
|
|
|
|
// Extract OCI annotations from manifest
|
|
var title, description, sourceURL, documentationURL, licenses, iconURL string
|
|
if manifestRecord.Annotations != nil {
|
|
title = manifestRecord.Annotations["org.opencontainers.image.title"]
|
|
description = manifestRecord.Annotations["org.opencontainers.image.description"]
|
|
sourceURL = manifestRecord.Annotations["org.opencontainers.image.source"]
|
|
documentationURL = manifestRecord.Annotations["org.opencontainers.image.documentation"]
|
|
licenses = manifestRecord.Annotations["org.opencontainers.image.licenses"]
|
|
iconURL = manifestRecord.Annotations["io.atcr.icon"]
|
|
}
|
|
|
|
// Insert manifest
|
|
manifestID, err := db.InsertManifest(w.db, &db.Manifest{
|
|
DID: commit.DID,
|
|
Repository: manifestRecord.Repository,
|
|
Digest: manifestRecord.Digest,
|
|
MediaType: manifestRecord.MediaType,
|
|
SchemaVersion: manifestRecord.SchemaVersion,
|
|
ConfigDigest: manifestRecord.Config.Digest,
|
|
ConfigSize: manifestRecord.Config.Size,
|
|
HoldEndpoint: manifestRecord.HoldEndpoint,
|
|
CreatedAt: manifestRecord.CreatedAt,
|
|
Title: title,
|
|
Description: description,
|
|
SourceURL: sourceURL,
|
|
DocumentationURL: documentationURL,
|
|
Licenses: licenses,
|
|
IconURL: iconURL,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert manifest: %w", err)
|
|
}
|
|
|
|
// Insert layers
|
|
for i, layer := range manifestRecord.Layers {
|
|
if err := db.InsertLayer(w.db, &db.Layer{
|
|
ManifestID: manifestID,
|
|
Digest: layer.Digest,
|
|
MediaType: layer.MediaType,
|
|
Size: layer.Size,
|
|
LayerIndex: i,
|
|
}); err != nil {
|
|
// Continue on error - layer might already exist
|
|
continue
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processTag processes a tag commit event
|
|
func (w *Worker) processTag(commit *CommitEvent) error {
|
|
// Resolve and upsert user with handle/PDS endpoint
|
|
if err := w.ensureUser(context.Background(), commit.DID); err != nil {
|
|
return fmt.Errorf("failed to ensure user: %w", err)
|
|
}
|
|
|
|
if commit.Operation == "delete" {
|
|
// Delete tag
|
|
parts := strings.Split(commit.RKey, "/")
|
|
if len(parts) < 2 {
|
|
return fmt.Errorf("invalid tag rkey: %s", commit.RKey)
|
|
}
|
|
repo := strings.Join(parts[:len(parts)-1], "/")
|
|
tag := parts[len(parts)-1]
|
|
return db.DeleteTag(w.db, commit.DID, repo, tag)
|
|
}
|
|
|
|
// Parse tag record
|
|
var tagRecord atproto.TagRecord
|
|
if commit.Record != nil {
|
|
recordBytes, err := json.Marshal(commit.Record)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal record: %w", err)
|
|
}
|
|
if err := json.Unmarshal(recordBytes, &tagRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal tag: %w", err)
|
|
}
|
|
} else {
|
|
return nil
|
|
}
|
|
|
|
// Insert or update tag
|
|
return db.UpsertTag(w.db, &db.Tag{
|
|
DID: commit.DID,
|
|
Repository: tagRecord.Repository,
|
|
Tag: tagRecord.Tag,
|
|
Digest: tagRecord.ManifestDigest,
|
|
CreatedAt: tagRecord.UpdatedAt,
|
|
})
|
|
}
|
|
|
|
// processStar processes a star commit event
|
|
func (w *Worker) processStar(commit *CommitEvent) error {
|
|
// Resolve and upsert the user who starred (starrer)
|
|
if err := w.ensureUser(context.Background(), commit.DID); err != nil {
|
|
return fmt.Errorf("failed to ensure user: %w", err)
|
|
}
|
|
|
|
if commit.Operation == "delete" {
|
|
// Unstar - parse the rkey to get the subject (owner DID and repository)
|
|
// Delete events don't include the full record, but the rkey contains the info we need
|
|
ownerDID, repository, err := atproto.ParseStarRecordKey(commit.RKey)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse star rkey: %w", err)
|
|
}
|
|
|
|
// Delete the star record
|
|
return db.DeleteStar(w.db, commit.DID, ownerDID, repository)
|
|
}
|
|
|
|
// Parse star record
|
|
var starRecord atproto.StarRecord
|
|
if commit.Record != nil {
|
|
recordBytes, err := json.Marshal(commit.Record)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal record: %w", err)
|
|
}
|
|
if err := json.Unmarshal(recordBytes, &starRecord); err != nil {
|
|
return fmt.Errorf("failed to unmarshal star: %w", err)
|
|
}
|
|
} else {
|
|
return nil
|
|
}
|
|
|
|
// Upsert the star record (idempotent - star count will be calculated on demand)
|
|
return db.UpsertStar(w.db, commit.DID, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt)
|
|
}
|
|
|
|
// JetstreamEvent represents a Jetstream event
|
|
type JetstreamEvent struct {
|
|
DID string `json:"did"`
|
|
TimeUS int64 `json:"time_us"`
|
|
Kind string `json:"kind"` // "commit", "identity", "account"
|
|
Commit *CommitEvent `json:"commit,omitempty"`
|
|
Identity *IdentityInfo `json:"identity,omitempty"`
|
|
Account *AccountInfo `json:"account,omitempty"`
|
|
}
|
|
|
|
// CommitEvent represents a commit event (create/update/delete)
|
|
type CommitEvent struct {
|
|
Rev string `json:"rev"`
|
|
Operation string `json:"operation"` // "create", "update", "delete"
|
|
Collection string `json:"collection"`
|
|
RKey string `json:"rkey"`
|
|
Record map[string]any `json:"record,omitempty"`
|
|
CID string `json:"cid,omitempty"`
|
|
DID string `json:"-"` // Set from parent event
|
|
}
|
|
|
|
// IdentityInfo represents an identity event
|
|
type IdentityInfo struct {
|
|
DID string `json:"did"`
|
|
Handle string `json:"handle"`
|
|
Seq int64 `json:"seq"`
|
|
Time string `json:"time"`
|
|
}
|
|
|
|
// AccountInfo represents an account status event
|
|
type AccountInfo struct {
|
|
Active bool `json:"active"`
|
|
DID string `json:"did"`
|
|
Seq int64 `json:"seq"`
|
|
Time string `json:"time"`
|
|
Status string `json:"status,omitempty"`
|
|
}
|
|
|
|
// Helper functions
|
|
|
|
func extractRepoFromRKey(rkey string) string {
|
|
// RKey format: <digest> or <repo>/<digest>
|
|
// For manifest, it's just the digest
|
|
parts := strings.Split(rkey, "/")
|
|
if len(parts) > 1 {
|
|
return parts[0]
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func calculateManifestSize(manifest *atproto.ManifestRecord) int64 {
|
|
var total int64
|
|
total += manifest.Config.Size
|
|
for _, layer := range manifest.Layers {
|
|
total += layer.Size
|
|
}
|
|
return total
|
|
}
|