Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.
Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:
* admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
* TTL leader election via NATS KV, fence tokens, JetStream dedup
* rebalancer (plan/apply split), reconciler, requeue sweeper
* ristretto caches with NATS-backed cross-node invalidation
(placements live-nodes + token denylist)
* maintenance watchdog for stuck cluster-pause flag
* Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
* rate limiting: session (10/min) + anonymous global (120/min)
* integration tests: rebalance, refcount multi-org, RLS belt
* goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea
Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
164 lines
4.2 KiB
Go
164 lines
4.2 KiB
Go
// Package nats hosts anchorage's embedded NATS server lifecycle plus
|
|
// JetStream bootstrap for the PIN_JOBS and PIN_EVENTS streams.
|
|
//
|
|
// Every anchorage instance runs an in-process NATS server that peers with
|
|
// the other instances via cluster routes listed in config. JetStream is
|
|
// always enabled; stream replication is min(configured, clusterSize) so a
|
|
// single-node deployment works without extra tuning.
|
|
//
|
|
// Naming note: callers that also import the upstream nats.go client
|
|
// should alias it (by convention as `natsio`) so the two packages
|
|
// don't collide:
|
|
//
|
|
// import (
|
|
// "anchorage/internal/pkg/nats"
|
|
// natsio "github.com/nats-io/nats.go"
|
|
// )
|
|
package nats
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
natsserver "github.com/nats-io/nats-server/v2/server"
|
|
natsio "github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nats.go/jetstream"
|
|
)
|
|
|
|
// ServerConfig mirrors config.NATSConfig in a flatter, package-local form.
|
|
type ServerConfig struct {
|
|
// ServerName must be unique per cluster member. Typically the anchorage
|
|
// node ID so NATS peers and anchorage peers line up 1:1.
|
|
ServerName string
|
|
DataDir string
|
|
ClientHost string
|
|
ClientPort int
|
|
ClusterName string
|
|
ClusterHost string
|
|
ClusterPort int
|
|
Routes []string
|
|
JSReplicas int
|
|
}
|
|
|
|
// Server wraps a running in-process NATS server and a client connected
|
|
// to it. Callers should call Close to drain and stop the server.
|
|
type Server struct {
|
|
NS *natsserver.Server
|
|
NC *natsio.Conn
|
|
JS jetstream.JetStream
|
|
}
|
|
|
|
// Start boots an embedded NATS server, waits for readiness, and returns a
|
|
// connected client. Errors at any stage tear everything down.
|
|
func Start(ctx context.Context, cfg ServerConfig) (*Server, error) {
|
|
if cfg.DataDir == "" {
|
|
return nil, errors.New("nats: DataDir is required")
|
|
}
|
|
|
|
serverName := cfg.ServerName
|
|
if serverName == "" {
|
|
serverName = "anchorage-" + cfg.ClusterName
|
|
}
|
|
|
|
opts := &natsserver.Options{
|
|
ServerName: serverName,
|
|
Host: cfg.ClientHost,
|
|
Port: cfg.ClientPort,
|
|
JetStream: true,
|
|
StoreDir: cfg.DataDir,
|
|
NoSigs: true, // don't install signal handlers in embedded mode
|
|
}
|
|
// Clustering is opt-in: only configure it when a cluster port was
|
|
// provided. Tests and single-node deployments get a standalone server.
|
|
if cfg.ClusterPort != 0 {
|
|
opts.Cluster = natsserver.ClusterOpts{
|
|
Name: cfg.ClusterName,
|
|
Host: cfg.ClusterHost,
|
|
Port: cfg.ClusterPort,
|
|
}
|
|
}
|
|
for _, r := range cfg.Routes {
|
|
u, err := parseURL(r)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse route %q: %w", r, err)
|
|
}
|
|
opts.Routes = append(opts.Routes, u)
|
|
}
|
|
|
|
ns, err := natsserver.NewServer(opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("construct nats server: %w", err)
|
|
}
|
|
|
|
go ns.Start()
|
|
|
|
readyCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
if !waitReady(readyCtx, ns) {
|
|
ns.Shutdown()
|
|
return nil, errors.New("nats: server never became ready within 10s")
|
|
}
|
|
|
|
nc, err := natsio.Connect("", natsio.InProcessServer(ns), natsio.Name("anchorage-embedded"))
|
|
if err != nil {
|
|
ns.Shutdown()
|
|
return nil, fmt.Errorf("connect to embedded nats: %w", err)
|
|
}
|
|
|
|
js, err := jetstream.New(nc)
|
|
if err != nil {
|
|
nc.Close()
|
|
ns.Shutdown()
|
|
return nil, fmt.Errorf("open jetstream: %w", err)
|
|
}
|
|
|
|
return &Server{NS: ns, NC: nc, JS: js}, nil
|
|
}
|
|
|
|
// Close drains the client, waits for the server to flush JetStream state,
|
|
// and shuts the in-process server down. Safe to call more than once.
|
|
func (s *Server) Close() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
if s.NC != nil {
|
|
_ = s.NC.Drain()
|
|
}
|
|
if s.NS != nil {
|
|
s.NS.Shutdown()
|
|
s.NS.WaitForShutdown()
|
|
}
|
|
}
|
|
|
|
// waitReady polls ns.ReadyForConnections until the context expires.
|
|
func waitReady(ctx context.Context, ns *natsserver.Server) bool {
|
|
ticker := time.NewTicker(50 * time.Millisecond)
|
|
defer ticker.Stop()
|
|
for {
|
|
if ns.ReadyForConnections(100 * time.Millisecond) {
|
|
return true
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case <-ticker.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
// parseURL normalises a user-supplied cluster route to a *url.URL. We
|
|
// accept bare host:port and auto-prepend nats:// if missing.
|
|
func parseURL(raw string) (*url.URL, error) {
|
|
if raw == "" {
|
|
return nil, errors.New("empty route")
|
|
}
|
|
if !strings.Contains(raw, "://") {
|
|
raw = "nats://" + raw
|
|
}
|
|
return url.Parse(raw)
|
|
}
|