230 lines
7.0 KiB
Go
230 lines
7.0 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"atcr.io/pkg/hold"
|
|
"atcr.io/pkg/hold/oci"
|
|
"atcr.io/pkg/hold/pds"
|
|
"atcr.io/pkg/logging"
|
|
"atcr.io/pkg/s3"
|
|
|
|
// Import storage drivers
|
|
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
|
|
_ "github.com/distribution/distribution/v3/registry/storage/driver/filesystem"
|
|
_ "github.com/distribution/distribution/v3/registry/storage/driver/s3-aws"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/go-chi/chi/v5/middleware"
|
|
)
|
|
|
|
func main() {
|
|
// Load configuration from environment variables
|
|
cfg, err := hold.LoadConfigFromEnv()
|
|
if err != nil {
|
|
slog.Error("Failed to load config", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Initialize structured logging
|
|
logging.InitLogger(cfg.LogLevel)
|
|
|
|
// Initialize embedded PDS if database path is configured
|
|
// This must happen before creating HoldService since service needs PDS for authorization
|
|
var holdPDS *pds.HoldPDS
|
|
var xrpcHandler *pds.XRPCHandler
|
|
var broadcaster *pds.EventBroadcaster
|
|
if cfg.Database.Path != "" {
|
|
// Generate did:web from public URL
|
|
holdDID := pds.GenerateDIDFromURL(cfg.Server.PublicURL)
|
|
slog.Info("Initializing embedded PDS", "did", holdDID)
|
|
|
|
// Initialize PDS with carstore and keys
|
|
ctx := context.Background()
|
|
holdPDS, err = pds.NewHoldPDS(ctx, holdDID, cfg.Server.PublicURL, cfg.Database.Path, cfg.Database.KeyPath, cfg.Registration.EnableBlueskyPosts)
|
|
if err != nil {
|
|
slog.Error("Failed to initialize embedded PDS", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Create storage driver from config (needed for bootstrap profile avatar)
|
|
driver, err := factory.Create(ctx, cfg.Storage.Type(), cfg.Storage.Parameters())
|
|
if err != nil {
|
|
slog.Error("Failed to create storage driver", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Bootstrap PDS with captain record, hold owner as first crew member, and profile
|
|
if err := holdPDS.Bootstrap(ctx, driver, cfg.Registration.OwnerDID, cfg.Server.Public, cfg.Registration.AllowAllCrew, cfg.Registration.ProfileAvatarURL); err != nil {
|
|
slog.Error("Failed to bootstrap PDS", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Create event broadcaster for subscribeRepos firehose
|
|
// Database path: carstore creates db.sqlite3 inside cfg.Database.Path
|
|
var dbPath string
|
|
if cfg.Database.Path != ":memory:" {
|
|
dbPath = cfg.Database.Path + "/db.sqlite3"
|
|
} else {
|
|
dbPath = ":memory:"
|
|
}
|
|
broadcaster = pds.NewEventBroadcaster(holdDID, 100, dbPath)
|
|
|
|
// Bootstrap events from existing repo records (one-time migration)
|
|
if err := broadcaster.BootstrapFromRepo(holdPDS); err != nil {
|
|
slog.Warn("Failed to bootstrap events from repo", "error", err)
|
|
}
|
|
|
|
// Wire up repo event handler to broadcaster
|
|
holdPDS.RepomgrRef().SetEventHandler(broadcaster.SetRepoEventHandler(), true)
|
|
|
|
slog.Info("Embedded PDS initialized successfully with firehose enabled")
|
|
} else {
|
|
slog.Error("Database path is required for embedded PDS authorization")
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Create blob store adapter and XRPC handlers
|
|
var ociHandler *oci.XRPCHandler
|
|
if holdPDS != nil {
|
|
// Create storage driver from config
|
|
ctx := context.Background()
|
|
driver, err := factory.Create(ctx, cfg.Storage.Type(), cfg.Storage.Parameters())
|
|
if err != nil {
|
|
slog.Error("Failed to create storage driver", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
s3Service, err := s3.NewS3Service(cfg.Storage.Parameters(), cfg.Server.DisablePresignedURLs, cfg.Storage.Type())
|
|
if err != nil {
|
|
slog.Error("Failed to create S3 service", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
// Create PDS XRPC handler (ATProto endpoints)
|
|
xrpcHandler = pds.NewXRPCHandler(holdPDS, *s3Service, driver, broadcaster, nil)
|
|
|
|
// Create OCI XRPC handler (multipart upload endpoints)
|
|
ociHandler = oci.NewXRPCHandler(holdPDS, *s3Service, driver, cfg.Server.DisablePresignedURLs, cfg.Registration.EnableBlueskyPosts, nil)
|
|
}
|
|
|
|
// Setup HTTP routes with chi router
|
|
r := chi.NewRouter()
|
|
|
|
// Add RealIP middleware to extract real client IP from proxy headers
|
|
r.Use(middleware.RealIP)
|
|
|
|
// Add logging middleware to log all HTTP requests
|
|
r.Use(middleware.Logger)
|
|
|
|
// Add CORS middleware (must be before routes)
|
|
if xrpcHandler != nil {
|
|
r.Use(xrpcHandler.CORSMiddleware())
|
|
}
|
|
|
|
// Root page
|
|
r.Get("/", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/plain")
|
|
fmt.Fprintf(w, "This is a hold server. More info at https://atcr.io")
|
|
})
|
|
|
|
// Register XRPC/ATProto PDS endpoints if PDS is initialized
|
|
if xrpcHandler != nil {
|
|
slog.Info("Registering ATProto PDS endpoints")
|
|
xrpcHandler.RegisterHandlers(r)
|
|
}
|
|
|
|
// Register OCI multipart upload endpoints
|
|
if ociHandler != nil {
|
|
slog.Info("Registering OCI multipart upload endpoints")
|
|
ociHandler.RegisterHandlers(r)
|
|
}
|
|
|
|
// Create server
|
|
server := &http.Server{
|
|
Addr: cfg.Server.Addr,
|
|
Handler: r,
|
|
ReadTimeout: cfg.Server.ReadTimeout,
|
|
WriteTimeout: cfg.Server.WriteTimeout,
|
|
}
|
|
|
|
// Set up signal handling for graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
|
|
|
// Start server in goroutine
|
|
serverErr := make(chan error, 1)
|
|
go func() {
|
|
slog.Info("Starting hold service", "addr", cfg.Server.Addr)
|
|
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
serverErr <- err
|
|
}
|
|
}()
|
|
|
|
// Update status post to "online" after server starts
|
|
if holdPDS != nil {
|
|
ctx := context.Background()
|
|
|
|
if err := holdPDS.SetStatus(ctx, "online"); err != nil {
|
|
slog.Warn("Failed to set status post to online", "error", err)
|
|
} else {
|
|
slog.Info("Status post set to online")
|
|
}
|
|
}
|
|
|
|
// Request crawl from relay to make PDS discoverable
|
|
if cfg.Server.RelayEndpoint != "" {
|
|
slog.Info("Requesting crawl from relay", "relay", cfg.Server.RelayEndpoint)
|
|
if err := hold.RequestCrawl(cfg.Server.RelayEndpoint, cfg.Server.PublicURL); err != nil {
|
|
slog.Warn("Failed to request crawl from relay", "error", err)
|
|
} else {
|
|
slog.Info("Crawl requested successfully")
|
|
}
|
|
}
|
|
|
|
// Wait for signal or server error
|
|
select {
|
|
case err := <-serverErr:
|
|
slog.Error("Server failed", "error", err)
|
|
os.Exit(1)
|
|
case sig := <-sigChan:
|
|
slog.Info("Received signal, shutting down gracefully", "signal", sig)
|
|
|
|
// Update status post to "offline" before shutdown
|
|
if holdPDS != nil {
|
|
ctx := context.Background()
|
|
if err := holdPDS.SetStatus(ctx, "offline"); err != nil {
|
|
slog.Warn("Failed to set status post to offline", "error", err)
|
|
} else {
|
|
slog.Info("Status post set to offline")
|
|
}
|
|
}
|
|
|
|
// Close broadcaster database connection
|
|
if broadcaster != nil {
|
|
if err := broadcaster.Close(); err != nil {
|
|
slog.Warn("Failed to close broadcaster database", "error", err)
|
|
} else {
|
|
slog.Info("Broadcaster database closed")
|
|
}
|
|
}
|
|
|
|
// Graceful shutdown with 10 second timeout
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
if err := server.Shutdown(shutdownCtx); err != nil {
|
|
slog.Error("Server shutdown error", "error", err)
|
|
} else {
|
|
slog.Info("Server shutdown complete")
|
|
}
|
|
}
|
|
}
|