Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.
Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:
* admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
* TTL leader election via NATS KV, fence tokens, JetStream dedup
* rebalancer (plan/apply split), reconciler, requeue sweeper
* ristretto caches with NATS-backed cross-node invalidation
(placements live-nodes + token denylist)
* maintenance watchdog for stuck cluster-pause flag
* Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
* rate limiting: session (10/min) + anonymous global (120/min)
* integration tests: rebalance, refcount multi-org, RLS belt
* goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea
Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
292 lines
8.8 KiB
Go
292 lines
8.8 KiB
Go
package openapi
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/danielgtaylor/huma/v2"
|
|
|
|
"anchorage/internal/pkg/auth"
|
|
"anchorage/internal/pkg/ids"
|
|
"anchorage/internal/pkg/pin"
|
|
"anchorage/internal/pkg/store"
|
|
)
|
|
|
|
// pinService is the subset of pin.Service the handlers need, defined as
|
|
// an interface so tests can stub it.
|
|
type pinService interface {
|
|
Create(ctx context.Context, in pin.CreateInput) (*store.Pin, []*store.Placement, error)
|
|
Delete(ctx context.Context, orgID ids.OrgID, rid ids.PinID) error
|
|
Replace(ctx context.Context, orgID ids.OrgID, rid ids.PinID, in pin.CreateInput) (*store.Pin, []*store.Placement, error)
|
|
}
|
|
|
|
// parsePinFilter maps the spec's HTTP query params into a store.PinFilter.
|
|
// Returns a *time.Time (not string) for Before/After after RFC3339 parse;
|
|
// returns a map[string]any parsed from the meta JSON string.
|
|
func parsePinFilter(in *struct {
|
|
CID []string `query:"cid" maxItems:"10"`
|
|
Name string `query:"name"`
|
|
Match string `query:"match" enum:"exact,iexact,partial,ipartial" default:"exact"`
|
|
Status []string `query:"status"`
|
|
Before string `query:"before" doc:"RFC3339; return only pins created before this timestamp"`
|
|
After string `query:"after" doc:"RFC3339; return only pins created at or after this timestamp"`
|
|
Meta string `query:"meta" doc:"JSON object; pin.meta must contain every key/value"`
|
|
Limit int `query:"limit" default:"10" minimum:"1" maximum:"1000"`
|
|
Offset int `query:"offset" default:"0" minimum:"0"`
|
|
}) (store.PinFilter, error) {
|
|
f := store.PinFilter{
|
|
CIDs: in.CID,
|
|
Name: in.Name,
|
|
Match: in.Match,
|
|
Status: in.Status,
|
|
Limit: in.Limit,
|
|
Offset: in.Offset,
|
|
}
|
|
if in.Before != "" {
|
|
t, err := time.Parse(time.RFC3339, in.Before)
|
|
if err != nil {
|
|
return f, fmt.Errorf("before: %w", err)
|
|
}
|
|
f.Before = &t
|
|
}
|
|
if in.After != "" {
|
|
t, err := time.Parse(time.RFC3339, in.After)
|
|
if err != nil {
|
|
return f, fmt.Errorf("after: %w", err)
|
|
}
|
|
f.After = &t
|
|
}
|
|
if in.Meta != "" {
|
|
if err := json.Unmarshal([]byte(in.Meta), &f.Meta); err != nil {
|
|
return f, fmt.Errorf("meta: %w", err)
|
|
}
|
|
}
|
|
return f, nil
|
|
}
|
|
|
|
// pinStatusBody is the spec-compliant PinStatus response.
|
|
type pinStatusBody struct {
|
|
RequestID string `json:"requestid"`
|
|
Status string `json:"status" enum:"queued,pinning,pinned,failed"`
|
|
Created time.Time `json:"created"`
|
|
Pin pinBody `json:"pin"`
|
|
Delegates []string `json:"delegates"`
|
|
Info map[string]any `json:"info,omitempty"`
|
|
}
|
|
|
|
type pinBody struct {
|
|
CID string `json:"cid"`
|
|
Name string `json:"name,omitempty"`
|
|
Origins []string `json:"origins,omitempty"`
|
|
Meta map[string]any `json:"meta,omitempty"`
|
|
}
|
|
|
|
// RegisterPins wires the 5 spec-compliant pin endpoints onto api.
|
|
func RegisterPins(api huma.API, pins pinService, s store.Store) {
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "createPin",
|
|
Method: "POST",
|
|
Path: "/pins",
|
|
Summary: "Create a pin request",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
Body pinBody
|
|
}) (*struct {
|
|
Body pinStatusBody
|
|
}, error) {
|
|
cc, err := auth.Require(ctx)
|
|
if err != nil {
|
|
return nil, huma.Error401Unauthorized("auth")
|
|
}
|
|
created, placements, err := pins.Create(ctx, pin.CreateInput{
|
|
OrgID: cc.Org,
|
|
CID: in.Body.CID,
|
|
Name: optString(in.Body.Name),
|
|
Meta: in.Body.Meta,
|
|
Origins: in.Body.Origins,
|
|
})
|
|
if err != nil {
|
|
if !errors.Is(err, store.ErrConflict) {
|
|
return nil, huma.Error500InternalServerError("create pin", err)
|
|
}
|
|
// Idempotent re-add: load the existing pin's real placements
|
|
// so the caller gets accurate delegates, not the /unknown
|
|
// sentinel from an empty list.
|
|
if created != nil {
|
|
if existing, err := s.Pins().ListPlacements(ctx, created.RequestID); err == nil {
|
|
placements = existing
|
|
}
|
|
}
|
|
}
|
|
return &struct{ Body pinStatusBody }{Body: toBody(created, placements)}, nil
|
|
})
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "getPin",
|
|
Method: "GET",
|
|
Path: "/pins/{requestid}",
|
|
Summary: "Get a pin by request id",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
RequestID string `path:"requestid"`
|
|
}) (*struct{ Body pinStatusBody }, error) {
|
|
cc, err := auth.Require(ctx)
|
|
if err != nil {
|
|
return nil, huma.Error401Unauthorized("auth")
|
|
}
|
|
rid, err := ids.ParsePin(in.RequestID)
|
|
if err != nil {
|
|
return nil, huma.Error400BadRequest("bad request id")
|
|
}
|
|
p, err := s.Pins().Get(ctx, cc.Org, rid)
|
|
if err != nil {
|
|
return nil, huma.Error404NotFound("pin")
|
|
}
|
|
pls, _ := s.Pins().ListPlacements(ctx, rid)
|
|
return &struct{ Body pinStatusBody }{Body: toBody(p, pls)}, nil
|
|
})
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "listPins",
|
|
Method: "GET",
|
|
Path: "/pins",
|
|
Summary: "List pins with the full spec-compliant filter set",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
CID []string `query:"cid" maxItems:"10"`
|
|
Name string `query:"name"`
|
|
Match string `query:"match" enum:"exact,iexact,partial,ipartial" default:"exact"`
|
|
Status []string `query:"status"`
|
|
Before string `query:"before" doc:"RFC3339; return only pins created before this timestamp"`
|
|
After string `query:"after" doc:"RFC3339; return only pins created at or after this timestamp"`
|
|
Meta string `query:"meta" doc:"JSON object; pin.meta must contain every key/value"`
|
|
Limit int `query:"limit" default:"10" minimum:"1" maximum:"1000"`
|
|
Offset int `query:"offset" default:"0" minimum:"0"`
|
|
}) (*struct {
|
|
Body struct {
|
|
Results []pinStatusBody `json:"results"`
|
|
}
|
|
}, error) {
|
|
cc, err := auth.Require(ctx)
|
|
if err != nil {
|
|
return nil, huma.Error401Unauthorized("auth")
|
|
}
|
|
filter, err := parsePinFilter(in)
|
|
if err != nil {
|
|
return nil, huma.Error400BadRequest("bad filter", err)
|
|
}
|
|
pins, err := s.Pins().Filter(ctx, cc.Org, filter)
|
|
if err != nil {
|
|
return nil, huma.Error500InternalServerError("list pins", err)
|
|
}
|
|
out := &struct {
|
|
Body struct {
|
|
Results []pinStatusBody `json:"results"`
|
|
}
|
|
}{}
|
|
for _, p := range pins {
|
|
pls, _ := s.Pins().ListPlacements(ctx, p.RequestID)
|
|
out.Body.Results = append(out.Body.Results, toBody(p, pls))
|
|
}
|
|
return out, nil
|
|
})
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "replacePin",
|
|
Method: "POST",
|
|
Path: "/pins/{requestid}",
|
|
Summary: "Replace a pin's CID (spec-compliant pin update)",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
RequestID string `path:"requestid"`
|
|
Body pinBody
|
|
}) (*struct{ Body pinStatusBody }, error) {
|
|
cc, err := auth.Require(ctx)
|
|
if err != nil {
|
|
return nil, huma.Error401Unauthorized("auth")
|
|
}
|
|
rid, err := ids.ParsePin(in.RequestID)
|
|
if err != nil {
|
|
return nil, huma.Error400BadRequest("bad request id")
|
|
}
|
|
replaced, placements, err := pins.Replace(ctx, cc.Org, rid, pin.CreateInput{
|
|
OrgID: cc.Org,
|
|
CID: in.Body.CID,
|
|
Name: optString(in.Body.Name),
|
|
Meta: in.Body.Meta,
|
|
Origins: in.Body.Origins,
|
|
})
|
|
if err != nil {
|
|
if errors.Is(err, store.ErrNotFound) {
|
|
return nil, huma.Error404NotFound("pin")
|
|
}
|
|
return nil, huma.Error500InternalServerError("replace pin", err)
|
|
}
|
|
return &struct{ Body pinStatusBody }{Body: toBody(replaced, placements)}, nil
|
|
})
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "deletePin",
|
|
Method: "DELETE",
|
|
Path: "/pins/{requestid}",
|
|
Summary: "Remove a pin",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
RequestID string `path:"requestid"`
|
|
}) (*struct{}, error) {
|
|
cc, err := auth.Require(ctx)
|
|
if err != nil {
|
|
return nil, huma.Error401Unauthorized("auth")
|
|
}
|
|
rid, err := ids.ParsePin(in.RequestID)
|
|
if err != nil {
|
|
return nil, huma.Error400BadRequest("bad request id")
|
|
}
|
|
if err := pins.Delete(ctx, cc.Org, rid); err != nil {
|
|
if errors.Is(err, store.ErrNotFound) {
|
|
return nil, huma.Error404NotFound("pin")
|
|
}
|
|
return nil, huma.Error500InternalServerError("delete pin", err)
|
|
}
|
|
return &struct{}{}, nil
|
|
})
|
|
}
|
|
|
|
func toBody(p *store.Pin, placements []*store.Placement) pinStatusBody {
|
|
if p == nil {
|
|
return pinStatusBody{}
|
|
}
|
|
delegates := make([]string, 0, len(placements))
|
|
for _, pl := range placements {
|
|
delegates = append(delegates, pl.Multiaddrs...)
|
|
}
|
|
if len(delegates) == 0 {
|
|
delegates = []string{"/unknown"}
|
|
}
|
|
b := pinStatusBody{
|
|
RequestID: p.RequestID.String(),
|
|
Status: p.Status,
|
|
Created: p.Created,
|
|
Pin: pinBody{
|
|
CID: p.CID,
|
|
Origins: p.Origins,
|
|
Meta: p.Meta,
|
|
},
|
|
Delegates: delegates,
|
|
}
|
|
if p.Name != nil {
|
|
b.Pin.Name = *p.Name
|
|
}
|
|
return b
|
|
}
|
|
|
|
func optString(s string) *string {
|
|
if s == "" {
|
|
return nil
|
|
}
|
|
return &s
|
|
}
|