mirror of
https://tangled.org/evan.jarrett.net/at-container-registry
synced 2026-04-24 10:20:32 +00:00
275 lines
8.6 KiB
Go
275 lines
8.6 KiB
Go
package pds
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"atcr.io/pkg/atproto"
|
|
"atcr.io/pkg/auth/oauth"
|
|
"github.com/bluesky-social/indigo/atproto/atcrypto"
|
|
"github.com/bluesky-social/indigo/carstore"
|
|
lexutil "github.com/bluesky-social/indigo/lex/util"
|
|
"github.com/bluesky-social/indigo/models"
|
|
"github.com/bluesky-social/indigo/repo"
|
|
"github.com/distribution/distribution/v3/registry/storage/driver"
|
|
"github.com/ipfs/go-cid"
|
|
)
|
|
|
|
// init registers our custom ATProto types with indigo's lexutil type registry
|
|
// This allows repomgr.GetRecord to automatically unmarshal our types
|
|
func init() {
|
|
// Register captain, crew, tangled profile, and layer record types
|
|
// These must match the $type field in the records
|
|
lexutil.RegisterType(atproto.CaptainCollection, &atproto.CaptainRecord{})
|
|
lexutil.RegisterType(atproto.CrewCollection, &atproto.CrewRecord{})
|
|
lexutil.RegisterType(atproto.LayerCollection, &atproto.LayerRecord{})
|
|
lexutil.RegisterType(atproto.TangledProfileCollection, &atproto.TangledProfileRecord{})
|
|
}
|
|
|
|
// HoldPDS is a minimal ATProto PDS implementation for a hold service
|
|
type HoldPDS struct {
|
|
did string
|
|
PublicURL string
|
|
carstore carstore.CarStore
|
|
repomgr *RepoManager
|
|
dbPath string
|
|
uid models.Uid
|
|
signingKey *atcrypto.PrivateKeyK256
|
|
enableBlueskyPosts bool
|
|
}
|
|
|
|
// NewHoldPDS creates or opens a hold PDS with SQLite carstore
|
|
func NewHoldPDS(ctx context.Context, did, publicURL, dbPath, keyPath string, enableBlueskyPosts bool) (*HoldPDS, error) {
|
|
// Generate or load signing key
|
|
signingKey, err := oauth.GenerateOrLoadPDSKey(keyPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize signing key: %w", err)
|
|
}
|
|
|
|
// Create SQLite-backed carstore
|
|
var sqlStore *carstore.SQLiteStore
|
|
|
|
if dbPath == ":memory:" {
|
|
// In-memory mode for tests: create carstore manually and open with :memory:
|
|
sqlStore = new(carstore.SQLiteStore)
|
|
if err := sqlStore.Open(":memory:"); err != nil {
|
|
return nil, fmt.Errorf("failed to open in-memory sqlite store: %w", err)
|
|
}
|
|
} else {
|
|
// File mode for production: create directory and use NewSqliteStore
|
|
dir := filepath.Dir(dbPath)
|
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create database directory: %w", err)
|
|
}
|
|
|
|
// dbPath is the directory, carstore creates and opens db.sqlite3 inside it
|
|
sqlStore, err = carstore.NewSqliteStore(dbPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create sqlite store: %w", err)
|
|
}
|
|
}
|
|
|
|
// Use SQLiteStore directly, not the CarStore() wrapper
|
|
// The wrapper has a bug where GetUserRepoHead checks CarShard.ID which SQLite doesn't populate
|
|
cs := sqlStore
|
|
|
|
// For a single-user hold, we use a fixed UID (1)
|
|
uid := models.Uid(1)
|
|
|
|
// Create KeyManager wrapper for our signing key
|
|
kmgr := NewHoldKeyManager(signingKey)
|
|
|
|
// Create RepoManager - it will handle all session/repo lifecycle
|
|
rm := NewRepoManager(cs, kmgr)
|
|
|
|
// Check if repo already exists, if not create initial commit
|
|
head, err := cs.GetUserRepoHead(ctx, uid)
|
|
hasValidRepo := (err == nil && head.Defined())
|
|
|
|
if !hasValidRepo {
|
|
// Initialize empty repo with first commit
|
|
// RepoManager requires at least one commit to exist
|
|
// We'll create this by doing a dummy operation in Bootstrap
|
|
slog.Info("New hold repo - will be initialized in Bootstrap")
|
|
}
|
|
|
|
return &HoldPDS{
|
|
did: did,
|
|
PublicURL: publicURL,
|
|
carstore: cs,
|
|
repomgr: rm,
|
|
dbPath: dbPath,
|
|
uid: uid,
|
|
signingKey: signingKey,
|
|
enableBlueskyPosts: enableBlueskyPosts,
|
|
}, nil
|
|
}
|
|
|
|
// DID returns the hold's DID
|
|
func (p *HoldPDS) DID() string {
|
|
return p.did
|
|
}
|
|
|
|
// SigningKey returns the hold's signing key
|
|
func (p *HoldPDS) SigningKey() *atcrypto.PrivateKeyK256 {
|
|
return p.signingKey
|
|
}
|
|
|
|
// RepomgrRef returns a reference to the RepoManager for event handler setup
|
|
func (p *HoldPDS) RepomgrRef() *RepoManager {
|
|
return p.repomgr
|
|
}
|
|
|
|
// Bootstrap initializes the hold with the captain record, owner as first crew member, and profile
|
|
func (p *HoldPDS) Bootstrap(ctx context.Context, storageDriver driver.StorageDriver, ownerDID string, public bool, allowAllCrew bool, avatarURL string) error {
|
|
if ownerDID == "" {
|
|
return nil
|
|
}
|
|
|
|
// Check if captain record already exists (idempotent bootstrap)
|
|
_, _, err := p.GetCaptainRecord(ctx)
|
|
captainExists := (err == nil)
|
|
|
|
if captainExists {
|
|
// Captain record exists, skip captain/crew setup but still create profile if needed
|
|
slog.Info("Captain record exists, skipping captain/crew setup")
|
|
} else {
|
|
slog.Info("Bootstrapping hold PDS", "owner", ownerDID)
|
|
}
|
|
|
|
if !captainExists {
|
|
|
|
// Initialize repo if it doesn't exist yet
|
|
// Check if repo exists by trying to get the head
|
|
head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
|
|
if err != nil || !head.Defined() {
|
|
// Repo doesn't exist, initialize it
|
|
// InitNewActor creates an empty repo with initial commit
|
|
err = p.repomgr.InitNewActor(ctx, p.uid, "", p.did, "", "", "")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize repo: %w", err)
|
|
}
|
|
slog.Info("Initialized empty repo")
|
|
}
|
|
|
|
// Create captain record (hold ownership and settings)
|
|
_, err = p.CreateCaptainRecord(ctx, ownerDID, public, allowAllCrew, p.enableBlueskyPosts)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create captain record: %w", err)
|
|
}
|
|
|
|
slog.Info("Created captain record",
|
|
"public", public,
|
|
"allowAllCrew", allowAllCrew,
|
|
"enableBlueskyPosts", p.enableBlueskyPosts)
|
|
|
|
// Add hold owner as first crew member with admin role
|
|
_, err = p.AddCrewMember(ctx, ownerDID, "admin", []string{"blob:read", "blob:write", "crew:admin"})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to add owner as crew member: %w", err)
|
|
}
|
|
|
|
slog.Info("Added owner as hold admin", "did", ownerDID)
|
|
} else {
|
|
// Captain record exists, check if we need to sync settings from env vars
|
|
_, existingCaptain, err := p.GetCaptainRecord(ctx)
|
|
if err == nil {
|
|
// Check if any settings need updating
|
|
needsUpdate := existingCaptain.Public != public ||
|
|
existingCaptain.AllowAllCrew != allowAllCrew ||
|
|
existingCaptain.EnableBlueskyPosts != p.enableBlueskyPosts
|
|
|
|
if needsUpdate {
|
|
// Update captain record to match env vars
|
|
_, err = p.UpdateCaptainRecord(ctx, public, allowAllCrew, p.enableBlueskyPosts)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update captain record: %w", err)
|
|
}
|
|
slog.Info("Synced captain record with env vars",
|
|
"public", public,
|
|
"allowAllCrew", allowAllCrew,
|
|
"enableBlueskyPosts", p.enableBlueskyPosts)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create Bluesky profile record (idempotent - check if exists first)
|
|
// This runs even if captain exists (for existing holds being upgraded)
|
|
// Skip if no storage driver (e.g., in tests)
|
|
if storageDriver != nil {
|
|
_, _, err = p.GetProfileRecord(ctx)
|
|
if err != nil {
|
|
// Bluesky profile doesn't exist, create it
|
|
displayName := "Cargo Hold"
|
|
description := "ahoy from the cargo hold"
|
|
|
|
_, err = p.CreateProfileRecord(ctx, storageDriver, displayName, description, avatarURL)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create bluesky profile record: %w", err)
|
|
}
|
|
slog.Info("Created Bluesky profile record", "displayName", displayName)
|
|
} else {
|
|
slog.Info("Bluesky profile record already exists, skipping")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListCollections returns all collections present in the hold's repository
|
|
func (p *HoldPDS) ListCollections(ctx context.Context) ([]string, error) {
|
|
session, err := p.carstore.ReadOnlySession(p.uid)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create read-only session: %w", err)
|
|
}
|
|
|
|
head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get repo head: %w", err)
|
|
}
|
|
|
|
if !head.Defined() {
|
|
// Empty repo, no collections
|
|
return []string{}, nil
|
|
}
|
|
|
|
r, err := repo.OpenRepo(ctx, session, head)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open repo: %w", err)
|
|
}
|
|
|
|
collections := make(map[string]bool)
|
|
|
|
// Walk all records in the repo to discover collections
|
|
err = r.ForEach(ctx, "", func(k string, v cid.Cid) error {
|
|
// k is like "io.atcr.hold.captain/self" or "io.atcr.hold.crew/3m3by7msdln22"
|
|
parts := strings.Split(k, "/")
|
|
if len(parts) >= 1 {
|
|
collections[parts[0]] = true
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to enumerate collections: %w", err)
|
|
}
|
|
|
|
// Convert map to sorted slice
|
|
result := make([]string, 0, len(collections))
|
|
for collection := range collections {
|
|
result = append(result, collection)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Close closes the carstore
|
|
func (p *HoldPDS) Close() error {
|
|
// TODO: Close session properly
|
|
return nil
|
|
}
|