// Package test hosts anchorage's cross-package integration tests. They // are gated by a build tag because they pull in Docker via // testcontainers-go; running the normal `go test ./...` suite does NOT // exercise them. // // Run locally with: // // go test -tags=integration -timeout=5m ./test/... // //go:build integration package test import ( "context" "testing" "anchorage/internal/pkg/ids" "anchorage/internal/pkg/store" ) // --------------------------------------------------------------------------- // TestPostgresEndToEnd drives the full store API against a real // Postgres: create org + user + membership, mint a pin, assert // placement, drain a node, verify rebalance by placement migration, // replace a pin, delete a pin, confirm CASCADE cleans placements + refs. func TestPostgresEndToEnd(t *testing.T) { s, cleanup := startPostgres(t) defer cleanup() ctx := context.Background() // --- org + user --- orgID := ids.MustNewOrg() org, err := s.Orgs().Create(ctx, orgID, "acme", "Acme Corp") if err != nil { t.Fatalf("create org: %v", err) } if org.Slug != "acme" { t.Fatalf("slug") } userID := ids.MustNewUser() user, err := s.Users().UpsertByAuthentikSub(ctx, userID, "sub-1", "alice@example.com", "Alice", true) if err != nil { t.Fatalf("upsert user: %v", err) } if !user.IsSysadmin { t.Error("user should be sysadmin") } if err := s.Memberships().Add(ctx, orgID, userID, store.RoleOrgAdmin); err != nil { t.Fatalf("add member: %v", err) } // --- node registry --- nodeID := ids.MustNewNode() if err := s.Nodes().Upsert(ctx, &store.Node{ ID: nodeID, DisplayName: "node-1", Multiaddrs: []string{"/dns4/node-1/tcp/4001/p2p/QmX"}, RPCURL: "http://localhost:5001", Status: store.NodeStatusUp, }); err != nil { t.Fatalf("upsert node: %v", err) } // --- pin create --- pinID := ids.MustNewPin() name := "dataset" err = s.Tx(ctx, func(txCtx context.Context, tx store.Store) error { if err := tx.Pins().Create(txCtx, &store.Pin{ RequestID: pinID, OrgID: orgID, CID: "bafy1", Name: &name, Meta: map[string]any{"env": "prod"}, Status: store.PinStatusQueued, }); err != nil { return err } if _, err := tx.Pins().InsertPlacement(txCtx, pinID, nodeID, 1); err != nil { return err } return tx.Pins().IncRefcount(txCtx, nodeID, "bafy1") }) if err != nil { t.Fatalf("tx: %v", err) } // --- filter exercises the full spec-compliant surface --- pins, err := s.Pins().Filter(ctx, orgID, store.PinFilter{ CIDs: []string{"bafy1"}, Name: "dataset", Match: "exact", Status: []string{store.PinStatusQueued}, Meta: map[string]any{"env": "prod"}, Limit: 10, }) if err != nil { t.Fatalf("filter: %v", err) } if len(pins) != 1 || pins[0].RequestID != pinID { t.Fatalf("filter got %d pins, expected 1 with the created RequestID", len(pins)) } // --- by-request-id (scheduler path) --- fetched, err := s.Pins().GetByRequestID(ctx, pinID) if err != nil { t.Fatalf("get-by-rid: %v", err) } if fetched.CID != "bafy1" { t.Errorf("cid drift: %q", fetched.CID) } // --- drain + undrain --- if err := s.Nodes().Drain(ctx, nodeID); err != nil { t.Fatalf("drain: %v", err) } n, _ := s.Nodes().Get(ctx, nodeID) if n.Status != store.NodeStatusDrained { t.Errorf("drain status = %s", n.Status) } if err := s.Nodes().Uncordon(ctx, nodeID); err != nil { t.Fatalf("uncordon: %v", err) } // --- delete + cascade --- if err := s.Pins().Delete(ctx, orgID, pinID); err != nil { t.Fatalf("delete: %v", err) } if _, err := s.Pins().GetPlacement(ctx, pinID, nodeID); err == nil { t.Error("placement survived pin delete — CASCADE broken?") } }