422 lines
14 KiB
Go
422 lines
14 KiB
Go
package hold
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"atcr.io/pkg/atproto"
|
|
"atcr.io/pkg/hold/admin"
|
|
holddb "atcr.io/pkg/hold/db"
|
|
"atcr.io/pkg/hold/gc"
|
|
"atcr.io/pkg/hold/oci"
|
|
"atcr.io/pkg/hold/pds"
|
|
"atcr.io/pkg/hold/quota"
|
|
"atcr.io/pkg/logging"
|
|
"atcr.io/pkg/s3"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/go-chi/chi/v5/middleware"
|
|
)
|
|
|
|
// HoldServer is the hold service with an exposed router for extensibility.
|
|
// Consumers can add routes to Router before calling Serve().
|
|
type HoldServer struct {
|
|
// Router is the chi router. Add routes before calling Serve().
|
|
Router chi.Router
|
|
|
|
// PDS is the embedded ATProto PDS. Nil if database path is not configured.
|
|
PDS *pds.HoldPDS
|
|
|
|
// QuotaManager manages storage quotas per tier.
|
|
QuotaManager *quota.Manager
|
|
|
|
// Config is the hold service configuration.
|
|
Config *Config
|
|
|
|
// internal fields for shutdown
|
|
httpServer *http.Server
|
|
broadcaster *pds.EventBroadcaster
|
|
scanBroadcaster *pds.ScanBroadcaster
|
|
garbageCollector *gc.GarbageCollector
|
|
adminUI *admin.AdminUI
|
|
holdDB *holddb.HoldDB // shared database connection (nil for :memory:)
|
|
}
|
|
|
|
// NewHoldServer initializes PDS, storage, quota, XRPC handlers, and returns
|
|
// before starting. Consumer can add routes to Router before calling Serve().
|
|
func NewHoldServer(cfg *Config) (*HoldServer, error) {
|
|
// Initialize structured logging with optional remote shipping
|
|
logging.InitLoggerWithShipper(cfg.LogLevel, logging.ShipperConfig{
|
|
Backend: cfg.LogShipper.Backend,
|
|
URL: cfg.LogShipper.URL,
|
|
BatchSize: cfg.LogShipper.BatchSize,
|
|
FlushInterval: cfg.LogShipper.FlushInterval,
|
|
Service: "hold",
|
|
Username: cfg.LogShipper.Username,
|
|
Password: cfg.LogShipper.Password,
|
|
})
|
|
|
|
s := &HoldServer{
|
|
Config: cfg,
|
|
}
|
|
|
|
if cfg.Server.TestMode {
|
|
atproto.SetTestMode(true)
|
|
}
|
|
|
|
// Initialize embedded PDS if database path is configured
|
|
var xrpcHandler *pds.XRPCHandler
|
|
var s3Service *s3.S3Service
|
|
if cfg.Database.Path != "" {
|
|
ctx := context.Background()
|
|
|
|
holdDID, err := pds.LoadOrCreateDID(ctx, pds.DIDConfig{
|
|
DID: cfg.Database.DID,
|
|
DIDMethod: cfg.Database.DIDMethod,
|
|
PublicURL: cfg.Server.PublicURL,
|
|
DBPath: cfg.Database.Path,
|
|
SigningKeyPath: cfg.Database.KeyPath,
|
|
RotationKey: cfg.Database.RotationKey,
|
|
PLCDirectoryURL: cfg.Database.PLCDirectoryURL,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to resolve hold DID: %w", err)
|
|
}
|
|
slog.Info("Initializing embedded PDS", "did", holdDID)
|
|
|
|
if cfg.Database.Path != ":memory:" {
|
|
// File mode: open centralized shared DB (supports embedded replica sync)
|
|
dbFilePath := cfg.Database.Path + "/db.sqlite3"
|
|
libsqlCfg := holddb.LibsqlConfig{
|
|
SyncURL: cfg.Database.LibsqlSyncURL,
|
|
AuthToken: cfg.Database.LibsqlAuthToken,
|
|
SyncInterval: cfg.Database.LibsqlSyncInterval,
|
|
}
|
|
s.holdDB, err = holddb.OpenHoldDB(dbFilePath, libsqlCfg)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open hold database: %w", err)
|
|
}
|
|
|
|
// Use shared DB for all subsystems
|
|
s.PDS, err = pds.NewHoldPDSWithDB(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts, s.holdDB.DB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err)
|
|
}
|
|
|
|
s.broadcaster = pds.NewEventBroadcasterWithDB(holdDID, 100, s.holdDB.DB)
|
|
} else {
|
|
// In-memory mode (tests): each subsystem opens its own connection
|
|
s.PDS, err = pds.NewHoldPDS(ctx, holdDID, cfg.Server.PublicURL, cfg.Server.AppviewURL(), cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize embedded PDS: %w", err)
|
|
}
|
|
|
|
s.broadcaster = pds.NewEventBroadcaster(holdDID, 100, ":memory:")
|
|
}
|
|
|
|
// Create S3 service (used for bootstrap, handlers, GC, etc.)
|
|
s3Service, err = s3.NewS3Service(cfg.Storage.S3Params())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create S3 service: %w", err)
|
|
}
|
|
|
|
// Bootstrap events from existing repo records (one-time migration).
|
|
// Must run BEFORE the live event handler is wired, so it captures
|
|
// the full historical state without interference from new writes.
|
|
if err := s.broadcaster.BootstrapFromRepo(s.PDS); err != nil {
|
|
slog.Warn("Failed to bootstrap events from repo", "error", err)
|
|
}
|
|
|
|
// Backfill records index from existing MST data (one-time on startup)
|
|
if err := s.PDS.BackfillRecordsIndex(ctx); err != nil {
|
|
slog.Warn("Failed to backfill records index", "error", err)
|
|
}
|
|
|
|
// Wire up repo event handler with records indexing + broadcaster.
|
|
// Must be BEFORE Bootstrap so that record creates/updates during
|
|
// bootstrap (captain, crew, profile) emit to the firehose.
|
|
indexingHandler := s.PDS.CreateRecordsIndexEventHandler(s.broadcaster.SetRepoEventHandler())
|
|
s.PDS.RepomgrRef().SetEventHandler(indexingHandler, true)
|
|
|
|
// Bootstrap PDS with captain record, hold owner as first crew member, and profile.
|
|
// Now that the event handler is wired, any changes here emit to the firehose.
|
|
if err := s.PDS.Bootstrap(ctx, s3Service, pds.BootstrapConfig{
|
|
OwnerDID: cfg.Registration.OwnerDID,
|
|
Public: cfg.Server.Public,
|
|
AllowAllCrew: cfg.Registration.AllowAllCrew,
|
|
ProfileAvatarURL: cfg.Registration.ProfileAvatarURL,
|
|
ProfileDisplayName: cfg.Registration.ProfileDisplayName,
|
|
ProfileDescription: cfg.Registration.ProfileDescription,
|
|
Region: cfg.Registration.Region,
|
|
}); err != nil {
|
|
return nil, fmt.Errorf("failed to bootstrap PDS: %w", err)
|
|
}
|
|
|
|
// Sync successor from config (if set) — separate from Bootstrap to avoid changing its signature
|
|
if cfg.Server.Successor != "" {
|
|
if _, captain, err := s.PDS.GetCaptainRecord(ctx); err == nil && captain.Successor != cfg.Server.Successor {
|
|
captain.Successor = cfg.Server.Successor
|
|
if _, err := s.PDS.UpdateCaptainRecord(ctx, captain); err != nil {
|
|
slog.Warn("Failed to sync successor from config", "error", err)
|
|
} else {
|
|
slog.Info("Synced successor from config", "successor", cfg.Server.Successor)
|
|
}
|
|
}
|
|
}
|
|
|
|
slog.Info("Embedded PDS initialized successfully with firehose and records index enabled")
|
|
} else {
|
|
return nil, fmt.Errorf("database path is required for embedded PDS authorization")
|
|
}
|
|
|
|
// Initialize quota manager from config
|
|
var err error
|
|
s.QuotaManager, err = quota.NewManagerFromConfig(&cfg.Quota)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to load quota config: %w", err)
|
|
}
|
|
if s.QuotaManager.IsEnabled() {
|
|
slog.Info("Quota enforcement enabled", "tiers", s.QuotaManager.TierCount(), "defaultTier", s.QuotaManager.GetDefaultTier())
|
|
} else {
|
|
slog.Info("Quota enforcement disabled (no quota tiers configured)")
|
|
}
|
|
|
|
// Create XRPC handlers
|
|
var ociHandler *oci.XRPCHandler
|
|
if s.PDS != nil {
|
|
xrpcHandler = pds.NewXRPCHandler(s.PDS, *s3Service, s.broadcaster, nil, s.QuotaManager)
|
|
if cfg.Server.AppviewDID != "" {
|
|
xrpcHandler.SetAppviewDID(cfg.Server.AppviewDID)
|
|
}
|
|
ociHandler = oci.NewXRPCHandler(s.PDS, *s3Service, cfg.Registration.EnableBlueskyPosts, nil, s.QuotaManager)
|
|
|
|
// Initialize scan broadcaster if scanner secret is configured
|
|
if cfg.Scanner.Secret != "" {
|
|
holdDID := s.PDS.DID()
|
|
rescanInterval := cfg.Scanner.RescanInterval
|
|
var sb *pds.ScanBroadcaster
|
|
if s.holdDB != nil {
|
|
sb, err = pds.NewScanBroadcasterWithDB(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, s.holdDB.DB, s3Service, s.PDS, rescanInterval)
|
|
} else {
|
|
scanDBPath := cfg.Database.Path + "/db.sqlite3"
|
|
sb, err = pds.NewScanBroadcaster(holdDID, cfg.Server.PublicURL, cfg.Scanner.Secret, cfg.Server.RelayEndpoint, scanDBPath, s3Service, s.PDS, rescanInterval)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize scan broadcaster: %w", err)
|
|
}
|
|
s.scanBroadcaster = sb
|
|
xrpcHandler.SetScanBroadcaster(sb)
|
|
ociHandler.SetScanBroadcaster(sb)
|
|
slog.Info("Scan broadcaster initialized (scanner WebSocket enabled)",
|
|
"rescanInterval", rescanInterval)
|
|
}
|
|
|
|
// Initialize garbage collector
|
|
s.garbageCollector = gc.NewGarbageCollector(s.PDS, s3Service, cfg.GC)
|
|
slog.Info("Garbage collector initialized",
|
|
"enabled", cfg.GC.Enabled)
|
|
}
|
|
|
|
// Setup HTTP routes with chi router
|
|
r := chi.NewRouter()
|
|
r.Use(middleware.RealIP)
|
|
r.Use(middleware.Maybe(middleware.Logger, func(r *http.Request) bool {
|
|
return r.URL.Path != "/xrpc/_health"
|
|
}))
|
|
|
|
if xrpcHandler != nil {
|
|
r.Use(xrpcHandler.CORSMiddleware())
|
|
}
|
|
|
|
// Root page
|
|
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/plain")
|
|
fmt.Fprintf(w, "This is a hold server. More info at https://atcr.io")
|
|
})
|
|
|
|
// Robots.txt - disallow crawling of all endpoints except root
|
|
r.Get("/robots.txt", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/plain")
|
|
fmt.Fprint(w, "User-agent: *\nAllow: /\nDisallow: /xrpc/\nDisallow: /admin/\n")
|
|
})
|
|
|
|
// Register XRPC/ATProto PDS endpoints
|
|
if xrpcHandler != nil {
|
|
slog.Info("Registering ATProto PDS endpoints")
|
|
xrpcHandler.RegisterHandlers(r)
|
|
}
|
|
|
|
// Register OCI multipart upload endpoints
|
|
if ociHandler != nil {
|
|
slog.Info("Registering OCI multipart upload endpoints")
|
|
ociHandler.RegisterHandlers(r)
|
|
}
|
|
|
|
// Initialize and register admin panel if enabled
|
|
if cfg.Admin.Enabled && s.PDS != nil {
|
|
adminCfg := admin.AdminConfig{
|
|
Enabled: true,
|
|
PublicURL: cfg.Server.PublicURL,
|
|
ConfigPath: cfg.ConfigPath(),
|
|
}
|
|
|
|
s.adminUI, err = admin.NewAdminUI(context.Background(), s.PDS, s.QuotaManager, s.garbageCollector, adminCfg)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize admin panel: %w", err)
|
|
}
|
|
|
|
if s.adminUI != nil {
|
|
slog.Info("Registering admin panel routes")
|
|
s.adminUI.RegisterRoutes(r)
|
|
}
|
|
}
|
|
|
|
s.Router = r
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// Serve starts the HTTP server and blocks until shutdown signal.
|
|
func (s *HoldServer) Serve() error {
|
|
s.httpServer = &http.Server{
|
|
Addr: s.Config.Server.Addr,
|
|
Handler: s.Router,
|
|
ReadTimeout: s.Config.Server.ReadTimeout,
|
|
WriteTimeout: s.Config.Server.WriteTimeout,
|
|
}
|
|
|
|
// Set up signal handling for graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
|
|
|
// Start server in goroutine
|
|
serverErr := make(chan error, 1)
|
|
go func() {
|
|
slog.Info("Starting hold service", "addr", s.Config.Server.Addr)
|
|
if err := s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
serverErr <- err
|
|
}
|
|
}()
|
|
|
|
// Update status post to "online" after server starts
|
|
if s.PDS != nil {
|
|
ctx := context.Background()
|
|
if err := s.PDS.SetStatus(ctx, "online"); err != nil {
|
|
slog.Warn("Failed to set status post to online", "error", err)
|
|
} else {
|
|
slog.Info("Status post set to online")
|
|
}
|
|
}
|
|
|
|
// Fetch appview metadata for branding (Bluesky posts)
|
|
if s.Config.Server.AppviewURL() != "" {
|
|
meta, err := atproto.FetchAppviewMetadata(context.Background(), s.Config.Server.AppviewURL())
|
|
if err != nil {
|
|
slog.Warn("Failed to fetch appview metadata, using defaults", "appview_url", s.Config.Server.AppviewURL(), "error", err)
|
|
} else {
|
|
s.PDS.SetAppviewMeta(meta)
|
|
slog.Info("Fetched appview metadata", "clientName", meta.ClientName, "clientShortName", meta.ClientShortName)
|
|
}
|
|
}
|
|
|
|
// Request crawl from relay to make PDS discoverable
|
|
if s.Config.Server.RelayEndpoint != "" {
|
|
slog.Info("Requesting crawl from relay", "relay", s.Config.Server.RelayEndpoint)
|
|
if err := atproto.RequestCrawl(s.Config.Server.RelayEndpoint, s.Config.Server.PublicURL); err != nil {
|
|
slog.Warn("Failed to request crawl from relay", "error", err)
|
|
} else {
|
|
slog.Info("Crawl requested successfully")
|
|
}
|
|
}
|
|
|
|
// Start garbage collector (runs on startup + nightly)
|
|
if s.garbageCollector != nil {
|
|
s.garbageCollector.Start(context.Background())
|
|
}
|
|
|
|
// Wait for signal or server error
|
|
select {
|
|
case err := <-serverErr:
|
|
slog.Error("Server failed", "error", err)
|
|
logging.Shutdown()
|
|
return err
|
|
case sig := <-sigChan:
|
|
slog.Info("Received signal, shutting down gracefully", "signal", sig)
|
|
s.shutdown()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *HoldServer) shutdown() {
|
|
// Update status post to "offline" before shutdown
|
|
if s.PDS != nil {
|
|
ctx := context.Background()
|
|
if err := s.PDS.SetStatus(ctx, "offline"); err != nil {
|
|
slog.Warn("Failed to set status post to offline", "error", err)
|
|
} else {
|
|
slog.Info("Status post set to offline")
|
|
}
|
|
}
|
|
|
|
// Stop garbage collector
|
|
if s.garbageCollector != nil {
|
|
s.garbageCollector.Stop()
|
|
slog.Info("Garbage collector stopped")
|
|
}
|
|
|
|
// Close scan broadcaster database connection
|
|
if s.scanBroadcaster != nil {
|
|
if err := s.scanBroadcaster.Close(); err != nil {
|
|
slog.Warn("Failed to close scan broadcaster database", "error", err)
|
|
} else {
|
|
slog.Info("Scan broadcaster database closed")
|
|
}
|
|
}
|
|
|
|
// Close broadcaster database connection
|
|
if s.broadcaster != nil {
|
|
if err := s.broadcaster.Close(); err != nil {
|
|
slog.Warn("Failed to close broadcaster database", "error", err)
|
|
} else {
|
|
slog.Info("Broadcaster database closed")
|
|
}
|
|
}
|
|
|
|
// Close admin panel
|
|
if s.adminUI != nil {
|
|
if err := s.adminUI.Close(); err != nil {
|
|
slog.Warn("Failed to close admin panel", "error", err)
|
|
} else {
|
|
slog.Info("Admin panel closed")
|
|
}
|
|
}
|
|
|
|
// Close shared database connection and connector (after all subsystems)
|
|
if s.holdDB != nil {
|
|
if err := s.holdDB.Close(); err != nil {
|
|
slog.Warn("Failed to close hold database", "error", err)
|
|
} else {
|
|
slog.Info("Hold database closed")
|
|
}
|
|
}
|
|
|
|
// Graceful shutdown with 10 second timeout
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
|
|
slog.Error("Server shutdown error", "error", err)
|
|
} else {
|
|
slog.Info("Server shutdown complete")
|
|
}
|
|
|
|
logging.Shutdown()
|
|
}
|