anchorage v1.0 initial tree

Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.

Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:

  * admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
    prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
  * TTL leader election via NATS KV, fence tokens, JetStream dedup
  * rebalancer (plan/apply split), reconciler, requeue sweeper
  * ristretto caches with NATS-backed cross-node invalidation
    (placements live-nodes + token denylist)
  * maintenance watchdog for stuck cluster-pause flag
  * Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
  * rate limiting: session (10/min) + anonymous global (120/min)
  * integration tests: rebalance, refcount multi-org, RLS belt
  * goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea

Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
This commit is contained in:
2026-04-16 18:10:52 -05:00
commit 12bf35caf8
112 changed files with 16884 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
package httpserver
import (
"log/slog"
"net/http"
"time"
"github.com/gofiber/fiber/v2"
"anchorage/internal/pkg/metrics"
)
// accessLog is a minimal, slog-friendly request logger that also feeds
// the Prometheus HTTP request counter. Full structured logging with
// tenant + user attribution lives in the auth middleware once a JWT
// is validated.
//
// /metrics itself is skipped — self-observing scrapes would show up as
// a ton of 200s that tell the operator nothing.
func accessLog() fiber.Handler {
return func(c *fiber.Ctx) error {
start := time.Now()
err := c.Next()
dur := time.Since(start)
status := c.Response().StatusCode()
lvl := slog.LevelInfo
if status >= http.StatusInternalServerError {
lvl = slog.LevelError
} else if status >= http.StatusBadRequest {
lvl = slog.LevelWarn
}
slog.Log(c.UserContext(), lvl, "http",
"method", c.Method(),
"path", c.Path(),
"status", status,
"duration_ms", dur.Milliseconds(),
"request_id", c.Get(fiber.HeaderXRequestID),
"remote", c.IP())
if c.Path() != "/metrics" {
metrics.HTTPRequests.
WithLabelValues(c.Method(), metrics.StatusClass(status)).
Inc()
}
return err
}
}

View File

@@ -0,0 +1,70 @@
package httpserver
import (
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/limiter"
"anchorage/internal/pkg/auth"
)
// SessionLimiter is the brute-force guard on `POST /v1/auth/session`.
// Applies only to that specific method+path via Fiber's `Next` skip
// function — so you can install it globally with `app.Use(...)` and
// have it no-op on every other request.
//
// perMinute == 0 falls back to 10 attempts per IP per minute.
func SessionLimiter(perMinute int) fiber.Handler {
if perMinute <= 0 {
perMinute = 10
}
return limiter.New(limiter.Config{
Max: perMinute,
Expiration: time.Minute,
Next: func(c *fiber.Ctx) bool {
return !(c.Method() == fiber.MethodPost && c.Path() == "/v1/auth/session")
},
LimitReached: func(c *fiber.Ctx) error {
return c.Status(fiber.StatusTooManyRequests).JSON(fiber.Map{
"error": "Too Many Requests",
"message": "too many session attempts; slow down",
})
},
})
}
// AnonymousLimiter caps unauthenticated requests per IP across the whole
// API. Authenticated requests (those carrying a valid Bearer or session
// cookie the BearerMiddleware already resolved) skip the limiter. Probe
// paths (/health, /ready, /metrics) are also exempt.
//
// perMinute == 0 falls back to 120 — a generous bound for a single IP
// hitting /openapi.json, /docs, and unauthenticated pre-login traffic.
func AnonymousLimiter(perMinute int) fiber.Handler {
if perMinute <= 0 {
perMinute = 120
}
return limiter.New(limiter.Config{
Max: perMinute,
Expiration: time.Minute,
Next: func(c *fiber.Ctx) bool {
// Skip health / metrics — orchestrator probes hit these far
// more often than the per-minute bound and we don't want to
// DoS ourselves.
switch c.Path() {
case "/v1/health", "/v1/ready", "/metrics", "/lbz":
return true
}
// Skip authenticated traffic — real API clients burst
// legitimately and we don't want to throttle them here.
return auth.FromContext(c.UserContext()) != nil
},
LimitReached: func(c *fiber.Ctx) error {
return c.Status(fiber.StatusTooManyRequests).JSON(fiber.Map{
"error": "Too Many Requests",
"message": "anonymous request rate exceeded",
})
},
})
}

View File

@@ -0,0 +1,91 @@
// Package httpserver composes the Fiber app and its middleware stack.
//
// The actual route tree is registered by internal/pkg/openapi through the
// humafiber adapter, so this package only owns lifecycle: configuration,
// graceful start, graceful shutdown, and basic observability middleware.
package httpserver
import (
"context"
"fmt"
"log/slog"
"net/http"
"time"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/recover"
"github.com/gofiber/fiber/v2/middleware/requestid"
"anchorage/internal/pkg/metrics"
)
// Options configures a Server.
type Options struct {
Host string
Port int
ReadTimeout time.Duration
WriteTimeout time.Duration
// MetricsACLCIDRs is the allowlist for /metrics. Nil → use the
// metrics package default (loopback + RFC1918). Explicit empty
// slice → no restriction (firewall-only).
MetricsACLCIDRs []string
}
// Server wraps *fiber.App with lifecycle helpers.
type Server struct {
App *fiber.App
opts Options
}
// New constructs a Server with the standard anchorage middleware stack.
//
// The returned *fiber.App is exposed on Server.App so callers can register
// routes before Start is called.
func New(opts Options) *Server {
app := fiber.New(fiber.Config{
AppName: "anchorage",
DisableStartupMessage: true,
ReadTimeout: opts.ReadTimeout,
WriteTimeout: opts.WriteTimeout,
ErrorHandler: func(c *fiber.Ctx, err error) error {
// Fiber default returns an HTML body; JSON is more useful
// for our API clients.
code := fiber.StatusInternalServerError
if fe, ok := err.(*fiber.Error); ok {
code = fe.Code
}
return c.Status(code).JSON(fiber.Map{
"error": http.StatusText(code),
"message": err.Error(),
})
},
})
app.Use(recover.New())
app.Use(requestid.New())
app.Use(accessLog())
// /metrics lives at root (NOT under /v1) so Prometheus scrapers
// find it at the conventional path. Gated by a CIDR ACL — see
// metrics.ACL and MetricsConfig.AllowCIDRs.
if acl, err := metrics.ACL(opts.MetricsACLCIDRs); err != nil {
slog.Error("httpserver: bad metrics ACL; /metrics disabled", "err", err)
} else {
app.Get("/metrics", acl, metrics.Handler())
}
return &Server{App: app, opts: opts}
}
// Start binds and listens. Returns only on listener error.
func (s *Server) Start(_ context.Context) error {
addr := fmt.Sprintf("%s:%d", s.opts.Host, s.opts.Port)
slog.Info("httpserver: listening", "addr", addr)
return s.App.Listen(addr)
}
// Shutdown drains in-flight requests with a bounded grace period.
func (s *Server) Shutdown(ctx context.Context) error {
return s.App.ShutdownWithContext(ctx)
}