Files
at-container-registry/pkg/appview/server.go

728 lines
25 KiB
Go

package appview
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"html/template"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/distribution/distribution/v3/registry/handlers"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"atcr.io/pkg/appview/db"
"atcr.io/pkg/appview/holdhealth"
"atcr.io/pkg/appview/jetstream"
"atcr.io/pkg/appview/middleware"
"atcr.io/pkg/appview/readme"
"atcr.io/pkg/appview/routes"
"atcr.io/pkg/appview/storage"
"atcr.io/pkg/atproto"
"atcr.io/pkg/auth"
"atcr.io/pkg/auth/oauth"
"atcr.io/pkg/auth/token"
"atcr.io/pkg/logging"
indigooauth "github.com/bluesky-social/indigo/atproto/auth/oauth"
)
// OAuthPostAuthHook is called after the default OAuth post-auth logic
// (profile creation, avatar fetch, crew registration). Hooks added after
// NewAppViewServer but before the first request work correctly.
type OAuthPostAuthHook func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error
// TokenPostAuthHook is called after the default token post-auth logic
// (profile creation). Hooks added after NewAppViewServer but before the
// first request work correctly.
type TokenPostAuthHook func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error
// AppViewServer is the AppView service with an exposed router for extensibility.
// Consumers can add routes to Router and hooks before calling Serve().
type AppViewServer struct {
// Router is the chi router. Add routes before calling Serve().
Router chi.Router
// Config is the AppView configuration.
Config *Config
// Database is the read-write SQLite database.
Database *sql.DB
// ReadOnlyDB is the read-only SQLite database connection.
ReadOnlyDB *sql.DB
// SessionStore manages web UI sessions.
SessionStore *db.SessionStore
// DeviceStore manages device authorization flows.
DeviceStore *db.DeviceStore
// OAuthStore manages OAuth session persistence.
OAuthStore *db.OAuthStore
// OAuthServer handles OAuth authorization and callback endpoints.
OAuthServer *oauth.Server
// OAuthClientApp is the indigo OAuth client application.
OAuthClientApp *indigooauth.ClientApp
// Refresher manages OAuth session refresh and caching.
Refresher *oauth.Refresher
// Templates are the parsed HTML templates.
Templates *template.Template
// HealthChecker checks hold service health.
HealthChecker *holdhealth.Checker
// ReadmeFetcher fetches README content for repository pages.
ReadmeFetcher *readme.Fetcher
// TokenIssuer issues registry JWTs (nil if auth is not configured).
TokenIssuer *token.Issuer
// HoldAuthorizer checks hold access permissions.
HoldAuthorizer auth.HoldAuthorizer
// Private fields for lifecycle management
oauthHooks []OAuthPostAuthHook
tokenHooks []TokenPostAuthHook
httpServer *http.Server
healthWorker *holdhealth.Worker
workerCancel context.CancelFunc
branding *BrandingOverrides
}
// AddOAuthPostAuthHook registers a hook that runs after the default OAuth
// post-auth logic. Multiple hooks run in registration order.
func (s *AppViewServer) AddOAuthPostAuthHook(hook OAuthPostAuthHook) {
s.oauthHooks = append(s.oauthHooks, hook)
}
// AddTokenPostAuthHook registers a hook that runs after the default token
// post-auth logic. Multiple hooks run in registration order.
func (s *AppViewServer) AddTokenPostAuthHook(hook TokenPostAuthHook) {
s.tokenHooks = append(s.tokenHooks, hook)
}
// NewAppViewServer creates a fully-initialized AppView server ready for the
// consumer to add routes and hooks before calling Serve(). Pass nil for
// branding to use default atcr.io assets and templates.
func NewAppViewServer(cfg *Config, branding *BrandingOverrides) (*AppViewServer, error) {
// Initialize structured logging with optional remote shipping
logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{
Backend: cfg.LogShipper.Backend,
URL: cfg.LogShipper.URL,
BatchSize: cfg.LogShipper.BatchSize,
FlushInterval: cfg.LogShipper.FlushInterval,
Service: "appview",
Username: cfg.LogShipper.Username,
Password: cfg.LogShipper.Password,
})
slog.Info("Configuration loaded successfully from environment")
s := &AppViewServer{
Config: cfg,
branding: branding,
}
// Initialize UI database (required for all stores)
slog.Info("Initializing UI database", "path", cfg.UI.DatabasePath)
libsqlCfg := db.LibsqlConfig{
SyncURL: cfg.UI.LibsqlSyncURL,
AuthToken: cfg.UI.LibsqlAuthToken,
SyncInterval: cfg.UI.LibsqlSyncInterval,
}
s.Database, s.ReadOnlyDB, s.SessionStore = db.InitializeDatabase(cfg.UI.DatabasePath, libsqlCfg)
if s.Database == nil {
return nil, fmt.Errorf("failed to initialize UI database - required for session storage")
}
// Initialize hold health checker
slog.Info("Initializing hold health checker", "cache_ttl", cfg.Health.CacheTTL)
s.HealthChecker = holdhealth.NewChecker(cfg.Health.CacheTTL)
// Initialize README fetcher for rendering repo page descriptions
s.ReadmeFetcher = readme.NewFetcher()
// Start background health check worker
startupDelay := 5 * time.Second
dbAdapter := holdhealth.NewDBAdapter(s.Database)
s.healthWorker = holdhealth.NewWorkerWithStartupDelay(s.HealthChecker, dbAdapter, cfg.Health.CheckInterval, startupDelay)
workerCtx, workerCancel := context.WithCancel(context.Background())
s.workerCancel = workerCancel
s.healthWorker.Start(workerCtx)
slog.Info("Hold health worker started", "startup_delay", startupDelay, "refresh_interval", cfg.Health.CheckInterval, "cache_ttl", cfg.Health.CacheTTL)
// Initialize OAuth components
slog.Info("Initializing OAuth components")
s.OAuthStore = db.NewOAuthStore(s.Database)
slog.Info("Using SQLite for OAuth session storage")
s.DeviceStore = db.NewDeviceStore(s.Database)
slog.Info("Using SQLite for device storage")
baseURL := cfg.Server.BaseURL
defaultHoldDID := cfg.Server.DefaultHoldDID
testMode := cfg.Server.TestMode
slog.Debug("Base URL for OAuth", "base_url", baseURL)
if testMode {
slog.Info("TEST_MODE enabled - will use HTTP for local DID resolution")
}
// Create OAuth client app
desiredScopes := oauth.GetDefaultScopes(defaultHoldDID)
var err error
s.OAuthClientApp, err = oauth.NewClientApp(baseURL, s.OAuthStore, desiredScopes, cfg.Server.OAuthKeyPath, cfg.Server.ClientName)
if err != nil {
return nil, fmt.Errorf("failed to create OAuth client app: %w", err)
}
// Invalidate sessions with mismatched scopes on startup
invalidatedCount, err := s.OAuthStore.InvalidateSessionsWithMismatchedScopes(context.Background(), desiredScopes)
if err != nil {
slog.Warn("Failed to invalidate sessions with mismatched scopes", "error", err)
} else if invalidatedCount > 0 {
slog.Info("Invalidated OAuth sessions due to scope changes", "count", invalidatedCount)
}
// Create oauth token refresher
s.Refresher = oauth.NewRefresher(s.OAuthClientApp)
// Wire up UI session store to refresher
if s.SessionStore != nil {
s.Refresher.SetUISessionStore(s.SessionStore)
}
// Set global refresher for middleware
middleware.SetGlobalRefresher(s.Refresher)
// Set global database for hold DID lookups
holdDIDDB := db.NewHoldDIDDB(s.Database)
middleware.SetGlobalDatabase(holdDIDDB)
// Create RemoteHoldAuthorizer for hold authorization with caching
s.HoldAuthorizer = auth.NewRemoteHoldAuthorizer(s.Database, testMode)
middleware.SetGlobalAuthorizer(s.HoldAuthorizer)
slog.Info("Hold authorizer initialized with database caching")
// Clear all denial caches on startup for a clean slate
if remote, ok := s.HoldAuthorizer.(*auth.RemoteHoldAuthorizer); ok {
go func() {
if err := remote.ClearAllDenials(); err != nil {
slog.Warn("Failed to clear denial caches on startup", "error", err)
}
}()
}
// Initialize Jetstream workers
s.initializeJetstream()
// Create main chi router
mainRouter := chi.NewRouter()
mainRouter.Use(chimiddleware.Logger)
mainRouter.Use(chimiddleware.Recoverer)
mainRouter.Use(chimiddleware.GetHead)
mainRouter.Use(routes.CORSMiddleware())
// Domain routing middleware
if len(cfg.Server.RegistryDomains) > 0 {
mainRouter.Use(DomainRoutingMiddleware(cfg.Server.RegistryDomains, cfg.Server.BaseURL))
slog.Info("Domain routing middleware enabled",
"registry_domains", cfg.Server.RegistryDomains,
"ui_base_url", cfg.Server.BaseURL)
}
// Load templates
ComputeAssetHashes(branding)
s.Templates, err = Templates(branding)
if err != nil {
return nil, fmt.Errorf("failed to load UI templates: %w", err)
}
// Register UI routes
routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{
Database: s.Database,
ReadOnlyDB: s.ReadOnlyDB,
SessionStore: s.SessionStore,
OAuthClientApp: s.OAuthClientApp,
OAuthStore: s.OAuthStore,
Refresher: s.Refresher,
BaseURL: baseURL,
RegistryDomain: primaryRegistryDomain(cfg.Server.RegistryDomains),
DeviceStore: s.DeviceStore,
HealthChecker: s.HealthChecker,
ReadmeFetcher: s.ReadmeFetcher,
Templates: s.Templates,
DefaultHoldDID: defaultHoldDID,
ClientName: cfg.Server.ClientName,
ClientShortName: cfg.Server.ClientShortName,
LegalConfig: routes.LegalConfig{
CompanyName: cfg.Legal.CompanyName,
Jurisdiction: cfg.Legal.Jurisdiction,
},
})
// Create OAuth server
s.OAuthServer = oauth.NewServer(s.OAuthClientApp)
s.OAuthServer.SetRefresher(s.Refresher)
if s.SessionStore != nil {
s.OAuthServer.SetUISessionStore(s.SessionStore)
}
// Register OAuth post-auth callback (closure captures s for hook dispatch)
s.OAuthServer.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error {
slog.Debug("OAuth post-auth callback", "component", "appview/callback", "did", did)
// Create ATProto client with session provider
client := atproto.NewClientWithSessionProvider(pdsEndpoint, did, s.Refresher)
// Ensure sailor profile exists
slog.Debug("Ensuring profile exists", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID)
if err := storage.EnsureProfile(ctx, client, defaultHoldDID); err != nil {
slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err)
} else {
slog.Debug("Profile ensured", "component", "appview/callback", "did", did)
}
// Fetch user's profile record from PDS
profileRecord, err := client.GetProfileRecord(ctx, did)
if err != nil {
slog.Warn("Failed to fetch profile record", "component", "appview/callback", "did", did, "error", err)
profileRecord = nil
}
// Construct avatar URL from blob CID
avatarURL := ""
if profileRecord != nil && profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
avatarURL = atproto.BlobCDNURL(did, profileRecord.Avatar.Ref.Link)
slog.Debug("Constructed avatar URL", "component", "appview/callback", "avatar_url", avatarURL)
}
// Store user in database
if avatarURL != "" {
err = db.UpsertUser(s.Database, &db.User{
DID: did,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatarURL,
LastSeen: time.Now(),
})
} else {
err = db.UpsertUserIgnoreAvatar(s.Database, &db.User{
DID: did,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatarURL,
LastSeen: time.Now(),
})
}
if err != nil {
slog.Warn("Failed to store user in database", "component", "appview/callback", "error", err)
return nil
}
slog.Debug("Stored user", "component", "appview/callback", "did", did, "has_avatar", avatarURL != "")
// Migrate profile URL→DID if needed
profile, err := storage.GetProfile(ctx, client)
if err != nil {
slog.Warn("Failed to get profile", "component", "appview/callback", "did", did, "error", err)
return nil
}
var holdDID string
if profile != nil && profile.DefaultHold != "" {
if strings.HasPrefix(profile.DefaultHold, "http://") || strings.HasPrefix(profile.DefaultHold, "https://") {
slog.Debug("Migrating hold URL to DID", "component", "appview/callback", "did", did, "hold_url", profile.DefaultHold)
holdDID := atproto.ResolveHoldDIDFromURL(profile.DefaultHold)
profile.DefaultHold = holdDID
if err := storage.UpdateProfile(ctx, client, profile); err != nil {
slog.Warn("Failed to update profile with hold DID", "component", "appview/callback", "did", did, "error", err)
} else {
slog.Debug("Updated profile with hold DID", "component", "appview/callback", "hold_did", holdDID)
}
} else {
holdDID = profile.DefaultHold
}
// Register crew in background
slog.Debug("Attempting crew registration", "component", "appview/callback", "did", did, "hold_did", holdDID)
go func(client *atproto.Client, refresher *oauth.Refresher, holdDID string, authorizer auth.HoldAuthorizer) {
ctx := context.Background()
storage.EnsureCrewMembership(ctx, client, refresher, holdDID, authorizer)
}(client, s.Refresher, holdDID, s.HoldAuthorizer)
}
// Migrate old-format star records to AT URI format in background
go func(client *atproto.Client, did string) {
ctx := context.Background()
migrated, err := atproto.MigrateStarRecords(ctx, client)
if err != nil {
slog.Warn("Star record migration failed", "component", "appview/callback", "did", did, "error", err, "migrated", migrated)
} else if migrated > 0 {
slog.Info("Migrated star records to AT URI format", "component", "appview/callback", "did", did, "count", migrated)
}
}(client, did)
// Drain manifests from old hold to successor in background
go func(client *atproto.Client, did string) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
storage.MigrateManifestsForSuccessor(ctx, client, s.HoldAuthorizer, db.NewHoldDIDDB(s.Database), did)
}(client, did)
// Run consumer hooks
for _, hook := range s.oauthHooks {
if err := hook(ctx, did, handle, pdsEndpoint, sessionID); err != nil {
slog.Warn("OAuth post-auth hook error", "component", "appview/callback", "error", err)
}
}
return nil
})
// Create token issuer
if cfg.Distribution.Auth["token"] != nil {
s.TokenIssuer, err = s.createTokenIssuer()
if err != nil {
return nil, fmt.Errorf("failed to create token issuer: %w", err)
}
slog.Info("Auth keys initialized", "path", cfg.Auth.KeyPath)
}
// Create registry app (distribution library handler)
ctx := context.Background()
app := handlers.NewApp(ctx, cfg.Distribution)
// Wrap with auth method extraction middleware
wrappedApp := middleware.ExtractAuthMethod(app)
// Mount registry at /v2/
mainRouter.Handle("/v2/*", wrappedApp)
// Mount static files
if s.SessionStore != nil && s.Templates != nil {
publicHandler := CacheMiddleware(PublicHandler(branding), 31536000)
rootFiles, err := PublicRootFiles(branding)
if err != nil {
slog.Warn("Failed to scan static root files", "error", err)
} else {
for _, filename := range rootFiles {
file := filename
mainRouter.Get("/"+file, func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/" + file
publicHandler.ServeHTTP(w, r)
})
}
slog.Info("Registered dynamic root file routes", "count", len(rootFiles), "files", rootFiles)
}
mainRouter.Handle("/css/*", CacheMiddleware(http.StripPrefix("/css/", PublicSubdir("css", branding)), 31536000))
mainRouter.Handle("/js/*", CacheMiddleware(http.StripPrefix("/js/", PublicSubdir("js", branding)), 31536000))
mainRouter.Handle("/static/*", CacheMiddleware(http.StripPrefix("/static/", PublicSubdir("static", branding)), 31536000))
slog.Info("UI enabled", "home", "/", "settings", "/settings")
}
// Mount OAuth endpoints
mainRouter.Get("/auth/oauth/authorize", s.OAuthServer.ServeAuthorize)
mainRouter.Get("/auth/oauth/callback", s.OAuthServer.ServeCallback)
// OAuth client metadata endpoint
mainRouter.Get("/oauth-client-metadata.json", func(w http.ResponseWriter, r *http.Request) {
config := s.OAuthClientApp.Config
logoURI := cfg.Server.BaseURL + "/web-app-manifest-192x192.png"
policyURI := cfg.Server.BaseURL + "/privacy"
tosURI := cfg.Server.BaseURL + "/terms"
metadata := config.ClientMetadata()
metadata.ClientName = &cfg.Server.ClientName
metadata.ClientURI = &cfg.Server.BaseURL
metadata.LogoURI = &logoURI
metadata.PolicyURI = &policyURI
metadata.TosURI = &tosURI
if config.IsConfidential() && metadata.JWKS == nil {
jwks := config.PublicJWKS()
metadata.JWKS = &jwks
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Cache-Control", "public, max-age=300")
if err := json.NewEncoder(w).Encode(metadata); err != nil {
http.Error(w, "Failed to encode metadata", http.StatusInternalServerError)
}
})
// Mount auth endpoints
if s.TokenIssuer != nil {
tokenHandler := token.NewHandler(s.TokenIssuer, s.DeviceStore)
tokenHandler.SetOAuthSessionValidator(s.Refresher)
// Token post-auth callback (closure captures s for hook dispatch)
tokenHandler.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error {
slog.Debug("Token post-auth callback", "component", "appview/callback", "did", did)
atprotoClient := atproto.NewClient(pdsEndpoint, did, accessToken)
if err := storage.EnsureProfile(ctx, atprotoClient, defaultHoldDID); err != nil {
slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err)
} else {
slog.Debug("Profile ensured with default hold", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID)
}
// Run consumer hooks
for _, hook := range s.tokenHooks {
if err := hook(ctx, did, handle, pdsEndpoint, accessToken); err != nil {
slog.Warn("Token post-auth hook error", "component", "appview/callback", "error", err)
}
}
return nil
})
mainRouter.Get("/auth/token", tokenHandler.ServeHTTP)
// Device authorization endpoints (public)
routes.RegisterDeviceEndpoints(mainRouter, s.DeviceStore, baseURL)
slog.Info("Auth endpoints enabled",
"basic_auth", "/auth/token",
"device_code", "/auth/device/code",
"device_token", "/auth/device/token",
"oauth_authorize", "/auth/oauth/authorize",
"oauth_callback", "/auth/oauth/callback",
"oauth_metadata", "/client-metadata.json")
}
// Health check endpoint (for Docker health checks / load balancers)
mainRouter.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(map[string]string{"status": "ok"}); err != nil {
http.Error(w, "encode error", http.StatusInternalServerError)
return
}
})
// Register credential helper version API (public endpoint)
routes.RegisterCredentialHelperEndpoint(mainRouter, cfg.CredentialHelper.TangledRepo)
s.Router = mainRouter
return s, nil
}
// Serve starts the HTTP server and blocks until shutdown signal.
func (s *AppViewServer) Serve() error {
listener, err := net.Listen("tcp", s.Config.Server.Addr)
if err != nil {
return fmt.Errorf("failed to create listener: %w", err)
}
s.httpServer = &http.Server{
Handler: s.Router,
}
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
errChan := make(chan error, 1)
go func() {
slog.Info("Starting registry server", "addr", s.Config.Server.Addr)
if err := s.httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
errChan <- err
}
}()
select {
case <-stop:
slog.Info("Shutting down registry server")
if s.Config.Server.TestMode {
listener.Close()
}
slog.Info("Stopping hold health worker")
s.healthWorker.Stop()
if s.workerCancel != nil {
s.workerCancel()
}
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := s.httpServer.Shutdown(shutdownCtx); err != nil && err != http.ErrServerClosed {
logging.Shutdown()
return fmt.Errorf("server shutdown error: %w", err)
}
case err := <-errChan:
s.healthWorker.Stop()
if s.workerCancel != nil {
s.workerCancel()
}
logging.Shutdown()
return fmt.Errorf("server error: %w", err)
}
logging.Shutdown()
return nil
}
// createTokenIssuer creates a token issuer for auth handlers.
func (s *AppViewServer) createTokenIssuer() (*token.Issuer, error) {
return token.NewIssuer(
s.Config.Auth.KeyPath,
s.Config.Auth.ServiceName,
s.Config.Auth.ServiceName,
s.Config.Auth.TokenExpiration,
)
}
// DomainRoutingMiddleware enforces three-tier domain routing:
//
// 1. UI domain (BaseURL hostname): serves web UI, auth, and static assets.
// Blocks /v2/* with an OCI UNSUPPORTED error — registry API lives on
// the dedicated registry domain(s).
// 2. Registry domains: allows /v2/* for Docker clients. Redirects everything
// else to the UI domain with 307 Temporary Redirect.
// 3. Unknown domains (CDN origins, IPs, etc.): redirects all requests to the
// UI domain with 307, except /health for load balancer probes.
func DomainRoutingMiddleware(registryDomains []string, uiBaseURL string) func(http.Handler) http.Handler {
regDomains := make(map[string]bool, len(registryDomains))
for _, d := range registryDomains {
regDomains[d] = true
}
// Extract UI hostname from BaseURL (e.g., "https://seamark.dev" -> "seamark.dev")
var uiHost string
if parsed, err := url.Parse(uiBaseURL); err == nil {
uiHost = parsed.Hostname()
}
primaryReg := primaryRegistryDomain(registryDomains)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host := r.Host
if idx := strings.LastIndex(host, ":"); idx != -1 {
host = host[:idx]
}
path := r.URL.Path
isV2 := path == "/v2" || path == "/v2/" || strings.HasPrefix(path, "/v2/")
switch {
case host == uiHost:
// UI domain: block /v2/*, serve everything else
if isV2 {
if err := errcode.ServeJSON(w, errcode.ErrorCodeUnsupported.WithMessage(
fmt.Sprintf("registry API is not available on this domain, use %s", primaryReg),
)); err != nil {
slog.Error("failed to write OCI error response", "error", err)
}
return
}
next.ServeHTTP(w, r)
case regDomains[host]:
// Registry domain: allow /v2/*, redirect everything else
if isV2 {
next.ServeHTTP(w, r)
return
}
http.Redirect(w, r, uiBaseURL+r.URL.RequestURI(), http.StatusTemporaryRedirect)
default:
// Unknown domain: allow /health, redirect everything else
if path == "/health" {
next.ServeHTTP(w, r)
return
}
http.Redirect(w, r, uiBaseURL+r.URL.RequestURI(), http.StatusTemporaryRedirect)
}
})
}
}
// primaryRegistryDomain returns the first registry domain, or empty string if none.
func primaryRegistryDomain(domains []string) string {
if len(domains) > 0 {
return domains[0]
}
return ""
}
// initializeJetstream initializes the Jetstream workers for real-time events and backfill.
func (s *AppViewServer) initializeJetstream() {
jetstreamURLs := s.Config.Jetstream.URLs
go func() {
worker := jetstream.NewWorker(s.Database, jetstreamURLs, 0)
worker.StartWithFailover(context.Background())
}()
slog.Info("Jetstream real-time worker started", "component", "jetstream", "endpoints", len(jetstreamURLs))
if s.Config.Jetstream.BackfillEnabled {
relayEndpoints := s.Config.Jetstream.RelayEndpoints
defaultHoldDID := s.Config.Server.DefaultHoldDID
testMode := s.Config.Server.TestMode
backfillWorker, err := jetstream.NewBackfillWorker(s.Database, relayEndpoints, defaultHoldDID, testMode, s.Refresher)
if err != nil {
slog.Warn("Failed to create backfill worker", "component", "jetstream/backfill", "error", err)
} else {
go func() {
startupDelay := 5 * time.Second
slog.Info("Waiting for services to be ready", "component", "jetstream/backfill", "startup_delay", startupDelay)
time.Sleep(startupDelay)
slog.Info("Starting sync-based backfill", "component", "jetstream/backfill", "relay_endpoints", relayEndpoints)
if err := backfillWorker.Start(context.Background()); err != nil {
slog.Warn("Backfill finished with error", "component", "jetstream/backfill", "error", err)
} else {
slog.Info("Backfill completed successfully", "component", "jetstream/backfill")
}
}()
interval := 1 * time.Hour
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
slog.Info("Starting periodic backfill", "component", "jetstream/backfill", "interval", interval)
if err := backfillWorker.Start(context.Background()); err != nil {
slog.Warn("Periodic backfill finished with error", "component", "jetstream/backfill", "error", err)
} else {
slog.Info("Periodic backfill completed successfully", "component", "jetstream/backfill")
}
}
}()
slog.Info("Periodic backfill scheduler started", "component", "jetstream/backfill", "interval", interval)
}
}
}