Files

617 lines
21 KiB
Go

package jetstream
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"time"
"atcr.io/pkg/appview/db"
"atcr.io/pkg/appview/readme"
"atcr.io/pkg/atproto"
"atcr.io/pkg/auth/oauth"
)
// BackfillWorker uses com.atproto.sync.listReposByCollection to backfill historical data
type BackfillWorker struct {
db *sql.DB
client *atproto.Client
processor *Processor // Shared processor for DB operations
defaultHoldDID string // Default hold DID from AppView config (e.g., "did:web:hold01.atcr.io")
testMode bool // If true, suppress warnings for external holds
refresher *oauth.Refresher // OAuth refresher for PDS writes (optional, can be nil)
}
// BackfillState tracks backfill progress
type BackfillState struct {
Collection string
RepoCursor string // Cursor for listReposByCollection
CurrentDID string // Current DID being processed
RecordCursor string // Cursor for listRecords within current DID
ProcessedRepos int
ProcessedRecords int
Completed bool
}
// NewBackfillWorker creates a backfill worker using sync API
// defaultHoldDID should be in format "did:web:hold01.atcr.io"
// To find a hold's DID, visit: https://hold-url/.well-known/did.json
// refresher is optional - if provided, backfill will try to update PDS records when fetching README content
func NewBackfillWorker(database *sql.DB, relayEndpoint, defaultHoldDID string, testMode bool, refresher *oauth.Refresher) (*BackfillWorker, error) {
// Create client for relay - used only for listReposByCollection
client := atproto.NewClient(relayEndpoint, "", "")
return &BackfillWorker{
db: database,
client: client, // This points to the relay
processor: NewProcessor(database, false), // No cache for batch processing
defaultHoldDID: defaultHoldDID,
testMode: testMode,
refresher: refresher,
}, nil
}
// Start runs the backfill for all ATCR collections
func (b *BackfillWorker) Start(ctx context.Context) error {
slog.Info("Backfill: Starting sync-based backfill...")
// First, query and cache the default hold's captain record
if b.defaultHoldDID != "" {
slog.Info("Backfill querying default hold captain record", "hold_did", b.defaultHoldDID)
if err := b.queryCaptainRecord(ctx, b.defaultHoldDID); err != nil {
slog.Warn("Backfill failed to query default hold captain record", "error", err)
// Don't fail the whole backfill - just warn
}
}
collections := []string{
atproto.ManifestCollection, // io.atcr.manifest
atproto.TagCollection, // io.atcr.tag
atproto.StarCollection, // io.atcr.sailor.star
atproto.SailorProfileCollection, // io.atcr.sailor.profile
atproto.RepoPageCollection, // io.atcr.repo.page
}
for _, collection := range collections {
slog.Info("Backfill processing collection", "collection", collection)
if err := b.backfillCollection(ctx, collection); err != nil {
return fmt.Errorf("failed to backfill collection %s: %w", collection, err)
}
slog.Info("Backfill completed collection", "collection", collection)
}
slog.Info("Backfill: All collections completed!")
return nil
}
// backfillCollection backfills a single collection
func (b *BackfillWorker) backfillCollection(ctx context.Context, collection string) error {
var repoCursor string
processedRepos := 0
processedRecords := 0
// Paginate through all repos with this collection
for {
// List repos that have records in this collection
result, err := b.client.ListReposByCollection(ctx, collection, 1000, repoCursor)
if err != nil {
return fmt.Errorf("failed to list repos: %w", err)
}
slog.Info("Backfill found repos with collection", "count", len(result.Repos), "collection", collection, "cursor", repoCursor)
// Process each repo (DID)
for _, repo := range result.Repos {
recordCount, err := b.backfillRepo(ctx, repo.DID, collection)
if err != nil {
slog.Warn("Backfill failed to backfill repo", "did", repo.DID, "error", err)
continue
}
processedRepos++
processedRecords += recordCount
if processedRepos%10 == 0 {
slog.Info("Backfill progress", "repos", processedRepos, "records", processedRecords)
}
}
// Check if there are more pages
if result.Cursor == "" {
break
}
repoCursor = result.Cursor
}
slog.Info("Backfill collection complete", "collection", collection, "repos", processedRepos, "records", processedRecords)
return nil
}
// backfillRepo backfills all records for a single repo/DID
func (b *BackfillWorker) backfillRepo(ctx context.Context, did, collection string) (int, error) {
// Ensure user exists in database and get their PDS endpoint
if err := b.processor.EnsureUser(ctx, did); err != nil {
return 0, fmt.Errorf("failed to ensure user: %w", err)
}
// Resolve DID to get user's PDS endpoint
pdsEndpoint, err := atproto.ResolveDIDToPDS(ctx, did)
if err != nil {
return 0, fmt.Errorf("failed to resolve DID to PDS: %w", err)
}
// Create a client for this user's PDS with the user's DID
// This allows GetRecord to work properly with the repo parameter
pdsClient := atproto.NewClient(pdsEndpoint, did, "")
var recordCursor string
recordCount := 0
// Track which records exist on the PDS for reconciliation
var foundManifestDigests []string
var foundTags []struct{ Repository, Tag string }
foundStars := make(map[string]time.Time) // key: "ownerDID/repository", value: createdAt
// Paginate through all records for this repo
for {
records, cursor, err := pdsClient.ListRecordsForRepo(ctx, did, collection, 100, recordCursor)
if err != nil {
return recordCount, fmt.Errorf("failed to list records: %w", err)
}
// Process each record
for _, record := range records {
// Track what we found for deletion reconciliation
switch collection {
case atproto.ManifestCollection:
var manifestRecord atproto.ManifestRecord
if err := json.Unmarshal(record.Value, &manifestRecord); err == nil {
foundManifestDigests = append(foundManifestDigests, manifestRecord.Digest)
}
case atproto.TagCollection:
var tagRecord atproto.TagRecord
if err := json.Unmarshal(record.Value, &tagRecord); err == nil {
foundTags = append(foundTags, struct{ Repository, Tag string }{
Repository: tagRecord.Repository,
Tag: tagRecord.Tag,
})
}
case atproto.StarCollection:
var starRecord atproto.StarRecord
if err := json.Unmarshal(record.Value, &starRecord); err == nil {
key := fmt.Sprintf("%s/%s", starRecord.Subject.DID, starRecord.Subject.Repository)
foundStars[key] = starRecord.CreatedAt
}
}
if err := b.processRecord(ctx, did, collection, &record); err != nil {
slog.Warn("Backfill failed to process record", "uri", record.URI, "error", err)
continue
}
recordCount++
}
// Check if there are more pages
if cursor == "" {
break
}
recordCursor = cursor
}
// Reconcile deletions - remove records from DB that no longer exist on PDS
if err := b.reconcileDeletions(did, collection, foundManifestDigests, foundTags, foundStars); err != nil {
slog.Warn("Backfill failed to reconcile deletions", "did", did, "error", err)
}
// After processing manifests, clean up orphaned tags (tags pointing to non-existent manifests)
if collection == atproto.ManifestCollection {
if err := db.CleanupOrphanedTags(b.db, did); err != nil {
slog.Warn("Backfill failed to cleanup orphaned tags", "did", did, "error", err)
}
// Reconcile annotations - ensure they come from newest manifest per repository
// This fixes out-of-order backfill where older manifests can overwrite newer annotations
if err := b.reconcileAnnotations(ctx, did, pdsClient); err != nil {
slog.Warn("Backfill failed to reconcile annotations", "did", did, "error", err)
}
}
// After processing repo pages, fetch descriptions from external sources if empty
if collection == atproto.RepoPageCollection {
if err := b.reconcileRepoPageDescriptions(ctx, did, pdsEndpoint); err != nil {
slog.Warn("Backfill failed to reconcile repo page descriptions", "did", did, "error", err)
}
}
return recordCount, nil
}
// reconcileDeletions removes records from the database that no longer exist on the PDS
func (b *BackfillWorker) reconcileDeletions(did, collection string, foundManifestDigests []string, foundTags []struct{ Repository, Tag string }, foundStars map[string]time.Time) error {
switch collection {
case atproto.ManifestCollection:
// Get current manifests in DB
dbDigests, err := db.GetManifestDigestsForDID(b.db, did)
if err != nil {
return fmt.Errorf("failed to get DB manifests: %w", err)
}
// Delete manifests not found on PDS
if err := db.DeleteManifestsNotInList(b.db, did, foundManifestDigests); err != nil {
return fmt.Errorf("failed to delete orphaned manifests: %w", err)
}
// Log deletions
deleted := len(dbDigests) - len(foundManifestDigests)
if deleted > 0 {
slog.Info("Backfill deleted orphaned manifests", "count", deleted, "did", did)
}
case atproto.TagCollection:
// Get current tags in DB
dbTags, err := db.GetTagsForDID(b.db, did)
if err != nil {
return fmt.Errorf("failed to get DB tags: %w", err)
}
// Delete tags not found on PDS
if err := db.DeleteTagsNotInList(b.db, did, foundTags); err != nil {
return fmt.Errorf("failed to delete orphaned tags: %w", err)
}
// Log deletions
deleted := len(dbTags) - len(foundTags)
if deleted > 0 {
slog.Info("Backfill deleted orphaned tags", "count", deleted, "did", did)
}
case atproto.StarCollection:
// Reconcile stars - delete stars that no longer exist on PDS
// Star counts will be calculated on demand from the stars table
if err := db.DeleteStarsNotInList(b.db, did, foundStars); err != nil {
return fmt.Errorf("failed to delete orphaned stars: %w", err)
}
}
return nil
}
// processRecord processes a single record and stores it in the database
func (b *BackfillWorker) processRecord(ctx context.Context, did, collection string, record *atproto.Record) error {
switch collection {
case atproto.ManifestCollection:
_, err := b.processor.ProcessManifest(context.Background(), did, record.Value)
return err
case atproto.TagCollection:
return b.processor.ProcessTag(context.Background(), did, record.Value)
case atproto.StarCollection:
return b.processor.ProcessStar(context.Background(), did, record.Value)
case atproto.SailorProfileCollection:
return b.processor.ProcessSailorProfile(ctx, did, record.Value, b.queryCaptainRecordWrapper)
case atproto.RepoPageCollection:
// rkey is extracted from the record URI, but for repo pages we use Repository field
return b.processor.ProcessRepoPage(ctx, did, record.URI, record.Value, false)
default:
return fmt.Errorf("unsupported collection: %s", collection)
}
}
// queryCaptainRecordWrapper wraps queryCaptainRecord with backfill-specific logic
func (b *BackfillWorker) queryCaptainRecordWrapper(ctx context.Context, holdDID string) error {
if err := b.queryCaptainRecord(ctx, holdDID); err != nil {
// In test mode, only warn about default hold (local hold)
// External/production holds may not have captain records yet (dev ahead of prod)
if b.testMode && holdDID != b.defaultHoldDID {
// Suppress warning for external holds in test mode
return nil
}
slog.Warn("Backfill failed to query captain record for hold", "hold_did", holdDID, "error", err)
// Don't fail the whole backfill - just skip this hold
return nil
}
return nil
}
// queryCaptainRecord queries a hold's captain record and caches it in the database
func (b *BackfillWorker) queryCaptainRecord(ctx context.Context, holdDID string) error {
// Check if we already have it cached (skip if recently updated)
existing, err := db.GetCaptainRecord(b.db, holdDID)
if err == nil && existing != nil {
// If cached within last hour, skip refresh
if time.Since(existing.UpdatedAt) < 1*time.Hour {
return nil
}
}
// Resolve hold DID to URL
holdURL := atproto.ResolveHoldURL(holdDID)
// Create client for hold's PDS
holdClient := atproto.NewClient(holdURL, holdDID, "")
// Query captain record with retries (for Docker startup timing)
var record *atproto.Record
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
record, err = holdClient.GetRecord(ctx, "io.atcr.hold.captain", "self")
if err == nil {
break
}
// Retry on connection errors (hold service might still be starting)
if attempt < maxRetries && strings.Contains(err.Error(), "connection refused") {
slog.Info("Backfill hold not ready, retrying", "attempt", attempt, "max_retries", maxRetries)
time.Sleep(2 * time.Second)
continue
}
return fmt.Errorf("failed to get captain record: %w", err)
}
// Parse captain record directly into db struct
var captainRecord db.HoldCaptainRecord
if err := json.Unmarshal(record.Value, &captainRecord); err != nil {
return fmt.Errorf("failed to parse captain record: %w", err)
}
// Set fields not from JSON
captainRecord.HoldDID = holdDID
captainRecord.UpdatedAt = time.Now()
if err := db.UpsertCaptainRecord(b.db, &captainRecord); err != nil {
return fmt.Errorf("failed to cache captain record: %w", err)
}
slog.Info("Backfill cached captain record for hold", "hold_did", holdDID, "owner_did", captainRecord.OwnerDID)
return nil
}
// reconcileAnnotations ensures annotations come from the newest manifest in each repository
// This fixes the out-of-order backfill issue where older manifests can overwrite newer annotations
func (b *BackfillWorker) reconcileAnnotations(ctx context.Context, did string, pdsClient *atproto.Client) error {
// Get all repositories for this DID
repositories, err := db.GetRepositoriesForDID(b.db, did)
if err != nil {
return fmt.Errorf("failed to get repositories: %w", err)
}
for _, repo := range repositories {
// Find newest manifest for this repository
newestManifest, err := db.GetNewestManifestForRepo(b.db, did, repo)
if err != nil {
slog.Warn("Backfill failed to get newest manifest for repo", "did", did, "repository", repo, "error", err)
continue // Skip on error
}
// Fetch the full manifest record from PDS using the digest as rkey
rkey := strings.TrimPrefix(newestManifest.Digest, "sha256:")
record, err := pdsClient.GetRecord(ctx, atproto.ManifestCollection, rkey)
if err != nil {
slog.Warn("Backfill failed to fetch manifest record for repo", "did", did, "repository", repo, "error", err)
continue // Skip on error
}
// Parse manifest record
var manifestRecord atproto.ManifestRecord
if err := json.Unmarshal(record.Value, &manifestRecord); err != nil {
slog.Warn("Backfill failed to parse manifest record for repo", "did", did, "repository", repo, "error", err)
continue
}
// Update annotations from newest manifest only
if len(manifestRecord.Annotations) > 0 {
// Filter out empty annotations
hasData := false
for _, value := range manifestRecord.Annotations {
if value != "" {
hasData = true
break
}
}
if hasData {
err = db.UpsertRepositoryAnnotations(b.db, did, repo, manifestRecord.Annotations)
if err != nil {
slog.Warn("Backfill failed to reconcile annotations for repo", "did", did, "repository", repo, "error", err)
} else {
slog.Info("Backfill reconciled annotations for repo from newest manifest", "did", did, "repository", repo, "digest", newestManifest.Digest)
}
}
}
}
return nil
}
// reconcileRepoPageDescriptions fetches README content from external sources for repo pages with empty descriptions
// If the user has an OAuth session, it updates the PDS record (source of truth)
// Otherwise, it just stores the fetched content in the database
func (b *BackfillWorker) reconcileRepoPageDescriptions(ctx context.Context, did, pdsEndpoint string) error {
// Get all repo pages for this DID
repoPages, err := db.GetRepoPagesByDID(b.db, did)
if err != nil {
return fmt.Errorf("failed to get repo pages: %w", err)
}
for _, page := range repoPages {
// Skip pages that already have a description
if page.Description != "" {
continue
}
// Get annotations from the repository's manifest
annotations, err := db.GetRepositoryAnnotations(b.db, did, page.Repository)
if err != nil {
slog.Debug("Failed to get annotations for repo page", "did", did, "repository", page.Repository, "error", err)
continue
}
// Try to fetch README content from external sources
description := b.fetchReadmeContent(ctx, annotations)
if description == "" {
// No README content available, skip
continue
}
slog.Info("Fetched README for repo page", "did", did, "repository", page.Repository, "descriptionLength", len(description))
// Try to update PDS if we have OAuth session
pdsUpdated := false
if b.refresher != nil {
if err := b.updateRepoPageInPDS(ctx, did, pdsEndpoint, page.Repository, description, page.AvatarCID); err != nil {
slog.Debug("Could not update repo page in PDS, falling back to DB-only", "did", did, "repository", page.Repository, "error", err)
} else {
pdsUpdated = true
slog.Info("Updated repo page in PDS with fetched description", "did", did, "repository", page.Repository)
}
}
// Always update database with the fetched content
if err := db.UpsertRepoPage(b.db, did, page.Repository, description, page.AvatarCID, page.CreatedAt, time.Now()); err != nil {
slog.Warn("Failed to update repo page in database", "did", did, "repository", page.Repository, "error", err)
} else if !pdsUpdated {
slog.Info("Updated repo page in database (PDS not updated)", "did", did, "repository", page.Repository)
}
}
return nil
}
// fetchReadmeContent attempts to fetch README content from external sources based on annotations
// Priority: io.atcr.readme annotation > derived from org.opencontainers.image.source
func (b *BackfillWorker) fetchReadmeContent(ctx context.Context, annotations map[string]string) string {
// Create a context with timeout for README fetching
fetchCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// Priority 1: Direct README URL from io.atcr.readme annotation
if readmeURL := annotations["io.atcr.readme"]; readmeURL != "" {
content, err := b.fetchRawReadme(fetchCtx, readmeURL)
if err != nil {
slog.Debug("Failed to fetch README from io.atcr.readme annotation", "url", readmeURL, "error", err)
} else if content != "" {
return content
}
}
// Priority 2: Derive README URL from org.opencontainers.image.source
if sourceURL := annotations["org.opencontainers.image.source"]; sourceURL != "" {
// Try main branch first, then master
for _, branch := range []string{"main", "master"} {
readmeURL := readme.DeriveReadmeURL(sourceURL, branch)
if readmeURL == "" {
continue
}
content, err := b.fetchRawReadme(fetchCtx, readmeURL)
if err != nil {
// Only log non-404 errors (404 is expected when trying main vs master)
if !readme.Is404(err) {
slog.Debug("Failed to fetch README from source URL", "url", readmeURL, "branch", branch, "error", err)
}
continue
}
if content != "" {
return content
}
}
}
return ""
}
// fetchRawReadme fetches raw markdown content from a URL
func (b *BackfillWorker) fetchRawReadme(ctx context.Context, readmeURL string) (string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", readmeURL, nil)
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("User-Agent", "ATCR-Backfill-README-Fetcher/1.0")
client := &http.Client{
Timeout: 10 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
if len(via) >= 5 {
return fmt.Errorf("too many redirects")
}
return nil
},
}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("failed to fetch URL: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("status %d", resp.StatusCode)
}
// Limit content size to 100KB
limitedReader := io.LimitReader(resp.Body, 100*1024)
content, err := io.ReadAll(limitedReader)
if err != nil {
return "", fmt.Errorf("failed to read response body: %w", err)
}
return string(content), nil
}
// updateRepoPageInPDS updates the repo page record in the user's PDS using OAuth
func (b *BackfillWorker) updateRepoPageInPDS(ctx context.Context, did, pdsEndpoint, repository, description, avatarCID string) error {
if b.refresher == nil {
return fmt.Errorf("no OAuth refresher available")
}
// Create ATProto client with session provider
pdsClient := atproto.NewClientWithSessionProvider(pdsEndpoint, did, b.refresher)
// Get existing repo page record to preserve other fields
existingRecord, err := pdsClient.GetRecord(ctx, atproto.RepoPageCollection, repository)
var createdAt time.Time
var avatarRef *atproto.ATProtoBlobRef
if err == nil && existingRecord != nil {
// Parse existing record
var existingPage atproto.RepoPageRecord
if err := json.Unmarshal(existingRecord.Value, &existingPage); err == nil {
createdAt = existingPage.CreatedAt
avatarRef = existingPage.Avatar
}
}
if createdAt.IsZero() {
createdAt = time.Now()
}
// Create updated repo page record
repoPage := &atproto.RepoPageRecord{
Type: atproto.RepoPageCollection,
Repository: repository,
Description: description,
Avatar: avatarRef,
CreatedAt: createdAt,
UpdatedAt: time.Now(),
}
// Write to PDS - this will use DoWithSession internally
_, err = pdsClient.PutRecord(ctx, atproto.RepoPageCollection, repository, repoPage)
if err != nil {
return fmt.Errorf("failed to write to PDS: %w", err)
}
return nil
}