893 lines
31 KiB
Go
893 lines
31 KiB
Go
package appview
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"html/template"
|
|
"log/slog"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/distribution/distribution/v3/registry/api/errcode"
|
|
"github.com/distribution/distribution/v3/registry/handlers"
|
|
"github.com/go-chi/chi/v5"
|
|
chimiddleware "github.com/go-chi/chi/v5/middleware"
|
|
|
|
"atcr.io/pkg/appview/db"
|
|
"atcr.io/pkg/appview/holdhealth"
|
|
"atcr.io/pkg/appview/jetstream"
|
|
"atcr.io/pkg/appview/middleware"
|
|
"atcr.io/pkg/appview/readme"
|
|
"atcr.io/pkg/appview/routes"
|
|
"atcr.io/pkg/appview/storage"
|
|
"atcr.io/pkg/appview/webhooks"
|
|
"atcr.io/pkg/atproto"
|
|
"atcr.io/pkg/auth"
|
|
"atcr.io/pkg/auth/oauth"
|
|
"atcr.io/pkg/auth/token"
|
|
"atcr.io/pkg/billing"
|
|
"atcr.io/pkg/logging"
|
|
|
|
"github.com/bluesky-social/indigo/atproto/atcrypto"
|
|
indigooauth "github.com/bluesky-social/indigo/atproto/auth/oauth"
|
|
)
|
|
|
|
// OAuthPostAuthHook is called after the default OAuth post-auth logic
|
|
// (profile creation, avatar fetch, crew registration). Hooks added after
|
|
// NewAppViewServer but before the first request work correctly.
|
|
type OAuthPostAuthHook func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error
|
|
|
|
// TokenPostAuthHook is called after the default token post-auth logic
|
|
// (profile creation). Hooks added after NewAppViewServer but before the
|
|
// first request work correctly.
|
|
type TokenPostAuthHook func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error
|
|
|
|
// AppViewServer is the AppView service with an exposed router for extensibility.
|
|
// Consumers can add routes to Router and hooks before calling Serve().
|
|
type AppViewServer struct {
|
|
// Router is the chi router. Add routes before calling Serve().
|
|
Router chi.Router
|
|
|
|
// Config is the AppView configuration.
|
|
Config *Config
|
|
|
|
// Database is the read-write SQLite database.
|
|
Database *sql.DB
|
|
|
|
// ReadOnlyDB is the read-only SQLite database connection.
|
|
ReadOnlyDB *sql.DB
|
|
|
|
// SessionStore manages web UI sessions.
|
|
SessionStore *db.SessionStore
|
|
|
|
// DeviceStore manages device authorization flows.
|
|
DeviceStore *db.DeviceStore
|
|
|
|
// OAuthStore manages OAuth session persistence.
|
|
OAuthStore *db.OAuthStore
|
|
|
|
// OAuthServer handles OAuth authorization and callback endpoints.
|
|
OAuthServer *oauth.Server
|
|
|
|
// OAuthClientApp is the indigo OAuth client application.
|
|
OAuthClientApp *indigooauth.ClientApp
|
|
|
|
// Refresher manages OAuth session refresh and caching.
|
|
Refresher *oauth.Refresher
|
|
|
|
// Templates are the parsed HTML templates.
|
|
Templates *template.Template
|
|
|
|
// HealthChecker checks hold service health.
|
|
HealthChecker *holdhealth.Checker
|
|
|
|
// ReadmeFetcher fetches README content for repository pages.
|
|
ReadmeFetcher *readme.Fetcher
|
|
|
|
// TokenIssuer issues registry JWTs (nil if auth is not configured).
|
|
TokenIssuer *token.Issuer
|
|
|
|
// HoldAuthorizer checks hold access permissions.
|
|
HoldAuthorizer auth.HoldAuthorizer
|
|
|
|
// OAuthKey is the P-256 private key used for OAuth client auth and appview service identity.
|
|
OAuthKey *atcrypto.PrivateKeyP256
|
|
|
|
// BillingManager handles Stripe billing and tier updates (nil if billing disabled).
|
|
BillingManager *billing.Manager
|
|
|
|
// WebhookDispatcher dispatches scan webhooks (stored in appview DB).
|
|
WebhookDispatcher *webhooks.Dispatcher
|
|
|
|
// Private fields for lifecycle management
|
|
oauthHooks []OAuthPostAuthHook
|
|
tokenHooks []TokenPostAuthHook
|
|
httpServer *http.Server
|
|
healthWorker *holdhealth.Worker
|
|
workerCancel context.CancelFunc
|
|
branding *BrandingOverrides
|
|
}
|
|
|
|
// AddOAuthPostAuthHook registers a hook that runs after the default OAuth
|
|
// post-auth logic. Multiple hooks run in registration order.
|
|
func (s *AppViewServer) AddOAuthPostAuthHook(hook OAuthPostAuthHook) {
|
|
s.oauthHooks = append(s.oauthHooks, hook)
|
|
}
|
|
|
|
// AddTokenPostAuthHook registers a hook that runs after the default token
|
|
// post-auth logic. Multiple hooks run in registration order.
|
|
func (s *AppViewServer) AddTokenPostAuthHook(hook TokenPostAuthHook) {
|
|
s.tokenHooks = append(s.tokenHooks, hook)
|
|
}
|
|
|
|
// NewAppViewServer creates a fully-initialized AppView server ready for the
|
|
// consumer to add routes and hooks before calling Serve(). Pass nil for
|
|
// branding to use default atcr.io assets and templates.
|
|
func NewAppViewServer(cfg *Config, branding *BrandingOverrides) (*AppViewServer, error) {
|
|
// Initialize structured logging with optional remote shipping
|
|
logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{
|
|
Backend: cfg.LogShipper.Backend,
|
|
URL: cfg.LogShipper.URL,
|
|
BatchSize: cfg.LogShipper.BatchSize,
|
|
FlushInterval: cfg.LogShipper.FlushInterval,
|
|
Service: "appview",
|
|
Username: cfg.LogShipper.Username,
|
|
Password: cfg.LogShipper.Password,
|
|
})
|
|
|
|
slog.Info("Configuration loaded successfully from environment")
|
|
|
|
s := &AppViewServer{
|
|
Config: cfg,
|
|
branding: branding,
|
|
}
|
|
|
|
// Initialize UI database (required for all stores)
|
|
slog.Info("Initializing UI database", "path", cfg.UI.DatabasePath)
|
|
libsqlCfg := db.LibsqlConfig{
|
|
SyncURL: cfg.UI.LibsqlSyncURL,
|
|
AuthToken: cfg.UI.LibsqlAuthToken,
|
|
SyncInterval: cfg.UI.LibsqlSyncInterval,
|
|
}
|
|
s.Database, s.ReadOnlyDB, s.SessionStore = db.InitializeDatabase(cfg.UI.DatabasePath, libsqlCfg)
|
|
if s.Database == nil {
|
|
return nil, fmt.Errorf("failed to initialize UI database - required for session storage")
|
|
}
|
|
|
|
// Initialize hold health checker
|
|
slog.Info("Initializing hold health checker", "cache_ttl", cfg.Health.CacheTTL)
|
|
s.HealthChecker = holdhealth.NewChecker(cfg.Health.CacheTTL)
|
|
|
|
// Initialize README fetcher for rendering repo page descriptions
|
|
s.ReadmeFetcher = readme.NewFetcher()
|
|
|
|
// Start background health check worker
|
|
startupDelay := 5 * time.Second
|
|
dbAdapter := holdhealth.NewDBAdapter(s.Database)
|
|
s.healthWorker = holdhealth.NewWorkerWithStartupDelay(s.HealthChecker, dbAdapter, cfg.Health.CheckInterval, startupDelay)
|
|
|
|
workerCtx, workerCancel := context.WithCancel(context.Background())
|
|
s.workerCancel = workerCancel
|
|
s.healthWorker.Start(workerCtx)
|
|
slog.Info("Hold health worker started", "startup_delay", startupDelay, "refresh_interval", cfg.Health.CheckInterval, "cache_ttl", cfg.Health.CacheTTL)
|
|
|
|
// Initialize OAuth components
|
|
slog.Info("Initializing OAuth components")
|
|
|
|
s.OAuthStore = db.NewOAuthStore(s.Database)
|
|
slog.Info("Using SQLite for OAuth session storage")
|
|
|
|
s.DeviceStore = db.NewDeviceStore(s.Database)
|
|
slog.Info("Using SQLite for device storage")
|
|
|
|
baseURL := cfg.Server.BaseURL
|
|
defaultHoldDID := cfg.Server.DefaultHoldDID
|
|
testMode := cfg.Server.TestMode
|
|
|
|
slog.Debug("Base URL for OAuth", "base_url", baseURL)
|
|
if testMode {
|
|
slog.Info("TEST_MODE enabled - will use HTTP for local DID resolution")
|
|
atproto.SetTestMode(true)
|
|
}
|
|
|
|
// Load crypto keys from database (with file fallback and migration)
|
|
oauthKey, err := loadOAuthKey(s.Database, cfg.Server.OAuthKeyPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load OAuth key: %w", err)
|
|
}
|
|
s.OAuthKey = oauthKey
|
|
|
|
// Create OAuth client app
|
|
desiredScopes := oauth.GetDefaultScopes(defaultHoldDID)
|
|
s.OAuthClientApp, err = oauth.NewClientAppWithKey(baseURL, s.OAuthStore, desiredScopes, oauthKey, cfg.Server.ClientName)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create OAuth client app: %w", err)
|
|
}
|
|
|
|
// Invalidate sessions with mismatched scopes on startup
|
|
invalidatedCount, err := s.OAuthStore.InvalidateSessionsWithMismatchedScopes(context.Background(), desiredScopes)
|
|
if err != nil {
|
|
slog.Warn("Failed to invalidate sessions with mismatched scopes", "error", err)
|
|
} else if invalidatedCount > 0 {
|
|
slog.Info("Invalidated OAuth sessions due to scope changes", "count", invalidatedCount)
|
|
}
|
|
|
|
// Create oauth token refresher
|
|
s.Refresher = oauth.NewRefresher(s.OAuthClientApp)
|
|
|
|
// Wire up UI session store to refresher
|
|
if s.SessionStore != nil {
|
|
s.Refresher.SetUISessionStore(s.SessionStore)
|
|
}
|
|
|
|
// Set global refresher for middleware
|
|
middleware.SetGlobalRefresher(s.Refresher)
|
|
|
|
// Set global database for hold DID lookups and manifest reference checks
|
|
holdDIDDB := db.NewHoldDIDDB(s.Database)
|
|
middleware.SetGlobalDatabase(holdDIDDB)
|
|
middleware.SetGlobalManifestRefChecker(holdDIDDB)
|
|
|
|
// Create RemoteHoldAuthorizer for hold authorization with caching
|
|
s.HoldAuthorizer = auth.NewRemoteHoldAuthorizer(s.Database, testMode)
|
|
middleware.SetGlobalAuthorizer(s.HoldAuthorizer)
|
|
slog.Info("Hold authorizer initialized with database caching")
|
|
|
|
// Clear all denial caches on startup for a clean slate
|
|
if remote, ok := s.HoldAuthorizer.(*auth.RemoteHoldAuthorizer); ok {
|
|
go func() {
|
|
if err := remote.ClearAllDenials(); err != nil {
|
|
slog.Warn("Failed to clear denial caches on startup", "error", err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Initialize billing manager
|
|
appviewDID := DIDFromBaseURL(baseURL)
|
|
s.BillingManager = billing.New(
|
|
&cfg.Billing,
|
|
oauthKey,
|
|
appviewDID,
|
|
cfg.Server.ManagedHolds,
|
|
baseURL,
|
|
)
|
|
// Allow hold captains to bypass billing feature gates
|
|
if len(cfg.Server.ManagedHolds) > 0 {
|
|
managedHolds := cfg.Server.ManagedHolds
|
|
roDB := s.ReadOnlyDB
|
|
s.BillingManager.SetCaptainChecker(func(userDID string) bool {
|
|
isCaptain, _ := db.IsHoldCaptain(roDB, userDID, managedHolds)
|
|
return isCaptain
|
|
})
|
|
}
|
|
if s.BillingManager.Enabled() {
|
|
slog.Info("Billing enabled", "appview_did", appviewDID, "managed_holds", len(cfg.Server.ManagedHolds))
|
|
go s.BillingManager.RefreshHoldTiers()
|
|
}
|
|
|
|
// Create webhook dispatcher
|
|
appviewMeta := atproto.AppviewMetadata{
|
|
ClientName: cfg.Server.ClientName,
|
|
ClientShortName: cfg.Server.ClientShortName,
|
|
BaseURL: cfg.Server.BaseURL,
|
|
FaviconURL: cfg.Server.BaseURL + "/favicon-96x96.png",
|
|
RegistryDomains: cfg.Server.RegistryDomains,
|
|
}
|
|
s.WebhookDispatcher = webhooks.NewDispatcher(s.Database, appviewMeta)
|
|
middleware.SetGlobalWebhookDispatcher(s.WebhookDispatcher)
|
|
|
|
// Initialize Jetstream workers
|
|
s.initializeJetstream()
|
|
|
|
// Create main chi router
|
|
mainRouter := chi.NewRouter()
|
|
|
|
mainRouter.Use(chimiddleware.RealIP)
|
|
mainRouter.Use(chimiddleware.Logger)
|
|
mainRouter.Use(chimiddleware.Recoverer)
|
|
mainRouter.Use(chimiddleware.GetHead)
|
|
mainRouter.Use(routes.CORSMiddleware())
|
|
|
|
// Domain routing middleware
|
|
if len(cfg.Server.RegistryDomains) > 0 {
|
|
mainRouter.Use(DomainRoutingMiddleware(cfg.Server.RegistryDomains, cfg.Server.BaseURL))
|
|
slog.Info("Domain routing middleware enabled",
|
|
"registry_domains", cfg.Server.RegistryDomains,
|
|
"ui_base_url", cfg.Server.BaseURL)
|
|
}
|
|
|
|
// Load templates
|
|
ComputeAssetHashes(branding)
|
|
s.Templates, err = Templates(branding)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load UI templates: %w", err)
|
|
}
|
|
|
|
// Register UI routes
|
|
routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{
|
|
Database: s.Database,
|
|
ReadOnlyDB: s.ReadOnlyDB,
|
|
SessionStore: s.SessionStore,
|
|
OAuthClientApp: s.OAuthClientApp,
|
|
OAuthStore: s.OAuthStore,
|
|
Refresher: s.Refresher,
|
|
BaseURL: baseURL,
|
|
RegistryDomain: primaryRegistryDomain(cfg.Server.RegistryDomains),
|
|
DeviceStore: s.DeviceStore,
|
|
HealthChecker: s.HealthChecker,
|
|
ReadmeFetcher: s.ReadmeFetcher,
|
|
Templates: s.Templates,
|
|
DefaultHoldDID: defaultHoldDID,
|
|
ClientName: cfg.Server.ClientName,
|
|
ClientShortName: cfg.Server.ClientShortName,
|
|
BillingManager: s.BillingManager,
|
|
WebhookDispatcher: s.WebhookDispatcher,
|
|
LegalConfig: routes.LegalConfig{
|
|
CompanyName: cfg.Legal.CompanyName,
|
|
Jurisdiction: cfg.Legal.Jurisdiction,
|
|
},
|
|
})
|
|
|
|
// Register Stripe webhook route (if billing enabled)
|
|
s.BillingManager.RegisterRoutes(mainRouter)
|
|
|
|
// Create OAuth server
|
|
s.OAuthServer = oauth.NewServer(s.OAuthClientApp)
|
|
s.OAuthServer.SetRefresher(s.Refresher)
|
|
if s.SessionStore != nil {
|
|
s.OAuthServer.SetUISessionStore(s.SessionStore)
|
|
}
|
|
|
|
// Register OAuth post-auth callback (closure captures s for hook dispatch)
|
|
s.OAuthServer.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error {
|
|
slog.Debug("OAuth post-auth callback", "component", "appview/callback", "did", did)
|
|
|
|
// Create ATProto client with session provider
|
|
client := atproto.NewClientWithSessionProvider(pdsEndpoint, did, s.Refresher)
|
|
|
|
// Ensure sailor profile exists
|
|
slog.Debug("Ensuring profile exists", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID)
|
|
if err := storage.EnsureProfile(ctx, client, defaultHoldDID); err != nil {
|
|
slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err)
|
|
} else {
|
|
slog.Debug("Profile ensured", "component", "appview/callback", "did", did)
|
|
}
|
|
|
|
// Fetch user's profile record from PDS
|
|
profileRecord, err := client.GetProfileRecord(ctx, did)
|
|
if err != nil {
|
|
slog.Warn("Failed to fetch profile record", "component", "appview/callback", "did", did, "error", err)
|
|
profileRecord = nil
|
|
}
|
|
|
|
// Construct avatar URL from blob CID
|
|
avatarURL := ""
|
|
if profileRecord != nil && profileRecord.Avatar != nil && profileRecord.Avatar.Ref.Link != "" {
|
|
avatarURL = atproto.BlobCDNURL(did, profileRecord.Avatar.Ref.Link)
|
|
slog.Debug("Constructed avatar URL", "component", "appview/callback", "avatar_url", avatarURL)
|
|
}
|
|
|
|
// Store user in database
|
|
if avatarURL != "" {
|
|
err = db.UpsertUser(s.Database, &db.User{
|
|
DID: did,
|
|
Handle: handle,
|
|
PDSEndpoint: pdsEndpoint,
|
|
Avatar: avatarURL,
|
|
LastSeen: time.Now(),
|
|
})
|
|
} else {
|
|
err = db.UpsertUserIgnoreAvatar(s.Database, &db.User{
|
|
DID: did,
|
|
Handle: handle,
|
|
PDSEndpoint: pdsEndpoint,
|
|
Avatar: avatarURL,
|
|
LastSeen: time.Now(),
|
|
})
|
|
}
|
|
if err != nil {
|
|
slog.Warn("Failed to store user in database", "component", "appview/callback", "error", err)
|
|
return nil
|
|
}
|
|
|
|
slog.Debug("Stored user", "component", "appview/callback", "did", did, "has_avatar", avatarURL != "")
|
|
|
|
// Migrate profile URL→DID if needed
|
|
profile, err := storage.GetProfile(ctx, client)
|
|
if err != nil {
|
|
slog.Warn("Failed to get profile", "component", "appview/callback", "did", did, "error", err)
|
|
return nil
|
|
}
|
|
|
|
var holdDID string
|
|
if profile != nil && profile.DefaultHold != "" {
|
|
if strings.HasPrefix(profile.DefaultHold, "http://") || strings.HasPrefix(profile.DefaultHold, "https://") {
|
|
slog.Debug("Migrating hold URL to DID", "component", "appview/callback", "did", did, "hold_url", profile.DefaultHold)
|
|
|
|
if resolvedDID, resolveErr := atproto.ResolveHoldDID(ctx, profile.DefaultHold); resolveErr != nil {
|
|
slog.Warn("Failed to resolve hold DID from URL", "component", "appview/callback", "did", did, "hold_url", profile.DefaultHold, "error", resolveErr)
|
|
} else {
|
|
holdDID = resolvedDID
|
|
profile.DefaultHold = holdDID
|
|
if err := storage.UpdateProfile(ctx, client, profile); err != nil {
|
|
slog.Warn("Failed to update profile with hold DID", "component", "appview/callback", "did", did, "error", err)
|
|
} else {
|
|
slog.Debug("Updated profile with hold DID", "component", "appview/callback", "hold_did", holdDID)
|
|
}
|
|
}
|
|
} else {
|
|
holdDID = profile.DefaultHold
|
|
}
|
|
// Register crew in background
|
|
slog.Debug("Attempting crew registration", "component", "appview/callback", "did", did, "hold_did", holdDID)
|
|
go func(client *atproto.Client, refresher *oauth.Refresher, holdDID string, authorizer auth.HoldAuthorizer) {
|
|
ctx := context.Background()
|
|
storage.EnsureCrewMembership(ctx, client, refresher, holdDID, authorizer)
|
|
}(client, s.Refresher, holdDID, s.HoldAuthorizer)
|
|
}
|
|
|
|
// Migrate old-format star records to AT URI format in background
|
|
go func(client *atproto.Client, did string) {
|
|
ctx := context.Background()
|
|
migrated, err := atproto.MigrateStarRecords(ctx, client)
|
|
if err != nil {
|
|
slog.Warn("Star record migration failed", "component", "appview/callback", "did", did, "error", err, "migrated", migrated)
|
|
} else if migrated > 0 {
|
|
slog.Info("Migrated star records to AT URI format", "component", "appview/callback", "did", did, "count", migrated)
|
|
}
|
|
}(client, did)
|
|
|
|
// Drain manifests from old hold to successor in background
|
|
go func(client *atproto.Client, did string) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
|
defer cancel()
|
|
storage.MigrateManifestsForSuccessor(ctx, client, s.HoldAuthorizer, db.NewHoldDIDDB(s.Database), did)
|
|
}(client, did)
|
|
|
|
// Run consumer hooks
|
|
for _, hook := range s.oauthHooks {
|
|
if err := hook(ctx, did, handle, pdsEndpoint, sessionID); err != nil {
|
|
slog.Warn("OAuth post-auth hook error", "component", "appview/callback", "error", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
// Create token issuer
|
|
if cfg.Distribution.Auth["token"] != nil {
|
|
rsaKey, certDER, err := loadJWTKeyAndCert(s.Database, cfg.Auth.KeyPath, cfg.Auth.CertPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load JWT key material: %w", err)
|
|
}
|
|
s.TokenIssuer = token.NewIssuerFromKey(rsaKey, certDER, cfg.Auth.ServiceName, cfg.Auth.ServiceName, cfg.Auth.TokenExpiration)
|
|
slog.Info("Auth keys initialized")
|
|
}
|
|
|
|
// Create registry app (distribution library handler)
|
|
ctx := context.Background()
|
|
app := handlers.NewApp(ctx, cfg.Distribution)
|
|
|
|
// Wrap with auth method extraction middleware
|
|
wrappedApp := middleware.ExtractAuthMethod(app)
|
|
|
|
// Mount registry at /v2/
|
|
mainRouter.Handle("/v2/*", wrappedApp)
|
|
|
|
// Mount static files
|
|
if s.SessionStore != nil && s.Templates != nil {
|
|
publicHandler := CacheMiddleware(PublicHandler(branding), 31536000)
|
|
rootFiles, err := PublicRootFiles(branding)
|
|
if err != nil {
|
|
slog.Warn("Failed to scan static root files", "error", err)
|
|
} else {
|
|
for _, filename := range rootFiles {
|
|
file := filename
|
|
mainRouter.Get("/"+file, func(w http.ResponseWriter, r *http.Request) {
|
|
r.URL.Path = "/" + file
|
|
publicHandler.ServeHTTP(w, r)
|
|
})
|
|
}
|
|
slog.Info("Registered dynamic root file routes", "count", len(rootFiles), "files", rootFiles)
|
|
}
|
|
|
|
mainRouter.Handle("/css/*", CacheMiddleware(http.StripPrefix("/css/", PublicSubdir("css", branding)), 31536000))
|
|
mainRouter.Handle("/js/*", CacheMiddleware(http.StripPrefix("/js/", PublicSubdir("js", branding)), 31536000))
|
|
mainRouter.Handle("/static/*", CacheMiddleware(http.StripPrefix("/static/", PublicSubdir("static", branding)), 31536000))
|
|
|
|
slog.Info("UI enabled", "home", "/", "settings", "/settings")
|
|
}
|
|
|
|
// Mount OAuth endpoints
|
|
mainRouter.Get("/auth/oauth/authorize", s.OAuthServer.ServeAuthorize)
|
|
mainRouter.Get("/auth/oauth/callback", s.OAuthServer.ServeCallback)
|
|
|
|
// OAuth client metadata endpoint
|
|
mainRouter.Get("/oauth-client-metadata.json", func(w http.ResponseWriter, r *http.Request) {
|
|
config := s.OAuthClientApp.Config
|
|
logoURI := cfg.Server.BaseURL + "/web-app-manifest-192x192.png"
|
|
policyURI := cfg.Server.BaseURL + "/privacy"
|
|
tosURI := cfg.Server.BaseURL + "/terms"
|
|
|
|
metadata := config.ClientMetadata()
|
|
metadata.ClientName = &cfg.Server.ClientName
|
|
metadata.ClientURI = &cfg.Server.BaseURL
|
|
metadata.LogoURI = &logoURI
|
|
metadata.PolicyURI = &policyURI
|
|
metadata.TosURI = &tosURI
|
|
|
|
if config.IsConfidential() && metadata.JWKS == nil {
|
|
jwks := config.PublicJWKS()
|
|
metadata.JWKS = &jwks
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
w.Header().Set("Cache-Control", "public, max-age=300")
|
|
if err := json.NewEncoder(w).Encode(metadata); err != nil {
|
|
http.Error(w, "Failed to encode metadata", http.StatusInternalServerError)
|
|
}
|
|
})
|
|
|
|
// Mount auth endpoints
|
|
if s.TokenIssuer != nil {
|
|
tokenHandler := token.NewHandler(s.TokenIssuer, s.DeviceStore)
|
|
|
|
tokenHandler.SetOAuthSessionValidator(s.Refresher)
|
|
|
|
// Token post-auth callback (closure captures s for hook dispatch)
|
|
tokenHandler.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, accessToken string) error {
|
|
slog.Debug("Token post-auth callback", "component", "appview/callback", "did", did)
|
|
|
|
atprotoClient := atproto.NewClient(pdsEndpoint, did, accessToken)
|
|
|
|
if err := storage.EnsureProfile(ctx, atprotoClient, defaultHoldDID); err != nil {
|
|
slog.Warn("Failed to ensure profile", "component", "appview/callback", "did", did, "error", err)
|
|
} else {
|
|
slog.Debug("Profile ensured with default hold", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID)
|
|
}
|
|
|
|
// Run consumer hooks
|
|
for _, hook := range s.tokenHooks {
|
|
if err := hook(ctx, did, handle, pdsEndpoint, accessToken); err != nil {
|
|
slog.Warn("Token post-auth hook error", "component", "appview/callback", "error", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
mainRouter.Get("/auth/token", tokenHandler.ServeHTTP)
|
|
|
|
// Device authorization endpoints (public)
|
|
routes.RegisterDeviceEndpoints(mainRouter, s.DeviceStore, baseURL)
|
|
|
|
slog.Info("Auth endpoints enabled",
|
|
"basic_auth", "/auth/token",
|
|
"device_code", "/auth/device/code",
|
|
"device_token", "/auth/device/token",
|
|
"oauth_authorize", "/auth/oauth/authorize",
|
|
"oauth_callback", "/auth/oauth/callback",
|
|
"oauth_metadata", "/client-metadata.json")
|
|
}
|
|
|
|
// Health check endpoint (for Docker health checks / load balancers)
|
|
mainRouter.Get("/health", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
if err := json.NewEncoder(w).Encode(map[string]string{"status": "ok"}); err != nil {
|
|
http.Error(w, "encode error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
})
|
|
|
|
// Appview metadata endpoint (public, used by holds for branding)
|
|
mainRouter.Get(atproto.AppviewGetMetadata, func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Header().Set("Cache-Control", "public, max-age=3600")
|
|
if err := json.NewEncoder(w).Encode(atproto.AppviewMetadata{
|
|
ClientName: cfg.Server.ClientName,
|
|
ClientShortName: cfg.Server.ClientShortName,
|
|
BaseURL: cfg.Server.BaseURL,
|
|
FaviconURL: cfg.Server.BaseURL + "/favicon-96x96.png",
|
|
RegistryDomains: cfg.Server.RegistryDomains,
|
|
}); err != nil {
|
|
http.Error(w, "encode error", http.StatusInternalServerError)
|
|
}
|
|
})
|
|
|
|
// Appview DID document endpoint (service identity for key discovery)
|
|
mainRouter.Get("/.well-known/did.json", s.handleDIDDocument)
|
|
|
|
// Register credential helper version API (public endpoint)
|
|
routes.RegisterCredentialHelperEndpoint(mainRouter, cfg.CredentialHelper.TangledRepo)
|
|
|
|
s.Router = mainRouter
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// Serve starts the HTTP server and blocks until shutdown signal.
|
|
func (s *AppViewServer) Serve() error {
|
|
listener, err := net.Listen("tcp", s.Config.Server.Addr)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create listener: %w", err)
|
|
}
|
|
|
|
s.httpServer = &http.Server{
|
|
Handler: s.Router,
|
|
}
|
|
|
|
stop := make(chan os.Signal, 1)
|
|
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
|
|
|
errChan := make(chan error, 1)
|
|
go func() {
|
|
slog.Info("Starting registry server", "addr", s.Config.Server.Addr)
|
|
if err := s.httpServer.Serve(listener); err != nil && err != http.ErrServerClosed {
|
|
errChan <- err
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-stop:
|
|
slog.Info("Shutting down registry server")
|
|
|
|
if s.Config.Server.TestMode {
|
|
listener.Close()
|
|
}
|
|
|
|
slog.Info("Stopping hold health worker")
|
|
s.healthWorker.Stop()
|
|
|
|
if s.workerCancel != nil {
|
|
s.workerCancel()
|
|
}
|
|
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := s.httpServer.Shutdown(shutdownCtx); err != nil && err != http.ErrServerClosed {
|
|
logging.Shutdown()
|
|
return fmt.Errorf("server shutdown error: %w", err)
|
|
}
|
|
case err := <-errChan:
|
|
s.healthWorker.Stop()
|
|
if s.workerCancel != nil {
|
|
s.workerCancel()
|
|
}
|
|
logging.Shutdown()
|
|
return fmt.Errorf("server error: %w", err)
|
|
}
|
|
|
|
logging.Shutdown()
|
|
return nil
|
|
}
|
|
|
|
// DomainRoutingMiddleware enforces three-tier domain routing:
|
|
//
|
|
// 1. UI domain (BaseURL hostname): serves web UI, auth, and static assets.
|
|
// Blocks /v2/* with an OCI UNSUPPORTED error — registry API lives on
|
|
// the dedicated registry domain(s).
|
|
// 2. Registry domains: allows /v2/* for Docker clients. Redirects everything
|
|
// else to the UI domain with 307 Temporary Redirect.
|
|
// 3. Unknown domains (CDN origins, IPs, etc.): redirects all requests to the
|
|
// UI domain with 307, except /health for load balancer probes.
|
|
func DomainRoutingMiddleware(registryDomains []string, uiBaseURL string) func(http.Handler) http.Handler {
|
|
regDomains := make(map[string]bool, len(registryDomains))
|
|
for _, d := range registryDomains {
|
|
regDomains[d] = true
|
|
}
|
|
|
|
// Extract UI hostname from BaseURL (e.g., "https://seamark.dev" -> "seamark.dev")
|
|
var uiHost string
|
|
if parsed, err := url.Parse(uiBaseURL); err == nil {
|
|
uiHost = parsed.Hostname()
|
|
}
|
|
|
|
primaryReg := primaryRegistryDomain(registryDomains)
|
|
|
|
return func(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
host := r.Host
|
|
if idx := strings.LastIndex(host, ":"); idx != -1 {
|
|
host = host[:idx]
|
|
}
|
|
|
|
path := r.URL.Path
|
|
isV2 := path == "/v2" || path == "/v2/" || strings.HasPrefix(path, "/v2/")
|
|
|
|
switch {
|
|
case host == uiHost:
|
|
// UI domain: block /v2/*, serve everything else
|
|
if isV2 {
|
|
if err := errcode.ServeJSON(w, errcode.ErrorCodeUnsupported.WithMessage(
|
|
fmt.Sprintf("registry API is not available on this domain, use %s", primaryReg),
|
|
)); err != nil {
|
|
slog.Error("failed to write OCI error response", "error", err)
|
|
}
|
|
return
|
|
}
|
|
next.ServeHTTP(w, r)
|
|
|
|
case regDomains[host]:
|
|
// Registry domain: allow /v2/*, /auth/token, /auth/device/*, redirect everything else
|
|
// Auth endpoints must be served directly to avoid 307 redirects that strip
|
|
// the Authorization header on cross-host redirects (Go http.Client behavior).
|
|
isAuth := path == "/auth/token" || strings.HasPrefix(path, "/auth/device/")
|
|
if isV2 || isAuth {
|
|
next.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
http.Redirect(w, r, uiBaseURL+r.URL.RequestURI(), http.StatusTemporaryRedirect)
|
|
|
|
default:
|
|
// Unknown domain: allow /health, redirect everything else
|
|
if path == "/health" {
|
|
next.ServeHTTP(w, r)
|
|
return
|
|
}
|
|
http.Redirect(w, r, uiBaseURL+r.URL.RequestURI(), http.StatusTemporaryRedirect)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
// primaryRegistryDomain returns the first registry domain, or empty string if none.
|
|
func primaryRegistryDomain(domains []string) string {
|
|
if len(domains) > 0 {
|
|
return domains[0]
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// DID returns the appview's did:web identity derived from its BaseURL.
|
|
func (s *AppViewServer) DID() string {
|
|
return DIDFromBaseURL(s.Config.Server.BaseURL)
|
|
}
|
|
|
|
// DIDFromBaseURL derives a did:web identifier from a base URL.
|
|
// Per the did:web spec, non-standard ports are percent-encoded.
|
|
// Examples:
|
|
//
|
|
// "https://atcr.io" → "did:web:atcr.io"
|
|
// "http://localhost:5000" → "did:web:localhost%3A5000"
|
|
func DIDFromBaseURL(baseURL string) string {
|
|
u, err := url.Parse(baseURL)
|
|
if err != nil {
|
|
return "did:web:localhost"
|
|
}
|
|
|
|
hostname := u.Hostname()
|
|
if hostname == "" {
|
|
hostname = "localhost"
|
|
}
|
|
|
|
port := u.Port()
|
|
isStandardPort := (u.Scheme == "https" && port == "443") ||
|
|
(u.Scheme == "http" && port == "80") ||
|
|
port == ""
|
|
|
|
if isStandardPort {
|
|
return "did:web:" + hostname
|
|
}
|
|
return fmt.Sprintf("did:web:%s%%3A%s", hostname, port)
|
|
}
|
|
|
|
// handleDIDDocument serves the appview's DID document at /.well-known/did.json.
|
|
// This is a service identity for key discovery — no PDS, no repo, no firehose.
|
|
// Holds use this to discover the appview's P-256 public key for JWT verification.
|
|
func (s *AppViewServer) handleDIDDocument(w http.ResponseWriter, r *http.Request) {
|
|
did := s.DID()
|
|
|
|
pubKey, err := s.OAuthKey.PublicKey()
|
|
if err != nil {
|
|
slog.Error("Failed to get public key for DID document", "error", err)
|
|
http.Error(w, "internal error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
doc := map[string]any{
|
|
"@context": []string{
|
|
"https://www.w3.org/ns/did/v1",
|
|
"https://w3id.org/security/multikey/v1",
|
|
},
|
|
"id": did,
|
|
"verificationMethod": []map[string]any{
|
|
{
|
|
"id": did + "#appview",
|
|
"type": "Multikey",
|
|
"controller": did,
|
|
"publicKeyMultibase": pubKey.Multibase(),
|
|
},
|
|
},
|
|
"authentication": []string{
|
|
did + "#appview",
|
|
},
|
|
"assertionMethod": []string{
|
|
did + "#appview",
|
|
},
|
|
"service": []map[string]any{
|
|
{
|
|
"id": "#atcr_appview",
|
|
"type": "AtcrAppView",
|
|
"serviceEndpoint": s.Config.Server.BaseURL,
|
|
},
|
|
},
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/did+ld+json")
|
|
w.Header().Set("Cache-Control", "public, max-age=3600")
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
if err := json.NewEncoder(w).Encode(doc); err != nil {
|
|
slog.Error("Failed to encode DID document", "error", err)
|
|
}
|
|
}
|
|
|
|
// initializeJetstream initializes the Jetstream workers for real-time events and backfill.
|
|
func (s *AppViewServer) initializeJetstream() {
|
|
jetstreamURLs := s.Config.Jetstream.URLs
|
|
|
|
go func() {
|
|
worker := jetstream.NewWorker(s.Database, jetstreamURLs, 0)
|
|
// Set webhook dispatcher on live worker (backfill skips dispatch)
|
|
if s.WebhookDispatcher != nil {
|
|
worker.Processor().SetWebhookDispatcher(s.WebhookDispatcher)
|
|
}
|
|
worker.StartWithFailover(context.Background())
|
|
}()
|
|
slog.Info("Jetstream real-time worker started", "component", "jetstream", "endpoints", len(jetstreamURLs))
|
|
|
|
if s.Config.Jetstream.BackfillEnabled {
|
|
relayEndpoints := s.Config.Jetstream.RelayEndpoints
|
|
defaultHoldDID := s.Config.Server.DefaultHoldDID
|
|
testMode := s.Config.Server.TestMode
|
|
|
|
backfillWorker, err := jetstream.NewBackfillWorker(s.Database, relayEndpoints, defaultHoldDID, testMode, s.Refresher)
|
|
if err != nil {
|
|
slog.Warn("Failed to create backfill worker", "component", "jetstream/backfill", "error", err)
|
|
} else {
|
|
go func() {
|
|
startupDelay := 5 * time.Second
|
|
slog.Info("Waiting for services to be ready", "component", "jetstream/backfill", "startup_delay", startupDelay)
|
|
time.Sleep(startupDelay)
|
|
|
|
slog.Info("Starting sync-based backfill", "component", "jetstream/backfill", "relay_endpoints", relayEndpoints)
|
|
if err := backfillWorker.Start(context.Background()); err != nil {
|
|
slog.Warn("Backfill finished with error", "component", "jetstream/backfill", "error", err)
|
|
} else {
|
|
slog.Info("Backfill completed successfully", "component", "jetstream/backfill")
|
|
}
|
|
}()
|
|
|
|
interval := s.Config.Jetstream.BackfillInterval
|
|
if interval > 0 {
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
slog.Info("Starting periodic backfill", "component", "jetstream/backfill", "interval", interval)
|
|
if err := backfillWorker.Start(context.Background()); err != nil {
|
|
slog.Warn("Periodic backfill finished with error", "component", "jetstream/backfill", "error", err)
|
|
} else {
|
|
slog.Info("Periodic backfill completed successfully", "component", "jetstream/backfill")
|
|
}
|
|
}
|
|
}()
|
|
slog.Info("Periodic backfill scheduler started", "component", "jetstream/backfill", "interval", interval)
|
|
} else {
|
|
slog.Info("Periodic backfill disabled (interval=0), only startup backfill will run", "component", "jetstream/backfill")
|
|
}
|
|
}
|
|
}
|
|
}
|