Files
William Gill 12bf35caf8 anchorage v1.0 initial tree
Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.

Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:

  * admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
    prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
  * TTL leader election via NATS KV, fence tokens, JetStream dedup
  * rebalancer (plan/apply split), reconciler, requeue sweeper
  * ristretto caches with NATS-backed cross-node invalidation
    (placements live-nodes + token denylist)
  * maintenance watchdog for stuck cluster-pause flag
  * Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
  * rate limiting: session (10/min) + anonymous global (120/min)
  * integration tests: rebalance, refcount multi-org, RLS belt
  * goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea

Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
2026-04-16 18:13:36 -05:00

601 lines
14 KiB
Go

// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: pins.sql
package sqlc
import (
"context"
"anchorage/internal/pkg/ids"
"github.com/jackc/pgx/v5/pgtype"
)
const createPin = `-- name: CreatePin :one
INSERT INTO pins (request_id, org_id, cid, name, meta, origins, status)
VALUES ($1, $2, $3, $4, $5, $6, 'queued')
RETURNING request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at
`
type CreatePinParams struct {
RequestID ids.PinID
OrgID ids.OrgID
Cid string
Name *string
Meta []byte
Origins []string
}
func (q *Queries) CreatePin(ctx context.Context, arg CreatePinParams) (Pin, error) {
row := q.db.QueryRow(ctx, createPin,
arg.RequestID,
arg.OrgID,
arg.Cid,
arg.Name,
arg.Meta,
arg.Origins,
)
var i Pin
err := row.Scan(
&i.RequestID,
&i.OrgID,
&i.Cid,
&i.Name,
&i.Meta,
&i.Origins,
&i.Status,
&i.FailureReason,
&i.Created,
&i.UpdatedAt,
)
return i, err
}
const decRefcount = `-- name: DecRefcount :one
UPDATE pin_refcount
SET count = count - 1
WHERE node_id = $1 AND cid = $2
RETURNING count
`
type DecRefcountParams struct {
NodeID ids.NodeID
Cid string
}
func (q *Queries) DecRefcount(ctx context.Context, arg DecRefcountParams) (int32, error) {
row := q.db.QueryRow(ctx, decRefcount, arg.NodeID, arg.Cid)
var count int32
err := row.Scan(&count)
return count, err
}
const deletePin = `-- name: DeletePin :exec
DELETE FROM pins WHERE request_id = $1 AND org_id = $2
`
type DeletePinParams struct {
RequestID ids.PinID
OrgID ids.OrgID
}
func (q *Queries) DeletePin(ctx context.Context, arg DeletePinParams) error {
_, err := q.db.Exec(ctx, deletePin, arg.RequestID, arg.OrgID)
return err
}
const deleteRefcount = `-- name: DeleteRefcount :exec
DELETE FROM pin_refcount WHERE node_id = $1 AND cid = $2 AND count <= 0
`
type DeleteRefcountParams struct {
NodeID ids.NodeID
Cid string
}
func (q *Queries) DeleteRefcount(ctx context.Context, arg DeleteRefcountParams) error {
_, err := q.db.Exec(ctx, deleteRefcount, arg.NodeID, arg.Cid)
return err
}
const filterPins = `-- name: FilterPins :many
SELECT request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at
FROM pins
WHERE org_id = $1
AND (COALESCE(array_length($2::text[], 1), 0) = 0
OR cid = ANY($2::text[]))
AND (COALESCE(array_length($3::text[], 1), 0) = 0
OR status = ANY($3::text[]))
AND ($4::text IS NULL
OR ($5::text = 'exact' AND name = $4::text)
OR ($5::text = 'iexact' AND lower(name) = lower($4::text))
OR ($5::text = 'partial' AND name LIKE '%' || $4::text || '%')
OR ($5::text = 'ipartial' AND name ILIKE '%' || $4::text || '%'))
AND ($6::timestamptz IS NULL OR created < $6::timestamptz)
AND ($7::timestamptz IS NULL OR created >= $7::timestamptz)
AND ($8::jsonb = '{}'::jsonb
OR meta @> $8::jsonb)
ORDER BY created DESC
LIMIT $10
OFFSET $9
`
type FilterPinsParams struct {
OrgID ids.OrgID
Cids []string
Statuses []string
Name *string
MatchMode string
Before pgtype.Timestamptz
After pgtype.Timestamptz
MetaFilter []byte
PinOffset int32
PinLimit int32
}
// FilterPins honors the full IPFS Pinning API spec filter surface.
//
// All filters are optional; empty arrays / NULL values short-circuit to
// "no restriction" via the IS NULL / array-length guards. Name matching
// supports four modes through a single `match` discriminator:
//
// 'exact' — LIKE (case-sensitive, no wildcards)
// 'iexact' — ILIKE anchored both ends
// 'partial' — LIKE %name%
// 'ipartial' — ILIKE %name% (uses pg_trgm GIN)
//
// `meta` is treated as a jsonb "contains" check — the pin's meta must
// be a superset of the requested map.
func (q *Queries) FilterPins(ctx context.Context, arg FilterPinsParams) ([]Pin, error) {
rows, err := q.db.Query(ctx, filterPins,
arg.OrgID,
arg.Cids,
arg.Statuses,
arg.Name,
arg.MatchMode,
arg.Before,
arg.After,
arg.MetaFilter,
arg.PinOffset,
arg.PinLimit,
)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Pin
for rows.Next() {
var i Pin
if err := rows.Scan(
&i.RequestID,
&i.OrgID,
&i.Cid,
&i.Name,
&i.Meta,
&i.Origins,
&i.Status,
&i.FailureReason,
&i.Created,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getExistingLivePinByCID = `-- name: GetExistingLivePinByCID :one
SELECT request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at FROM pins
WHERE org_id = $1 AND cid = $2 AND status <> 'failed'
LIMIT 1
`
type GetExistingLivePinByCIDParams struct {
OrgID ids.OrgID
Cid string
}
func (q *Queries) GetExistingLivePinByCID(ctx context.Context, arg GetExistingLivePinByCIDParams) (Pin, error) {
row := q.db.QueryRow(ctx, getExistingLivePinByCID, arg.OrgID, arg.Cid)
var i Pin
err := row.Scan(
&i.RequestID,
&i.OrgID,
&i.Cid,
&i.Name,
&i.Meta,
&i.Origins,
&i.Status,
&i.FailureReason,
&i.Created,
&i.UpdatedAt,
)
return i, err
}
const getPin = `-- name: GetPin :one
SELECT request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at FROM pins WHERE request_id = $1 AND org_id = $2
`
type GetPinParams struct {
RequestID ids.PinID
OrgID ids.OrgID
}
func (q *Queries) GetPin(ctx context.Context, arg GetPinParams) (Pin, error) {
row := q.db.QueryRow(ctx, getPin, arg.RequestID, arg.OrgID)
var i Pin
err := row.Scan(
&i.RequestID,
&i.OrgID,
&i.Cid,
&i.Name,
&i.Meta,
&i.Origins,
&i.Status,
&i.FailureReason,
&i.Created,
&i.UpdatedAt,
)
return i, err
}
const getPlacement = `-- name: GetPlacement :one
SELECT request_id, node_id, status, failure_reason, attempts, fence, created_at, updated_at FROM pin_placements WHERE request_id = $1 AND node_id = $2
`
type GetPlacementParams struct {
RequestID ids.PinID
NodeID ids.NodeID
}
func (q *Queries) GetPlacement(ctx context.Context, arg GetPlacementParams) (PinPlacement, error) {
row := q.db.QueryRow(ctx, getPlacement, arg.RequestID, arg.NodeID)
var i PinPlacement
err := row.Scan(
&i.RequestID,
&i.NodeID,
&i.Status,
&i.FailureReason,
&i.Attempts,
&i.Fence,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const incRefcount = `-- name: IncRefcount :exec
INSERT INTO pin_refcount (node_id, cid, count)
VALUES ($1, $2, 1)
ON CONFLICT (node_id, cid) DO UPDATE SET count = pin_refcount.count + 1
`
type IncRefcountParams struct {
NodeID ids.NodeID
Cid string
}
func (q *Queries) IncRefcount(ctx context.Context, arg IncRefcountParams) error {
_, err := q.db.Exec(ctx, incRefcount, arg.NodeID, arg.Cid)
return err
}
const insertPlacement = `-- name: InsertPlacement :one
INSERT INTO pin_placements (request_id, node_id, status, fence)
VALUES ($1, $2, 'queued', $3)
RETURNING request_id, node_id, status, failure_reason, attempts, fence, created_at, updated_at
`
type InsertPlacementParams struct {
RequestID ids.PinID
NodeID ids.NodeID
Fence int64
}
func (q *Queries) InsertPlacement(ctx context.Context, arg InsertPlacementParams) (PinPlacement, error) {
row := q.db.QueryRow(ctx, insertPlacement, arg.RequestID, arg.NodeID, arg.Fence)
var i PinPlacement
err := row.Scan(
&i.RequestID,
&i.NodeID,
&i.Status,
&i.FailureReason,
&i.Attempts,
&i.Fence,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const listPins = `-- name: ListPins :many
SELECT request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at FROM pins
WHERE org_id = $1
ORDER BY created DESC
LIMIT $2 OFFSET $3
`
type ListPinsParams struct {
OrgID ids.OrgID
Limit int32
Offset int32
}
func (q *Queries) ListPins(ctx context.Context, arg ListPinsParams) ([]Pin, error) {
rows, err := q.db.Query(ctx, listPins, arg.OrgID, arg.Limit, arg.Offset)
if err != nil {
return nil, err
}
defer rows.Close()
var items []Pin
for rows.Next() {
var i Pin
if err := rows.Scan(
&i.RequestID,
&i.OrgID,
&i.Cid,
&i.Name,
&i.Meta,
&i.Origins,
&i.Status,
&i.FailureReason,
&i.Created,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listPlacementsForNode = `-- name: ListPlacementsForNode :many
SELECT request_id, node_id, status, failure_reason, attempts, fence, created_at, updated_at FROM pin_placements
WHERE node_id = $1 AND status = $2
ORDER BY created_at
`
type ListPlacementsForNodeParams struct {
NodeID ids.NodeID
Status string
}
func (q *Queries) ListPlacementsForNode(ctx context.Context, arg ListPlacementsForNodeParams) ([]PinPlacement, error) {
rows, err := q.db.Query(ctx, listPlacementsForNode, arg.NodeID, arg.Status)
if err != nil {
return nil, err
}
defer rows.Close()
var items []PinPlacement
for rows.Next() {
var i PinPlacement
if err := rows.Scan(
&i.RequestID,
&i.NodeID,
&i.Status,
&i.FailureReason,
&i.Attempts,
&i.Fence,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listPlacementsForPin = `-- name: ListPlacementsForPin :many
SELECT pp.request_id, pp.node_id, pp.status, pp.failure_reason, pp.attempts, pp.fence, pp.created_at, pp.updated_at, n.multiaddrs
FROM pin_placements pp JOIN nodes n ON n.id = pp.node_id
WHERE pp.request_id = $1
`
type ListPlacementsForPinRow struct {
RequestID ids.PinID
NodeID ids.NodeID
Status string
FailureReason *string
Attempts int32
Fence int64
CreatedAt pgtype.Timestamptz
UpdatedAt pgtype.Timestamptz
Multiaddrs []string
}
func (q *Queries) ListPlacementsForPin(ctx context.Context, requestID ids.PinID) ([]ListPlacementsForPinRow, error) {
rows, err := q.db.Query(ctx, listPlacementsForPin, requestID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []ListPlacementsForPinRow
for rows.Next() {
var i ListPlacementsForPinRow
if err := rows.Scan(
&i.RequestID,
&i.NodeID,
&i.Status,
&i.FailureReason,
&i.Attempts,
&i.Fence,
&i.CreatedAt,
&i.UpdatedAt,
&i.Multiaddrs,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const replacePin = `-- name: ReplacePin :one
UPDATE pins
SET cid = $3,
name = $4,
meta = $5,
origins = $6,
status = 'queued',
failure_reason = NULL
WHERE request_id = $1 AND org_id = $2
RETURNING request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at
`
type ReplacePinParams struct {
RequestID ids.PinID
OrgID ids.OrgID
Cid string
Name *string
Meta []byte
Origins []string
}
// ReplacePin swaps a pin's CID atomically. Used by POST /v1/pins/{rid}
// (pin replace per spec). Placements + refcount are left to the caller
// to rearrange in the same transaction.
func (q *Queries) ReplacePin(ctx context.Context, arg ReplacePinParams) (Pin, error) {
row := q.db.QueryRow(ctx, replacePin,
arg.RequestID,
arg.OrgID,
arg.Cid,
arg.Name,
arg.Meta,
arg.Origins,
)
var i Pin
err := row.Scan(
&i.RequestID,
&i.OrgID,
&i.Cid,
&i.Name,
&i.Meta,
&i.Origins,
&i.Status,
&i.FailureReason,
&i.Created,
&i.UpdatedAt,
)
return i, err
}
const replacePlacementFence = `-- name: ReplacePlacementFence :one
UPDATE pin_placements
SET node_id = $3, fence = fence + 1, status = 'queued', failure_reason = NULL
WHERE request_id = $1 AND node_id = $2
RETURNING request_id, node_id, status, failure_reason, attempts, fence, created_at, updated_at
`
type ReplacePlacementFenceParams struct {
RequestID ids.PinID
NodeID ids.NodeID
NodeID_2 ids.NodeID
}
func (q *Queries) ReplacePlacementFence(ctx context.Context, arg ReplacePlacementFenceParams) (PinPlacement, error) {
row := q.db.QueryRow(ctx, replacePlacementFence, arg.RequestID, arg.NodeID, arg.NodeID_2)
var i PinPlacement
err := row.Scan(
&i.RequestID,
&i.NodeID,
&i.Status,
&i.FailureReason,
&i.Attempts,
&i.Fence,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const stuckPlacements = `-- name: StuckPlacements :many
SELECT request_id, node_id, fence
FROM pin_placements
WHERE status = 'queued' AND created_at < (now() - make_interval(secs => $1))
`
type StuckPlacementsRow struct {
RequestID ids.PinID
NodeID ids.NodeID
Fence int64
}
func (q *Queries) StuckPlacements(ctx context.Context, secs float64) ([]StuckPlacementsRow, error) {
rows, err := q.db.Query(ctx, stuckPlacements, secs)
if err != nil {
return nil, err
}
defer rows.Close()
var items []StuckPlacementsRow
for rows.Next() {
var i StuckPlacementsRow
if err := rows.Scan(&i.RequestID, &i.NodeID, &i.Fence); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const updatePinStatus = `-- name: UpdatePinStatus :exec
UPDATE pins SET status = $2, failure_reason = $3 WHERE request_id = $1
`
type UpdatePinStatusParams struct {
RequestID ids.PinID
Status string
FailureReason *string
}
func (q *Queries) UpdatePinStatus(ctx context.Context, arg UpdatePinStatusParams) error {
_, err := q.db.Exec(ctx, updatePinStatus, arg.RequestID, arg.Status, arg.FailureReason)
return err
}
const updatePlacementStatusFenced = `-- name: UpdatePlacementStatusFenced :execrows
UPDATE pin_placements
SET status = $3, failure_reason = $4, attempts = attempts + 1
WHERE request_id = $1 AND node_id = $2 AND fence = $5
`
type UpdatePlacementStatusFencedParams struct {
RequestID ids.PinID
NodeID ids.NodeID
Status string
FailureReason *string
Fence int64
}
func (q *Queries) UpdatePlacementStatusFenced(ctx context.Context, arg UpdatePlacementStatusFencedParams) (int64, error) {
result, err := q.db.Exec(ctx, updatePlacementStatusFenced,
arg.RequestID,
arg.NodeID,
arg.Status,
arg.FailureReason,
arg.Fence,
)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}