Files
anchorage/test/postgres_integration_test.go
William Gill 12bf35caf8 anchorage v1.0 initial tree
Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.

Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:

  * admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
    prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
  * TTL leader election via NATS KV, fence tokens, JetStream dedup
  * rebalancer (plan/apply split), reconciler, requeue sweeper
  * ristretto caches with NATS-backed cross-node invalidation
    (placements live-nodes + token denylist)
  * maintenance watchdog for stuck cluster-pause flag
  * Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
  * rate limiting: session (10/min) + anonymous global (120/min)
  * integration tests: rebalance, refcount multi-org, RLS belt
  * goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea

Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
2026-04-16 18:13:36 -05:00

132 lines
3.6 KiB
Go

// Package test hosts anchorage's cross-package integration tests. They
// are gated by a build tag because they pull in Docker via
// testcontainers-go; running the normal `go test ./...` suite does NOT
// exercise them.
//
// Run locally with:
//
// go test -tags=integration -timeout=5m ./test/...
//
//go:build integration
package test
import (
"context"
"testing"
"anchorage/internal/pkg/ids"
"anchorage/internal/pkg/store"
)
// ---------------------------------------------------------------------------
// TestPostgresEndToEnd drives the full store API against a real
// Postgres: create org + user + membership, mint a pin, assert
// placement, drain a node, verify rebalance by placement migration,
// replace a pin, delete a pin, confirm CASCADE cleans placements + refs.
func TestPostgresEndToEnd(t *testing.T) {
s, cleanup := startPostgres(t)
defer cleanup()
ctx := context.Background()
// --- org + user ---
orgID := ids.MustNewOrg()
org, err := s.Orgs().Create(ctx, orgID, "acme", "Acme Corp")
if err != nil {
t.Fatalf("create org: %v", err)
}
if org.Slug != "acme" {
t.Fatalf("slug")
}
userID := ids.MustNewUser()
user, err := s.Users().UpsertByAuthentikSub(ctx, userID, "sub-1", "alice@example.com", "Alice", true)
if err != nil {
t.Fatalf("upsert user: %v", err)
}
if !user.IsSysadmin {
t.Error("user should be sysadmin")
}
if err := s.Memberships().Add(ctx, orgID, userID, store.RoleOrgAdmin); err != nil {
t.Fatalf("add member: %v", err)
}
// --- node registry ---
nodeID := ids.MustNewNode()
if err := s.Nodes().Upsert(ctx, &store.Node{
ID: nodeID, DisplayName: "node-1",
Multiaddrs: []string{"/dns4/node-1/tcp/4001/p2p/QmX"},
RPCURL: "http://localhost:5001",
Status: store.NodeStatusUp,
}); err != nil {
t.Fatalf("upsert node: %v", err)
}
// --- pin create ---
pinID := ids.MustNewPin()
name := "dataset"
err = s.Tx(ctx, func(txCtx context.Context, tx store.Store) error {
if err := tx.Pins().Create(txCtx, &store.Pin{
RequestID: pinID, OrgID: orgID, CID: "bafy1",
Name: &name, Meta: map[string]any{"env": "prod"},
Status: store.PinStatusQueued,
}); err != nil {
return err
}
if _, err := tx.Pins().InsertPlacement(txCtx, pinID, nodeID, 1); err != nil {
return err
}
return tx.Pins().IncRefcount(txCtx, nodeID, "bafy1")
})
if err != nil {
t.Fatalf("tx: %v", err)
}
// --- filter exercises the full spec-compliant surface ---
pins, err := s.Pins().Filter(ctx, orgID, store.PinFilter{
CIDs: []string{"bafy1"},
Name: "dataset",
Match: "exact",
Status: []string{store.PinStatusQueued},
Meta: map[string]any{"env": "prod"},
Limit: 10,
})
if err != nil {
t.Fatalf("filter: %v", err)
}
if len(pins) != 1 || pins[0].RequestID != pinID {
t.Fatalf("filter got %d pins, expected 1 with the created RequestID", len(pins))
}
// --- by-request-id (scheduler path) ---
fetched, err := s.Pins().GetByRequestID(ctx, pinID)
if err != nil {
t.Fatalf("get-by-rid: %v", err)
}
if fetched.CID != "bafy1" {
t.Errorf("cid drift: %q", fetched.CID)
}
// --- drain + undrain ---
if err := s.Nodes().Drain(ctx, nodeID); err != nil {
t.Fatalf("drain: %v", err)
}
n, _ := s.Nodes().Get(ctx, nodeID)
if n.Status != store.NodeStatusDrained {
t.Errorf("drain status = %s", n.Status)
}
if err := s.Nodes().Uncordon(ctx, nodeID); err != nil {
t.Fatalf("uncordon: %v", err)
}
// --- delete + cascade ---
if err := s.Pins().Delete(ctx, orgID, pinID); err != nil {
t.Fatalf("delete: %v", err)
}
if _, err := s.Pins().GetPlacement(ctx, pinID, nodeID); err == nil {
t.Error("placement survived pin delete — CASCADE broken?")
}
}