Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.
Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:
* admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
* TTL leader election via NATS KV, fence tokens, JetStream dedup
* rebalancer (plan/apply split), reconciler, requeue sweeper
* ristretto caches with NATS-backed cross-node invalidation
(placements live-nodes + token denylist)
* maintenance watchdog for stuck cluster-pause flag
* Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
* rate limiting: session (10/min) + anonymous global (120/min)
* integration tests: rebalance, refcount multi-org, RLS belt
* goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea
Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
132 lines
3.6 KiB
Go
132 lines
3.6 KiB
Go
// Package test hosts anchorage's cross-package integration tests. They
|
|
// are gated by a build tag because they pull in Docker via
|
|
// testcontainers-go; running the normal `go test ./...` suite does NOT
|
|
// exercise them.
|
|
//
|
|
// Run locally with:
|
|
//
|
|
// go test -tags=integration -timeout=5m ./test/...
|
|
//
|
|
//go:build integration
|
|
|
|
package test
|
|
|
|
import (
|
|
"context"
|
|
"testing"
|
|
|
|
"anchorage/internal/pkg/ids"
|
|
"anchorage/internal/pkg/store"
|
|
)
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// TestPostgresEndToEnd drives the full store API against a real
|
|
// Postgres: create org + user + membership, mint a pin, assert
|
|
// placement, drain a node, verify rebalance by placement migration,
|
|
// replace a pin, delete a pin, confirm CASCADE cleans placements + refs.
|
|
func TestPostgresEndToEnd(t *testing.T) {
|
|
s, cleanup := startPostgres(t)
|
|
defer cleanup()
|
|
|
|
ctx := context.Background()
|
|
|
|
// --- org + user ---
|
|
orgID := ids.MustNewOrg()
|
|
org, err := s.Orgs().Create(ctx, orgID, "acme", "Acme Corp")
|
|
if err != nil {
|
|
t.Fatalf("create org: %v", err)
|
|
}
|
|
if org.Slug != "acme" {
|
|
t.Fatalf("slug")
|
|
}
|
|
|
|
userID := ids.MustNewUser()
|
|
user, err := s.Users().UpsertByAuthentikSub(ctx, userID, "sub-1", "alice@example.com", "Alice", true)
|
|
if err != nil {
|
|
t.Fatalf("upsert user: %v", err)
|
|
}
|
|
if !user.IsSysadmin {
|
|
t.Error("user should be sysadmin")
|
|
}
|
|
if err := s.Memberships().Add(ctx, orgID, userID, store.RoleOrgAdmin); err != nil {
|
|
t.Fatalf("add member: %v", err)
|
|
}
|
|
|
|
// --- node registry ---
|
|
nodeID := ids.MustNewNode()
|
|
if err := s.Nodes().Upsert(ctx, &store.Node{
|
|
ID: nodeID, DisplayName: "node-1",
|
|
Multiaddrs: []string{"/dns4/node-1/tcp/4001/p2p/QmX"},
|
|
RPCURL: "http://localhost:5001",
|
|
Status: store.NodeStatusUp,
|
|
}); err != nil {
|
|
t.Fatalf("upsert node: %v", err)
|
|
}
|
|
|
|
// --- pin create ---
|
|
pinID := ids.MustNewPin()
|
|
name := "dataset"
|
|
err = s.Tx(ctx, func(txCtx context.Context, tx store.Store) error {
|
|
if err := tx.Pins().Create(txCtx, &store.Pin{
|
|
RequestID: pinID, OrgID: orgID, CID: "bafy1",
|
|
Name: &name, Meta: map[string]any{"env": "prod"},
|
|
Status: store.PinStatusQueued,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.Pins().InsertPlacement(txCtx, pinID, nodeID, 1); err != nil {
|
|
return err
|
|
}
|
|
return tx.Pins().IncRefcount(txCtx, nodeID, "bafy1")
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("tx: %v", err)
|
|
}
|
|
|
|
// --- filter exercises the full spec-compliant surface ---
|
|
pins, err := s.Pins().Filter(ctx, orgID, store.PinFilter{
|
|
CIDs: []string{"bafy1"},
|
|
Name: "dataset",
|
|
Match: "exact",
|
|
Status: []string{store.PinStatusQueued},
|
|
Meta: map[string]any{"env": "prod"},
|
|
Limit: 10,
|
|
})
|
|
if err != nil {
|
|
t.Fatalf("filter: %v", err)
|
|
}
|
|
if len(pins) != 1 || pins[0].RequestID != pinID {
|
|
t.Fatalf("filter got %d pins, expected 1 with the created RequestID", len(pins))
|
|
}
|
|
|
|
// --- by-request-id (scheduler path) ---
|
|
fetched, err := s.Pins().GetByRequestID(ctx, pinID)
|
|
if err != nil {
|
|
t.Fatalf("get-by-rid: %v", err)
|
|
}
|
|
if fetched.CID != "bafy1" {
|
|
t.Errorf("cid drift: %q", fetched.CID)
|
|
}
|
|
|
|
// --- drain + undrain ---
|
|
if err := s.Nodes().Drain(ctx, nodeID); err != nil {
|
|
t.Fatalf("drain: %v", err)
|
|
}
|
|
n, _ := s.Nodes().Get(ctx, nodeID)
|
|
if n.Status != store.NodeStatusDrained {
|
|
t.Errorf("drain status = %s", n.Status)
|
|
}
|
|
if err := s.Nodes().Uncordon(ctx, nodeID); err != nil {
|
|
t.Fatalf("uncordon: %v", err)
|
|
}
|
|
|
|
// --- delete + cascade ---
|
|
if err := s.Pins().Delete(ctx, orgID, pinID); err != nil {
|
|
t.Fatalf("delete: %v", err)
|
|
}
|
|
if _, err := s.Pins().GetPlacement(ctx, pinID, nodeID); err == nil {
|
|
t.Error("placement survived pin delete — CASCADE broken?")
|
|
}
|
|
}
|