Files
William Gill 12bf35caf8 anchorage v1.0 initial tree
Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.

Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:

  * admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
    prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
  * TTL leader election via NATS KV, fence tokens, JetStream dedup
  * rebalancer (plan/apply split), reconciler, requeue sweeper
  * ristretto caches with NATS-backed cross-node invalidation
    (placements live-nodes + token denylist)
  * maintenance watchdog for stuck cluster-pause flag
  * Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
  * rate limiting: session (10/min) + anonymous global (120/min)
  * integration tests: rebalance, refcount multi-org, RLS belt
  * goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea

Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
2026-04-16 18:13:36 -05:00

143 lines
4.4 KiB
Go

package postgres
import (
"context"
"errors"
"fmt"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres" // register postgres:// driver
"github.com/golang-migrate/migrate/v4/source/iofs"
"github.com/jackc/pgx/v5/pgxpool"
"anchorage/internal/pkg/store/postgres/migrations"
)
// MigrateDirection selects up/down for Migrate.
type MigrateDirection int
const (
// MigrateUp applies all pending up migrations.
MigrateUp MigrateDirection = iota
// MigrateDown reverts every applied migration (dev / test only).
MigrateDown
)
// MigrateOptions tunes migration behavior.
type MigrateOptions struct {
// LockID is the key used with pg_advisory_lock to serialize concurrent
// migration attempts from N replicas starting simultaneously. Must be
// stable across deploys. Any non-zero int64 works; the default is fine
// unless an operator has a collision with another app sharing the DB.
LockID int64
}
// DefaultLockID is the advisory-lock key anchorage uses for schema migrations.
// Chosen arbitrarily but stably.
const DefaultLockID int64 = 0x616e63686f726167 // "anchorag" as hex
// Migrate runs the embedded schema migrations against the DSN.
//
// The pgxpool is only used to discover the DSN so we don't open two
// connections to the same database — golang-migrate's postgres driver
// wants a *sql.DB, which pgx does not provide directly. We read the DSN
// back from the pool config and let migrate open its own driver.
//
// golang-migrate acquires a Postgres advisory lock before running, so N
// replicas starting simultaneously all block on the same lock and exactly
// one of them actually runs the migration; the others see a no-op.
func Migrate(ctx context.Context, pool *pgxpool.Pool, dir MigrateDirection, opts MigrateOptions) error {
if pool == nil {
return errors.New("migrate: pool is required")
}
dsn := pool.Config().ConnString()
return MigrateDSN(ctx, dsn, dir, opts)
}
// MigrateDSN is the pool-less variant, used by CLI subcommands that have
// a DSN but don't need a warm pool.
func MigrateDSN(ctx context.Context, dsn string, dir MigrateDirection, opts MigrateOptions) error {
src, err := iofs.New(migrations.FS, ".")
if err != nil {
return fmt.Errorf("open embedded migrations: %w", err)
}
lockID := opts.LockID
if lockID == 0 {
lockID = DefaultLockID
}
// golang-migrate v4's postgres driver reads config from URL query params:
// x-migrations-table, x-advisory-lock-id, and others. Append ours without
// clobbering user-supplied params.
migrateDSN := appendQuery(dsn, "x-migrations-table", "anchorage_schema_migrations")
migrateDSN = appendQuery(migrateDSN, "x-advisory-lock-id", fmt.Sprintf("%d", lockID))
m, err := migrate.NewWithSourceInstance("iofs", src, migrateDSN)
if err != nil {
return fmt.Errorf("init migrate: %w", err)
}
defer func() {
_, _ = m.Close()
}()
runCh := make(chan error, 1)
go func() {
switch dir {
case MigrateUp:
runCh <- m.Up()
case MigrateDown:
runCh <- m.Down()
default:
runCh <- fmt.Errorf("unknown migrate direction %d", dir)
}
}()
select {
case err := <-runCh:
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
return fmt.Errorf("run migrations: %w", err)
}
return nil
case <-ctx.Done():
// migrate has no ctx plumbing; surface the cancellation but let
// the goroutine finish so we don't leak a connection.
return ctx.Err()
}
}
// MigrationVersion returns the currently-applied schema version and
// whether the last migration is in a dirty state. Used by `anchorage
// migrate status`.
func MigrationVersion(ctx context.Context, dsn string) (version uint, dirty bool, err error) {
src, err := iofs.New(migrations.FS, ".")
if err != nil {
return 0, false, fmt.Errorf("open embedded migrations: %w", err)
}
m, err := migrate.NewWithSourceInstance("iofs", src, dsn)
if err != nil {
return 0, false, fmt.Errorf("init migrate: %w", err)
}
defer func() { _, _ = m.Close() }()
v, d, err := m.Version()
if errors.Is(err, migrate.ErrNilVersion) {
return 0, false, nil
}
return v, d, err
}
// appendQuery adds a key=value to a DSN's query string, preserving
// existing params. Handles both postgres:// URL form and space-delimited
// libpq form; on failure to parse, falls back to unmodified.
func appendQuery(dsn, k, v string) string {
sep := "?"
for _, c := range dsn {
if c == '?' {
sep = "&"
break
}
}
return dsn + sep + k + "=" + v
}