mirror of
https://tangled.org/evan.jarrett.net/at-container-registry
synced 2026-04-27 03:35:10 +00:00
416 lines
14 KiB
Go
416 lines
14 KiB
Go
package jetstream
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"strings"
|
|
"time"
|
|
|
|
"atcr.io/pkg/appview/db"
|
|
"atcr.io/pkg/atproto"
|
|
)
|
|
|
|
// BackfillWorker uses com.atproto.sync.listReposByCollection to backfill historical data
|
|
type BackfillWorker struct {
|
|
db *sql.DB
|
|
client *atproto.Client
|
|
processor *Processor // Shared processor for DB operations
|
|
defaultHoldDID string // Default hold DID from AppView config (e.g., "did:web:hold01.atcr.io")
|
|
testMode bool // If true, suppress warnings for external holds
|
|
}
|
|
|
|
// BackfillState tracks backfill progress
|
|
type BackfillState struct {
|
|
Collection string
|
|
RepoCursor string // Cursor for listReposByCollection
|
|
CurrentDID string // Current DID being processed
|
|
RecordCursor string // Cursor for listRecords within current DID
|
|
ProcessedRepos int
|
|
ProcessedRecords int
|
|
Completed bool
|
|
}
|
|
|
|
// NewBackfillWorker creates a backfill worker using sync API
|
|
// defaultHoldDID should be in format "did:web:hold01.atcr.io"
|
|
// To find a hold's DID, visit: https://hold-url/.well-known/did.json
|
|
func NewBackfillWorker(database *sql.DB, relayEndpoint, defaultHoldDID string, testMode bool) (*BackfillWorker, error) {
|
|
// Create client for relay - used only for listReposByCollection
|
|
client := atproto.NewClient(relayEndpoint, "", "")
|
|
|
|
return &BackfillWorker{
|
|
db: database,
|
|
client: client, // This points to the relay
|
|
processor: NewProcessor(database, false), // No cache for batch processing
|
|
defaultHoldDID: defaultHoldDID,
|
|
testMode: testMode,
|
|
}, nil
|
|
}
|
|
|
|
// Start runs the backfill for all ATCR collections
|
|
func (b *BackfillWorker) Start(ctx context.Context) error {
|
|
slog.Info("Backfill: Starting sync-based backfill...")
|
|
|
|
// First, query and cache the default hold's captain record
|
|
if b.defaultHoldDID != "" {
|
|
slog.Info("Backfill querying default hold captain record", "hold_did", b.defaultHoldDID)
|
|
if err := b.queryCaptainRecord(ctx, b.defaultHoldDID); err != nil {
|
|
slog.Warn("Backfill failed to query default hold captain record", "error", err)
|
|
// Don't fail the whole backfill - just warn
|
|
}
|
|
}
|
|
|
|
collections := []string{
|
|
atproto.ManifestCollection, // io.atcr.manifest
|
|
atproto.TagCollection, // io.atcr.tag
|
|
atproto.StarCollection, // io.atcr.sailor.star
|
|
atproto.SailorProfileCollection, // io.atcr.sailor.profile
|
|
}
|
|
|
|
for _, collection := range collections {
|
|
slog.Info("Backfill processing collection", "collection", collection)
|
|
|
|
if err := b.backfillCollection(ctx, collection); err != nil {
|
|
return fmt.Errorf("failed to backfill collection %s: %w", collection, err)
|
|
}
|
|
|
|
slog.Info("Backfill completed collection", "collection", collection)
|
|
}
|
|
|
|
slog.Info("Backfill: All collections completed!")
|
|
return nil
|
|
}
|
|
|
|
// backfillCollection backfills a single collection
|
|
func (b *BackfillWorker) backfillCollection(ctx context.Context, collection string) error {
|
|
var repoCursor string
|
|
processedRepos := 0
|
|
processedRecords := 0
|
|
|
|
// Paginate through all repos with this collection
|
|
for {
|
|
// List repos that have records in this collection
|
|
result, err := b.client.ListReposByCollection(ctx, collection, 1000, repoCursor)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list repos: %w", err)
|
|
}
|
|
|
|
slog.Info("Backfill found repos with collection", "count", len(result.Repos), "collection", collection, "cursor", repoCursor)
|
|
|
|
// Process each repo (DID)
|
|
for _, repo := range result.Repos {
|
|
recordCount, err := b.backfillRepo(ctx, repo.DID, collection)
|
|
if err != nil {
|
|
slog.Warn("Backfill failed to backfill repo", "did", repo.DID, "error", err)
|
|
continue
|
|
}
|
|
|
|
processedRepos++
|
|
processedRecords += recordCount
|
|
|
|
if processedRepos%10 == 0 {
|
|
slog.Info("Backfill progress", "repos", processedRepos, "records", processedRecords)
|
|
}
|
|
}
|
|
|
|
// Check if there are more pages
|
|
if result.Cursor == "" {
|
|
break
|
|
}
|
|
|
|
repoCursor = result.Cursor
|
|
}
|
|
|
|
slog.Info("Backfill collection complete", "collection", collection, "repos", processedRepos, "records", processedRecords)
|
|
return nil
|
|
}
|
|
|
|
// backfillRepo backfills all records for a single repo/DID
|
|
func (b *BackfillWorker) backfillRepo(ctx context.Context, did, collection string) (int, error) {
|
|
// Ensure user exists in database and get their PDS endpoint
|
|
if err := b.processor.EnsureUser(ctx, did); err != nil {
|
|
return 0, fmt.Errorf("failed to ensure user: %w", err)
|
|
}
|
|
|
|
// Resolve DID to get user's PDS endpoint
|
|
pdsEndpoint, err := atproto.ResolveDIDToPDS(ctx, did)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("failed to resolve DID to PDS: %w", err)
|
|
}
|
|
|
|
// Create a client for this user's PDS with the user's DID
|
|
// This allows GetRecord to work properly with the repo parameter
|
|
pdsClient := atproto.NewClient(pdsEndpoint, did, "")
|
|
|
|
var recordCursor string
|
|
recordCount := 0
|
|
|
|
// Track which records exist on the PDS for reconciliation
|
|
var foundManifestDigests []string
|
|
var foundTags []struct{ Repository, Tag string }
|
|
foundStars := make(map[string]time.Time) // key: "ownerDID/repository", value: createdAt
|
|
|
|
// Paginate through all records for this repo
|
|
for {
|
|
records, cursor, err := pdsClient.ListRecordsForRepo(ctx, did, collection, 100, recordCursor)
|
|
if err != nil {
|
|
return recordCount, fmt.Errorf("failed to list records: %w", err)
|
|
}
|
|
|
|
// Process each record
|
|
for _, record := range records {
|
|
// Track what we found for deletion reconciliation
|
|
switch collection {
|
|
case atproto.ManifestCollection:
|
|
var manifestRecord atproto.ManifestRecord
|
|
if err := json.Unmarshal(record.Value, &manifestRecord); err == nil {
|
|
foundManifestDigests = append(foundManifestDigests, manifestRecord.Digest)
|
|
}
|
|
case atproto.TagCollection:
|
|
var tagRecord atproto.TagRecord
|
|
if err := json.Unmarshal(record.Value, &tagRecord); err == nil {
|
|
foundTags = append(foundTags, struct{ Repository, Tag string }{
|
|
Repository: tagRecord.Repository,
|
|
Tag: tagRecord.Tag,
|
|
})
|
|
}
|
|
case atproto.StarCollection:
|
|
var starRecord atproto.StarRecord
|
|
if err := json.Unmarshal(record.Value, &starRecord); err == nil {
|
|
key := fmt.Sprintf("%s/%s", starRecord.Subject.DID, starRecord.Subject.Repository)
|
|
foundStars[key] = starRecord.CreatedAt
|
|
}
|
|
}
|
|
|
|
if err := b.processRecord(ctx, did, collection, &record); err != nil {
|
|
slog.Warn("Backfill failed to process record", "uri", record.URI, "error", err)
|
|
continue
|
|
}
|
|
recordCount++
|
|
}
|
|
|
|
// Check if there are more pages
|
|
if cursor == "" {
|
|
break
|
|
}
|
|
|
|
recordCursor = cursor
|
|
}
|
|
|
|
// Reconcile deletions - remove records from DB that no longer exist on PDS
|
|
if err := b.reconcileDeletions(did, collection, foundManifestDigests, foundTags, foundStars); err != nil {
|
|
slog.Warn("Backfill failed to reconcile deletions", "did", did, "error", err)
|
|
}
|
|
|
|
// After processing manifests, clean up orphaned tags (tags pointing to non-existent manifests)
|
|
if collection == atproto.ManifestCollection {
|
|
if err := db.CleanupOrphanedTags(b.db, did); err != nil {
|
|
slog.Warn("Backfill failed to cleanup orphaned tags", "did", did, "error", err)
|
|
}
|
|
|
|
// Reconcile annotations - ensure they come from newest manifest per repository
|
|
// This fixes out-of-order backfill where older manifests can overwrite newer annotations
|
|
if err := b.reconcileAnnotations(ctx, did, pdsClient); err != nil {
|
|
slog.Warn("Backfill failed to reconcile annotations", "did", did, "error", err)
|
|
}
|
|
}
|
|
|
|
return recordCount, nil
|
|
}
|
|
|
|
// reconcileDeletions removes records from the database that no longer exist on the PDS
|
|
func (b *BackfillWorker) reconcileDeletions(did, collection string, foundManifestDigests []string, foundTags []struct{ Repository, Tag string }, foundStars map[string]time.Time) error {
|
|
switch collection {
|
|
case atproto.ManifestCollection:
|
|
// Get current manifests in DB
|
|
dbDigests, err := db.GetManifestDigestsForDID(b.db, did)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get DB manifests: %w", err)
|
|
}
|
|
|
|
// Delete manifests not found on PDS
|
|
if err := db.DeleteManifestsNotInList(b.db, did, foundManifestDigests); err != nil {
|
|
return fmt.Errorf("failed to delete orphaned manifests: %w", err)
|
|
}
|
|
|
|
// Log deletions
|
|
deleted := len(dbDigests) - len(foundManifestDigests)
|
|
if deleted > 0 {
|
|
slog.Info("Backfill deleted orphaned manifests", "count", deleted, "did", did)
|
|
}
|
|
|
|
case atproto.TagCollection:
|
|
// Get current tags in DB
|
|
dbTags, err := db.GetTagsForDID(b.db, did)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get DB tags: %w", err)
|
|
}
|
|
|
|
// Delete tags not found on PDS
|
|
if err := db.DeleteTagsNotInList(b.db, did, foundTags); err != nil {
|
|
return fmt.Errorf("failed to delete orphaned tags: %w", err)
|
|
}
|
|
|
|
// Log deletions
|
|
deleted := len(dbTags) - len(foundTags)
|
|
if deleted > 0 {
|
|
slog.Info("Backfill deleted orphaned tags", "count", deleted, "did", did)
|
|
}
|
|
|
|
case atproto.StarCollection:
|
|
// Reconcile stars - delete stars that no longer exist on PDS
|
|
// Star counts will be calculated on demand from the stars table
|
|
if err := db.DeleteStarsNotInList(b.db, did, foundStars); err != nil {
|
|
return fmt.Errorf("failed to delete orphaned stars: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// processRecord processes a single record and stores it in the database
|
|
func (b *BackfillWorker) processRecord(ctx context.Context, did, collection string, record *atproto.Record) error {
|
|
switch collection {
|
|
case atproto.ManifestCollection:
|
|
_, err := b.processor.ProcessManifest(context.Background(), did, record.Value)
|
|
return err
|
|
case atproto.TagCollection:
|
|
return b.processor.ProcessTag(context.Background(), did, record.Value)
|
|
case atproto.StarCollection:
|
|
return b.processor.ProcessStar(context.Background(), did, record.Value)
|
|
case atproto.SailorProfileCollection:
|
|
return b.processor.ProcessSailorProfile(ctx, did, record.Value, b.queryCaptainRecordWrapper)
|
|
default:
|
|
return fmt.Errorf("unsupported collection: %s", collection)
|
|
}
|
|
}
|
|
|
|
// queryCaptainRecordWrapper wraps queryCaptainRecord with backfill-specific logic
|
|
func (b *BackfillWorker) queryCaptainRecordWrapper(ctx context.Context, holdDID string) error {
|
|
if err := b.queryCaptainRecord(ctx, holdDID); err != nil {
|
|
// In test mode, only warn about default hold (local hold)
|
|
// External/production holds may not have captain records yet (dev ahead of prod)
|
|
if b.testMode && holdDID != b.defaultHoldDID {
|
|
// Suppress warning for external holds in test mode
|
|
return nil
|
|
}
|
|
slog.Warn("Backfill failed to query captain record for hold", "hold_did", holdDID, "error", err)
|
|
// Don't fail the whole backfill - just skip this hold
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// queryCaptainRecord queries a hold's captain record and caches it in the database
|
|
func (b *BackfillWorker) queryCaptainRecord(ctx context.Context, holdDID string) error {
|
|
// Check if we already have it cached (skip if recently updated)
|
|
existing, err := db.GetCaptainRecord(b.db, holdDID)
|
|
if err == nil && existing != nil {
|
|
// If cached within last hour, skip refresh
|
|
if time.Since(existing.UpdatedAt) < 1*time.Hour {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Resolve hold DID to URL
|
|
holdURL := atproto.ResolveHoldURL(holdDID)
|
|
|
|
// Create client for hold's PDS
|
|
holdClient := atproto.NewClient(holdURL, holdDID, "")
|
|
|
|
// Query captain record with retries (for Docker startup timing)
|
|
var record *atproto.Record
|
|
maxRetries := 3
|
|
for attempt := 1; attempt <= maxRetries; attempt++ {
|
|
record, err = holdClient.GetRecord(ctx, "io.atcr.hold.captain", "self")
|
|
if err == nil {
|
|
break
|
|
}
|
|
|
|
// Retry on connection errors (hold service might still be starting)
|
|
if attempt < maxRetries && strings.Contains(err.Error(), "connection refused") {
|
|
slog.Info("Backfill hold not ready, retrying", "attempt", attempt, "max_retries", maxRetries)
|
|
time.Sleep(2 * time.Second)
|
|
continue
|
|
}
|
|
|
|
return fmt.Errorf("failed to get captain record: %w", err)
|
|
}
|
|
|
|
// Parse captain record directly into db struct
|
|
var captainRecord db.HoldCaptainRecord
|
|
if err := json.Unmarshal(record.Value, &captainRecord); err != nil {
|
|
return fmt.Errorf("failed to parse captain record: %w", err)
|
|
}
|
|
|
|
// Set fields not from JSON
|
|
captainRecord.HoldDID = holdDID
|
|
captainRecord.UpdatedAt = time.Now()
|
|
|
|
if err := db.UpsertCaptainRecord(b.db, &captainRecord); err != nil {
|
|
return fmt.Errorf("failed to cache captain record: %w", err)
|
|
}
|
|
|
|
slog.Info("Backfill cached captain record for hold", "hold_did", holdDID, "owner_did", captainRecord.OwnerDID)
|
|
return nil
|
|
}
|
|
|
|
// reconcileAnnotations ensures annotations come from the newest manifest in each repository
|
|
// This fixes the out-of-order backfill issue where older manifests can overwrite newer annotations
|
|
func (b *BackfillWorker) reconcileAnnotations(ctx context.Context, did string, pdsClient *atproto.Client) error {
|
|
// Get all repositories for this DID
|
|
repositories, err := db.GetRepositoriesForDID(b.db, did)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get repositories: %w", err)
|
|
}
|
|
|
|
for _, repo := range repositories {
|
|
// Find newest manifest for this repository
|
|
newestManifest, err := db.GetNewestManifestForRepo(b.db, did, repo)
|
|
if err != nil {
|
|
slog.Warn("Backfill failed to get newest manifest for repo", "did", did, "repository", repo, "error", err)
|
|
continue // Skip on error
|
|
}
|
|
|
|
// Fetch the full manifest record from PDS using the digest as rkey
|
|
rkey := strings.TrimPrefix(newestManifest.Digest, "sha256:")
|
|
record, err := pdsClient.GetRecord(ctx, atproto.ManifestCollection, rkey)
|
|
if err != nil {
|
|
slog.Warn("Backfill failed to fetch manifest record for repo", "did", did, "repository", repo, "error", err)
|
|
continue // Skip on error
|
|
}
|
|
|
|
// Parse manifest record
|
|
var manifestRecord atproto.ManifestRecord
|
|
if err := json.Unmarshal(record.Value, &manifestRecord); err != nil {
|
|
slog.Warn("Backfill failed to parse manifest record for repo", "did", did, "repository", repo, "error", err)
|
|
continue
|
|
}
|
|
|
|
// Update annotations from newest manifest only
|
|
if len(manifestRecord.Annotations) > 0 {
|
|
// Filter out empty annotations
|
|
hasData := false
|
|
for _, value := range manifestRecord.Annotations {
|
|
if value != "" {
|
|
hasData = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if hasData {
|
|
err = db.UpsertRepositoryAnnotations(b.db, did, repo, manifestRecord.Annotations)
|
|
if err != nil {
|
|
slog.Warn("Backfill failed to reconcile annotations for repo", "did", did, "repository", repo, "error", err)
|
|
} else {
|
|
slog.Info("Backfill reconciled annotations for repo from newest manifest", "did", did, "repository", repo, "digest", newestManifest.Digest)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|