Files

893 lines
31 KiB
Go

package appview
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"html/template"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/distribution/distribution/v3/registry/handlers"
"github.com/go-chi/chi/v5"
chimiddleware "github.com/go-chi/chi/v5/middleware"
"atcr.io/pkg/appview/db"
"atcr.io/pkg/appview/holdhealth"
"atcr.io/pkg/appview/jetstream"
"atcr.io/pkg/appview/middleware"
"atcr.io/pkg/appview/readme"
"atcr.io/pkg/appview/routes"
"atcr.io/pkg/appview/storage"
"atcr.io/pkg/appview/webhooks"
"atcr.io/pkg/atproto"
"atcr.io/pkg/auth"
"atcr.io/pkg/auth/oauth"
"atcr.io/pkg/auth/token"
"atcr.io/pkg/billing"
"atcr.io/pkg/logging"
"github.com/bluesky-social/indigo/atproto/atcrypto"
indigooauth "github.com/bluesky-social/indigo/atproto/auth/oauth"
)
// OAuthPostAuthHook is called after the default OAuth post-auth logic
// (profile creation, avatar fetch, crew registration). Hooks added after
// NewAppViewServer but before the first request work correctly.
type OAuthPostAuthHook func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error
// TokenPostAuthHook is called after the default token post-auth logic
// (profile creation). Hooks added after NewAppViewServer but before the
// first request work correctly.
type TokenPostAuthHook func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error
// AppViewServer is the AppView service with an exposed router for extensibility.
// Consumers can add routes to Router and hooks before calling Serve().
type AppViewServer struct {
// Router is the chi router. Add routes before calling Serve().
Router chi.Router
// Config is the AppView configuration.
Config *Config
// Database is the read-write SQLite database.
Database *sql.DB
// ReadOnlyDB is the read-only SQLite database connection.
ReadOnlyDB *sql.DB
// SessionStore manages web UI sessions.
SessionStore *db.SessionStore
// DeviceStore manages device authorization flows.
DeviceStore *db.DeviceStore
// OAuthStore manages OAuth session persistence.
OAuthStore *db.OAuthStore
// OAuthServer handles OAuth authorization and callback endpoints.
OAuthServer *oauth.Server
// OAuthClientApp is the indigo OAuth client application.
OAuthClientApp *indigooauth.ClientApp
// Refresher manages OAuth session refresh and caching.
Refresher *oauth.Refresher
// Templates are the parsed HTML templates.
Templates *template.Template
// HealthChecker checks hold service health.
HealthChecker *holdhealth.Checker
// ReadmeFetcher fetches README content for repository pages.
ReadmeFetcher *readme.Fetcher
// TokenIssuer issues registry JWTs (nil if auth is not configured).
TokenIssuer *token.Issuer
// HoldAuthorizer checks hold access permissions.
HoldAuthorizer auth.HoldAuthorizer
// OAuthKey is the P-256 private key used for OAuth client auth and appview service identity.
OAuthKey *atcrypto.PrivateKeyP256
// BillingManager handles Stripe billing and tier updates (nil if billing disabled).
BillingManager *billing.Manager
// WebhookDispatcher dispatches scan webhooks (stored in appview DB).
WebhookDispatcher *webhooks.Dispatcher
// Private fields for lifecycle management
oauthHooks []OAuthPostAuthHook
tokenHooks []TokenPostAuthHook
httpServer *http.Server
healthWorker *holdhealth.Worker
workerCancel context.CancelFunc
branding *BrandingOverrides
}
// AddOAuthPostAuthHook registers a hook that runs after the default OAuth
// post-auth logic. Multiple hooks run in registration order.
func (s *AppViewServer) AddOAuthPostAuthHook(hook OAuthPostAuthHook) {
s.oauthHooks = append(s.oauthHooks, hook)
}
// AddTokenPostAuthHook registers a hook that runs after the default token
// post-auth logic. Multiple hooks run in registration order.
func (s *AppViewServer) AddTokenPostAuthHook(hook TokenPostAuthHook) {
s.tokenHooks = append(s.tokenHooks, hook)
}
// NewAppViewServer creates a fully-initialized AppView server ready for the
// consumer to add routes and hooks before calling Serve(). Pass nil for
// branding to use default atcr.io assets and templates.
func NewAppViewServer(cfg *Config, branding *BrandingOverrides) (*AppViewServer, error) {
// Initialize structured logging with optional remote shipping
logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{
Backend: cfg.LogShipper.Backend,
URL: cfg.LogShipper.URL,
BatchSize: cfg.LogShipper.BatchSize,
FlushInterval: cfg.LogShipper.FlushInterval,
Service: "appview",
Username: cfg.LogShipper.Username,
Password: cfg.LogShipper.Password,
})
slog.Info("Configuration loaded successfully from environment")
s := &AppViewServer{
Config: cfg,
branding: branding,
}
// Initialize UI database (required for all stores)
slog.Info("Initializing UI database", "path", cfg.UI.DatabasePath)
libsqlCfg := db.LibsqlConfig{
SyncURL: cfg.UI.LibsqlSyncURL,
AuthToken: cfg.UI.LibsqlAuthToken,
SyncInterval: cfg.UI.LibsqlSyncInterval,
}
s.Database, s.ReadOnlyDB, s.SessionStore = db.InitializeDatabase(cfg.UI.DatabasePath, libsqlCfg)
if s.Database == nil {
return nil, fmt.Errorf("failed to initialize UI database - required for session storage")
}
// Initialize hold health checker
slog.Info("Initializing hold health checker", "cache_ttl", cfg.Health.CacheTTL)
s.HealthChecker = holdhealth.NewChecker(cfg.Health.CacheTTL)
// Initialize README fetcher for rendering repo page descriptions
s.ReadmeFetcher = readme.NewFetcher()
// Start background health check worker
startupDelay := 5 * time.Second
dbAdapter := holdhealth.NewDBAdapter(s.Database)
s.healthWorker = holdhealth.NewWorkerWithStartupDelay(s.HealthChecker, dbAdapter, cfg.Health.CheckInterval, startupDelay)
workerCtx, workerCancel := context.WithCancel(context.Background())
s.workerCancel = workerCancel
s.healthWorker.Start(workerCtx)
slog.Info("Hold health worker started", "startup_delay", startupDelay, "refresh_interval", cfg.Health.CheckInterval, "cache_ttl", cfg.Health.CacheTTL)
// Initialize OAuth components
slog.Info("Initializing OAuth components")
s.OAuthStore = db.NewOAuthStore(s.Database)
slog.Info("Using SQLite for OAuth session storage")
s.DeviceStore = db.NewDeviceStore(s.Database)
slog.Info("Using SQLite for device storage")
baseURL := cfg.Server.BaseURL
defaultHoldDID := cfg.Server.DefaultHoldDID
testMode := cfg.Server.TestMode
slog.Debug("Base URL for OAuth", "base_url", baseURL)
if testMode {
slog.Info("TEST_MODE enabled - will use HTTP for local DID resolution")
atproto.SetTestMode(true)
}
// Load crypto keys from database (with file fallback and migration)
oauthKey, err := loadOAuthKey(s.Database, cfg.Server.OAuthKeyPath)
if err != nil {
return nil, fmt.Errorf("failed to load OAuth key: %w", err)
}
s.OAuthKey = oauthKey
// Create OAuth client app
desiredScopes := oauth.GetDefaultScopes(defaultHoldDID)
s.OAuthClientApp, err = oauth.NewClientAppWithKey(baseURL, s.OAuthStore, desiredScopes, oauthKey, cfg.Server.ClientName)
if err != nil {
return nil, fmt.Errorf("failed to create OAuth client app: %w", err)
}
// Invalidate sessions with mismatched scopes on startup
invalidatedCount, err := s.OAuthStore.InvalidateSessionsWithMismatchedScopes(context.Background(), desiredScopes)
if err != nil {
slog.Warn("Failed to invalidate sessions with mismatched scopes", "error", err)
} else if invalidatedCount > 0 {
slog.Info("Invalidated OAuth sessions due to scope changes", "count", invalidatedCount)
}
// Create oauth token refresher
s.Refresher = oauth.NewRefresher(s.OAuthClientApp)
// Wire up UI session store to refresher
if s.SessionStore != nil {
s.Refresher.SetUISessionStore(s.SessionStore)
}
// Set global refresher for middleware
middleware.SetGlobalRefresher(s.Refresher)
// Set global database for hold DID lookups and manifest reference checks
holdDIDDB := db.NewHoldDIDDB(s.Database)
middleware.SetGlobalDatabase(holdDIDDB)
middleware.SetGlobalManifestRefChecker(holdDIDDB)
// Create RemoteHoldAuthorizer for hold authorization with caching
s.HoldAuthorizer = auth.NewRemoteHoldAuthorizer(s.Database, testMode)
middleware.SetGlobalAuthorizer(s.HoldAuthorizer)
slog.Info("Hold authorizer initialized with database caching")
// Clear all denial caches on startup for a clean slate
if remote, ok := s.HoldAuthorizer.(*auth.RemoteHoldAuthorizer); ok {
go func() {
if err := remote.ClearAllDenials(); err != nil {
slog.Warn("Failed to clear denial caches on startup", "error", err)
}
}()
}
// Initialize billing manager
appviewDID := DIDFromBaseURL(baseURL)
s.BillingManager = billing.New(
&cfg.Billing,
oauthKey,
appviewDID,
cfg.Server.ManagedHolds,
baseURL,
)
// Allow hold captains to bypass billing feature gates
if len(cfg.Server.ManagedHolds) > 0 {
managedHolds := cfg.Server.ManagedHolds
roDB := s.ReadOnlyDB
s.BillingManager.SetCaptainChecker(func(userDID string) bool {
isCaptain, _ := db.IsHoldCaptain(roDB, userDID, managedHolds)
return isCaptain
})
}
if s.BillingManager.Enabled() {
slog.Info("Billing enabled", "appview_did", appviewDID, "managed_holds", len(cfg.Server.ManagedHolds))
go s.BillingManager.RefreshHoldTiers()
}
// Create webhook dispatcher
appviewMeta := atproto.AppviewMetadata{
ClientName: cfg.Server.ClientName,
ClientShortName: cfg.Server.ClientShortName,
BaseURL: cfg.Server.BaseURL,
FaviconURL: cfg.Server.BaseURL + "/favicon-96x96.png",
RegistryDomains: cfg.Server.RegistryDomains,
}
s.WebhookDispatcher = webhooks.NewDispatcher(s.Database, appviewMeta)
middleware.SetGlobalWebhookDispatcher(s.WebhookDispatcher)
// Initialize Jetstream workers
s.initializeJetstream()
// Create main chi router
mainRouter := chi.NewRouter()
mainRouter.Use(chimiddleware.RealIP)
mainRouter.Use(chimiddleware.Logger)
mainRouter.Use(chimiddleware.Recoverer)
mainRouter.Use(chimiddleware.GetHead)
mainRouter.Use(routes.CORSMiddleware())
// Domain routing middleware
if len(cfg.Server.RegistryDomains) > 0 {
mainRouter.Use(DomainRoutingMiddleware(cfg.Server.RegistryDomains, cfg.Server.BaseURL))
slog.Info("Domain routing middleware enabled",
"registry_domains", cfg.Server.RegistryDomains,
"ui_base_url", cfg.Server.BaseURL)
}
// Load templates
ComputeAssetHashes(branding)
s.Templates, err = Templates(branding)
if err != nil {
return nil, fmt.Errorf("failed to load UI templates: %w", err)
}
// Register UI routes
routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{
Database: s.Database,
ReadOnlyDB: s.ReadOnlyDB,
SessionStore: s.SessionStore,
OAuthClientApp: s.OAuthClientApp,
OAuthStore: s.OAuthStore,
Refresher: s.Refresher,
BaseURL: baseURL,
RegistryDomain: primaryRegistryDomain(cfg.Server.RegistryDomains),
DeviceStore: s.DeviceStore,
HealthChecker: s.HealthChecker,
ReadmeFetcher: s.ReadmeFetcher,
Templates: s.Templates,
DefaultHoldDID: defaultHoldDID,
ClientName: cfg.Server.ClientName,
ClientShortName: cfg.Server.ClientShortName,
BillingManager: s.BillingManager,
WebhookDispatcher: s.WebhookDispatcher,
LegalConfig: routes.LegalConfig{
CompanyName: cfg.Legal.CompanyName,
Jurisdiction: cfg.Legal.Jurisdiction,
},
})
// Register Stripe webhook route (if billing enabled)
s.BillingManager.RegisterRoutes(mainRouter)
// Create OAuth server
s.OAuthServer = oauth.NewServer(s.OAuthClientApp)
s.OAuthServer.SetRefresher(s.Refresher)
if s.SessionStore != nil {
s.OAuthServer.SetUISessionStore(s.SessionStore)
}
// Register OAuth post-auth callback (closure captures s for hook dispatch)
s.OAuthServer.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error {
slog.Debug("OAuth post-auth callback", "component", "appview/callback", "did", did)
// Create ATProto client with session provider
client := atproto.NewClientWithSessionProvider(pdsEndpoint, did, s.Refresher)
// Ensure sailor profile exists
slog.Debug("Ensuring profile exists", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID)
if err := storage.EnsureProfile(ctx, client, defaultHoldDID); err != nil {
slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err)
} else {
slog.Debug("Profile ensured", "component", "appview/callback", "did", did)
}
// Fetch user's profile record from PDS
profileRecord, err := client.GetProfileRecord(ctx, did)
if err != nil {
slog.Warn("Failed to fetch profile record", "component", "appview/callback", "did", did, "error", err)
profileRecord = nil
}
// Construct avatar URL from blob CID
avatarURL := ""
if profileRecord != nil && profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
avatarURL = atproto.BlobCDNURL(did, profileRecord.Avatar.Ref.Link)
slog.Debug("Constructed avatar URL", "component", "appview/callback", "avatar_url", avatarURL)
}
// Store user in database
if avatarURL != "" {
err = db.UpsertUser(s.Database, &db.User{
DID: did,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatarURL,
LastSeen: time.Now(),
})
} else {
err = db.UpsertUserIgnoreAvatar(s.Database, &db.User{
DID: did,
Handle: handle,
PDSEndpoint: pdsEndpoint,
Avatar: avatarURL,
LastSeen: time.Now(),
})
}
if err != nil {
slog.Warn("Failed to store user in database", "component", "appview/callback", "error", err)
return nil
}
slog.Debug("Stored user", "component", "appview/callback", "did", did, "has_avatar", avatarURL != "")
// Migrate profile URL→DID if needed
profile, err := storage.GetProfile(ctx, client)
if err != nil {
slog.Warn("Failed to get profile", "component", "appview/callback", "did", did, "error", err)
return nil
}
var holdDID string
if profile != nil && profile.DefaultHold != "" {
if strings.HasPrefix(profile.DefaultHold, "http://") || strings.HasPrefix(profile.DefaultHold, "https://") {
slog.Debug("Migrating hold URL to DID", "component", "appview/callback", "did", did, "hold_url", profile.DefaultHold)
if resolvedDID, resolveErr := atproto.ResolveHoldDID(ctx, profile.DefaultHold); resolveErr != nil {
slog.Warn("Failed to resolve hold DID from URL", "component", "appview/callback", "did", did, "hold_url", profile.DefaultHold, "error", resolveErr)
} else {
holdDID = resolvedDID
profile.DefaultHold = holdDID
if err := storage.UpdateProfile(ctx, client, profile); err != nil {
slog.Warn("Failed to update profile with hold DID", "component", "appview/callback", "did", did, "error", err)
} else {
slog.Debug("Updated profile with hold DID", "component", "appview/callback", "hold_did", holdDID)
}
}
} else {
holdDID = profile.DefaultHold
}
// Register crew in background
slog.Debug("Attempting crew registration", "component", "appview/callback", "did", did, "hold_did", holdDID)
go func(client *atproto.Client, refresher *oauth.Refresher, holdDID string, authorizer auth.HoldAuthorizer) {
ctx := context.Background()
storage.EnsureCrewMembership(ctx, client, refresher, holdDID, authorizer)
}(client, s.Refresher, holdDID, s.HoldAuthorizer)
}
// Migrate old-format star records to AT URI format in background
go func(client *atproto.Client, did string) {
ctx := context.Background()
migrated, err := atproto.MigrateStarRecords(ctx, client)
if err != nil {
slog.Warn("Star record migration failed", "component", "appview/callback", "did", did, "error", err, "migrated", migrated)
} else if migrated > 0 {
slog.Info("Migrated star records to AT URI format", "component", "appview/callback", "did", did, "count", migrated)
}
}(client, did)
// Drain manifests from old hold to successor in background
go func(client *atproto.Client, did string) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
storage.MigrateManifestsForSuccessor(ctx, client, s.HoldAuthorizer, db.NewHoldDIDDB(s.Database), did)
}(client, did)
// Run consumer hooks
for _, hook := range s.oauthHooks {
if err := hook(ctx, did, handle, pdsEndpoint, sessionID); err != nil {
slog.Warn("OAuth post-auth hook error", "component", "appview/callback", "error", err)
}
}
return nil
})
// Create token issuer
if cfg.Distribution.Auth["token"] != nil {
rsaKey, certDER, err := loadJWTKeyAndCert(s.Database, cfg.Auth.KeyPath, cfg.Auth.CertPath)
if err != nil {
return nil, fmt.Errorf("failed to load JWT key material: %w", err)
}
s.TokenIssuer = token.NewIssuerFromKey(rsaKey, certDER, cfg.Auth.ServiceName, cfg.Auth.ServiceName, cfg.Auth.TokenExpiration)
slog.Info("Auth keys initialized")
}
// Create registry app (distribution library handler)
ctx := context.Background()
app := handlers.NewApp(ctx, cfg.Distribution)
// Wrap with auth method extraction middleware
wrappedApp := middleware.ExtractAuthMethod(app)
// Mount registry at /v2/
mainRouter.Handle("/v2/*", wrappedApp)
// Mount static files
if s.SessionStore != nil && s.Templates != nil {
publicHandler := CacheMiddleware(PublicHandler(branding), 31536000)
rootFiles, err := PublicRootFiles(branding)
if err != nil {
slog.Warn("Failed to scan static root files", "error", err)
} else {
for _, filename := range rootFiles {
file := filename
mainRouter.Get("/"+file, func(w http.ResponseWriter, r *http.Request) {
r.URL.Path = "/" + file
publicHandler.ServeHTTP(w, r)
})
}
slog.Info("Registered dynamic root file routes", "count", len(rootFiles), "files", rootFiles)
}
mainRouter.Handle("/css/*", CacheMiddleware(http.StripPrefix("/css/", PublicSubdir("css", branding)), 31536000))
mainRouter.Handle("/js/*", CacheMiddleware(http.StripPrefix("/js/", PublicSubdir("js", branding)), 31536000))
mainRouter.Handle("/static/*", CacheMiddleware(http.StripPrefix("/static/", PublicSubdir("static", branding)), 31536000))
slog.Info("UI enabled", "home", "/", "settings", "/settings")
}
// Mount OAuth endpoints
mainRouter.Get("/auth/oauth/authorize", s.OAuthServer.ServeAuthorize)
mainRouter.Get("/auth/oauth/callback", s.OAuthServer.ServeCallback)
// OAuth client metadata endpoint
mainRouter.Get("/oauth-client-metadata.json", func(w http.ResponseWriter, r *http.Request) {
config := s.OAuthClientApp.Config
logoURI := cfg.Server.BaseURL + "/web-app-manifest-192x192.png"
policyURI := cfg.Server.BaseURL + "/privacy"
tosURI := cfg.Server.BaseURL + "/terms"
metadata := config.ClientMetadata()
metadata.ClientName = &cfg.Server.ClientName
metadata.ClientURI = &cfg.Server.BaseURL
metadata.LogoURI = &logoURI
metadata.PolicyURI = &policyURI
metadata.TosURI = &tosURI
if config.IsConfidential() && metadata.JWKS == nil {
jwks := config.PublicJWKS()
metadata.JWKS = &jwks
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Cache-Control", "public, max-age=300")
if err := json.NewEncoder(w).Encode(metadata); err != nil {
http.Error(w, "Failed to encode metadata", http.StatusInternalServerError)
}
})
// Mount auth endpoints
if s.TokenIssuer != nil {
tokenHandler := token.NewHandler(s.TokenIssuer, s.DeviceStore)
tokenHandler.SetOAuthSessionValidator(s.Refresher)
// Token post-auth callback (closure captures s for hook dispatch)
tokenHandler.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error {
slog.Debug("Token post-auth callback", "component", "appview/callback", "did", did)
atprotoClient := atproto.NewClient(pdsEndpoint, did, accessToken)
if err := storage.EnsureProfile(ctx, atprotoClient, defaultHoldDID); err != nil {
slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err)
} else {
slog.Debug("Profile ensured with default hold", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID)
}
// Run consumer hooks
for _, hook := range s.tokenHooks {
if err := hook(ctx, did, handle, pdsEndpoint, accessToken); err != nil {
slog.Warn("Token post-auth hook error", "component", "appview/callback", "error", err)
}
}
return nil
})
mainRouter.Get("/auth/token", tokenHandler.ServeHTTP)
// Device authorization endpoints (public)
routes.RegisterDeviceEndpoints(mainRouter, s.DeviceStore, baseURL)
slog.Info("Auth endpoints enabled",
"basic_auth", "/auth/token",
"device_code", "/auth/device/code",
"device_token", "/auth/device/token",
"oauth_authorize", "/auth/oauth/authorize",
"oauth_callback", "/auth/oauth/callback",
"oauth_metadata", "/client-metadata.json")
}
// Health check endpoint (for Docker health checks / load balancers)
mainRouter.Get("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(map[string]string{"status": "ok"}); err != nil {
http.Error(w, "encode error", http.StatusInternalServerError)
return
}
})
// Appview metadata endpoint (public, used by holds for branding)
mainRouter.Get(atproto.AppviewGetMetadata, func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "public, max-age=3600")
if err := json.NewEncoder(w).Encode(atproto.AppviewMetadata{
ClientName: cfg.Server.ClientName,
ClientShortName: cfg.Server.ClientShortName,
BaseURL: cfg.Server.BaseURL,
FaviconURL: cfg.Server.BaseURL + "/favicon-96x96.png",
RegistryDomains: cfg.Server.RegistryDomains,
}); err != nil {
http.Error(w, "encode error", http.StatusInternalServerError)
}
})
// Appview DID document endpoint (service identity for key discovery)
mainRouter.Get("/.well-known/did.json", s.handleDIDDocument)
// Register credential helper version API (public endpoint)
routes.RegisterCredentialHelperEndpoint(mainRouter, cfg.CredentialHelper.TangledRepo)
s.Router = mainRouter
return s, nil
}
// Serve starts the HTTP server and blocks until shutdown signal.
func (s *AppViewServer) Serve() error {
listener, err := net.Listen("tcp", s.Config.Server.Addr)
if err != nil {
return fmt.Errorf("failed to create listener: %w", err)
}
s.httpServer = &http.Server{
Handler: s.Router,
}
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
errChan := make(chan error, 1)
go func() {
slog.Info("Starting registry server", "addr", s.Config.Server.Addr)
if err := s.httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
errChan <- err
}
}()
select {
case <-stop:
slog.Info("Shutting down registry server")
if s.Config.Server.TestMode {
listener.Close()
}
slog.Info("Stopping hold health worker")
s.healthWorker.Stop()
if s.workerCancel != nil {
s.workerCancel()
}
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := s.httpServer.Shutdown(shutdownCtx); err != nil && err != http.ErrServerClosed {
logging.Shutdown()
return fmt.Errorf("server shutdown error: %w", err)
}
case err := <-errChan:
s.healthWorker.Stop()
if s.workerCancel != nil {
s.workerCancel()
}
logging.Shutdown()
return fmt.Errorf("server error: %w", err)
}
logging.Shutdown()
return nil
}
// DomainRoutingMiddleware enforces three-tier domain routing:
//
// 1. UI domain (BaseURL hostname): serves web UI, auth, and static assets.
// Blocks /v2/* with an OCI UNSUPPORTED error — registry API lives on
// the dedicated registry domain(s).
// 2. Registry domains: allows /v2/* for Docker clients. Redirects everything
// else to the UI domain with 307 Temporary Redirect.
// 3. Unknown domains (CDN origins, IPs, etc.): redirects all requests to the
// UI domain with 307, except /health for load balancer probes.
func DomainRoutingMiddleware(registryDomains []string, uiBaseURL string) func(http.Handler) http.Handler {
regDomains := make(map[string]bool, len(registryDomains))
for _, d := range registryDomains {
regDomains[d] = true
}
// Extract UI hostname from BaseURL (e.g., "https://seamark.dev" -> "seamark.dev")
var uiHost string
if parsed, err := url.Parse(uiBaseURL); err == nil {
uiHost = parsed.Hostname()
}
primaryReg := primaryRegistryDomain(registryDomains)
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
host := r.Host
if idx := strings.LastIndex(host, ":"); idx != -1 {
host = host[:idx]
}
path := r.URL.Path
isV2 := path == "/v2" || path == "/v2/" || strings.HasPrefix(path, "/v2/")
switch {
case host == uiHost:
// UI domain: block /v2/*, serve everything else
if isV2 {
if err := errcode.ServeJSON(w, errcode.ErrorCodeUnsupported.WithMessage(
fmt.Sprintf("registry API is not available on this domain, use %s", primaryReg),
)); err != nil {
slog.Error("failed to write OCI error response", "error", err)
}
return
}
next.ServeHTTP(w, r)
case regDomains[host]:
// Registry domain: allow /v2/*, /auth/token, /auth/device/*, redirect everything else
// Auth endpoints must be served directly to avoid 307 redirects that strip
// the Authorization header on cross-host redirects (Go http.Client behavior).
isAuth := path == "/auth/token" || strings.HasPrefix(path, "/auth/device/")
if isV2 || isAuth {
next.ServeHTTP(w, r)
return
}
http.Redirect(w, r, uiBaseURL+r.URL.RequestURI(), http.StatusTemporaryRedirect)
default:
// Unknown domain: allow /health, redirect everything else
if path == "/health" {
next.ServeHTTP(w, r)
return
}
http.Redirect(w, r, uiBaseURL+r.URL.RequestURI(), http.StatusTemporaryRedirect)
}
})
}
}
// primaryRegistryDomain returns the first registry domain, or empty string if none.
func primaryRegistryDomain(domains []string) string {
if len(domains) > 0 {
return domains[0]
}
return ""
}
// DID returns the appview's did:web identity derived from its BaseURL.
func (s *AppViewServer) DID() string {
return DIDFromBaseURL(s.Config.Server.BaseURL)
}
// DIDFromBaseURL derives a did:web identifier from a base URL.
// Per the did:web spec, non-standard ports are percent-encoded.
// Examples:
//
// "https://atcr.io" → "did:web:atcr.io"
// "http://localhost:5000" → "did:web:localhost%3A5000"
func DIDFromBaseURL(baseURL string) string {
u, err := url.Parse(baseURL)
if err != nil {
return "did:web:localhost"
}
hostname := u.Hostname()
if hostname == "" {
hostname = "localhost"
}
port := u.Port()
isStandardPort := (u.Scheme == "https" && port == "443") ||
(u.Scheme == "http" && port == "80") ||
port == ""
if isStandardPort {
return "did:web:" + hostname
}
return fmt.Sprintf("did:web:%s%%3A%s", hostname, port)
}
// handleDIDDocument serves the appview's DID document at /.well-known/did.json.
// This is a service identity for key discovery — no PDS, no repo, no firehose.
// Holds use this to discover the appview's P-256 public key for JWT verification.
func (s *AppViewServer) handleDIDDocument(w http.ResponseWriter, r *http.Request) {
did := s.DID()
pubKey, err := s.OAuthKey.PublicKey()
if err != nil {
slog.Error("Failed to get public key for DID document", "error", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
doc := map[string]any{
"@context": []string{
"https://www.w3.org/ns/did/v1",
"https://w3id.org/security/multikey/v1",
},
"id": did,
"verificationMethod": []map[string]any{
{
"id": did + "#appview",
"type": "Multikey",
"controller": did,
"publicKeyMultibase": pubKey.Multibase(),
},
},
"authentication": []string{
did + "#appview",
},
"assertionMethod": []string{
did + "#appview",
},
"service": []map[string]any{
{
"id": "#atcr_appview",
"type": "AtcrAppView",
"serviceEndpoint": s.Config.Server.BaseURL,
},
},
}
w.Header().Set("Content-Type", "application/did+ld+json")
w.Header().Set("Cache-Control", "public, max-age=3600")
w.Header().Set("Access-Control-Allow-Origin", "*")
if err := json.NewEncoder(w).Encode(doc); err != nil {
slog.Error("Failed to encode DID document", "error", err)
}
}
// initializeJetstream initializes the Jetstream workers for real-time events and backfill.
func (s *AppViewServer) initializeJetstream() {
jetstreamURLs := s.Config.Jetstream.URLs
go func() {
worker := jetstream.NewWorker(s.Database, jetstreamURLs, 0)
// Set webhook dispatcher on live worker (backfill skips dispatch)
if s.WebhookDispatcher != nil {
worker.Processor().SetWebhookDispatcher(s.WebhookDispatcher)
}
worker.StartWithFailover(context.Background())
}()
slog.Info("Jetstream real-time worker started", "component", "jetstream", "endpoints", len(jetstreamURLs))
if s.Config.Jetstream.BackfillEnabled {
relayEndpoints := s.Config.Jetstream.RelayEndpoints
defaultHoldDID := s.Config.Server.DefaultHoldDID
testMode := s.Config.Server.TestMode
backfillWorker, err := jetstream.NewBackfillWorker(s.Database, relayEndpoints, defaultHoldDID, testMode, s.Refresher)
if err != nil {
slog.Warn("Failed to create backfill worker", "component", "jetstream/backfill", "error", err)
} else {
go func() {
startupDelay := 5 * time.Second
slog.Info("Waiting for services to be ready", "component", "jetstream/backfill", "startup_delay", startupDelay)
time.Sleep(startupDelay)
slog.Info("Starting sync-based backfill", "component", "jetstream/backfill", "relay_endpoints", relayEndpoints)
if err := backfillWorker.Start(context.Background()); err != nil {
slog.Warn("Backfill finished with error", "component", "jetstream/backfill", "error", err)
} else {
slog.Info("Backfill completed successfully", "component", "jetstream/backfill")
}
}()
interval := s.Config.Jetstream.BackfillInterval
if interval > 0 {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
slog.Info("Starting periodic backfill", "component", "jetstream/backfill", "interval", interval)
if err := backfillWorker.Start(context.Background()); err != nil {
slog.Warn("Periodic backfill finished with error", "component", "jetstream/backfill", "error", err)
} else {
slog.Info("Periodic backfill completed successfully", "component", "jetstream/backfill")
}
}
}()
slog.Info("Periodic backfill scheduler started", "component", "jetstream/backfill", "interval", interval)
} else {
slog.Info("Periodic backfill disabled (interval=0), only startup backfill will run", "component", "jetstream/backfill")
}
}
}
}