Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.
Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:
* admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
* TTL leader election via NATS KV, fence tokens, JetStream dedup
* rebalancer (plan/apply split), reconciler, requeue sweeper
* ristretto caches with NATS-backed cross-node invalidation
(placements live-nodes + token denylist)
* maintenance watchdog for stuck cluster-pause flag
* Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
* rate limiting: session (10/min) + anonymous global (120/min)
* integration tests: rebalance, refcount multi-org, RLS belt
* goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea
Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
601 lines
14 KiB
Go
601 lines
14 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.30.0
|
|
// source: pins.sql
|
|
|
|
package sqlc
|
|
|
|
import (
|
|
"context"
|
|
|
|
"anchorage/internal/pkg/ids"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const createPin = `-- name: CreatePin :one
|
|
INSERT INTO pins (request_id, org_id, cid, name, meta, origins, status)
|
|
VALUES ($1, $2, $3, $4, $5, $6, 'queued')
|
|
RETURNING request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at
|
|
`
|
|
|
|
type CreatePinParams struct {
|
|
RequestID ids.PinID
|
|
OrgID ids.OrgID
|
|
Cid string
|
|
Name *string
|
|
Meta []byte
|
|
Origins []string
|
|
}
|
|
|
|
func (q *Queries) CreatePin(ctx context.Context, arg CreatePinParams) (Pin, error) {
|
|
row := q.db.QueryRow(ctx, createPin,
|
|
arg.RequestID,
|
|
arg.OrgID,
|
|
arg.Cid,
|
|
arg.Name,
|
|
arg.Meta,
|
|
arg.Origins,
|
|
)
|
|
var i Pin
|
|
err := row.Scan(
|
|
&i.RequestID,
|
|
&i.OrgID,
|
|
&i.Cid,
|
|
&i.Name,
|
|
&i.Meta,
|
|
&i.Origins,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Created,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const decRefcount = `-- name: DecRefcount :one
|
|
UPDATE pin_refcount
|
|
SET count = count - 1
|
|
WHERE node_id = $1 AND cid = $2
|
|
RETURNING count
|
|
`
|
|
|
|
type DecRefcountParams struct {
|
|
NodeID ids.NodeID
|
|
Cid string
|
|
}
|
|
|
|
func (q *Queries) DecRefcount(ctx context.Context, arg DecRefcountParams) (int32, error) {
|
|
row := q.db.QueryRow(ctx, decRefcount, arg.NodeID, arg.Cid)
|
|
var count int32
|
|
err := row.Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
const deletePin = `-- name: DeletePin :exec
|
|
DELETE FROM pins WHERE request_id = $1 AND org_id = $2
|
|
`
|
|
|
|
type DeletePinParams struct {
|
|
RequestID ids.PinID
|
|
OrgID ids.OrgID
|
|
}
|
|
|
|
func (q *Queries) DeletePin(ctx context.Context, arg DeletePinParams) error {
|
|
_, err := q.db.Exec(ctx, deletePin, arg.RequestID, arg.OrgID)
|
|
return err
|
|
}
|
|
|
|
const deleteRefcount = `-- name: DeleteRefcount :exec
|
|
DELETE FROM pin_refcount WHERE node_id = $1 AND cid = $2 AND count <= 0
|
|
`
|
|
|
|
type DeleteRefcountParams struct {
|
|
NodeID ids.NodeID
|
|
Cid string
|
|
}
|
|
|
|
func (q *Queries) DeleteRefcount(ctx context.Context, arg DeleteRefcountParams) error {
|
|
_, err := q.db.Exec(ctx, deleteRefcount, arg.NodeID, arg.Cid)
|
|
return err
|
|
}
|
|
|
|
const filterPins = `-- name: FilterPins :many
|
|
SELECT request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at
|
|
FROM pins
|
|
WHERE org_id = $1
|
|
AND (COALESCE(array_length($2::text[], 1), 0) = 0
|
|
OR cid = ANY($2::text[]))
|
|
AND (COALESCE(array_length($3::text[], 1), 0) = 0
|
|
OR status = ANY($3::text[]))
|
|
AND ($4::text IS NULL
|
|
OR ($5::text = 'exact' AND name = $4::text)
|
|
OR ($5::text = 'iexact' AND lower(name) = lower($4::text))
|
|
OR ($5::text = 'partial' AND name LIKE '%' || $4::text || '%')
|
|
OR ($5::text = 'ipartial' AND name ILIKE '%' || $4::text || '%'))
|
|
AND ($6::timestamptz IS NULL OR created < $6::timestamptz)
|
|
AND ($7::timestamptz IS NULL OR created >= $7::timestamptz)
|
|
AND ($8::jsonb = '{}'::jsonb
|
|
OR meta @> $8::jsonb)
|
|
ORDER BY created DESC
|
|
LIMIT $10
|
|
OFFSET $9
|
|
`
|
|
|
|
type FilterPinsParams struct {
|
|
OrgID ids.OrgID
|
|
Cids []string
|
|
Statuses []string
|
|
Name *string
|
|
MatchMode string
|
|
Before pgtype.Timestamptz
|
|
After pgtype.Timestamptz
|
|
MetaFilter []byte
|
|
PinOffset int32
|
|
PinLimit int32
|
|
}
|
|
|
|
// FilterPins honors the full IPFS Pinning API spec filter surface.
|
|
//
|
|
// All filters are optional; empty arrays / NULL values short-circuit to
|
|
// "no restriction" via the IS NULL / array-length guards. Name matching
|
|
// supports four modes through a single `match` discriminator:
|
|
//
|
|
// 'exact' — LIKE (case-sensitive, no wildcards)
|
|
// 'iexact' — ILIKE anchored both ends
|
|
// 'partial' — LIKE %name%
|
|
// 'ipartial' — ILIKE %name% (uses pg_trgm GIN)
|
|
//
|
|
// `meta` is treated as a jsonb "contains" check — the pin's meta must
|
|
// be a superset of the requested map.
|
|
func (q *Queries) FilterPins(ctx context.Context, arg FilterPinsParams) ([]Pin, error) {
|
|
rows, err := q.db.Query(ctx, filterPins,
|
|
arg.OrgID,
|
|
arg.Cids,
|
|
arg.Statuses,
|
|
arg.Name,
|
|
arg.MatchMode,
|
|
arg.Before,
|
|
arg.After,
|
|
arg.MetaFilter,
|
|
arg.PinOffset,
|
|
arg.PinLimit,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []Pin
|
|
for rows.Next() {
|
|
var i Pin
|
|
if err := rows.Scan(
|
|
&i.RequestID,
|
|
&i.OrgID,
|
|
&i.Cid,
|
|
&i.Name,
|
|
&i.Meta,
|
|
&i.Origins,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Created,
|
|
&i.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getExistingLivePinByCID = `-- name: GetExistingLivePinByCID :one
|
|
SELECT request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at FROM pins
|
|
WHERE org_id = $1 AND cid = $2 AND status <> 'failed'
|
|
LIMIT 1
|
|
`
|
|
|
|
type GetExistingLivePinByCIDParams struct {
|
|
OrgID ids.OrgID
|
|
Cid string
|
|
}
|
|
|
|
func (q *Queries) GetExistingLivePinByCID(ctx context.Context, arg GetExistingLivePinByCIDParams) (Pin, error) {
|
|
row := q.db.QueryRow(ctx, getExistingLivePinByCID, arg.OrgID, arg.Cid)
|
|
var i Pin
|
|
err := row.Scan(
|
|
&i.RequestID,
|
|
&i.OrgID,
|
|
&i.Cid,
|
|
&i.Name,
|
|
&i.Meta,
|
|
&i.Origins,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Created,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getPin = `-- name: GetPin :one
|
|
SELECT request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at FROM pins WHERE request_id = $1 AND org_id = $2
|
|
`
|
|
|
|
type GetPinParams struct {
|
|
RequestID ids.PinID
|
|
OrgID ids.OrgID
|
|
}
|
|
|
|
func (q *Queries) GetPin(ctx context.Context, arg GetPinParams) (Pin, error) {
|
|
row := q.db.QueryRow(ctx, getPin, arg.RequestID, arg.OrgID)
|
|
var i Pin
|
|
err := row.Scan(
|
|
&i.RequestID,
|
|
&i.OrgID,
|
|
&i.Cid,
|
|
&i.Name,
|
|
&i.Meta,
|
|
&i.Origins,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Created,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getPlacement = `-- name: GetPlacement :one
|
|
SELECT request_id, node_id, status, failure_reason, attempts, fence, created_at, updated_at FROM pin_placements WHERE request_id = $1 AND node_id = $2
|
|
`
|
|
|
|
type GetPlacementParams struct {
|
|
RequestID ids.PinID
|
|
NodeID ids.NodeID
|
|
}
|
|
|
|
func (q *Queries) GetPlacement(ctx context.Context, arg GetPlacementParams) (PinPlacement, error) {
|
|
row := q.db.QueryRow(ctx, getPlacement, arg.RequestID, arg.NodeID)
|
|
var i PinPlacement
|
|
err := row.Scan(
|
|
&i.RequestID,
|
|
&i.NodeID,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Attempts,
|
|
&i.Fence,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const incRefcount = `-- name: IncRefcount :exec
|
|
INSERT INTO pin_refcount (node_id, cid, count)
|
|
VALUES ($1, $2, 1)
|
|
ON CONFLICT (node_id, cid) DO UPDATE SET count = pin_refcount.count + 1
|
|
`
|
|
|
|
type IncRefcountParams struct {
|
|
NodeID ids.NodeID
|
|
Cid string
|
|
}
|
|
|
|
func (q *Queries) IncRefcount(ctx context.Context, arg IncRefcountParams) error {
|
|
_, err := q.db.Exec(ctx, incRefcount, arg.NodeID, arg.Cid)
|
|
return err
|
|
}
|
|
|
|
const insertPlacement = `-- name: InsertPlacement :one
|
|
INSERT INTO pin_placements (request_id, node_id, status, fence)
|
|
VALUES ($1, $2, 'queued', $3)
|
|
RETURNING request_id, node_id, status, failure_reason, attempts, fence, created_at, updated_at
|
|
`
|
|
|
|
type InsertPlacementParams struct {
|
|
RequestID ids.PinID
|
|
NodeID ids.NodeID
|
|
Fence int64
|
|
}
|
|
|
|
func (q *Queries) InsertPlacement(ctx context.Context, arg InsertPlacementParams) (PinPlacement, error) {
|
|
row := q.db.QueryRow(ctx, insertPlacement, arg.RequestID, arg.NodeID, arg.Fence)
|
|
var i PinPlacement
|
|
err := row.Scan(
|
|
&i.RequestID,
|
|
&i.NodeID,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Attempts,
|
|
&i.Fence,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const listPins = `-- name: ListPins :many
|
|
SELECT request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at FROM pins
|
|
WHERE org_id = $1
|
|
ORDER BY created DESC
|
|
LIMIT $2 OFFSET $3
|
|
`
|
|
|
|
type ListPinsParams struct {
|
|
OrgID ids.OrgID
|
|
Limit int32
|
|
Offset int32
|
|
}
|
|
|
|
func (q *Queries) ListPins(ctx context.Context, arg ListPinsParams) ([]Pin, error) {
|
|
rows, err := q.db.Query(ctx, listPins, arg.OrgID, arg.Limit, arg.Offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []Pin
|
|
for rows.Next() {
|
|
var i Pin
|
|
if err := rows.Scan(
|
|
&i.RequestID,
|
|
&i.OrgID,
|
|
&i.Cid,
|
|
&i.Name,
|
|
&i.Meta,
|
|
&i.Origins,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Created,
|
|
&i.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listPlacementsForNode = `-- name: ListPlacementsForNode :many
|
|
SELECT request_id, node_id, status, failure_reason, attempts, fence, created_at, updated_at FROM pin_placements
|
|
WHERE node_id = $1 AND status = $2
|
|
ORDER BY created_at
|
|
`
|
|
|
|
type ListPlacementsForNodeParams struct {
|
|
NodeID ids.NodeID
|
|
Status string
|
|
}
|
|
|
|
func (q *Queries) ListPlacementsForNode(ctx context.Context, arg ListPlacementsForNodeParams) ([]PinPlacement, error) {
|
|
rows, err := q.db.Query(ctx, listPlacementsForNode, arg.NodeID, arg.Status)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []PinPlacement
|
|
for rows.Next() {
|
|
var i PinPlacement
|
|
if err := rows.Scan(
|
|
&i.RequestID,
|
|
&i.NodeID,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Attempts,
|
|
&i.Fence,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listPlacementsForPin = `-- name: ListPlacementsForPin :many
|
|
SELECT pp.request_id, pp.node_id, pp.status, pp.failure_reason, pp.attempts, pp.fence, pp.created_at, pp.updated_at, n.multiaddrs
|
|
FROM pin_placements pp JOIN nodes n ON n.id = pp.node_id
|
|
WHERE pp.request_id = $1
|
|
`
|
|
|
|
type ListPlacementsForPinRow struct {
|
|
RequestID ids.PinID
|
|
NodeID ids.NodeID
|
|
Status string
|
|
FailureReason *string
|
|
Attempts int32
|
|
Fence int64
|
|
CreatedAt pgtype.Timestamptz
|
|
UpdatedAt pgtype.Timestamptz
|
|
Multiaddrs []string
|
|
}
|
|
|
|
func (q *Queries) ListPlacementsForPin(ctx context.Context, requestID ids.PinID) ([]ListPlacementsForPinRow, error) {
|
|
rows, err := q.db.Query(ctx, listPlacementsForPin, requestID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListPlacementsForPinRow
|
|
for rows.Next() {
|
|
var i ListPlacementsForPinRow
|
|
if err := rows.Scan(
|
|
&i.RequestID,
|
|
&i.NodeID,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Attempts,
|
|
&i.Fence,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Multiaddrs,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const replacePin = `-- name: ReplacePin :one
|
|
UPDATE pins
|
|
SET cid = $3,
|
|
name = $4,
|
|
meta = $5,
|
|
origins = $6,
|
|
status = 'queued',
|
|
failure_reason = NULL
|
|
WHERE request_id = $1 AND org_id = $2
|
|
RETURNING request_id, org_id, cid, name, meta, origins, status, failure_reason, created, updated_at
|
|
`
|
|
|
|
type ReplacePinParams struct {
|
|
RequestID ids.PinID
|
|
OrgID ids.OrgID
|
|
Cid string
|
|
Name *string
|
|
Meta []byte
|
|
Origins []string
|
|
}
|
|
|
|
// ReplacePin swaps a pin's CID atomically. Used by POST /v1/pins/{rid}
|
|
// (pin replace per spec). Placements + refcount are left to the caller
|
|
// to rearrange in the same transaction.
|
|
func (q *Queries) ReplacePin(ctx context.Context, arg ReplacePinParams) (Pin, error) {
|
|
row := q.db.QueryRow(ctx, replacePin,
|
|
arg.RequestID,
|
|
arg.OrgID,
|
|
arg.Cid,
|
|
arg.Name,
|
|
arg.Meta,
|
|
arg.Origins,
|
|
)
|
|
var i Pin
|
|
err := row.Scan(
|
|
&i.RequestID,
|
|
&i.OrgID,
|
|
&i.Cid,
|
|
&i.Name,
|
|
&i.Meta,
|
|
&i.Origins,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Created,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const replacePlacementFence = `-- name: ReplacePlacementFence :one
|
|
UPDATE pin_placements
|
|
SET node_id = $3, fence = fence + 1, status = 'queued', failure_reason = NULL
|
|
WHERE request_id = $1 AND node_id = $2
|
|
RETURNING request_id, node_id, status, failure_reason, attempts, fence, created_at, updated_at
|
|
`
|
|
|
|
type ReplacePlacementFenceParams struct {
|
|
RequestID ids.PinID
|
|
NodeID ids.NodeID
|
|
NodeID_2 ids.NodeID
|
|
}
|
|
|
|
func (q *Queries) ReplacePlacementFence(ctx context.Context, arg ReplacePlacementFenceParams) (PinPlacement, error) {
|
|
row := q.db.QueryRow(ctx, replacePlacementFence, arg.RequestID, arg.NodeID, arg.NodeID_2)
|
|
var i PinPlacement
|
|
err := row.Scan(
|
|
&i.RequestID,
|
|
&i.NodeID,
|
|
&i.Status,
|
|
&i.FailureReason,
|
|
&i.Attempts,
|
|
&i.Fence,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const stuckPlacements = `-- name: StuckPlacements :many
|
|
SELECT request_id, node_id, fence
|
|
FROM pin_placements
|
|
WHERE status = 'queued' AND created_at < (now() - make_interval(secs => $1))
|
|
`
|
|
|
|
type StuckPlacementsRow struct {
|
|
RequestID ids.PinID
|
|
NodeID ids.NodeID
|
|
Fence int64
|
|
}
|
|
|
|
func (q *Queries) StuckPlacements(ctx context.Context, secs float64) ([]StuckPlacementsRow, error) {
|
|
rows, err := q.db.Query(ctx, stuckPlacements, secs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []StuckPlacementsRow
|
|
for rows.Next() {
|
|
var i StuckPlacementsRow
|
|
if err := rows.Scan(&i.RequestID, &i.NodeID, &i.Fence); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const updatePinStatus = `-- name: UpdatePinStatus :exec
|
|
UPDATE pins SET status = $2, failure_reason = $3 WHERE request_id = $1
|
|
`
|
|
|
|
type UpdatePinStatusParams struct {
|
|
RequestID ids.PinID
|
|
Status string
|
|
FailureReason *string
|
|
}
|
|
|
|
func (q *Queries) UpdatePinStatus(ctx context.Context, arg UpdatePinStatusParams) error {
|
|
_, err := q.db.Exec(ctx, updatePinStatus, arg.RequestID, arg.Status, arg.FailureReason)
|
|
return err
|
|
}
|
|
|
|
const updatePlacementStatusFenced = `-- name: UpdatePlacementStatusFenced :execrows
|
|
UPDATE pin_placements
|
|
SET status = $3, failure_reason = $4, attempts = attempts + 1
|
|
WHERE request_id = $1 AND node_id = $2 AND fence = $5
|
|
`
|
|
|
|
type UpdatePlacementStatusFencedParams struct {
|
|
RequestID ids.PinID
|
|
NodeID ids.NodeID
|
|
Status string
|
|
FailureReason *string
|
|
Fence int64
|
|
}
|
|
|
|
func (q *Queries) UpdatePlacementStatusFenced(ctx context.Context, arg UpdatePlacementStatusFencedParams) (int64, error) {
|
|
result, err := q.db.Exec(ctx, updatePlacementStatusFenced,
|
|
arg.RequestID,
|
|
arg.NodeID,
|
|
arg.Status,
|
|
arg.FailureReason,
|
|
arg.Fence,
|
|
)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.RowsAffected(), nil
|
|
}
|