Files
anchorage/internal/pkg/nats/server.go
William Gill 12bf35caf8 anchorage v1.0 initial tree
Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.

Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:

  * admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
    prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
  * TTL leader election via NATS KV, fence tokens, JetStream dedup
  * rebalancer (plan/apply split), reconciler, requeue sweeper
  * ristretto caches with NATS-backed cross-node invalidation
    (placements live-nodes + token denylist)
  * maintenance watchdog for stuck cluster-pause flag
  * Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
  * rate limiting: session (10/min) + anonymous global (120/min)
  * integration tests: rebalance, refcount multi-org, RLS belt
  * goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea

Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
2026-04-16 18:13:36 -05:00

164 lines
4.2 KiB
Go

// Package nats hosts anchorage's embedded NATS server lifecycle plus
// JetStream bootstrap for the PIN_JOBS and PIN_EVENTS streams.
//
// Every anchorage instance runs an in-process NATS server that peers with
// the other instances via cluster routes listed in config. JetStream is
// always enabled; stream replication is min(configured, clusterSize) so a
// single-node deployment works without extra tuning.
//
// Naming note: callers that also import the upstream nats.go client
// should alias it (by convention as `natsio`) so the two packages
// don't collide:
//
// import (
// "anchorage/internal/pkg/nats"
// natsio "github.com/nats-io/nats.go"
// )
package nats
import (
"context"
"errors"
"fmt"
"net/url"
"strings"
"time"
natsserver "github.com/nats-io/nats-server/v2/server"
natsio "github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
// ServerConfig mirrors config.NATSConfig in a flatter, package-local form.
type ServerConfig struct {
// ServerName must be unique per cluster member. Typically the anchorage
// node ID so NATS peers and anchorage peers line up 1:1.
ServerName string
DataDir string
ClientHost string
ClientPort int
ClusterName string
ClusterHost string
ClusterPort int
Routes []string
JSReplicas int
}
// Server wraps a running in-process NATS server and a client connected
// to it. Callers should call Close to drain and stop the server.
type Server struct {
NS *natsserver.Server
NC *natsio.Conn
JS jetstream.JetStream
}
// Start boots an embedded NATS server, waits for readiness, and returns a
// connected client. Errors at any stage tear everything down.
func Start(ctx context.Context, cfg ServerConfig) (*Server, error) {
if cfg.DataDir == "" {
return nil, errors.New("nats: DataDir is required")
}
serverName := cfg.ServerName
if serverName == "" {
serverName = "anchorage-" + cfg.ClusterName
}
opts := &natsserver.Options{
ServerName: serverName,
Host: cfg.ClientHost,
Port: cfg.ClientPort,
JetStream: true,
StoreDir: cfg.DataDir,
NoSigs: true, // don't install signal handlers in embedded mode
}
// Clustering is opt-in: only configure it when a cluster port was
// provided. Tests and single-node deployments get a standalone server.
if cfg.ClusterPort != 0 {
opts.Cluster = natsserver.ClusterOpts{
Name: cfg.ClusterName,
Host: cfg.ClusterHost,
Port: cfg.ClusterPort,
}
}
for _, r := range cfg.Routes {
u, err := parseURL(r)
if err != nil {
return nil, fmt.Errorf("parse route %q: %w", r, err)
}
opts.Routes = append(opts.Routes, u)
}
ns, err := natsserver.NewServer(opts)
if err != nil {
return nil, fmt.Errorf("construct nats server: %w", err)
}
go ns.Start()
readyCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if !waitReady(readyCtx, ns) {
ns.Shutdown()
return nil, errors.New("nats: server never became ready within 10s")
}
nc, err := natsio.Connect("", natsio.InProcessServer(ns), natsio.Name("anchorage-embedded"))
if err != nil {
ns.Shutdown()
return nil, fmt.Errorf("connect to embedded nats: %w", err)
}
js, err := jetstream.New(nc)
if err != nil {
nc.Close()
ns.Shutdown()
return nil, fmt.Errorf("open jetstream: %w", err)
}
return &Server{NS: ns, NC: nc, JS: js}, nil
}
// Close drains the client, waits for the server to flush JetStream state,
// and shuts the in-process server down. Safe to call more than once.
func (s *Server) Close() {
if s == nil {
return
}
if s.NC != nil {
_ = s.NC.Drain()
}
if s.NS != nil {
s.NS.Shutdown()
s.NS.WaitForShutdown()
}
}
// waitReady polls ns.ReadyForConnections until the context expires.
func waitReady(ctx context.Context, ns *natsserver.Server) bool {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
if ns.ReadyForConnections(100 * time.Millisecond) {
return true
}
select {
case <-ctx.Done():
return false
case <-ticker.C:
}
}
}
// parseURL normalises a user-supplied cluster route to a *url.URL. We
// accept bare host:port and auto-prepend nats:// if missing.
func parseURL(raw string) (*url.URL, error) {
if raw == "" {
return nil, errors.New("empty route")
}
if !strings.Contains(raw, "://") {
raw = "nats://" + raw
}
return url.Parse(raw)
}