Files
2025-12-29 17:02:07 -06:00

561 lines
20 KiB
Go

package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"html/template"
"log/slog"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/distribution/distribution/v3/registry"
"github.com/distribution/distribution/v3/registry/handlers"
"github.com/spf13/cobra"
"atcr.io/pkg/appview/middleware"
"atcr.io/pkg/appview/storage"
"atcr.io/pkg/atproto"
"atcr.io/pkg/auth"
"atcr.io/pkg/auth/oauth"
"atcr.io/pkg/auth/token"
"atcr.io/pkg/logging"
// UI components
"atcr.io/pkg/appview"
"atcr.io/pkg/appview/db"
uihandlers "atcr.io/pkg/appview/handlers"
"atcr.io/pkg/appview/holdhealth"
"atcr.io/pkg/appview/jetstream"
"atcr.io/pkg/appview/readme"
"atcr.io/pkg/appview/routes"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
)
var serveCmd = &cobra.Command{
Use: "serve",
Short: "Start the ATCR registry server",
Long: `Start the ATCR registry server with authentication endpoints.
Configuration is loaded from environment variables.
See .env.appview.example for available environment variables.`,
Args: cobra.NoArgs,
RunE: serveRegistry,
}
func init() {
// Replace the default serve command with our custom one
for i, cmd := range registry.RootCmd.Commands() {
if cmd.Name() == "serve" {
registry.RootCmd.Commands()[i] = serveCmd
break
}
}
}
func serveRegistry(cmd *cobra.Command, args []string) error {
// Load configuration from environment variables
cfg, err := appview.LoadConfigFromEnv()
if err != nil {
return fmt.Errorf("failed to load config from environment: %w", err)
}
// Initialize structured logging
logging.InitLogger(cfg.LogLevel)
slog.Info("Configuration loaded successfully from environment")
// Initialize UI database first (required for all stores)
slog.Info("Initializing UI database", "path", cfg.UI.DatabasePath)
uiDatabase, uiReadOnlyDB, uiSessionStore := db.InitializeDatabase(cfg.UI.Enabled, cfg.UI.DatabasePath, cfg.UI.SkipDBMigrations)
if uiDatabase == nil {
return fmt.Errorf("failed to initialize UI database - required for session storage")
}
// Initialize hold health checker
slog.Info("Initializing hold health checker", "cache_ttl", cfg.Health.CacheTTL)
healthChecker := holdhealth.NewChecker(cfg.Health.CacheTTL)
// Initialize README fetcher for rendering repo page descriptions
readmeFetcher := readme.NewFetcher()
// Start background health check worker
startupDelay := 5 * time.Second // Wait for hold services to start (Docker compose)
dbAdapter := holdhealth.NewDBAdapter(uiDatabase)
healthWorker := holdhealth.NewWorkerWithStartupDelay(healthChecker, dbAdapter, cfg.Health.CheckInterval, startupDelay)
// Create context for worker lifecycle management
workerCtx, workerCancel := context.WithCancel(context.Background())
defer workerCancel() // Ensure context is cancelled on all exit paths
healthWorker.Start(workerCtx)
slog.Info("Hold health worker started", "startup_delay", startupDelay, "refresh_interval", cfg.Health.CheckInterval, "cache_ttl", cfg.Health.CacheTTL)
// Initialize OAuth components
slog.Info("Initializing OAuth components")
// Create OAuth session storage (SQLite-backed)
oauthStore := db.NewOAuthStore(uiDatabase)
slog.Info("Using SQLite for OAuth session storage")
// Create device store (SQLite-backed)
deviceStore := db.NewDeviceStore(uiDatabase)
slog.Info("Using SQLite for device storage")
// Get base URL and default hold DID from config
baseURL := cfg.Server.BaseURL
defaultHoldDID := cfg.Server.DefaultHoldDID
testMode := cfg.Server.TestMode
slog.Debug("Base URL for OAuth", "base_url", baseURL)
if testMode {
slog.Info("TEST_MODE enabled - will use HTTP for local DID resolution and transition:generic scope")
}
// Create OAuth client app (automatically configures confidential client for production)
desiredScopes := oauth.GetDefaultScopes(defaultHoldDID)
oauthClientApp, err := oauth.NewClientApp(baseURL, oauthStore, desiredScopes, cfg.Server.OAuthKeyPath, cfg.Server.ClientName)
if err != nil {
return fmt.Errorf("failed to create OAuth client app: %w", err)
}
if testMode {
slog.Info("Using OAuth scopes with transition:generic (test mode)")
} else {
slog.Info("Using OAuth scopes with RPC scope (production mode)")
}
// Invalidate sessions with mismatched scopes on startup
// This ensures all users have the latest required scopes after deployment
invalidatedCount, err := oauthStore.InvalidateSessionsWithMismatchedScopes(context.Background(), desiredScopes)
if err != nil {
slog.Warn("Failed to invalidate sessions with mismatched scopes", "error", err)
} else if invalidatedCount > 0 {
slog.Info("Invalidated OAuth sessions due to scope changes", "count", invalidatedCount)
}
// Create oauth token refresher
refresher := oauth.NewRefresher(oauthClientApp)
// Wire up UI session store to refresher so it can invalidate UI sessions on OAuth failures
if uiSessionStore != nil {
refresher.SetUISessionStore(uiSessionStore)
}
// Set global refresher for middleware
middleware.SetGlobalRefresher(refresher)
// Set global database for pull/push metrics tracking
middleware.SetGlobalDatabase(uiDatabase)
// Create RemoteHoldAuthorizer for hold authorization with caching
holdAuthorizer := auth.NewRemoteHoldAuthorizer(uiDatabase, testMode)
middleware.SetGlobalAuthorizer(holdAuthorizer)
slog.Info("Hold authorizer initialized with database caching")
// Initialize Jetstream workers (background services before HTTP routes)
initializeJetstream(uiDatabase, &cfg.Jetstream, defaultHoldDID, testMode, refresher)
// Create main chi router
mainRouter := chi.NewRouter()
// Add core middleware
mainRouter.Use(chimiddleware.Logger)
mainRouter.Use(chimiddleware.Recoverer)
mainRouter.Use(chimiddleware.GetHead) // Automatically handle HEAD requests for GET routes
mainRouter.Use(routes.CORSMiddleware())
// Load templates if UI is enabled
var uiTemplates *template.Template
if cfg.UI.Enabled {
var err error
uiTemplates, err = appview.Templates()
if err != nil {
slog.Warn("Failed to load UI templates", "error", err)
} else {
// Register UI routes with dependencies
routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{
Database: uiDatabase,
ReadOnlyDB: uiReadOnlyDB,
SessionStore: uiSessionStore,
OAuthClientApp: oauthClientApp,
OAuthStore: oauthStore,
Refresher: refresher,
BaseURL: baseURL,
DeviceStore: deviceStore,
HealthChecker: healthChecker,
ReadmeFetcher: readmeFetcher,
Templates: uiTemplates,
DefaultHoldDID: defaultHoldDID,
})
}
}
// Create OAuth server
oauthServer := oauth.NewServer(oauthClientApp)
// Connect server to refresher for cache invalidation
oauthServer.SetRefresher(refresher)
// Connect UI session store for web login
if uiSessionStore != nil {
oauthServer.SetUISessionStore(uiSessionStore)
}
// Register OAuth post-auth callback for AppView business logic
// This decouples the OAuth package from AppView-specific dependencies
oauthServer.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error {
slog.Debug("OAuth post-auth callback", "component", "appview/callback", "did", did)
// Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety)
client := atproto.NewClientWithSessionProvider(pdsEndpoint, did, refresher)
// Note: Profile and crew setup now happen automatically via UserContext.EnsureUserSetup()
// Fetch user's profile record from PDS (contains blob references)
profileRecord, err := client.GetProfileRecord(ctx, did)
if err != nil {
slog.Warn("Failed to fetch profile record", "component", "appview/callback", "did", did, "error", err)
// Continue without avatar - set profileRecord to nil to skip avatar extraction
profileRecord = nil
}
// Construct avatar URL from blob CID using imgs.blue CDN (if profile record was fetched successfully)
avatarURL := ""
if profileRecord != nil && profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
avatarURL = atproto.BlobCDNURL(did, profileRecord.Avatar.Ref.Link)
slog.Debug("Constructed avatar URL", "component", "appview/callback", "avatar_url", avatarURL)
}
// Store user in database (with or without avatar)
// Use UpsertUser if we successfully fetched an avatar (to update existing users)
// Use UpsertUserIgnoreAvatar if fetch failed (to preserve existing avatars)
if avatarURL != "" {
err = db.UpsertUser(uiDatabase, &db.User{
DID: did,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatarURL,
LastSeen: time.Now(),
})
} else {
err = db.UpsertUserIgnoreAvatar(uiDatabase, &db.User{
DID: did,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatarURL,
LastSeen: time.Now(),
})
}
if err != nil {
slog.Warn("Failed to store user in database", "component", "appview/callback", "error", err)
return nil // Non-fatal
}
slog.Debug("Stored user", "component", "appview/callback", "did", did, "has_avatar", avatarURL != "")
// Migrate profile URL→DID if needed
profile, err := storage.GetProfile(ctx, client)
if err != nil {
slog.Warn("Failed to get profile", "component", "appview/callback", "did", did, "error", err)
return nil // Non-fatal
}
// Migrate profile URL→DID if needed (legacy migration, crew registration now handled by UserContext)
if profile != nil && profile.DefaultHold != "" {
// Check if defaultHold is a URL (needs migration)
if strings.HasPrefix(profile.DefaultHold, "http://") || strings.HasPrefix(profile.DefaultHold, "https://") {
slog.Debug("Migrating hold URL to DID", "component", "appview/callback", "did", did, "hold_url", profile.DefaultHold)
// Resolve URL to DID
holdDID := atproto.ResolveHoldDIDFromURL(profile.DefaultHold)
// Update profile with DID
profile.DefaultHold = holdDID
if err := storage.UpdateProfile(ctx, client, profile); err != nil {
slog.Warn("Failed to update profile with hold DID", "component", "appview/callback", "did", did, "error", err)
} else {
slog.Debug("Updated profile with hold DID", "component", "appview/callback", "hold_did", holdDID)
}
}
}
return nil // All errors are non-fatal, logged for debugging
})
// Create token issuer (also initializes auth keys if needed)
var issuer *token.Issuer
if cfg.Distribution.Auth["token"] != nil {
issuer, err = createTokenIssuer(cfg)
if err != nil {
return fmt.Errorf("failed to create token issuer: %w", err)
}
// Log successful initialization
slog.Info("Auth keys initialized", "path", cfg.Auth.KeyPath)
}
// Create registry app (returns http.Handler)
ctx := context.Background()
app := handlers.NewApp(ctx, cfg.Distribution)
// Wrap registry app with middleware chain:
// 1. ExtractAuthMethod - extracts auth method from JWT and stores in context
// 2. UserContextMiddleware - builds UserContext with identity, permissions, service tokens
wrappedApp := middleware.ExtractAuthMethod(app)
// Create dependencies for UserContextMiddleware
userContextDeps := &auth.Dependencies{
Refresher: refresher,
Authorizer: holdAuthorizer,
DefaultHoldDID: defaultHoldDID,
}
wrappedApp = middleware.UserContextMiddleware(userContextDeps)(wrappedApp)
// Mount registry at /v2/
mainRouter.Handle("/v2/*", wrappedApp)
// Mount static files if UI is enabled
if uiSessionStore != nil && uiTemplates != nil {
// Register dynamic routes for root-level files (favicons, manifests, etc.)
staticHandler := appview.StaticHandler()
rootFiles, err := appview.StaticRootFiles()
if err != nil {
slog.Warn("Failed to scan static root files", "error", err)
} else {
for _, filename := range rootFiles {
// Create a closure to capture the filename
file := filename
mainRouter.Get("/"+file, func(w http.ResponseWriter, r *http.Request) {
// Serve the specific file from static root
r.URL.Path = "/" + file
staticHandler.ServeHTTP(w, r)
})
}
slog.Info("Registered dynamic root file routes", "count", len(rootFiles), "files", rootFiles)
}
// Mount subdirectory routes with clean paths
mainRouter.Handle("/css/*", http.StripPrefix("/css/", appview.StaticSubdir("css")))
mainRouter.Handle("/js/*", http.StripPrefix("/js/", appview.StaticSubdir("js")))
mainRouter.Handle("/static/*", http.StripPrefix("/static/", appview.StaticSubdir("static")))
slog.Info("UI enabled", "home", "/", "settings", "/settings")
}
// Mount OAuth endpoints
mainRouter.Get("/auth/oauth/authorize", oauthServer.ServeAuthorize)
mainRouter.Get("/auth/oauth/callback", oauthServer.ServeCallback)
// OAuth client metadata endpoint
mainRouter.Get("/oauth-client-metadata.json", func(w http.ResponseWriter, r *http.Request) {
config := oauthClientApp.Config
metadata := config.ClientMetadata()
// For confidential clients, ensure JWKS is included
// The indigo library should populate this automatically, but we explicitly set it here
// to be defensive and ensure it's always present for confidential clients
if config.IsConfidential() && metadata.JWKS == nil {
jwks := config.PublicJWKS()
metadata.JWKS = &jwks
}
// Convert indigo's metadata to map so we can add custom fields
metadataBytes, err := json.Marshal(metadata)
if err != nil {
http.Error(w, "Failed to marshal metadata", http.StatusInternalServerError)
return
}
var metadataMap map[string]interface{}
if err := json.Unmarshal(metadataBytes, &metadataMap); err != nil {
http.Error(w, "Failed to unmarshal metadata", http.StatusInternalServerError)
return
}
// Add custom fields
metadataMap["client_name"] = cfg.Server.ClientName
metadataMap["client_uri"] = cfg.Server.BaseURL
metadataMap["logo_uri"] = cfg.Server.BaseURL + "/web-app-manifest-192x192.png"
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
// Limit caching to allow scope changes to propagate quickly
// PDS servers cache client metadata, so short max-age helps with updates
w.Header().Set("Cache-Control", "public, max-age=300")
if err := json.NewEncoder(w).Encode(metadataMap); err != nil {
http.Error(w, "Failed to encode metadata", http.StatusInternalServerError)
}
})
// Note: Indigo handles OAuth state cleanup internally via its store
// Mount auth endpoints if enabled
if issuer != nil {
// Basic Auth token endpoint (supports device secrets and app passwords)
tokenHandler := token.NewHandler(issuer, deviceStore)
// Register OAuth session validator for device auth validation
// This validates OAuth sessions are usable (not just exist) before issuing tokens
// Prevents the flood of errors when a stale session is discovered during push
tokenHandler.SetOAuthSessionValidator(refresher)
// Register token post-auth callback
// Note: Profile and crew setup now happen automatically via UserContext.EnsureUserSetup()
tokenHandler.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error {
slog.Debug("Token post-auth callback", "component", "appview/callback", "did", did)
return nil
})
mainRouter.Get("/auth/token", tokenHandler.ServeHTTP)
// Device authorization endpoints (public)
mainRouter.Handle("/auth/device/code", &uihandlers.DeviceCodeHandler{
Store: deviceStore,
AppViewBaseURL: baseURL,
})
mainRouter.Handle("/auth/device/token", &uihandlers.DeviceTokenHandler{
Store: deviceStore,
})
slog.Info("Auth endpoints enabled",
"basic_auth", "/auth/token",
"device_code", "/auth/device/code",
"device_token", "/auth/device/token",
"oauth_authorize", "/auth/oauth/authorize",
"oauth_callback", "/auth/oauth/callback",
"oauth_metadata", "/client-metadata.json")
}
// Register credential helper version API (public endpoint)
mainRouter.Handle("/api/credential-helper/version", &uihandlers.CredentialHelperVersionHandler{
Version: cfg.CredentialHelper.Version,
TangledRepo: cfg.CredentialHelper.TangledRepo,
Checksums: cfg.CredentialHelper.Checksums,
})
if cfg.CredentialHelper.Version != "" {
slog.Info("Credential helper version API enabled",
"endpoint", "/api/credential-helper/version",
"version", cfg.CredentialHelper.Version)
}
// Create HTTP server
server := &http.Server{
Addr: cfg.Server.Addr,
Handler: mainRouter,
}
// Handle graceful shutdown
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
// Start server in goroutine
errChan := make(chan error, 1)
go func() {
slog.Info("Starting registry server", "addr", cfg.Server.Addr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
errChan <- err
}
}()
// Wait for shutdown signal or error
select {
case <-stop:
slog.Info("Shutting down registry server")
// Stop health worker first
slog.Info("Stopping hold health worker")
healthWorker.Stop()
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("server shutdown error: %w", err)
}
case err := <-errChan:
// Stop health worker on error (workerCancel called by defer)
healthWorker.Stop()
return fmt.Errorf("server error: %w", err)
}
return nil
}
// createTokenIssuer creates a token issuer for auth handlers
func createTokenIssuer(cfg *appview.Config) (*token.Issuer, error) {
return token.NewIssuer(
cfg.Auth.KeyPath,
cfg.Auth.ServiceName, // issuer
cfg.Auth.ServiceName, // service
cfg.Auth.TokenExpiration,
)
}
// initializeJetstream initializes the Jetstream workers for real-time events and backfill
func initializeJetstream(database *sql.DB, jetstreamCfg *appview.JetstreamConfig, defaultHoldDID string, testMode bool, refresher *oauth.Refresher) {
// Start Jetstream worker
jetstreamURL := jetstreamCfg.URL
// Start real-time Jetstream worker with cursor tracking for reconnects
go func() {
var lastCursor int64 = 0 // Start from now on first connect
for {
worker := jetstream.NewWorker(database, jetstreamURL, lastCursor)
if err := worker.Start(context.Background()); err != nil {
// Save cursor from this connection for next reconnect
lastCursor = worker.GetLastCursor()
slog.Warn("Jetstream real-time worker error, reconnecting", "component", "jetstream", "error", err, "reconnect_delay", "10s")
time.Sleep(10 * time.Second)
}
}
}()
slog.Info("Jetstream real-time worker started", "component", "jetstream")
// Start backfill worker (enabled by default, set ATCR_BACKFILL_ENABLED=false to disable)
if jetstreamCfg.BackfillEnabled {
// Get relay endpoint for sync API (defaults to Bluesky's relay)
relayEndpoint := jetstreamCfg.RelayEndpoint
backfillWorker, err := jetstream.NewBackfillWorker(database, relayEndpoint, defaultHoldDID, testMode, refresher)
if err != nil {
slog.Warn("Failed to create backfill worker", "component", "jetstream/backfill", "error", err)
} else {
// Run initial backfill with startup delay for Docker compose
go func() {
// Wait for hold service to be ready (Docker startup race condition)
startupDelay := 5 * time.Second
slog.Info("Waiting for services to be ready", "component", "jetstream/backfill", "startup_delay", startupDelay)
time.Sleep(startupDelay)
slog.Info("Starting sync-based backfill", "component", "jetstream/backfill", "relay_endpoint", relayEndpoint)
if err := backfillWorker.Start(context.Background()); err != nil {
slog.Warn("Backfill finished with error", "component", "jetstream/backfill", "error", err)
} else {
slog.Info("Backfill completed successfully", "component", "jetstream/backfill")
}
}()
// Start periodic backfill scheduler
interval := jetstreamCfg.BackfillInterval
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
slog.Info("Starting periodic backfill", "component", "jetstream/backfill", "interval", interval)
if err := backfillWorker.Start(context.Background()); err != nil {
slog.Warn("Periodic backfill finished with error", "component", "jetstream/backfill", "error", err)
} else {
slog.Info("Periodic backfill completed successfully", "component", "jetstream/backfill")
}
}
}()
slog.Info("Periodic backfill scheduler started", "component", "jetstream/backfill", "interval", interval)
}
}
}