561 lines
20 KiB
Go
561 lines
20 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"html/template"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/distribution/distribution/v3/registry"
|
|
"github.com/distribution/distribution/v3/registry/handlers"
|
|
"github.com/spf13/cobra"
|
|
|
|
"atcr.io/pkg/appview/middleware"
|
|
"atcr.io/pkg/appview/storage"
|
|
"atcr.io/pkg/atproto"
|
|
"atcr.io/pkg/auth"
|
|
"atcr.io/pkg/auth/oauth"
|
|
"atcr.io/pkg/auth/token"
|
|
"atcr.io/pkg/logging"
|
|
|
|
// UI components
|
|
"atcr.io/pkg/appview"
|
|
"atcr.io/pkg/appview/db"
|
|
uihandlers "atcr.io/pkg/appview/handlers"
|
|
"atcr.io/pkg/appview/holdhealth"
|
|
"atcr.io/pkg/appview/jetstream"
|
|
"atcr.io/pkg/appview/readme"
|
|
"atcr.io/pkg/appview/routes"
|
|
"github.com/go-chi/chi/v5"
|
|
chimiddleware "github.com/go-chi/chi/v5/middleware"
|
|
)
|
|
|
|
var serveCmd = &cobra.Command{
|
|
Use: "serve",
|
|
Short: "Start the ATCR registry server",
|
|
Long: `Start the ATCR registry server with authentication endpoints.
|
|
|
|
Configuration is loaded from environment variables.
|
|
See .env.appview.example for available environment variables.`,
|
|
Args: cobra.NoArgs,
|
|
RunE: serveRegistry,
|
|
}
|
|
|
|
func init() {
|
|
// Replace the default serve command with our custom one
|
|
for i, cmd := range registry.RootCmd.Commands() {
|
|
if cmd.Name() == "serve" {
|
|
registry.RootCmd.Commands()[i] = serveCmd
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func serveRegistry(cmd *cobra.Command, args []string) error {
|
|
// Load configuration from environment variables
|
|
cfg, err := appview.LoadConfigFromEnv()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load config from environment: %w", err)
|
|
}
|
|
|
|
// Initialize structured logging
|
|
logging.InitLogger(cfg.LogLevel)
|
|
|
|
slog.Info("Configuration loaded successfully from environment")
|
|
|
|
// Initialize UI database first (required for all stores)
|
|
slog.Info("Initializing UI database", "path", cfg.UI.DatabasePath)
|
|
uiDatabase, uiReadOnlyDB, uiSessionStore := db.InitializeDatabase(cfg.UI.Enabled, cfg.UI.DatabasePath, cfg.UI.SkipDBMigrations)
|
|
if uiDatabase == nil {
|
|
return fmt.Errorf("failed to initialize UI database - required for session storage")
|
|
}
|
|
|
|
// Initialize hold health checker
|
|
slog.Info("Initializing hold health checker", "cache_ttl", cfg.Health.CacheTTL)
|
|
healthChecker := holdhealth.NewChecker(cfg.Health.CacheTTL)
|
|
|
|
// Initialize README fetcher for rendering repo page descriptions
|
|
readmeFetcher := readme.NewFetcher()
|
|
|
|
// Start background health check worker
|
|
startupDelay := 5 * time.Second // Wait for hold services to start (Docker compose)
|
|
dbAdapter := holdhealth.NewDBAdapter(uiDatabase)
|
|
healthWorker := holdhealth.NewWorkerWithStartupDelay(healthChecker, dbAdapter, cfg.Health.CheckInterval, startupDelay)
|
|
|
|
// Create context for worker lifecycle management
|
|
workerCtx, workerCancel := context.WithCancel(context.Background())
|
|
defer workerCancel() // Ensure context is cancelled on all exit paths
|
|
healthWorker.Start(workerCtx)
|
|
slog.Info("Hold health worker started", "startup_delay", startupDelay, "refresh_interval", cfg.Health.CheckInterval, "cache_ttl", cfg.Health.CacheTTL)
|
|
|
|
// Initialize OAuth components
|
|
slog.Info("Initializing OAuth components")
|
|
|
|
// Create OAuth session storage (SQLite-backed)
|
|
oauthStore := db.NewOAuthStore(uiDatabase)
|
|
slog.Info("Using SQLite for OAuth session storage")
|
|
|
|
// Create device store (SQLite-backed)
|
|
deviceStore := db.NewDeviceStore(uiDatabase)
|
|
slog.Info("Using SQLite for device storage")
|
|
|
|
// Get base URL and default hold DID from config
|
|
baseURL := cfg.Server.BaseURL
|
|
defaultHoldDID := cfg.Server.DefaultHoldDID
|
|
testMode := cfg.Server.TestMode
|
|
|
|
slog.Debug("Base URL for OAuth", "base_url", baseURL)
|
|
if testMode {
|
|
slog.Info("TEST_MODE enabled - will use HTTP for local DID resolution and transition:generic scope")
|
|
}
|
|
|
|
// Create OAuth client app (automatically configures confidential client for production)
|
|
desiredScopes := oauth.GetDefaultScopes(defaultHoldDID)
|
|
oauthClientApp, err := oauth.NewClientApp(baseURL, oauthStore, desiredScopes, cfg.Server.OAuthKeyPath, cfg.Server.ClientName)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create OAuth client app: %w", err)
|
|
}
|
|
if testMode {
|
|
slog.Info("Using OAuth scopes with transition:generic (test mode)")
|
|
} else {
|
|
slog.Info("Using OAuth scopes with RPC scope (production mode)")
|
|
}
|
|
|
|
// Invalidate sessions with mismatched scopes on startup
|
|
// This ensures all users have the latest required scopes after deployment
|
|
invalidatedCount, err := oauthStore.InvalidateSessionsWithMismatchedScopes(context.Background(), desiredScopes)
|
|
if err != nil {
|
|
slog.Warn("Failed to invalidate sessions with mismatched scopes", "error", err)
|
|
} else if invalidatedCount > 0 {
|
|
slog.Info("Invalidated OAuth sessions due to scope changes", "count", invalidatedCount)
|
|
}
|
|
|
|
// Create oauth token refresher
|
|
refresher := oauth.NewRefresher(oauthClientApp)
|
|
|
|
// Wire up UI session store to refresher so it can invalidate UI sessions on OAuth failures
|
|
if uiSessionStore != nil {
|
|
refresher.SetUISessionStore(uiSessionStore)
|
|
}
|
|
|
|
// Set global refresher for middleware
|
|
middleware.SetGlobalRefresher(refresher)
|
|
|
|
// Set global database for pull/push metrics tracking
|
|
middleware.SetGlobalDatabase(uiDatabase)
|
|
|
|
// Create RemoteHoldAuthorizer for hold authorization with caching
|
|
holdAuthorizer := auth.NewRemoteHoldAuthorizer(uiDatabase, testMode)
|
|
middleware.SetGlobalAuthorizer(holdAuthorizer)
|
|
slog.Info("Hold authorizer initialized with database caching")
|
|
|
|
// Initialize Jetstream workers (background services before HTTP routes)
|
|
initializeJetstream(uiDatabase, &cfg.Jetstream, defaultHoldDID, testMode, refresher)
|
|
|
|
// Create main chi router
|
|
mainRouter := chi.NewRouter()
|
|
|
|
// Add core middleware
|
|
mainRouter.Use(chimiddleware.Logger)
|
|
mainRouter.Use(chimiddleware.Recoverer)
|
|
mainRouter.Use(chimiddleware.GetHead) // Automatically handle HEAD requests for GET routes
|
|
mainRouter.Use(routes.CORSMiddleware())
|
|
|
|
// Load templates if UI is enabled
|
|
var uiTemplates *template.Template
|
|
if cfg.UI.Enabled {
|
|
var err error
|
|
uiTemplates, err = appview.Templates()
|
|
if err != nil {
|
|
slog.Warn("Failed to load UI templates", "error", err)
|
|
} else {
|
|
// Register UI routes with dependencies
|
|
routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{
|
|
Database: uiDatabase,
|
|
ReadOnlyDB: uiReadOnlyDB,
|
|
SessionStore: uiSessionStore,
|
|
OAuthClientApp: oauthClientApp,
|
|
OAuthStore: oauthStore,
|
|
Refresher: refresher,
|
|
BaseURL: baseURL,
|
|
DeviceStore: deviceStore,
|
|
HealthChecker: healthChecker,
|
|
ReadmeFetcher: readmeFetcher,
|
|
Templates: uiTemplates,
|
|
DefaultHoldDID: defaultHoldDID,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Create OAuth server
|
|
oauthServer := oauth.NewServer(oauthClientApp)
|
|
// Connect server to refresher for cache invalidation
|
|
oauthServer.SetRefresher(refresher)
|
|
// Connect UI session store for web login
|
|
if uiSessionStore != nil {
|
|
oauthServer.SetUISessionStore(uiSessionStore)
|
|
}
|
|
|
|
// Register OAuth post-auth callback for AppView business logic
|
|
// This decouples the OAuth package from AppView-specific dependencies
|
|
oauthServer.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error {
|
|
slog.Debug("OAuth post-auth callback", "component", "appview/callback", "did", did)
|
|
|
|
// Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety)
|
|
client := atproto.NewClientWithSessionProvider(pdsEndpoint, did, refresher)
|
|
|
|
// Note: Profile and crew setup now happen automatically via UserContext.EnsureUserSetup()
|
|
|
|
// Fetch user's profile record from PDS (contains blob references)
|
|
profileRecord, err := client.GetProfileRecord(ctx, did)
|
|
if err != nil {
|
|
slog.Warn("Failed to fetch profile record", "component", "appview/callback", "did", did, "error", err)
|
|
// Continue without avatar - set profileRecord to nil to skip avatar extraction
|
|
profileRecord = nil
|
|
}
|
|
|
|
// Construct avatar URL from blob CID using imgs.blue CDN (if profile record was fetched successfully)
|
|
avatarURL := ""
|
|
if profileRecord != nil && profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
|
|
avatarURL = atproto.BlobCDNURL(did, profileRecord.Avatar.Ref.Link)
|
|
slog.Debug("Constructed avatar URL", "component", "appview/callback", "avatar_url", avatarURL)
|
|
}
|
|
|
|
// Store user in database (with or without avatar)
|
|
// Use UpsertUser if we successfully fetched an avatar (to update existing users)
|
|
// Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
|
|
if avatarURL != "" {
|
|
err = db.UpsertUser(uiDatabase, &db.User{
|
|
DID: did,
|
|
Handle: handle,
|
|
PDSEndpoint: pdsEndpoint,
|
|
Avatar: avatarURL,
|
|
LastSeen: time.Now(),
|
|
})
|
|
} else {
|
|
err = db.UpsertUserIgnoreAvatar(uiDatabase, &db.User{
|
|
DID: did,
|
|
Handle: handle,
|
|
PDSEndpoint: pdsEndpoint,
|
|
Avatar: avatarURL,
|
|
LastSeen: time.Now(),
|
|
})
|
|
}
|
|
if err != nil {
|
|
slog.Warn("Failed to store user in database", "component", "appview/callback", "error", err)
|
|
return nil // Non-fatal
|
|
}
|
|
|
|
slog.Debug("Stored user", "component", "appview/callback", "did", did, "has_avatar", avatarURL != "")
|
|
|
|
// Migrate profile URL→DID if needed
|
|
profile, err := storage.GetProfile(ctx, client)
|
|
if err != nil {
|
|
slog.Warn("Failed to get profile", "component", "appview/callback", "did", did, "error", err)
|
|
return nil // Non-fatal
|
|
}
|
|
|
|
// Migrate profile URL→DID if needed (legacy migration, crew registration now handled by UserContext)
|
|
if profile != nil && profile.DefaultHold != "" {
|
|
// Check if defaultHold is a URL (needs migration)
|
|
if strings.HasPrefix(profile.DefaultHold, "http://") || strings.HasPrefix(profile.DefaultHold, "https://") {
|
|
slog.Debug("Migrating hold URL to DID", "component", "appview/callback", "did", did, "hold_url", profile.DefaultHold)
|
|
|
|
// Resolve URL to DID
|
|
holdDID := atproto.ResolveHoldDIDFromURL(profile.DefaultHold)
|
|
|
|
// Update profile with DID
|
|
profile.DefaultHold = holdDID
|
|
if err := storage.UpdateProfile(ctx, client, profile); err != nil {
|
|
slog.Warn("Failed to update profile with hold DID", "component", "appview/callback", "did", did, "error", err)
|
|
} else {
|
|
slog.Debug("Updated profile with hold DID", "component", "appview/callback", "hold_did", holdDID)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil // All errors are non-fatal, logged for debugging
|
|
})
|
|
|
|
// Create token issuer (also initializes auth keys if needed)
|
|
var issuer *token.Issuer
|
|
if cfg.Distribution.Auth["token"] != nil {
|
|
issuer, err = createTokenIssuer(cfg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create token issuer: %w", err)
|
|
}
|
|
|
|
// Log successful initialization
|
|
slog.Info("Auth keys initialized", "path", cfg.Auth.KeyPath)
|
|
}
|
|
|
|
// Create registry app (returns http.Handler)
|
|
ctx := context.Background()
|
|
app := handlers.NewApp(ctx, cfg.Distribution)
|
|
|
|
// Wrap registry app with middleware chain:
|
|
// 1. ExtractAuthMethod - extracts auth method from JWT and stores in context
|
|
// 2. UserContextMiddleware - builds UserContext with identity, permissions, service tokens
|
|
wrappedApp := middleware.ExtractAuthMethod(app)
|
|
|
|
// Create dependencies for UserContextMiddleware
|
|
userContextDeps := &auth.Dependencies{
|
|
Refresher: refresher,
|
|
Authorizer: holdAuthorizer,
|
|
DefaultHoldDID: defaultHoldDID,
|
|
}
|
|
wrappedApp = middleware.UserContextMiddleware(userContextDeps)(wrappedApp)
|
|
|
|
// Mount registry at /v2/
|
|
mainRouter.Handle("/v2/*", wrappedApp)
|
|
|
|
// Mount static files if UI is enabled
|
|
if uiSessionStore != nil && uiTemplates != nil {
|
|
// Register dynamic routes for root-level files (favicons, manifests, etc.)
|
|
staticHandler := appview.StaticHandler()
|
|
rootFiles, err := appview.StaticRootFiles()
|
|
if err != nil {
|
|
slog.Warn("Failed to scan static root files", "error", err)
|
|
} else {
|
|
for _, filename := range rootFiles {
|
|
// Create a closure to capture the filename
|
|
file := filename
|
|
mainRouter.Get("/"+file, func(w http.ResponseWriter, r *http.Request) {
|
|
// Serve the specific file from static root
|
|
r.URL.Path = "/" + file
|
|
staticHandler.ServeHTTP(w, r)
|
|
})
|
|
}
|
|
slog.Info("Registered dynamic root file routes", "count", len(rootFiles), "files", rootFiles)
|
|
}
|
|
|
|
// Mount subdirectory routes with clean paths
|
|
mainRouter.Handle("/css/*", http.StripPrefix("/css/", appview.StaticSubdir("css")))
|
|
mainRouter.Handle("/js/*", http.StripPrefix("/js/", appview.StaticSubdir("js")))
|
|
mainRouter.Handle("/static/*", http.StripPrefix("/static/", appview.StaticSubdir("static")))
|
|
|
|
slog.Info("UI enabled", "home", "/", "settings", "/settings")
|
|
}
|
|
|
|
// Mount OAuth endpoints
|
|
mainRouter.Get("/auth/oauth/authorize", oauthServer.ServeAuthorize)
|
|
mainRouter.Get("/auth/oauth/callback", oauthServer.ServeCallback)
|
|
|
|
// OAuth client metadata endpoint
|
|
mainRouter.Get("/oauth-client-metadata.json", func(w http.ResponseWriter, r *http.Request) {
|
|
config := oauthClientApp.Config
|
|
metadata := config.ClientMetadata()
|
|
|
|
// For confidential clients, ensure JWKS is included
|
|
// The indigo library should populate this automatically, but we explicitly set it here
|
|
// to be defensive and ensure it's always present for confidential clients
|
|
if config.IsConfidential() && metadata.JWKS == nil {
|
|
jwks := config.PublicJWKS()
|
|
metadata.JWKS = &jwks
|
|
}
|
|
|
|
// Convert indigo's metadata to map so we can add custom fields
|
|
metadataBytes, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
http.Error(w, "Failed to marshal metadata", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
var metadataMap map[string]interface{}
|
|
if err := json.Unmarshal(metadataBytes, &metadataMap); err != nil {
|
|
http.Error(w, "Failed to unmarshal metadata", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
// Add custom fields
|
|
metadataMap["client_name"] = cfg.Server.ClientName
|
|
metadataMap["client_uri"] = cfg.Server.BaseURL
|
|
metadataMap["logo_uri"] = cfg.Server.BaseURL + "/web-app-manifest-192x192.png"
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
// Limit caching to allow scope changes to propagate quickly
|
|
// PDS servers cache client metadata, so short max-age helps with updates
|
|
w.Header().Set("Cache-Control", "public, max-age=300")
|
|
if err := json.NewEncoder(w).Encode(metadataMap); err != nil {
|
|
http.Error(w, "Failed to encode metadata", http.StatusInternalServerError)
|
|
}
|
|
})
|
|
|
|
// Note: Indigo handles OAuth state cleanup internally via its store
|
|
|
|
// Mount auth endpoints if enabled
|
|
if issuer != nil {
|
|
// Basic Auth token endpoint (supports device secrets and app passwords)
|
|
tokenHandler := token.NewHandler(issuer, deviceStore)
|
|
|
|
// Register OAuth session validator for device auth validation
|
|
// This validates OAuth sessions are usable (not just exist) before issuing tokens
|
|
// Prevents the flood of errors when a stale session is discovered during push
|
|
tokenHandler.SetOAuthSessionValidator(refresher)
|
|
|
|
// Register token post-auth callback
|
|
// Note: Profile and crew setup now happen automatically via UserContext.EnsureUserSetup()
|
|
tokenHandler.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error {
|
|
slog.Debug("Token post-auth callback", "component", "appview/callback", "did", did)
|
|
return nil
|
|
})
|
|
|
|
mainRouter.Get("/auth/token", tokenHandler.ServeHTTP)
|
|
|
|
// Device authorization endpoints (public)
|
|
mainRouter.Handle("/auth/device/code", &uihandlers.DeviceCodeHandler{
|
|
Store: deviceStore,
|
|
AppViewBaseURL: baseURL,
|
|
})
|
|
mainRouter.Handle("/auth/device/token", &uihandlers.DeviceTokenHandler{
|
|
Store: deviceStore,
|
|
})
|
|
|
|
slog.Info("Auth endpoints enabled",
|
|
"basic_auth", "/auth/token",
|
|
"device_code", "/auth/device/code",
|
|
"device_token", "/auth/device/token",
|
|
"oauth_authorize", "/auth/oauth/authorize",
|
|
"oauth_callback", "/auth/oauth/callback",
|
|
"oauth_metadata", "/client-metadata.json")
|
|
}
|
|
|
|
// Register credential helper version API (public endpoint)
|
|
mainRouter.Handle("/api/credential-helper/version", &uihandlers.CredentialHelperVersionHandler{
|
|
Version: cfg.CredentialHelper.Version,
|
|
TangledRepo: cfg.CredentialHelper.TangledRepo,
|
|
Checksums: cfg.CredentialHelper.Checksums,
|
|
})
|
|
if cfg.CredentialHelper.Version != "" {
|
|
slog.Info("Credential helper version API enabled",
|
|
"endpoint", "/api/credential-helper/version",
|
|
"version", cfg.CredentialHelper.Version)
|
|
}
|
|
|
|
// Create HTTP server
|
|
server := &http.Server{
|
|
Addr: cfg.Server.Addr,
|
|
Handler: mainRouter,
|
|
}
|
|
|
|
// Handle graceful shutdown
|
|
stop := make(chan os.Signal, 1)
|
|
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
|
|
|
// Start server in goroutine
|
|
errChan := make(chan error, 1)
|
|
go func() {
|
|
slog.Info("Starting registry server", "addr", cfg.Server.Addr)
|
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
errChan <- err
|
|
}
|
|
}()
|
|
|
|
// Wait for shutdown signal or error
|
|
select {
|
|
case <-stop:
|
|
slog.Info("Shutting down registry server")
|
|
|
|
// Stop health worker first
|
|
slog.Info("Stopping hold health worker")
|
|
healthWorker.Stop()
|
|
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := server.Shutdown(shutdownCtx); err != nil {
|
|
return fmt.Errorf("server shutdown error: %w", err)
|
|
}
|
|
case err := <-errChan:
|
|
// Stop health worker on error (workerCancel called by defer)
|
|
healthWorker.Stop()
|
|
return fmt.Errorf("server error: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// createTokenIssuer creates a token issuer for auth handlers
|
|
func createTokenIssuer(cfg *appview.Config) (*token.Issuer, error) {
|
|
return token.NewIssuer(
|
|
cfg.Auth.KeyPath,
|
|
cfg.Auth.ServiceName, // issuer
|
|
cfg.Auth.ServiceName, // service
|
|
cfg.Auth.TokenExpiration,
|
|
)
|
|
}
|
|
|
|
// initializeJetstream initializes the Jetstream workers for real-time events and backfill
|
|
func initializeJetstream(database *sql.DB, jetstreamCfg *appview.JetstreamConfig, defaultHoldDID string, testMode bool, refresher *oauth.Refresher) {
|
|
// Start Jetstream worker
|
|
jetstreamURL := jetstreamCfg.URL
|
|
|
|
// Start real-time Jetstream worker with cursor tracking for reconnects
|
|
go func() {
|
|
var lastCursor int64 = 0 // Start from now on first connect
|
|
for {
|
|
worker := jetstream.NewWorker(database, jetstreamURL, lastCursor)
|
|
if err := worker.Start(context.Background()); err != nil {
|
|
// Save cursor from this connection for next reconnect
|
|
lastCursor = worker.GetLastCursor()
|
|
slog.Warn("Jetstream real-time worker error, reconnecting", "component", "jetstream", "error", err, "reconnect_delay", "10s")
|
|
time.Sleep(10 * time.Second)
|
|
}
|
|
}
|
|
}()
|
|
slog.Info("Jetstream real-time worker started", "component", "jetstream")
|
|
|
|
// Start backfill worker (enabled by default, set ATCR_BACKFILL_ENABLED=false to disable)
|
|
if jetstreamCfg.BackfillEnabled {
|
|
// Get relay endpoint for sync API (defaults to Bluesky's relay)
|
|
relayEndpoint := jetstreamCfg.RelayEndpoint
|
|
|
|
backfillWorker, err := jetstream.NewBackfillWorker(database, relayEndpoint, defaultHoldDID, testMode, refresher)
|
|
if err != nil {
|
|
slog.Warn("Failed to create backfill worker", "component", "jetstream/backfill", "error", err)
|
|
} else {
|
|
// Run initial backfill with startup delay for Docker compose
|
|
go func() {
|
|
// Wait for hold service to be ready (Docker startup race condition)
|
|
startupDelay := 5 * time.Second
|
|
slog.Info("Waiting for services to be ready", "component", "jetstream/backfill", "startup_delay", startupDelay)
|
|
time.Sleep(startupDelay)
|
|
|
|
slog.Info("Starting sync-based backfill", "component", "jetstream/backfill", "relay_endpoint", relayEndpoint)
|
|
if err := backfillWorker.Start(context.Background()); err != nil {
|
|
slog.Warn("Backfill finished with error", "component", "jetstream/backfill", "error", err)
|
|
} else {
|
|
slog.Info("Backfill completed successfully", "component", "jetstream/backfill")
|
|
}
|
|
}()
|
|
|
|
// Start periodic backfill scheduler
|
|
interval := jetstreamCfg.BackfillInterval
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
slog.Info("Starting periodic backfill", "component", "jetstream/backfill", "interval", interval)
|
|
if err := backfillWorker.Start(context.Background()); err != nil {
|
|
slog.Warn("Periodic backfill finished with error", "component", "jetstream/backfill", "error", err)
|
|
} else {
|
|
slog.Info("Periodic backfill completed successfully", "component", "jetstream/backfill")
|
|
}
|
|
}
|
|
}()
|
|
slog.Info("Periodic backfill scheduler started", "component", "jetstream/backfill", "interval", interval)
|
|
}
|
|
}
|
|
}
|