Files
at-container-registry/pkg/hold/pds/server.go
2025-10-29 12:06:47 -05:00

275 lines
8.6 KiB
Go

package pds
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"atcr.io/pkg/atproto"
"atcr.io/pkg/auth/oauth"
"github.com/bluesky-social/indigo/atproto/atcrypto"
"github.com/bluesky-social/indigo/carstore"
lexutil "github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/models"
"github.com/bluesky-social/indigo/repo"
"github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/ipfs/go-cid"
)
// init registers our custom ATProto types with indigo's lexutil type registry
// This allows repomgr.GetRecord to automatically unmarshal our types
func init() {
// Register captain, crew, tangled profile, and layer record types
// These must match the $type field in the records
lexutil.RegisterType(atproto.CaptainCollection, &atproto.CaptainRecord{})
lexutil.RegisterType(atproto.CrewCollection, &atproto.CrewRecord{})
lexutil.RegisterType(atproto.LayerCollection, &atproto.LayerRecord{})
lexutil.RegisterType(atproto.TangledProfileCollection, &atproto.TangledProfileRecord{})
}
// HoldPDS is a minimal ATProto PDS implementation for a hold service
type HoldPDS struct {
did string
PublicURL string
carstore carstore.CarStore
repomgr *RepoManager
dbPath string
uid models.Uid
signingKey *atcrypto.PrivateKeyK256
enableBlueskyPosts bool
}
// NewHoldPDS creates or opens a hold PDS with SQLite carstore
func NewHoldPDS(ctx context.Context, did, publicURL, dbPath, keyPath string, enableBlueskyPosts bool) (*HoldPDS, error) {
// Generate or load signing key
signingKey, err := oauth.GenerateOrLoadPDSKey(keyPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize signing key: %w", err)
}
// Create SQLite-backed carstore
var sqlStore *carstore.SQLiteStore
if dbPath == ":memory:" {
// In-memory mode for tests: create carstore manually and open with :memory:
sqlStore = new(carstore.SQLiteStore)
if err := sqlStore.Open(":memory:"); err != nil {
return nil, fmt.Errorf("failed to open in-memory sqlite store: %w", err)
}
} else {
// File mode for production: create directory and use NewSqliteStore
dir := filepath.Dir(dbPath)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to create database directory: %w", err)
}
// dbPath is the directory, carstore creates and opens db.sqlite3 inside it
sqlStore, err = carstore.NewSqliteStore(dbPath)
if err != nil {
return nil, fmt.Errorf("failed to create sqlite store: %w", err)
}
}
// Use SQLiteStore directly, not the CarStore() wrapper
// The wrapper has a bug where GetUserRepoHead checks CarShard.ID which SQLite doesn't populate
cs := sqlStore
// For a single-user hold, we use a fixed UID (1)
uid := models.Uid(1)
// Create KeyManager wrapper for our signing key
kmgr := NewHoldKeyManager(signingKey)
// Create RepoManager - it will handle all session/repo lifecycle
rm := NewRepoManager(cs, kmgr)
// Check if repo already exists, if not create initial commit
head, err := cs.GetUserRepoHead(ctx, uid)
hasValidRepo := (err == nil && head.Defined())
if !hasValidRepo {
// Initialize empty repo with first commit
// RepoManager requires at least one commit to exist
// We'll create this by doing a dummy operation in Bootstrap
slog.Info("New hold repo - will be initialized in Bootstrap")
}
return &HoldPDS{
did: did,
PublicURL: publicURL,
carstore: cs,
repomgr: rm,
dbPath: dbPath,
uid: uid,
signingKey: signingKey,
enableBlueskyPosts: enableBlueskyPosts,
}, nil
}
// DID returns the hold's DID
func (p *HoldPDS) DID() string {
return p.did
}
// SigningKey returns the hold's signing key
func (p *HoldPDS) SigningKey() *atcrypto.PrivateKeyK256 {
return p.signingKey
}
// RepomgrRef returns a reference to the RepoManager for event handler setup
func (p *HoldPDS) RepomgrRef() *RepoManager {
return p.repomgr
}
// Bootstrap initializes the hold with the captain record, owner as first crew member, and profile
func (p *HoldPDS) Bootstrap(ctx context.Context, storageDriver driver.StorageDriver, ownerDID string, public bool, allowAllCrew bool, avatarURL string) error {
if ownerDID == "" {
return nil
}
// Check if captain record already exists (idempotent bootstrap)
_, _, err := p.GetCaptainRecord(ctx)
captainExists := (err == nil)
if captainExists {
// Captain record exists, skip captain/crew setup but still create profile if needed
slog.Info("Captain record exists, skipping captain/crew setup")
} else {
slog.Info("Bootstrapping hold PDS", "owner", ownerDID)
}
if !captainExists {
// Initialize repo if it doesn't exist yet
// Check if repo exists by trying to get the head
head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
if err != nil || !head.Defined() {
// Repo doesn't exist, initialize it
// InitNewActor creates an empty repo with initial commit
err = p.repomgr.InitNewActor(ctx, p.uid, "", p.did, "", "", "")
if err != nil {
return fmt.Errorf("failed to initialize repo: %w", err)
}
slog.Info("Initialized empty repo")
}
// Create captain record (hold ownership and settings)
_, err = p.CreateCaptainRecord(ctx, ownerDID, public, allowAllCrew, p.enableBlueskyPosts)
if err != nil {
return fmt.Errorf("failed to create captain record: %w", err)
}
slog.Info("Created captain record",
"public", public,
"allowAllCrew", allowAllCrew,
"enableBlueskyPosts", p.enableBlueskyPosts)
// Add hold owner as first crew member with admin role
_, err = p.AddCrewMember(ctx, ownerDID, "admin", []string{"blob:read", "blob:write", "crew:admin"})
if err != nil {
return fmt.Errorf("failed to add owner as crew member: %w", err)
}
slog.Info("Added owner as hold admin", "did", ownerDID)
} else {
// Captain record exists, check if we need to sync settings from env vars
_, existingCaptain, err := p.GetCaptainRecord(ctx)
if err == nil {
// Check if any settings need updating
needsUpdate := existingCaptain.Public != public ||
existingCaptain.AllowAllCrew != allowAllCrew ||
existingCaptain.EnableBlueskyPosts != p.enableBlueskyPosts
if needsUpdate {
// Update captain record to match env vars
_, err = p.UpdateCaptainRecord(ctx, public, allowAllCrew, p.enableBlueskyPosts)
if err != nil {
return fmt.Errorf("failed to update captain record: %w", err)
}
slog.Info("Synced captain record with env vars",
"public", public,
"allowAllCrew", allowAllCrew,
"enableBlueskyPosts", p.enableBlueskyPosts)
}
}
}
// Create Bluesky profile record (idempotent - check if exists first)
// This runs even if captain exists (for existing holds being upgraded)
// Skip if no storage driver (e.g., in tests)
if storageDriver != nil {
_, _, err = p.GetProfileRecord(ctx)
if err != nil {
// Bluesky profile doesn't exist, create it
displayName := "Cargo Hold"
description := "ahoy from the cargo hold"
_, err = p.CreateProfileRecord(ctx, storageDriver, displayName, description, avatarURL)
if err != nil {
return fmt.Errorf("failed to create bluesky profile record: %w", err)
}
slog.Info("Created Bluesky profile record", "displayName", displayName)
} else {
slog.Info("Bluesky profile record already exists, skipping")
}
}
return nil
}
// ListCollections returns all collections present in the hold's repository
func (p *HoldPDS) ListCollections(ctx context.Context) ([]string, error) {
session, err := p.carstore.ReadOnlySession(p.uid)
if err != nil {
return nil, fmt.Errorf("failed to create read-only session: %w", err)
}
head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
if err != nil {
return nil, fmt.Errorf("failed to get repo head: %w", err)
}
if !head.Defined() {
// Empty repo, no collections
return []string{}, nil
}
r, err := repo.OpenRepo(ctx, session, head)
if err != nil {
return nil, fmt.Errorf("failed to open repo: %w", err)
}
collections := make(map[string]bool)
// Walk all records in the repo to discover collections
err = r.ForEach(ctx, "", func(k string, v cid.Cid) error {
// k is like "io.atcr.hold.captain/self" or "io.atcr.hold.crew/3m3by7msdln22"
parts := strings.Split(k, "/")
if len(parts) >= 1 {
collections[parts[0]] = true
}
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to enumerate collections: %w", err)
}
// Convert map to sorted slice
result := make([]string, 0, len(collections))
for collection := range collections {
result = append(result, collection)
}
return result, nil
}
// Close closes the carstore
func (p *HoldPDS) Close() error {
// TODO: Close session properly
return nil
}