Files

422 lines
14 KiB
Go

package hold
import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"atcr.io/pkg/atproto"
"atcr.io/pkg/hold/admin"
holddb "atcr.io/pkg/hold/db"
"atcr.io/pkg/hold/gc"
"atcr.io/pkg/hold/oci"
"atcr.io/pkg/hold/pds"
"atcr.io/pkg/hold/quota"
"atcr.io/pkg/logging"
"atcr.io/pkg/s3"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)
// HoldServer is the hold service with an exposed router for extensibility.
// Consumers can add routes to Router before calling Serve().
type HoldServer struct {
// Router is the chi router. Add routes before calling Serve().
Router chi.Router
// PDS is the embedded ATProto PDS. Nil if database path is not configured.
PDS *pds.HoldPDS
// QuotaManager manages storage quotas per tier.
QuotaManager *quota.Manager
// Config is the hold service configuration.
Config *Config
// internal fields for shutdown
httpServer *http.Server
broadcaster *pds.EventBroadcaster
scanBroadcaster *pds.ScanBroadcaster
garbageCollector *gc.GarbageCollector
adminUI *admin.AdminUI
holdDB *holddb.HoldDB // shared database connection (nil for :memory:)
}
// NewHoldServer initializes PDS, storage, quota, XRPC handlers, and returns
// before starting. Consumer can add routes to Router before calling Serve().
func NewHoldServer(cfg *Config) (*HoldServer, error) {
// Initialize structured logging with optional remote shipping
logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{
Backend: cfg.LogShipper.Backend,
URL: cfg.LogShipper.URL,
BatchSize: cfg.LogShipper.BatchSize,
FlushInterval: cfg.LogShipper.FlushInterval,
Service: "hold",
Username: cfg.LogShipper.Username,
Password: cfg.LogShipper.Password,
})
s := &HoldServer{
Config: cfg,
}
if cfg.Server.TestMode {
atproto.SetTestMode(true)
}
// Initialize embedded PDS if database path is configured
var xrpcHandler *pds.XRPCHandler
var s3Service *s3.S3Service
if cfg.Database.Path != "" {
ctx := context.Background()
holdDID, err := pds.LoadOrCreateDID(ctx, pds.DIDConfig{
DID: cfg.Database.DID,
DIDMethod: cfg.Database.DIDMethod,
PublicURL: cfg.Server.PublicURL,
DBPath: cfg.Database.Path,
SigningKeyPath: cfg.Database.KeyPath,
RotationKey: cfg.Database.RotationKey,
PLCDirectoryURL: cfg.Database.PLCDirectoryURL,
})
if err != nil {
return nil, fmt.Errorf("failed to resolve hold DID: %w", err)
}
slog.Info("Initializing embedded PDS", "did", holdDID)
if cfg.Database.Path != ":memory:" {
// File mode: open centralized shared DB (supports embedded replica sync)
dbFilePath := cfg.Database.Path + "/db.sqlite3"
libsqlCfg := holddb.LibsqlConfig{
SyncURL: cfg.Database.LibsqlSyncURL,
AuthToken: cfg.Database.LibsqlAuthToken,
SyncInterval: cfg.Database.LibsqlSyncInterval,
}
s.holdDB, err = holddb.OpenHoldDB(dbFilePath, libsqlCfg)
if err != nil {
return nil, fmt.Errorf("failed to open hold database: %w", err)
}
// Use shared DB for all subsystems
s.PDS, err = pds.NewHoldPDSWithDB(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts, s.holdDB.DB)
if err != nil {
return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err)
}
s.broadcaster = pds.NewEventBroadcasterWithDB(holdDID, 100, s.holdDB.DB)
} else {
// In-memory mode (tests): each subsystem opens its own connection
s.PDS, err = pds.NewHoldPDS(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts)
if err != nil {
return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err)
}
s.broadcaster = pds.NewEventBroadcaster(holdDID, 100, ":memory:")
}
// Create S3 service (used for bootstrap, handlers, GC, etc.)
s3Service, err = s3.NewS3Service(cfg.Storage.S3Params())
if err != nil {
return nil, fmt.Errorf("failed to create S3 service: %w", err)
}
// Bootstrap events from existing repo records (one-time migration).
// Must run BEFORE the live event handler is wired, so it captures
// the full historical state without interference from new writes.
if err := s.broadcaster.BootstrapFromRepo(s.PDS); err != nil {
slog.Warn("Failed to bootstrap events from repo", "error", err)
}
// Backfill records index from existing MST data (one-time on startup)
if err := s.PDS.BackfillRecordsIndex(ctx); err != nil {
slog.Warn("Failed to backfill records index", "error", err)
}
// Wire up repo event handler with records indexing + broadcaster.
// Must be BEFORE Bootstrap so that record creates/updates during
// bootstrap (captain, crew, profile) emit to the firehose.
indexingHandler := s.PDS.CreateRecordsIndexEventHandler(s.broadcaster.SetRepoEventHandler())
s.PDS.RepomgrRef().SetEventHandler(indexingHandler, true)
// Bootstrap PDS with captain record, hold owner as first crew member, and profile.
// Now that the event handler is wired, any changes here emit to the firehose.
if err := s.PDS.Bootstrap(ctx, s3Service, pds.BootstrapConfig{
OwnerDID: cfg.Registration.OwnerDID,
Public: cfg.Server.Public,
AllowAllCrew: cfg.Registration.AllowAllCrew,
ProfileAvatarURL: cfg.Registration.ProfileAvatarURL,
ProfileDisplayName: cfg.Registration.ProfileDisplayName,
ProfileDescription: cfg.Registration.ProfileDescription,
Region: cfg.Registration.Region,
}); err != nil {
return nil, fmt.Errorf("failed to bootstrap PDS: %w", err)
}
// Sync successor from config (if set) — separate from Bootstrap to avoid changing its signature
if cfg.Server.Successor != "" {
if _, captain, err := s.PDS.GetCaptainRecord(ctx); err == nil && captain.Successor != cfg.Server.Successor {
captain.Successor = cfg.Server.Successor
if _, err := s.PDS.UpdateCaptainRecord(ctx, captain); err != nil {
slog.Warn("Failed to sync successor from config", "error", err)
} else {
slog.Info("Synced successor from config", "successor", cfg.Server.Successor)
}
}
}
slog.Info("Embedded PDS initialized successfully with firehose and records index enabled")
} else {
return nil, fmt.Errorf("database path is required for embedded PDS authorization")
}
// Initialize quota manager from config
var err error
s.QuotaManager, err = quota.NewManagerFromConfig(&cfg.Quota)
if err != nil {
return nil, fmt.Errorf("failed to load quota config: %w", err)
}
if s.QuotaManager.IsEnabled() {
slog.Info("Quota enforcement enabled", "tiers", s.QuotaManager.TierCount(), "defaultTier", s.QuotaManager.GetDefaultTier())
} else {
slog.Info("Quota enforcement disabled (no quota tiers configured)")
}
// Create XRPC handlers
var ociHandler *oci.XRPCHandler
if s.PDS != nil {
xrpcHandler = pds.NewXRPCHandler(s.PDS, *s3Service, s.broadcaster, nil, s.QuotaManager)
if cfg.Server.AppviewDID != "" {
xrpcHandler.SetAppviewDID(cfg.Server.AppviewDID)
}
ociHandler = oci.NewXRPCHandler(s.PDS, *s3Service, cfg.Registration.EnableBlueskyPosts, nil, s.QuotaManager)
// Initialize scan broadcaster if scanner secret is configured
if cfg.Scanner.Secret != "" {
holdDID := s.PDS.DID()
rescanInterval := cfg.Scanner.RescanInterval
var sb *pds.ScanBroadcaster
if s.holdDB != nil {
sb, err = pds.NewScanBroadcasterWithDB(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, s.holdDB.DB, s3Service, s.PDS, rescanInterval)
} else {
scanDBPath := cfg.Database.Path + "/db.sqlite3"
sb, err = pds.NewScanBroadcaster(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, scanDBPath, s3Service, s.PDS, rescanInterval)
}
if err != nil {
return nil, fmt.Errorf("failed to initialize scan broadcaster: %w", err)
}
s.scanBroadcaster = sb
xrpcHandler.SetScanBroadcaster(sb)
ociHandler.SetScanBroadcaster(sb)
slog.Info("Scan broadcaster initialized (scanner WebSocket enabled)",
"rescanInterval", rescanInterval)
}
// Initialize garbage collector
s.garbageCollector = gc.NewGarbageCollector(s.PDS, s3Service, cfg.GC)
slog.Info("Garbage collector initialized",
"enabled", cfg.GC.Enabled)
}
// Setup HTTP routes with chi router
r := chi.NewRouter()
r.Use(middleware.RealIP)
r.Use(middleware.Maybe(middleware.Logger, func(r *http.Request) bool {
return r.URL.Path != "/xrpc/_health"
}))
if xrpcHandler != nil {
r.Use(xrpcHandler.CORSMiddleware())
}
// Root page
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(w, "This is a hold server. More info at https://atcr.io")
})
// Robots.txt - disallow crawling of all endpoints except root
r.Get("/robots.txt", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
fmt.Fprint(w, "User-agent: *\nAllow: /\nDisallow: /xrpc/\nDisallow: /admin/\n")
})
// Register XRPC/ATProto PDS endpoints
if xrpcHandler != nil {
slog.Info("Registering ATProto PDS endpoints")
xrpcHandler.RegisterHandlers(r)
}
// Register OCI multipart upload endpoints
if ociHandler != nil {
slog.Info("Registering OCI multipart upload endpoints")
ociHandler.RegisterHandlers(r)
}
// Initialize and register admin panel if enabled
if cfg.Admin.Enabled && s.PDS != nil {
adminCfg := admin.AdminConfig{
Enabled: true,
PublicURL: cfg.Server.PublicURL,
ConfigPath: cfg.ConfigPath(),
}
s.adminUI, err = admin.NewAdminUI(context.Background(), s.PDS, s.QuotaManager, s.garbageCollector, adminCfg)
if err != nil {
return nil, fmt.Errorf("failed to initialize admin panel: %w", err)
}
if s.adminUI != nil {
slog.Info("Registering admin panel routes")
s.adminUI.RegisterRoutes(r)
}
}
s.Router = r
return s, nil
}
// Serve starts the HTTP server and blocks until shutdown signal.
func (s *HoldServer) Serve() error {
s.httpServer = &http.Server{
Addr: s.Config.Server.Addr,
Handler: s.Router,
ReadTimeout: s.Config.Server.ReadTimeout,
WriteTimeout: s.Config.Server.WriteTimeout,
}
// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Start server in goroutine
serverErr := make(chan error, 1)
go func() {
slog.Info("Starting hold service", "addr", s.Config.Server.Addr)
if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
serverErr <- err
}
}()
// Update status post to "online" after server starts
if s.PDS != nil {
ctx := context.Background()
if err := s.PDS.SetStatus(ctx, "online"); err != nil {
slog.Warn("Failed to set status post to online", "error", err)
} else {
slog.Info("Status post set to online")
}
}
// Fetch appview metadata for branding (Bluesky posts)
if s.Config.Server.AppviewURL() != "" {
meta, err := atproto.FetchAppviewMetadata(context.Background(), s.Config.Server.AppviewURL())
if err != nil {
slog.Warn("Failed to fetch appview metadata, using defaults", "appview_url", s.Config.Server.AppviewURL(), "error", err)
} else {
s.PDS.SetAppviewMeta(meta)
slog.Info("Fetched appview metadata", "clientName", meta.ClientName, "clientShortName", meta.ClientShortName)
}
}
// Request crawl from relay to make PDS discoverable
if s.Config.Server.RelayEndpoint != "" {
slog.Info("Requesting crawl from relay", "relay", s.Config.Server.RelayEndpoint)
if err := atproto.RequestCrawl(s.Config.Server.RelayEndpoint, s.Config.Server.PublicURL); err != nil {
slog.Warn("Failed to request crawl from relay", "error", err)
} else {
slog.Info("Crawl requested successfully")
}
}
// Start garbage collector (runs on startup + nightly)
if s.garbageCollector != nil {
s.garbageCollector.Start(context.Background())
}
// Wait for signal or server error
select {
case err := <-serverErr:
slog.Error("Server failed", "error", err)
logging.Shutdown()
return err
case sig := <-sigChan:
slog.Info("Received signal, shutting down gracefully", "signal", sig)
s.shutdown()
}
return nil
}
func (s *HoldServer) shutdown() {
// Update status post to "offline" before shutdown
if s.PDS != nil {
ctx := context.Background()
if err := s.PDS.SetStatus(ctx, "offline"); err != nil {
slog.Warn("Failed to set status post to offline", "error", err)
} else {
slog.Info("Status post set to offline")
}
}
// Stop garbage collector
if s.garbageCollector != nil {
s.garbageCollector.Stop()
slog.Info("Garbage collector stopped")
}
// Close scan broadcaster database connection
if s.scanBroadcaster != nil {
if err := s.scanBroadcaster.Close(); err != nil {
slog.Warn("Failed to close scan broadcaster database", "error", err)
} else {
slog.Info("Scan broadcaster database closed")
}
}
// Close broadcaster database connection
if s.broadcaster != nil {
if err := s.broadcaster.Close(); err != nil {
slog.Warn("Failed to close broadcaster database", "error", err)
} else {
slog.Info("Broadcaster database closed")
}
}
// Close admin panel
if s.adminUI != nil {
if err := s.adminUI.Close(); err != nil {
slog.Warn("Failed to close admin panel", "error", err)
} else {
slog.Info("Admin panel closed")
}
}
// Close shared database connection and connector (after all subsystems)
if s.holdDB != nil {
if err := s.holdDB.Close(); err != nil {
slog.Warn("Failed to close hold database", "error", err)
} else {
slog.Info("Hold database closed")
}
}
// Graceful shutdown with 10 second timeout
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
slog.Error("Server shutdown error", "error", err)
} else {
slog.Info("Server shutdown complete")
}
logging.Shutdown()
}