Files
William Gill 12bf35caf8 anchorage v1.0 initial tree
Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.

Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:

  * admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
    prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
  * TTL leader election via NATS KV, fence tokens, JetStream dedup
  * rebalancer (plan/apply split), reconciler, requeue sweeper
  * ristretto caches with NATS-backed cross-node invalidation
    (placements live-nodes + token denylist)
  * maintenance watchdog for stuck cluster-pause flag
  * Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
  * rate limiting: session (10/min) + anonymous global (120/min)
  * integration tests: rebalance, refcount multi-org, RLS belt
  * goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea

Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
2026-04-16 18:13:36 -05:00

292 lines
8.8 KiB
Go

package openapi
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/danielgtaylor/huma/v2"
"anchorage/internal/pkg/auth"
"anchorage/internal/pkg/ids"
"anchorage/internal/pkg/pin"
"anchorage/internal/pkg/store"
)
// pinService is the subset of pin.Service the handlers need, defined as
// an interface so tests can stub it.
type pinService interface {
Create(ctx context.Context, in pin.CreateInput) (*store.Pin, []*store.Placement, error)
Delete(ctx context.Context, orgID ids.OrgID, rid ids.PinID) error
Replace(ctx context.Context, orgID ids.OrgID, rid ids.PinID, in pin.CreateInput) (*store.Pin, []*store.Placement, error)
}
// parsePinFilter maps the spec's HTTP query params into a store.PinFilter.
// Returns a *time.Time (not string) for Before/After after RFC3339 parse;
// returns a map[string]any parsed from the meta JSON string.
func parsePinFilter(in *struct {
CID []string `query:"cid" maxItems:"10"`
Name string `query:"name"`
Match string `query:"match" enum:"exact,iexact,partial,ipartial" default:"exact"`
Status []string `query:"status"`
Before string `query:"before" doc:"RFC3339; return only pins created before this timestamp"`
After string `query:"after" doc:"RFC3339; return only pins created at or after this timestamp"`
Meta string `query:"meta" doc:"JSON object; pin.meta must contain every key/value"`
Limit int `query:"limit" default:"10" minimum:"1" maximum:"1000"`
Offset int `query:"offset" default:"0" minimum:"0"`
}) (store.PinFilter, error) {
f := store.PinFilter{
CIDs: in.CID,
Name: in.Name,
Match: in.Match,
Status: in.Status,
Limit: in.Limit,
Offset: in.Offset,
}
if in.Before != "" {
t, err := time.Parse(time.RFC3339, in.Before)
if err != nil {
return f, fmt.Errorf("before: %w", err)
}
f.Before = &t
}
if in.After != "" {
t, err := time.Parse(time.RFC3339, in.After)
if err != nil {
return f, fmt.Errorf("after: %w", err)
}
f.After = &t
}
if in.Meta != "" {
if err := json.Unmarshal([]byte(in.Meta), &f.Meta); err != nil {
return f, fmt.Errorf("meta: %w", err)
}
}
return f, nil
}
// pinStatusBody is the spec-compliant PinStatus response.
type pinStatusBody struct {
RequestID string `json:"requestid"`
Status string `json:"status" enum:"queued,pinning,pinned,failed"`
Created time.Time `json:"created"`
Pin pinBody `json:"pin"`
Delegates []string `json:"delegates"`
Info map[string]any `json:"info,omitempty"`
}
type pinBody struct {
CID string `json:"cid"`
Name string `json:"name,omitempty"`
Origins []string `json:"origins,omitempty"`
Meta map[string]any `json:"meta,omitempty"`
}
// RegisterPins wires the 5 spec-compliant pin endpoints onto api.
func RegisterPins(api huma.API, pins pinService, s store.Store) {
huma.Register(api, huma.Operation{
OperationID: "createPin",
Method: "POST",
Path: "/pins",
Summary: "Create a pin request",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
Body pinBody
}) (*struct {
Body pinStatusBody
}, error) {
cc, err := auth.Require(ctx)
if err != nil {
return nil, huma.Error401Unauthorized("auth")
}
created, placements, err := pins.Create(ctx, pin.CreateInput{
OrgID: cc.Org,
CID: in.Body.CID,
Name: optString(in.Body.Name),
Meta: in.Body.Meta,
Origins: in.Body.Origins,
})
if err != nil {
if !errors.Is(err, store.ErrConflict) {
return nil, huma.Error500InternalServerError("create pin", err)
}
// Idempotent re-add: load the existing pin's real placements
// so the caller gets accurate delegates, not the /unknown
// sentinel from an empty list.
if created != nil {
if existing, err := s.Pins().ListPlacements(ctx, created.RequestID); err == nil {
placements = existing
}
}
}
return &struct{ Body pinStatusBody }{Body: toBody(created, placements)}, nil
})
huma.Register(api, huma.Operation{
OperationID: "getPin",
Method: "GET",
Path: "/pins/{requestid}",
Summary: "Get a pin by request id",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
RequestID string `path:"requestid"`
}) (*struct{ Body pinStatusBody }, error) {
cc, err := auth.Require(ctx)
if err != nil {
return nil, huma.Error401Unauthorized("auth")
}
rid, err := ids.ParsePin(in.RequestID)
if err != nil {
return nil, huma.Error400BadRequest("bad request id")
}
p, err := s.Pins().Get(ctx, cc.Org, rid)
if err != nil {
return nil, huma.Error404NotFound("pin")
}
pls, _ := s.Pins().ListPlacements(ctx, rid)
return &struct{ Body pinStatusBody }{Body: toBody(p, pls)}, nil
})
huma.Register(api, huma.Operation{
OperationID: "listPins",
Method: "GET",
Path: "/pins",
Summary: "List pins with the full spec-compliant filter set",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
CID []string `query:"cid" maxItems:"10"`
Name string `query:"name"`
Match string `query:"match" enum:"exact,iexact,partial,ipartial" default:"exact"`
Status []string `query:"status"`
Before string `query:"before" doc:"RFC3339; return only pins created before this timestamp"`
After string `query:"after" doc:"RFC3339; return only pins created at or after this timestamp"`
Meta string `query:"meta" doc:"JSON object; pin.meta must contain every key/value"`
Limit int `query:"limit" default:"10" minimum:"1" maximum:"1000"`
Offset int `query:"offset" default:"0" minimum:"0"`
}) (*struct {
Body struct {
Results []pinStatusBody `json:"results"`
}
}, error) {
cc, err := auth.Require(ctx)
if err != nil {
return nil, huma.Error401Unauthorized("auth")
}
filter, err := parsePinFilter(in)
if err != nil {
return nil, huma.Error400BadRequest("bad filter", err)
}
pins, err := s.Pins().Filter(ctx, cc.Org, filter)
if err != nil {
return nil, huma.Error500InternalServerError("list pins", err)
}
out := &struct {
Body struct {
Results []pinStatusBody `json:"results"`
}
}{}
for _, p := range pins {
pls, _ := s.Pins().ListPlacements(ctx, p.RequestID)
out.Body.Results = append(out.Body.Results, toBody(p, pls))
}
return out, nil
})
huma.Register(api, huma.Operation{
OperationID: "replacePin",
Method: "POST",
Path: "/pins/{requestid}",
Summary: "Replace a pin's CID (spec-compliant pin update)",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
RequestID string `path:"requestid"`
Body pinBody
}) (*struct{ Body pinStatusBody }, error) {
cc, err := auth.Require(ctx)
if err != nil {
return nil, huma.Error401Unauthorized("auth")
}
rid, err := ids.ParsePin(in.RequestID)
if err != nil {
return nil, huma.Error400BadRequest("bad request id")
}
replaced, placements, err := pins.Replace(ctx, cc.Org, rid, pin.CreateInput{
OrgID: cc.Org,
CID: in.Body.CID,
Name: optString(in.Body.Name),
Meta: in.Body.Meta,
Origins: in.Body.Origins,
})
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, huma.Error404NotFound("pin")
}
return nil, huma.Error500InternalServerError("replace pin", err)
}
return &struct{ Body pinStatusBody }{Body: toBody(replaced, placements)}, nil
})
huma.Register(api, huma.Operation{
OperationID: "deletePin",
Method: "DELETE",
Path: "/pins/{requestid}",
Summary: "Remove a pin",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
RequestID string `path:"requestid"`
}) (*struct{}, error) {
cc, err := auth.Require(ctx)
if err != nil {
return nil, huma.Error401Unauthorized("auth")
}
rid, err := ids.ParsePin(in.RequestID)
if err != nil {
return nil, huma.Error400BadRequest("bad request id")
}
if err := pins.Delete(ctx, cc.Org, rid); err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, huma.Error404NotFound("pin")
}
return nil, huma.Error500InternalServerError("delete pin", err)
}
return &struct{}{}, nil
})
}
func toBody(p *store.Pin, placements []*store.Placement) pinStatusBody {
if p == nil {
return pinStatusBody{}
}
delegates := make([]string, 0, len(placements))
for _, pl := range placements {
delegates = append(delegates, pl.Multiaddrs...)
}
if len(delegates) == 0 {
delegates = []string{"/unknown"}
}
b := pinStatusBody{
RequestID: p.RequestID.String(),
Status: p.Status,
Created: p.Created,
Pin: pinBody{
CID: p.CID,
Origins: p.Origins,
Meta: p.Meta,
},
Delegates: delegates,
}
if p.Name != nil {
b.Pin.Name = *p.Name
}
return b
}
func optString(s string) *string {
if s == "" {
return nil
}
return &s
}