Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.
Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:
* admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
* TTL leader election via NATS KV, fence tokens, JetStream dedup
* rebalancer (plan/apply split), reconciler, requeue sweeper
* ristretto caches with NATS-backed cross-node invalidation
(placements live-nodes + token denylist)
* maintenance watchdog for stuck cluster-pause flag
* Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
* rate limiting: session (10/min) + anonymous global (120/min)
* integration tests: rebalance, refcount multi-org, RLS belt
* goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea
Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
123 lines
3.9 KiB
Go
123 lines
3.9 KiB
Go
//go:build integration
|
|
|
|
package test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"testing"
|
|
|
|
"anchorage/internal/pkg/ids"
|
|
"anchorage/internal/pkg/store"
|
|
)
|
|
|
|
// TestRefcountSharedAcrossOrgs verifies the per-(node, cid) refcount
|
|
// semantic: two orgs pinning the same CID co-located on the same node
|
|
// share a single Kubo pin (refcount=2). Deleting one org's pin drops
|
|
// the count to 1 without unpinning Kubo; deleting the second deletes
|
|
// the row, which is the trigger the scheduler uses to issue `ipfs pin
|
|
// rm` on Kubo.
|
|
//
|
|
// Exercises: PinStore.IncRefcount, DecRefcount, DeleteRefcountIfZero,
|
|
// and the CASCADE on pins → pin_placements.
|
|
func TestRefcountSharedAcrossOrgs(t *testing.T) {
|
|
s, cleanup := startPostgres(t)
|
|
defer cleanup()
|
|
ctx := context.Background()
|
|
|
|
// Two orgs + one node.
|
|
orgA := ids.MustNewOrg()
|
|
orgB := ids.MustNewOrg()
|
|
for _, o := range []struct {
|
|
id ids.OrgID
|
|
slug string
|
|
}{{orgA, "a"}, {orgB, "b"}} {
|
|
if _, err := s.Orgs().Create(ctx, o.id, o.slug, o.slug); err != nil {
|
|
t.Fatalf("create org %s: %v", o.slug, err)
|
|
}
|
|
}
|
|
node := ids.MustNewNode()
|
|
if err := s.Nodes().Upsert(ctx, &store.Node{
|
|
ID: node, Multiaddrs: []string{"/dns4/x/tcp/4001/p2p/Q"}, RPCURL: "http://k:5001",
|
|
}); err != nil {
|
|
t.Fatalf("upsert node: %v", err)
|
|
}
|
|
|
|
const sharedCID = "bafy-shared-dataset"
|
|
|
|
// Org A pins first.
|
|
pinA := ids.MustNewPin()
|
|
if err := s.Tx(ctx, func(txCtx context.Context, tx store.Store) error {
|
|
if err := tx.Pins().Create(txCtx, &store.Pin{
|
|
RequestID: pinA, OrgID: orgA, CID: sharedCID, Status: store.PinStatusPinned,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.Pins().InsertPlacement(txCtx, pinA, node, 1); err != nil {
|
|
return err
|
|
}
|
|
return tx.Pins().IncRefcount(txCtx, node, sharedCID)
|
|
}); err != nil {
|
|
t.Fatalf("seed org A pin: %v", err)
|
|
}
|
|
|
|
// Org B pins the same CID onto the same node.
|
|
pinB := ids.MustNewPin()
|
|
if err := s.Tx(ctx, func(txCtx context.Context, tx store.Store) error {
|
|
if err := tx.Pins().Create(txCtx, &store.Pin{
|
|
RequestID: pinB, OrgID: orgB, CID: sharedCID, Status: store.PinStatusPinned,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
if _, err := tx.Pins().InsertPlacement(txCtx, pinB, node, 1); err != nil {
|
|
return err
|
|
}
|
|
return tx.Pins().IncRefcount(txCtx, node, sharedCID)
|
|
}); err != nil {
|
|
t.Fatalf("seed org B pin: %v", err)
|
|
}
|
|
|
|
// Delete org A's pin. In production pin.Service.Delete would walk
|
|
// placements and publish unpin jobs; the consumer decrements the
|
|
// refcount. We simulate that path directly here to test the store
|
|
// semantic without spinning up a full scheduler.
|
|
if err := s.Pins().Delete(ctx, orgA, pinA); err != nil {
|
|
t.Fatalf("delete org A pin: %v", err)
|
|
}
|
|
remaining, err := s.Pins().DecRefcount(ctx, node, sharedCID)
|
|
if err != nil {
|
|
t.Fatalf("dec after org A delete: %v", err)
|
|
}
|
|
if remaining != 1 {
|
|
t.Errorf("refcount after first delete = %d, want 1", remaining)
|
|
}
|
|
|
|
// Org B's pin + placement must still exist.
|
|
if _, err := s.Pins().Get(ctx, orgB, pinB); err != nil {
|
|
t.Errorf("org B pin should survive org A's delete: %v", err)
|
|
}
|
|
if _, err := s.Pins().GetPlacement(ctx, pinB, node); err != nil {
|
|
t.Errorf("org B placement should survive: %v", err)
|
|
}
|
|
|
|
// Delete org B's pin → refcount goes to 0 → row removed.
|
|
if err := s.Pins().Delete(ctx, orgB, pinB); err != nil {
|
|
t.Fatalf("delete org B pin: %v", err)
|
|
}
|
|
remaining, err = s.Pins().DecRefcount(ctx, node, sharedCID)
|
|
if err != nil {
|
|
t.Fatalf("dec after org B delete: %v", err)
|
|
}
|
|
if remaining != 0 {
|
|
t.Errorf("refcount after second delete = %d, want 0", remaining)
|
|
}
|
|
if err := s.Pins().DeleteRefcountIfZero(ctx, node, sharedCID); err != nil {
|
|
t.Fatalf("DeleteRefcountIfZero: %v", err)
|
|
}
|
|
|
|
// A subsequent DecRefcount should now fail with ErrNotFound — row is gone.
|
|
if _, err := s.Pins().DecRefcount(ctx, node, sharedCID); !errors.Is(err, store.ErrNotFound) {
|
|
t.Errorf("expected ErrNotFound after refcount row removed, got %v", err)
|
|
}
|
|
}
|