Files
at-container-registry/cmd/appview/serve.go
2025-10-12 22:09:03 -05:00

694 lines
22 KiB
Go

package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"html/template"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"github.com/distribution/distribution/v3/configuration"
"github.com/distribution/distribution/v3/registry"
"github.com/distribution/distribution/v3/registry/handlers"
sqlite3 "github.com/mattn/go-sqlite3"
"github.com/spf13/cobra"
"atcr.io/pkg/appview/middleware"
"atcr.io/pkg/auth/oauth"
"atcr.io/pkg/auth/token"
// UI components
"atcr.io/pkg/appview"
"atcr.io/pkg/appview/db"
uihandlers "atcr.io/pkg/appview/handlers"
"atcr.io/pkg/appview/jetstream"
"github.com/gorilla/mux"
)
// Define sensitive tables that should never be accessible from public queries
var sensitiveTables = map[string]bool{
"oauth_sessions": true, // OAuth tokens
"ui_sessions": true, // Session IDs
"oauth_auth_requests": true, // OAuth state
"devices": true, // Device secret hashes
"pending_device_auth": true, // Pending device secrets
}
// readOnlyAuthorizerCallback blocks access to sensitive tables
func readOnlyAuthorizerCallback(action int, arg1, arg2, dbName string) int {
// arg1 contains the table name for most operations
tableName := arg1
// Block any access to sensitive tables
if action == sqlite3.SQLITE_READ || action == sqlite3.SQLITE_UPDATE ||
action == sqlite3.SQLITE_INSERT || action == sqlite3.SQLITE_DELETE ||
action == sqlite3.SQLITE_SELECT {
if sensitiveTables[tableName] {
fmt.Printf("SECURITY: Blocked access to sensitive table '%s' (action=%d)\n", tableName, action)
return sqlite3.SQLITE_DENY
}
}
// Allow everything else
return sqlite3.SQLITE_OK
}
var serveCmd = &cobra.Command{
Use: "serve",
Short: "Start the ATCR registry server",
Long: `Start the ATCR registry server with authentication endpoints.
Configuration is loaded from environment variables.
See .env.appview.example for available environment variables.`,
Args: cobra.NoArgs,
RunE: serveRegistry,
}
func init() {
// Register a custom SQLite driver with authorizer for read-only public queries
sql.Register("sqlite3_readonly_public",
&sqlite3.SQLiteDriver{
ConnectHook: func(conn *sqlite3.SQLiteConn) error {
conn.RegisterAuthorizer(readOnlyAuthorizerCallback)
return nil
},
})
// Replace the default serve command with our custom one
for i, cmd := range registry.RootCmd.Commands() {
if cmd.Name() == "serve" {
registry.RootCmd.Commands()[i] = serveCmd
break
}
}
}
func serveRegistry(cmd *cobra.Command, args []string) error {
// Load configuration from environment variables
fmt.Println("Loading configuration from environment variables...")
config, err := loadConfigFromEnv()
if err != nil {
return fmt.Errorf("failed to load config from environment: %w", err)
}
fmt.Println("Configuration loaded successfully from environment")
// Initialize UI database first (required for all stores)
fmt.Println("Initializing UI database...")
uiDatabase, uiReadOnlyDB, uiSessionStore := initializeDatabase()
if uiDatabase == nil {
return fmt.Errorf("failed to initialize UI database - required for session storage")
}
// Initialize OAuth components
fmt.Println("Initializing OAuth components...")
// 1. Create OAuth session storage (SQLite-backed)
oauthStore := db.NewOAuthStore(uiDatabase)
fmt.Println("Using SQLite for OAuth session storage")
// 2. Create device store (SQLite-backed)
deviceStore := db.NewDeviceStore(uiDatabase)
fmt.Println("Using SQLite for device storage")
// 3. Get base URL from config or environment
baseURL := os.Getenv("ATCR_BASE_URL")
if baseURL == "" {
// If addr is just a port (e.g., ":5000"), prepend localhost
addr := config.HTTP.Addr
if addr[0] == ':' {
baseURL = fmt.Sprintf("http://127.0.0.1%s", addr)
} else {
baseURL = fmt.Sprintf("http://%s", addr)
}
}
fmt.Printf("DEBUG: Base URL for OAuth: %s\n", baseURL)
// 4. Create OAuth app (indigo client)
oauthApp, err := oauth.NewApp(baseURL, oauthStore)
if err != nil {
return fmt.Errorf("failed to create OAuth app: %w", err)
}
fmt.Println("Using full OAuth scopes (including blob: scope)")
// 5. Create refresher
refresher := oauth.NewRefresher(oauthApp)
// 6. Set global refresher for middleware
middleware.SetGlobalRefresher(refresher)
// 6.5. Set global database for pull/push metrics tracking
metricsDB := db.NewMetricsDB(uiDatabase)
middleware.SetGlobalDatabase(metricsDB)
// 7. Initialize UI routes with OAuth app, refresher, and device store
uiTemplates, uiRouter := initializeUIRoutes(uiDatabase, uiReadOnlyDB, uiSessionStore, oauthApp, refresher, baseURL, deviceStore)
// 8. Create OAuth server
oauthServer := oauth.NewServer(oauthApp)
// Connect server to refresher for cache invalidation
oauthServer.SetRefresher(refresher)
// Connect UI session store for web login
if uiSessionStore != nil {
oauthServer.SetUISessionStore(uiSessionStore)
}
// Connect database for user avatar management
oauthServer.SetDatabase(uiDatabase)
// 8.5. Extract default hold endpoint and set it on OAuth server
// This is used to create sailor profiles on first login
defaultHoldEndpoint := extractDefaultHoldEndpoint(config)
if defaultHoldEndpoint != "" {
oauthServer.SetDefaultHoldEndpoint(defaultHoldEndpoint)
fmt.Printf("OAuth server will create profiles with default hold: %s\n", defaultHoldEndpoint)
}
// 9. Initialize auth keys and create token issuer
var issuer *token.Issuer
if config.Auth["token"] != nil {
if err := initializeAuthKeys(config); err != nil {
return fmt.Errorf("failed to initialize auth keys: %w", err)
}
// Create token issuer for auth handlers
issuer, err = createTokenIssuer(config)
if err != nil {
return fmt.Errorf("failed to create token issuer: %w", err)
}
}
// Create registry app (returns http.Handler)
ctx := context.Background()
app := handlers.NewApp(ctx, config)
// Create main HTTP mux
mux := http.NewServeMux()
// Mount registry at /v2/
mux.Handle("/v2/", app)
// Mount UI routes if enabled
if uiSessionStore != nil && uiTemplates != nil && uiRouter != nil {
// Mount static files
mux.Handle("/static/", http.StripPrefix("/static/", appview.StaticHandler()))
// Mount UI routes directly at root level
mux.Handle("/", uiRouter)
fmt.Printf("UI enabled:\n")
fmt.Printf(" - Home: /\n")
fmt.Printf(" - Settings: /settings\n")
}
// Mount OAuth endpoints
mux.HandleFunc("/auth/oauth/authorize", oauthServer.ServeAuthorize)
mux.HandleFunc("/auth/oauth/callback", oauthServer.ServeCallback)
// OAuth client metadata endpoint
mux.HandleFunc("/client-metadata.json", func(w http.ResponseWriter, r *http.Request) {
config := oauth.NewClientConfig(baseURL)
metadata := config.ClientMetadata()
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
if err := json.NewEncoder(w).Encode(metadata); err != nil {
http.Error(w, "Failed to encode metadata", http.StatusInternalServerError)
}
})
// Note: Indigo handles OAuth state cleanup internally via its store
// Mount auth endpoints if enabled
if issuer != nil {
// Basic Auth token endpoint (supports device secrets and app passwords)
// Reuse defaultHoldEndpoint extracted earlier
tokenHandler := token.NewHandler(issuer, deviceStore, defaultHoldEndpoint)
tokenHandler.RegisterRoutes(mux)
// Device authorization endpoints (public)
mux.Handle("/auth/device/code", &uihandlers.DeviceCodeHandler{
Store: deviceStore,
AppViewBaseURL: baseURL,
})
mux.Handle("/auth/device/token", &uihandlers.DeviceTokenHandler{
Store: deviceStore,
})
fmt.Printf("Auth endpoints enabled:\n")
fmt.Printf(" - Basic Auth: /auth/token (device secrets + app passwords)\n")
fmt.Printf(" - Device Auth: /auth/device/code\n")
fmt.Printf(" - Device Auth: /auth/device/token\n")
fmt.Printf(" - OAuth: /auth/oauth/authorize\n")
fmt.Printf(" - OAuth: /auth/oauth/callback\n")
fmt.Printf(" - OAuth Meta: /client-metadata.json\n")
}
// Create HTTP server
server := &http.Server{
Addr: config.HTTP.Addr,
Handler: mux,
}
// Handle graceful shutdown
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
// Start server in goroutine
errChan := make(chan error, 1)
go func() {
fmt.Printf("Starting registry server on %s\n", config.HTTP.Addr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
errChan <- err
}
}()
// Wait for shutdown signal or error
select {
case <-stop:
fmt.Println("Shutting down registry server...")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
return fmt.Errorf("server shutdown error: %w", err)
}
case err := <-errChan:
return fmt.Errorf("server error: %w", err)
}
return nil
}
// initializeAuthKeys creates the auth keys if they don't exist
func initializeAuthKeys(config *configuration.Configuration) error {
tokenParams, ok := config.Auth["token"]
if !ok {
return nil
}
privateKeyPath := getStringParam(tokenParams, "privatekey", "/var/lib/atcr/auth/private-key.pem")
issuerName := getStringParam(tokenParams, "issuer", "atcr.io")
service := getStringParam(tokenParams, "service", "atcr.io")
expirationSecs := getIntParam(tokenParams, "expiration", 300)
// Create issuer (this will generate the key if it doesn't exist)
_, err := token.NewIssuer(
privateKeyPath,
issuerName,
service,
time.Duration(expirationSecs)*time.Second,
)
if err != nil {
return fmt.Errorf("failed to initialize token issuer: %w", err)
}
fmt.Printf("Auth keys initialized at %s\n", privateKeyPath)
return nil
}
// createTokenIssuer creates a token issuer for auth handlers
func createTokenIssuer(config *configuration.Configuration) (*token.Issuer, error) {
tokenParams, ok := config.Auth["token"]
if !ok {
return nil, fmt.Errorf("token auth not configured")
}
privateKeyPath := getStringParam(tokenParams, "privatekey", "/var/lib/atcr/auth/private-key.pem")
issuerName := getStringParam(tokenParams, "issuer", "atcr.io")
service := getStringParam(tokenParams, "service", "atcr.io")
expirationSecs := getIntParam(tokenParams, "expiration", 300)
return token.NewIssuer(
privateKeyPath,
issuerName,
service,
time.Duration(expirationSecs)*time.Second,
)
}
// Helper functions to extract values from config parameters
func getStringParam(params configuration.Parameters, key, defaultValue string) string {
if v, ok := params[key]; ok {
if s, ok := v.(string); ok {
return s
}
}
return defaultValue
}
func getIntParam(params configuration.Parameters, key string, defaultValue int) int {
if v, ok := params[key]; ok {
if i, ok := v.(int); ok {
return i
}
}
return defaultValue
}
// extractDefaultHoldEndpoint extracts the default hold endpoint from middleware config
func extractDefaultHoldEndpoint(config *configuration.Configuration) string {
// Navigate through: middleware.registry[].options.default_storage_endpoint
registryMiddleware, ok := config.Middleware["registry"]
if !ok {
return ""
}
// Find atproto-resolver middleware
for _, mw := range registryMiddleware {
// Check if this is the atproto-resolver
if mw.Name != "atproto-resolver" {
continue
}
// Extract options - options is configuration.Parameters which is map[string]any
if mw.Options != nil {
if endpoint, ok := mw.Options["default_storage_endpoint"].(string); ok {
return endpoint
}
}
}
return ""
}
// initializeDatabase initializes the SQLite database and session store
// Returns: (read-write DB, read-only DB, session store)
func initializeDatabase() (*sql.DB, *sql.DB, *db.SessionStore) {
// Check if UI is enabled (optional configuration)
uiEnabled := os.Getenv("ATCR_UI_ENABLED")
if uiEnabled == "false" {
return nil, nil, nil
}
// Get database path
dbPath := os.Getenv("ATCR_UI_DATABASE_PATH")
if dbPath == "" {
dbPath = "/var/lib/atcr/ui.db"
}
// Ensure directory exists
dbDir := filepath.Dir(dbPath)
if err := os.MkdirAll(dbDir, 0700); err != nil {
fmt.Printf("Warning: Failed to create UI database directory: %v\n", err)
return nil, nil, nil
}
// Initialize read-write database (for writes and auth operations)
database, err := db.InitDB(dbPath)
if err != nil {
fmt.Printf("Warning: Failed to initialize UI database: %v\n", err)
return nil, nil, nil
}
// Open read-only connection for public queries (search, user pages, etc.)
// Uses custom driver with SQLite authorizer that blocks sensitive tables
// This prevents accidental writes and blocks access to sensitive tables even if SQL injection occurs
readOnlyDB, err := sql.Open("sqlite3_readonly_public", "file:"+dbPath+"?mode=ro")
if err != nil {
fmt.Printf("Warning: Failed to open read-only database connection: %v\n", err)
return nil, nil, nil
}
fmt.Printf("UI database (readonly) initialized at %s\n", dbPath)
// Create SQLite-backed session store
sessionStore := db.NewSessionStore(database)
// Start cleanup goroutines for all SQLite stores
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
ctx := context.Background()
// Cleanup UI sessions
sessionStore.Cleanup()
// Cleanup OAuth sessions (older than 30 days)
oauthStore := db.NewOAuthStore(database)
oauthStore.CleanupOldSessions(ctx, 30*24*time.Hour)
oauthStore.CleanupExpiredAuthRequests(ctx)
// Cleanup device pending auths
deviceStore := db.NewDeviceStore(database)
deviceStore.CleanupExpired()
}
}()
return database, readOnlyDB, sessionStore
}
// initializeUIRoutes initializes the web UI routes
// database: read-write connection for auth and writes
// readOnlyDB: read-only connection for public queries (search, user pages, etc.)
func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.SessionStore, oauthApp *oauth.App, refresher *oauth.Refresher, baseURL string, deviceStore *db.DeviceStore) (*template.Template, *mux.Router) {
// Check if UI is enabled
uiEnabled := os.Getenv("ATCR_UI_ENABLED")
if uiEnabled == "false" {
return nil, nil
}
// Load templates
templates, err := appview.Templates()
if err != nil {
fmt.Printf("Warning: Failed to load UI templates: %v\n", err)
return nil, nil
}
// Create router
router := mux.NewRouter()
// OAuth login routes (public)
router.Handle("/auth/oauth/login", &uihandlers.LoginHandler{
Templates: templates,
}).Methods("GET")
router.Handle("/auth/oauth/login", &uihandlers.LoginSubmitHandler{}).Methods("POST")
// Public routes (with optional auth for navbar)
// SECURITY: Public pages use read-only DB
router.Handle("/", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.HomeHandler{
DB: readOnlyDB,
Templates: templates,
RegistryURL: uihandlers.TrimRegistryURL(baseURL),
},
)).Methods("GET")
router.Handle("/api/recent-pushes", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.RecentPushesHandler{
DB: readOnlyDB,
Templates: templates,
RegistryURL: uihandlers.TrimRegistryURL(baseURL),
},
)).Methods("GET")
// SECURITY: Search uses read-only DB to prevent writes and limit access to sensitive tables
router.Handle("/search", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.SearchHandler{
DB: readOnlyDB,
Templates: templates,
RegistryURL: uihandlers.TrimRegistryURL(baseURL),
},
)).Methods("GET")
router.Handle("/api/search-results", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.SearchResultsHandler{
DB: readOnlyDB,
Templates: templates,
RegistryURL: uihandlers.TrimRegistryURL(baseURL),
},
)).Methods("GET")
// Install page (public)
router.Handle("/install", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.InstallHandler{
Templates: templates,
RegistryURL: uihandlers.TrimRegistryURL(baseURL),
},
)).Methods("GET")
// API route for repository stats (public, read-only)
router.Handle("/api/stats/{handle}/{repository}", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.GetStatsHandler{
DB: readOnlyDB,
Directory: oauthApp.Directory(),
},
)).Methods("GET")
// API routes for stars (require authentication)
router.Handle("/api/stars/{handle}/{repository}", middleware.RequireAuth(sessionStore, database)(
&uihandlers.StarRepositoryHandler{
DB: database, // Needs write access
Directory: oauthApp.Directory(),
Refresher: refresher,
},
)).Methods("POST")
router.Handle("/api/stars/{handle}/{repository}", middleware.RequireAuth(sessionStore, database)(
&uihandlers.UnstarRepositoryHandler{
DB: database, // Needs write access
Directory: oauthApp.Directory(),
Refresher: refresher,
},
)).Methods("DELETE")
router.Handle("/api/stars/{handle}/{repository}", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.CheckStarHandler{
DB: readOnlyDB, // Read-only check
Directory: oauthApp.Directory(),
Refresher: refresher,
},
)).Methods("GET")
router.Handle("/u/{handle}", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.UserPageHandler{
DB: readOnlyDB,
Templates: templates,
RegistryURL: uihandlers.TrimRegistryURL(baseURL),
},
)).Methods("GET")
router.Handle("/r/{handle}/{repository}", middleware.OptionalAuth(sessionStore, database)(
&uihandlers.RepositoryPageHandler{
DB: readOnlyDB,
Templates: templates,
RegistryURL: uihandlers.TrimRegistryURL(baseURL),
Directory: oauthApp.Directory(),
Refresher: refresher,
},
)).Methods("GET")
// Authenticated routes
authRouter := router.NewRoute().Subrouter()
authRouter.Use(middleware.RequireAuth(sessionStore, database))
authRouter.Handle("/settings", &uihandlers.SettingsHandler{
Templates: templates,
Refresher: refresher,
RegistryURL: uihandlers.TrimRegistryURL(baseURL),
}).Methods("GET")
authRouter.Handle("/api/profile/default-hold", &uihandlers.UpdateDefaultHoldHandler{
Refresher: refresher,
}).Methods("POST")
authRouter.Handle("/api/images/{repository}/tags/{tag}", &uihandlers.DeleteTagHandler{
DB: database,
}).Methods("DELETE")
authRouter.Handle("/api/images/{repository}/manifests/{digest}", &uihandlers.DeleteManifestHandler{
DB: database,
}).Methods("DELETE")
// Device approval page (authenticated)
authRouter.Handle("/device", &uihandlers.DeviceApprovalPageHandler{
Store: deviceStore,
SessionStore: sessionStore,
}).Methods("GET")
authRouter.Handle("/device/approve", &uihandlers.DeviceApproveHandler{
Store: deviceStore,
SessionStore: sessionStore,
}).Methods("POST")
// Device management routes
authRouter.Handle("/api/devices", &uihandlers.ListDevicesHandler{
Store: deviceStore,
SessionStore: sessionStore,
}).Methods("GET")
authRouter.Handle("/api/devices/{id}", &uihandlers.RevokeDeviceHandler{
Store: deviceStore,
SessionStore: sessionStore,
}).Methods("DELETE")
// Logout endpoint (supports both GET and POST)
router.HandleFunc("/auth/logout", func(w http.ResponseWriter, r *http.Request) {
if sessionID, ok := db.GetSessionID(r); ok {
sessionStore.Delete(sessionID)
}
db.ClearCookie(w)
http.Redirect(w, r, "/", http.StatusFound)
}).Methods("GET", "POST")
// Start Jetstream worker
jetstreamURL := os.Getenv("JETSTREAM_URL")
if jetstreamURL == "" {
jetstreamURL = "wss://jetstream2.us-west.bsky.network/subscribe"
}
// Start real-time Jetstream worker with cursor tracking for reconnects
go func() {
var lastCursor int64 = 0 // Start from now on first connect
for {
worker := jetstream.NewWorker(database, jetstreamURL, lastCursor)
if err := worker.Start(context.Background()); err != nil {
// Save cursor from this connection for next reconnect
lastCursor = worker.GetLastCursor()
fmt.Printf("Jetstream: Real-time worker error: %v, reconnecting in 10s...\n", err)
time.Sleep(10 * time.Second)
}
}
}()
fmt.Println("Jetstream: Real-time worker started")
// Start backfill worker (enabled by default, set ATCR_BACKFILL_ENABLED=false to disable)
if backfillEnabled := os.Getenv("ATCR_BACKFILL_ENABLED"); backfillEnabled != "false" {
// Get relay endpoint for sync API (defaults to Bluesky's relay)
relayEndpoint := os.Getenv("ATCR_RELAY_ENDPOINT")
if relayEndpoint == "" {
relayEndpoint = "https://relay1.us-east.bsky.network"
}
backfillWorker, err := jetstream.NewBackfillWorker(database, relayEndpoint)
if err != nil {
fmt.Printf("Warning: Failed to create backfill worker: %v\n", err)
} else {
// Run initial backfill
go func() {
fmt.Printf("Backfill: Starting sync-based backfill from %s...\n", relayEndpoint)
if err := backfillWorker.Start(context.Background()); err != nil {
fmt.Printf("Backfill: Finished with error: %v\n", err)
} else {
fmt.Println("Backfill: Completed successfully!")
}
}()
// Start periodic backfill scheduler
backfillInterval := os.Getenv("ATCR_BACKFILL_INTERVAL")
if backfillInterval == "" {
backfillInterval = "1h" // Default to 1 hour
}
interval, err := time.ParseDuration(backfillInterval)
if err != nil {
fmt.Printf("Warning: Invalid ATCR_BACKFILL_INTERVAL '%s', using default 1h: %v\n", backfillInterval, err)
interval = time.Hour
}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
fmt.Printf("Backfill: Starting periodic backfill (runs every %s)...\n", interval)
if err := backfillWorker.Start(context.Background()); err != nil {
fmt.Printf("Backfill: Periodic backfill finished with error: %v\n", err)
} else {
fmt.Println("Backfill: Periodic backfill completed successfully!")
}
}
}()
fmt.Printf("Backfill: Periodic scheduler started (interval: %s)\n", interval)
}
}
return templates, router
}