Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.
Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:
* admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
* TTL leader election via NATS KV, fence tokens, JetStream dedup
* rebalancer (plan/apply split), reconciler, requeue sweeper
* ristretto caches with NATS-backed cross-node invalidation
(placements live-nodes + token denylist)
* maintenance watchdog for stuck cluster-pause flag
* Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
* rate limiting: session (10/min) + anonymous global (120/min)
* integration tests: rebalance, refcount multi-org, RLS belt
* goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea
Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
143 lines
4.4 KiB
Go
143 lines
4.4 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/golang-migrate/migrate/v4"
|
|
_ "github.com/golang-migrate/migrate/v4/database/postgres" // register postgres:// driver
|
|
"github.com/golang-migrate/migrate/v4/source/iofs"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"anchorage/internal/pkg/store/postgres/migrations"
|
|
)
|
|
|
|
// MigrateDirection selects up/down for Migrate.
|
|
type MigrateDirection int
|
|
|
|
const (
|
|
// MigrateUp applies all pending up migrations.
|
|
MigrateUp MigrateDirection = iota
|
|
// MigrateDown reverts every applied migration (dev / test only).
|
|
MigrateDown
|
|
)
|
|
|
|
// MigrateOptions tunes migration behavior.
|
|
type MigrateOptions struct {
|
|
// LockID is the key used with pg_advisory_lock to serialize concurrent
|
|
// migration attempts from N replicas starting simultaneously. Must be
|
|
// stable across deploys. Any non-zero int64 works; the default is fine
|
|
// unless an operator has a collision with another app sharing the DB.
|
|
LockID int64
|
|
}
|
|
|
|
// DefaultLockID is the advisory-lock key anchorage uses for schema migrations.
|
|
// Chosen arbitrarily but stably.
|
|
const DefaultLockID int64 = 0x616e63686f726167 // "anchorag" as hex
|
|
|
|
// Migrate runs the embedded schema migrations against the DSN.
|
|
//
|
|
// The pgxpool is only used to discover the DSN so we don't open two
|
|
// connections to the same database — golang-migrate's postgres driver
|
|
// wants a *sql.DB, which pgx does not provide directly. We read the DSN
|
|
// back from the pool config and let migrate open its own driver.
|
|
//
|
|
// golang-migrate acquires a Postgres advisory lock before running, so N
|
|
// replicas starting simultaneously all block on the same lock and exactly
|
|
// one of them actually runs the migration; the others see a no-op.
|
|
func Migrate(ctx context.Context, pool *pgxpool.Pool, dir MigrateDirection, opts MigrateOptions) error {
|
|
if pool == nil {
|
|
return errors.New("migrate: pool is required")
|
|
}
|
|
dsn := pool.Config().ConnString()
|
|
return MigrateDSN(ctx, dsn, dir, opts)
|
|
}
|
|
|
|
// MigrateDSN is the pool-less variant, used by CLI subcommands that have
|
|
// a DSN but don't need a warm pool.
|
|
func MigrateDSN(ctx context.Context, dsn string, dir MigrateDirection, opts MigrateOptions) error {
|
|
src, err := iofs.New(migrations.FS, ".")
|
|
if err != nil {
|
|
return fmt.Errorf("open embedded migrations: %w", err)
|
|
}
|
|
|
|
lockID := opts.LockID
|
|
if lockID == 0 {
|
|
lockID = DefaultLockID
|
|
}
|
|
|
|
// golang-migrate v4's postgres driver reads config from URL query params:
|
|
// x-migrations-table, x-advisory-lock-id, and others. Append ours without
|
|
// clobbering user-supplied params.
|
|
migrateDSN := appendQuery(dsn, "x-migrations-table", "anchorage_schema_migrations")
|
|
migrateDSN = appendQuery(migrateDSN, "x-advisory-lock-id", fmt.Sprintf("%d", lockID))
|
|
|
|
m, err := migrate.NewWithSourceInstance("iofs", src, migrateDSN)
|
|
if err != nil {
|
|
return fmt.Errorf("init migrate: %w", err)
|
|
}
|
|
defer func() {
|
|
_, _ = m.Close()
|
|
}()
|
|
|
|
runCh := make(chan error, 1)
|
|
go func() {
|
|
switch dir {
|
|
case MigrateUp:
|
|
runCh <- m.Up()
|
|
case MigrateDown:
|
|
runCh <- m.Down()
|
|
default:
|
|
runCh <- fmt.Errorf("unknown migrate direction %d", dir)
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case err := <-runCh:
|
|
if err != nil && !errors.Is(err, migrate.ErrNoChange) {
|
|
return fmt.Errorf("run migrations: %w", err)
|
|
}
|
|
return nil
|
|
case <-ctx.Done():
|
|
// migrate has no ctx plumbing; surface the cancellation but let
|
|
// the goroutine finish so we don't leak a connection.
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// MigrationVersion returns the currently-applied schema version and
|
|
// whether the last migration is in a dirty state. Used by `anchorage
|
|
// migrate status`.
|
|
func MigrationVersion(ctx context.Context, dsn string) (version uint, dirty bool, err error) {
|
|
src, err := iofs.New(migrations.FS, ".")
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("open embedded migrations: %w", err)
|
|
}
|
|
m, err := migrate.NewWithSourceInstance("iofs", src, dsn)
|
|
if err != nil {
|
|
return 0, false, fmt.Errorf("init migrate: %w", err)
|
|
}
|
|
defer func() { _, _ = m.Close() }()
|
|
|
|
v, d, err := m.Version()
|
|
if errors.Is(err, migrate.ErrNilVersion) {
|
|
return 0, false, nil
|
|
}
|
|
return v, d, err
|
|
}
|
|
|
|
// appendQuery adds a key=value to a DSN's query string, preserving
|
|
// existing params. Handles both postgres:// URL form and space-delimited
|
|
// libpq form; on failure to parse, falls back to unmodified.
|
|
func appendQuery(dsn, k, v string) string {
|
|
sep := "?"
|
|
for _, c := range dsn {
|
|
if c == '?' {
|
|
sep = "&"
|
|
break
|
|
}
|
|
}
|
|
return dsn + sep + k + "=" + v
|
|
}
|