Files
anchorage/test/refcount_integration_test.go
William Gill 12bf35caf8 anchorage v1.0 initial tree
Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.

Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:

  * admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
    prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
  * TTL leader election via NATS KV, fence tokens, JetStream dedup
  * rebalancer (plan/apply split), reconciler, requeue sweeper
  * ristretto caches with NATS-backed cross-node invalidation
    (placements live-nodes + token denylist)
  * maintenance watchdog for stuck cluster-pause flag
  * Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
  * rate limiting: session (10/min) + anonymous global (120/min)
  * integration tests: rebalance, refcount multi-org, RLS belt
  * goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea

Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
2026-04-16 18:13:36 -05:00

123 lines
3.9 KiB
Go

//go:build integration
package test
import (
"context"
"errors"
"testing"
"anchorage/internal/pkg/ids"
"anchorage/internal/pkg/store"
)
// TestRefcountSharedAcrossOrgs verifies the per-(node, cid) refcount
// semantic: two orgs pinning the same CID co-located on the same node
// share a single Kubo pin (refcount=2). Deleting one org's pin drops
// the count to 1 without unpinning Kubo; deleting the second deletes
// the row, which is the trigger the scheduler uses to issue `ipfs pin
// rm` on Kubo.
//
// Exercises: PinStore.IncRefcount, DecRefcount, DeleteRefcountIfZero,
// and the CASCADE on pins → pin_placements.
func TestRefcountSharedAcrossOrgs(t *testing.T) {
s, cleanup := startPostgres(t)
defer cleanup()
ctx := context.Background()
// Two orgs + one node.
orgA := ids.MustNewOrg()
orgB := ids.MustNewOrg()
for _, o := range []struct {
id ids.OrgID
slug string
}{{orgA, "a"}, {orgB, "b"}} {
if _, err := s.Orgs().Create(ctx, o.id, o.slug, o.slug); err != nil {
t.Fatalf("create org %s: %v", o.slug, err)
}
}
node := ids.MustNewNode()
if err := s.Nodes().Upsert(ctx, &store.Node{
ID: node, Multiaddrs: []string{"/dns4/x/tcp/4001/p2p/Q"}, RPCURL: "http://k:5001",
}); err != nil {
t.Fatalf("upsert node: %v", err)
}
const sharedCID = "bafy-shared-dataset"
// Org A pins first.
pinA := ids.MustNewPin()
if err := s.Tx(ctx, func(txCtx context.Context, tx store.Store) error {
if err := tx.Pins().Create(txCtx, &store.Pin{
RequestID: pinA, OrgID: orgA, CID: sharedCID, Status: store.PinStatusPinned,
}); err != nil {
return err
}
if _, err := tx.Pins().InsertPlacement(txCtx, pinA, node, 1); err != nil {
return err
}
return tx.Pins().IncRefcount(txCtx, node, sharedCID)
}); err != nil {
t.Fatalf("seed org A pin: %v", err)
}
// Org B pins the same CID onto the same node.
pinB := ids.MustNewPin()
if err := s.Tx(ctx, func(txCtx context.Context, tx store.Store) error {
if err := tx.Pins().Create(txCtx, &store.Pin{
RequestID: pinB, OrgID: orgB, CID: sharedCID, Status: store.PinStatusPinned,
}); err != nil {
return err
}
if _, err := tx.Pins().InsertPlacement(txCtx, pinB, node, 1); err != nil {
return err
}
return tx.Pins().IncRefcount(txCtx, node, sharedCID)
}); err != nil {
t.Fatalf("seed org B pin: %v", err)
}
// Delete org A's pin. In production pin.Service.Delete would walk
// placements and publish unpin jobs; the consumer decrements the
// refcount. We simulate that path directly here to test the store
// semantic without spinning up a full scheduler.
if err := s.Pins().Delete(ctx, orgA, pinA); err != nil {
t.Fatalf("delete org A pin: %v", err)
}
remaining, err := s.Pins().DecRefcount(ctx, node, sharedCID)
if err != nil {
t.Fatalf("dec after org A delete: %v", err)
}
if remaining != 1 {
t.Errorf("refcount after first delete = %d, want 1", remaining)
}
// Org B's pin + placement must still exist.
if _, err := s.Pins().Get(ctx, orgB, pinB); err != nil {
t.Errorf("org B pin should survive org A's delete: %v", err)
}
if _, err := s.Pins().GetPlacement(ctx, pinB, node); err != nil {
t.Errorf("org B placement should survive: %v", err)
}
// Delete org B's pin → refcount goes to 0 → row removed.
if err := s.Pins().Delete(ctx, orgB, pinB); err != nil {
t.Fatalf("delete org B pin: %v", err)
}
remaining, err = s.Pins().DecRefcount(ctx, node, sharedCID)
if err != nil {
t.Fatalf("dec after org B delete: %v", err)
}
if remaining != 0 {
t.Errorf("refcount after second delete = %d, want 0", remaining)
}
if err := s.Pins().DeleteRefcountIfZero(ctx, node, sharedCID); err != nil {
t.Fatalf("DeleteRefcountIfZero: %v", err)
}
// A subsequent DecRefcount should now fail with ErrNotFound — row is gone.
if _, err := s.Pins().DecRefcount(ctx, node, sharedCID); !errors.Is(err, store.ErrNotFound) {
t.Errorf("expected ErrNotFound after refcount row removed, got %v", err)
}
}