Files
at-container-registry/cmd/hold/main.go
2025-12-18 11:19:49 -06:00

230 lines
7.0 KiB
Go

package main
import (
"context"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"atcr.io/pkg/hold"
"atcr.io/pkg/hold/oci"
"atcr.io/pkg/hold/pds"
"atcr.io/pkg/logging"
"atcr.io/pkg/s3"
// Import storage drivers
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
_ "github.com/distribution/distribution/v3/registry/storage/driver/filesystem"
_ "github.com/distribution/distribution/v3/registry/storage/driver/s3-aws"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
)
func main() {
// Load configuration from environment variables
cfg, err := hold.LoadConfigFromEnv()
if err != nil {
slog.Error("Failed to load config", "error", err)
os.Exit(1)
}
// Initialize structured logging
logging.InitLogger(cfg.LogLevel)
// Initialize embedded PDS if database path is configured
// This must happen before creating HoldService since service needs PDS for authorization
var holdPDS *pds.HoldPDS
var xrpcHandler *pds.XRPCHandler
var broadcaster *pds.EventBroadcaster
if cfg.Database.Path != "" {
// Generate did:web from public URL
holdDID := pds.GenerateDIDFromURL(cfg.Server.PublicURL)
slog.Info("Initializing embedded PDS", "did", holdDID)
// Initialize PDS with carstore and keys
ctx := context.Background()
holdPDS, err = pds.NewHoldPDS(ctx, holdDID, cfg.Server.PublicURL, cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts)
if err != nil {
slog.Error("Failed to initialize embedded PDS", "error", err)
os.Exit(1)
}
// Create storage driver from config (needed for bootstrap profile avatar)
driver, err := factory.Create(ctx, cfg.Storage.Type(), cfg.Storage.Parameters())
if err != nil {
slog.Error("Failed to create storage driver", "error", err)
os.Exit(1)
}
// Bootstrap PDS with captain record, hold owner as first crew member, and profile
if err := holdPDS.Bootstrap(ctx, driver, cfg.Registration.OwnerDID, cfg.Server.Public, cfg.Registration.AllowAllCrew, cfg.Registration.ProfileAvatarURL); err != nil {
slog.Error("Failed to bootstrap PDS", "error", err)
os.Exit(1)
}
// Create event broadcaster for subscribeRepos firehose
// Database path: carstore creates db.sqlite3 inside cfg.Database.Path
var dbPath string
if cfg.Database.Path != ":memory:" {
dbPath = cfg.Database.Path + "/db.sqlite3"
} else {
dbPath = ":memory:"
}
broadcaster = pds.NewEventBroadcaster(holdDID, 100, dbPath)
// Bootstrap events from existing repo records (one-time migration)
if err := broadcaster.BootstrapFromRepo(holdPDS); err != nil {
slog.Warn("Failed to bootstrap events from repo", "error", err)
}
// Wire up repo event handler to broadcaster
holdPDS.RepomgrRef().SetEventHandler(broadcaster.SetRepoEventHandler(), true)
slog.Info("Embedded PDS initialized successfully with firehose enabled")
} else {
slog.Error("Database path is required for embedded PDS authorization")
os.Exit(1)
}
// Create blob store adapter and XRPC handlers
var ociHandler *oci.XRPCHandler
if holdPDS != nil {
// Create storage driver from config
ctx := context.Background()
driver, err := factory.Create(ctx, cfg.Storage.Type(), cfg.Storage.Parameters())
if err != nil {
slog.Error("Failed to create storage driver", "error", err)
os.Exit(1)
}
s3Service, err := s3.NewS3Service(cfg.Storage.Parameters(), cfg.Server.DisablePresignedURLs, cfg.Storage.Type())
if err != nil {
slog.Error("Failed to create S3 service", "error", err)
os.Exit(1)
}
// Create PDS XRPC handler (ATProto endpoints)
xrpcHandler = pds.NewXRPCHandler(holdPDS, *s3Service, driver, broadcaster, nil)
// Create OCI XRPC handler (multipart upload endpoints)
ociHandler = oci.NewXRPCHandler(holdPDS, *s3Service, driver, cfg.Server.DisablePresignedURLs, cfg.Registration.EnableBlueskyPosts, nil)
}
// Setup HTTP routes with chi router
r := chi.NewRouter()
// Add RealIP middleware to extract real client IP from proxy headers
r.Use(middleware.RealIP)
// Add logging middleware to log all HTTP requests
r.Use(middleware.Logger)
// Add CORS middleware (must be before routes)
if xrpcHandler != nil {
r.Use(xrpcHandler.CORSMiddleware())
}
// Root page
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(w, "This is a hold server. More info at https://atcr.io")
})
// Register XRPC/ATProto PDS endpoints if PDS is initialized
if xrpcHandler != nil {
slog.Info("Registering ATProto PDS endpoints")
xrpcHandler.RegisterHandlers(r)
}
// Register OCI multipart upload endpoints
if ociHandler != nil {
slog.Info("Registering OCI multipart upload endpoints")
ociHandler.RegisterHandlers(r)
}
// Create server
server := &http.Server{
Addr: cfg.Server.Addr,
Handler: r,
ReadTimeout: cfg.Server.ReadTimeout,
WriteTimeout: cfg.Server.WriteTimeout,
}
// Set up signal handling for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Start server in goroutine
serverErr := make(chan error, 1)
go func() {
slog.Info("Starting hold service", "addr", cfg.Server.Addr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
serverErr <- err
}
}()
// Update status post to "online" after server starts
if holdPDS != nil {
ctx := context.Background()
if err := holdPDS.SetStatus(ctx, "online"); err != nil {
slog.Warn("Failed to set status post to online", "error", err)
} else {
slog.Info("Status post set to online")
}
}
// Request crawl from relay to make PDS discoverable
if cfg.Server.RelayEndpoint != "" {
slog.Info("Requesting crawl from relay", "relay", cfg.Server.RelayEndpoint)
if err := hold.RequestCrawl(cfg.Server.RelayEndpoint, cfg.Server.PublicURL); err != nil {
slog.Warn("Failed to request crawl from relay", "error", err)
} else {
slog.Info("Crawl requested successfully")
}
}
// Wait for signal or server error
select {
case err := <-serverErr:
slog.Error("Server failed", "error", err)
os.Exit(1)
case sig := <-sigChan:
slog.Info("Received signal, shutting down gracefully", "signal", sig)
// Update status post to "offline" before shutdown
if holdPDS != nil {
ctx := context.Background()
if err := holdPDS.SetStatus(ctx, "offline"); err != nil {
slog.Warn("Failed to set status post to offline", "error", err)
} else {
slog.Info("Status post set to offline")
}
}
// Close broadcaster database connection
if broadcaster != nil {
if err := broadcaster.Close(); err != nil {
slog.Warn("Failed to close broadcaster database", "error", err)
} else {
slog.Info("Broadcaster database closed")
}
}
// Graceful shutdown with 10 second timeout
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := server.Shutdown(shutdownCtx); err != nil {
slog.Error("Server shutdown error", "error", err)
} else {
slog.Info("Server shutdown complete")
}
}
}