Files
William Gill fa0df451d5
Some checks failed
Test / Build & Unit Tests (push) Successful in 4m58s
Test / Lint (push) Successful in 27s
Test / Integration Tests (push) Failing after 2m19s
Security / Vulnerability Check (push) Failing after 1m22s
fix: coerce nil Origins to empty slice at pg store boundary
Two bugs exposed by the integration suite:

1. (production) pins.origins is NOT NULL DEFAULT '{}', but pgx
   serialises a Go nil []string as SQL NULL — so every Create/Replace
   whose caller omitted origins (an optional field per the IPFS
   Pinning Service spec) was 500ing on the NOT NULL constraint. The
   openapi/pin-service paths pass origins through verbatim, so any
   client POST without "origins" hit this. Normalise nil -> []string{}
   at the store boundary in both pinStore.Create and pinStore.Replace.

2. (test I introduced last commit) SET LOCAL does not accept bound
   parameters; the RLS integration test was getting a 42601 syntax
   error. Switch to SELECT set_config('anchorage.org_id', $1, true),
   which is parameterisable and keeps the value out of the SQL string.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 09:45:37 -05:00

811 lines
25 KiB
Go

// Package postgres is anchorage's pgx-backed implementation of
// store.Store. Sub-store methods are thin wrappers over sqlc-generated
// query objects in internal/pkg/store/postgres/sqlc; the file you're
// reading is the bridge between anchorage's domain types and the
// sqlc Pgx types (pgtype.Timestamptz → time.Time, etc).
//
// Transactions: Tx opens a pgx tx, sets the `anchorage.org_id` GUC when
// a tenant context is active, and hands the closure a Store wrapping
// the transactional Queries. Commit / rollback follows the standard
// Go "defer rollback-unless-committed" idiom.
package postgres
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"anchorage/internal/pkg/ids"
"anchorage/internal/pkg/store"
"anchorage/internal/pkg/store/postgres/sqlc"
)
// ctxKey namespaces the context values this package stashes (currently
// just the RLS org id).
type ctxKey struct{ name string }
var orgCtxKey = ctxKey{"anchorage-org-id"}
// Store implements store.Store on top of a pgxpool.Pool. A separate
// *Store is constructed per transaction (see Tx); the top-level one
// holds the pool directly.
type Store struct {
pool *pgxpool.Pool
// If non-nil, all sub-stores use this Queries (scoped to a
// transaction). The top-level Store has txQ=nil and lazily wraps
// pool.
txQ *sqlc.Queries
txOrgID *ids.OrgID // set inside Tx so GUC stays consistent
}
// New builds a pool-backed Store.
func New(pool *pgxpool.Pool) *Store {
return &Store{pool: pool}
}
// queries returns a sqlc.Queries over the correct dbtx handle — either
// the transactional one from Tx, or a pool-backed one.
func (s *Store) queries() *sqlc.Queries {
if s.txQ != nil {
return s.txQ
}
return sqlc.New(s.pool)
}
// ---- Root store.Store interface --------------------------------------------
func (s *Store) Orgs() store.OrgStore { return orgStore{s} }
func (s *Store) Users() store.UserStore { return userStore{s} }
func (s *Store) Memberships() store.MembershipStore { return membershipStore{s} }
func (s *Store) Tokens() store.TokenStore { return tokenStore{s} }
func (s *Store) Nodes() store.NodeStore { return nodeStore{s} }
func (s *Store) Pins() store.PinStore { return pinStore{s} }
func (s *Store) Audit() store.AuditStore { return auditStore{s} }
// WithOrgContext returns a context tagged with orgID. The Tx handler
// reads this to SET LOCAL anchorage.org_id inside the transaction.
func (s *Store) WithOrgContext(ctx context.Context, orgID ids.OrgID) (context.Context, error) {
return context.WithValue(ctx, orgCtxKey, orgID), nil
}
// Tx opens a pgx transaction, sets the anchorage.org_id GUC if an org
// has been tagged on ctx via WithOrgContext, and passes fn a Store
// bound to that transaction.
func (s *Store) Tx(ctx context.Context, fn func(txCtx context.Context, tx store.Store) error) error {
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer func() { _ = tx.Rollback(ctx) }()
q := sqlc.New(tx).WithTx(tx)
txStore := &Store{pool: s.pool, txQ: q}
// If the caller wrapped ctx with WithOrgContext, apply the GUC so
// RLS policies on the tenant tables kick in.
if v, ok := ctx.Value(orgCtxKey).(ids.OrgID); ok && !v.IsZero() {
if _, err := tx.Exec(ctx, "SET LOCAL anchorage.org_id = $1", v.String()); err != nil {
return fmt.Errorf("set org guc: %w", err)
}
txStore.txOrgID = &v
}
if err := fn(ctx, txStore); err != nil {
return err
}
return tx.Commit(ctx)
}
// ---- Converters ------------------------------------------------------------
func tsToTime(t pgtype.Timestamptz) time.Time {
if !t.Valid {
return time.Time{}
}
return t.Time
}
func timePtr(t pgtype.Timestamptz) *time.Time {
if !t.Valid {
return nil
}
v := t.Time
return &v
}
func tsFrom(t time.Time) pgtype.Timestamptz {
return pgtype.Timestamptz{Time: t.UTC(), Valid: !t.IsZero()}
}
func tsFromPtr(t *time.Time) pgtype.Timestamptz {
if t == nil {
return pgtype.Timestamptz{}
}
return tsFrom(*t)
}
func mapErr(err error) error {
switch {
case err == nil:
return nil
case errors.Is(err, pgx.ErrNoRows):
return store.ErrNotFound
}
return err
}
// ---- Orgs -----------------------------------------------------------------
type orgStore struct{ s *Store }
func (o orgStore) Create(ctx context.Context, id ids.OrgID, slug, name string) (*store.Org, error) {
row, err := o.s.queries().CreateOrg(ctx, sqlc.CreateOrgParams{ID: id, Slug: slug, Name: name})
if err != nil {
// pgx reports unique-constraint violations with SQLSTATE 23505;
// surface as ErrConflict so callers can 409 cleanly.
if isUniqueViolation(err) {
return nil, store.ErrConflict
}
return nil, err
}
return orgFromRow(row), nil
}
func (o orgStore) GetByID(ctx context.Context, id ids.OrgID) (*store.Org, error) {
row, err := o.s.queries().GetOrgByID(ctx, id)
if err != nil {
return nil, mapErr(err)
}
return orgFromRow(row), nil
}
func (o orgStore) GetBySlug(ctx context.Context, slug string) (*store.Org, error) {
row, err := o.s.queries().GetOrgBySlug(ctx, slug)
if err != nil {
return nil, mapErr(err)
}
return orgFromRow(row), nil
}
func (o orgStore) UpdateName(ctx context.Context, id ids.OrgID, name string) (*store.Org, error) {
row, err := o.s.queries().UpdateOrgName(ctx, sqlc.UpdateOrgNameParams{ID: id, Name: name})
if err != nil {
return nil, mapErr(err)
}
return orgFromRow(row), nil
}
func (o orgStore) List(ctx context.Context, limit, offset int) ([]*store.Org, error) {
rows, err := o.s.queries().ListOrgs(ctx, sqlc.ListOrgsParams{Limit: int32(limit), Offset: int32(offset)})
if err != nil {
return nil, err
}
out := make([]*store.Org, 0, len(rows))
for _, r := range rows {
out = append(out, orgFromRow(r))
}
return out, nil
}
func orgFromRow(r sqlc.Org) *store.Org {
return &store.Org{
ID: r.ID, Slug: r.Slug, Name: r.Name,
CreatedAt: tsToTime(r.CreatedAt),
UpdatedAt: tsToTime(r.UpdatedAt),
}
}
// ---- Users ----------------------------------------------------------------
type userStore struct{ s *Store }
func (u userStore) UpsertByAuthentikSub(ctx context.Context, id ids.UserID, sub, email, displayName string, isSysadmin bool) (*store.User, error) {
row, err := u.s.queries().UpsertUserByAuthentikSub(ctx, sqlc.UpsertUserByAuthentikSubParams{
ID: id,
AuthentikSub: &sub,
Email: email,
DisplayName: displayName,
IsSysadmin: isSysadmin,
})
if err != nil {
return nil, err
}
return userFromRow(row), nil
}
func (u userStore) GetByID(ctx context.Context, id ids.UserID) (*store.User, error) {
row, err := u.s.queries().GetUserByID(ctx, id)
if err != nil {
return nil, mapErr(err)
}
return userFromRow(row), nil
}
func (u userStore) GetByEmail(ctx context.Context, email string) (*store.User, error) {
row, err := u.s.queries().GetUserByEmail(ctx, email)
if err != nil {
return nil, mapErr(err)
}
return userFromRow(row), nil
}
func (u userStore) PromoteSysadmin(ctx context.Context, id ids.UserID) error {
return mapErr(u.s.queries().PromoteSysadmin(ctx, id))
}
func userFromRow(r sqlc.User) *store.User {
out := &store.User{
ID: r.ID, Email: r.Email, DisplayName: r.DisplayName, IsSysadmin: r.IsSysadmin,
CreatedAt: tsToTime(r.CreatedAt), UpdatedAt: tsToTime(r.UpdatedAt),
}
if r.AuthentikSub != nil {
out.AuthentikSub = *r.AuthentikSub
}
return out
}
// ---- Memberships ---------------------------------------------------------
type membershipStore struct{ s *Store }
func (m membershipStore) Add(ctx context.Context, orgID ids.OrgID, userID ids.UserID, role string) error {
return m.s.queries().AddMembership(ctx, sqlc.AddMembershipParams{OrgID: orgID, UserID: userID, Role: role})
}
func (m membershipStore) Remove(ctx context.Context, orgID ids.OrgID, userID ids.UserID) error {
return m.s.queries().RemoveMembership(ctx, sqlc.RemoveMembershipParams{OrgID: orgID, UserID: userID})
}
func (m membershipStore) ListForUser(ctx context.Context, userID ids.UserID) ([]*store.Membership, error) {
rows, err := m.s.queries().ListMemberships(ctx, userID)
if err != nil {
return nil, err
}
out := make([]*store.Membership, 0, len(rows))
for _, r := range rows {
out = append(out, &store.Membership{
OrgID: r.OrgID, UserID: r.UserID, Role: r.Role,
OrgSlug: r.Slug, OrgName: r.Name,
Created: tsToTime(r.CreatedAt),
})
}
return out, nil
}
// ---- Tokens --------------------------------------------------------------
type tokenStore struct{ s *Store }
func (t tokenStore) Create(ctx context.Context, tk *store.APIToken) error {
_, err := t.s.queries().CreateAPIToken(ctx, sqlc.CreateAPITokenParams{
Jti: tk.JTI,
OrgID: tk.OrgID,
UserID: tk.UserID,
Label: tk.Label,
Scopes: tk.Scopes,
ExpiresAt: tsFrom(tk.ExpiresAt),
})
return err
}
func (t tokenStore) GetByJTI(ctx context.Context, jti ids.TokenID) (*store.APIToken, error) {
row, err := t.s.queries().GetAPITokenByJTI(ctx, jti)
if err != nil {
return nil, mapErr(err)
}
return apiTokenFromRow(row), nil
}
func (t tokenStore) ListForUser(ctx context.Context, orgID ids.OrgID, userID ids.UserID) ([]*store.APIToken, error) {
rows, err := t.s.queries().ListAPITokensForUser(ctx, sqlc.ListAPITokensForUserParams{OrgID: orgID, UserID: userID})
if err != nil {
return nil, err
}
out := make([]*store.APIToken, 0, len(rows))
for _, r := range rows {
out = append(out, apiTokenFromRow(r))
}
return out, nil
}
func (t tokenStore) Revoke(ctx context.Context, jti ids.TokenID) error {
return mapErr(t.s.queries().RevokeAPIToken(ctx, jti))
}
func (t tokenStore) TouchLastUsed(ctx context.Context, jti ids.TokenID) error {
return mapErr(t.s.queries().TouchAPITokenLastUsed(ctx, jti))
}
func (t tokenStore) AddDenylist(ctx context.Context, jti ids.TokenID, expiresAt time.Time, reason string) error {
return t.s.queries().AddTokenDenylist(ctx, sqlc.AddTokenDenylistParams{
Jti: jti.String(), ExpiresAt: tsFrom(expiresAt), Reason: reason,
})
}
func (t tokenStore) IsDenied(ctx context.Context, jti ids.TokenID) (bool, error) {
return t.s.queries().IsTokenDenied(ctx, jti.String())
}
func (t tokenStore) PruneDenylist(ctx context.Context) error {
return t.s.queries().PruneTokenDenylist(ctx)
}
func apiTokenFromRow(r sqlc.ApiToken) *store.APIToken {
return &store.APIToken{
JTI: r.Jti, OrgID: r.OrgID, UserID: r.UserID,
Label: r.Label, Scopes: r.Scopes,
ExpiresAt: tsToTime(r.ExpiresAt),
RevokedAt: timePtr(r.RevokedAt), LastUsedAt: timePtr(r.LastUsedAt),
CreatedAt: tsToTime(r.CreatedAt), UpdatedAt: tsToTime(r.UpdatedAt),
}
}
// ---- Nodes ---------------------------------------------------------------
type nodeStore struct{ s *Store }
func (n nodeStore) Upsert(ctx context.Context, nd *store.Node) error {
_, err := n.s.queries().UpsertNode(ctx, sqlc.UpsertNodeParams{
ID: nd.ID, DisplayName: nd.DisplayName, Multiaddrs: nd.Multiaddrs, RpcUrl: nd.RPCURL,
})
return err
}
func (n nodeStore) Get(ctx context.Context, id ids.NodeID) (*store.Node, error) {
row, err := n.s.queries().GetNode(ctx, id)
if err != nil {
return nil, mapErr(err)
}
return nodeFromRow(row), nil
}
func (n nodeStore) ListAll(ctx context.Context) ([]*store.Node, error) {
rows, err := n.s.queries().ListAllNodes(ctx)
if err != nil {
return nil, err
}
out := make([]*store.Node, 0, len(rows))
for _, r := range rows {
out = append(out, nodeFromRow(r))
}
return out, nil
}
func (n nodeStore) ListLive(ctx context.Context) ([]*store.Node, error) {
rows, err := n.s.queries().ListLiveNodes(ctx)
if err != nil {
return nil, err
}
out := make([]*store.Node, 0, len(rows))
for _, r := range rows {
out = append(out, nodeFromRow(r))
}
return out, nil
}
func (n nodeStore) TouchHeartbeat(ctx context.Context, id ids.NodeID) error {
return n.s.queries().TouchNodeHeartbeat(ctx, id)
}
func (n nodeStore) MarkStaleDown(ctx context.Context, staleAfter time.Duration) ([]ids.NodeID, error) {
return n.s.queries().MarkStaleNodesDown(ctx, staleAfter.Seconds())
}
func (n nodeStore) Drain(ctx context.Context, id ids.NodeID) error {
return n.s.queries().DrainNode(ctx, id)
}
func (n nodeStore) Uncordon(ctx context.Context, id ids.NodeID) error {
return n.s.queries().UncordonNode(ctx, id)
}
func nodeFromRow(r sqlc.Node) *store.Node {
return &store.Node{
ID: r.ID, DisplayName: r.DisplayName, Multiaddrs: r.Multiaddrs, RPCURL: r.RpcUrl,
Status: r.Status,
LastSeenAt: tsToTime(r.LastSeenAt), JoinedAt: tsToTime(r.JoinedAt), UpdatedAt: tsToTime(r.UpdatedAt),
}
}
// ---- Pins ----------------------------------------------------------------
type pinStore struct{ s *Store }
func (p pinStore) Create(ctx context.Context, pn *store.Pin) error {
meta, err := json.Marshal(pn.Meta)
if err != nil {
return fmt.Errorf("marshal meta: %w", err)
}
// pins.origins is NOT NULL DEFAULT '{}'. pgx serialises a nil
// []string as SQL NULL, which would fail the constraint, so coerce
// nil to an empty slice at the boundary.
origins := pn.Origins
if origins == nil {
origins = []string{}
}
_, err = p.s.queries().CreatePin(ctx, sqlc.CreatePinParams{
RequestID: pn.RequestID, OrgID: pn.OrgID, Cid: pn.CID,
Name: pn.Name, Meta: meta, Origins: origins,
})
if isUniqueViolation(err) {
return store.ErrConflict
}
return err
}
func (p pinStore) Get(ctx context.Context, orgID ids.OrgID, rid ids.PinID) (*store.Pin, error) {
row, err := p.s.queries().GetPin(ctx, sqlc.GetPinParams{RequestID: rid, OrgID: orgID})
if err != nil {
return nil, mapErr(err)
}
return pinFromRow(row)
}
func (p pinStore) GetByRequestID(ctx context.Context, rid ids.PinID) (*store.Pin, error) {
// The sqlc GetPin requires org_id; fall back to filtered list of one.
// We work around by using GetPin with a scratched filter — but sqlc
// doesn't expose that. Use a raw query via the pool.
var orgID ids.OrgID
var cid string
var name *string
var meta []byte
var origins []string
var status string
var failureReason *string
var created, updated pgtype.Timestamptz
err := p.s.pool.QueryRow(ctx,
"SELECT org_id, cid, name, meta, origins, status, failure_reason, created, updated_at FROM pins WHERE request_id = $1",
rid,
).Scan(&orgID, &cid, &name, &meta, &origins, &status, &failureReason, &created, &updated)
if err != nil {
return nil, mapErr(err)
}
return pinFromFields(rid, orgID, cid, name, meta, origins, status, failureReason, created, updated)
}
func (p pinStore) GetLiveByCID(ctx context.Context, orgID ids.OrgID, cid string) (*store.Pin, error) {
row, err := p.s.queries().GetExistingLivePinByCID(ctx, sqlc.GetExistingLivePinByCIDParams{OrgID: orgID, Cid: cid})
if err != nil {
return nil, mapErr(err)
}
return pinFromRow(row)
}
func (p pinStore) UpdateStatus(ctx context.Context, rid ids.PinID, status string, failureReason *string) error {
return p.s.queries().UpdatePinStatus(ctx, sqlc.UpdatePinStatusParams{
RequestID: rid, Status: status, FailureReason: failureReason,
})
}
func (p pinStore) Delete(ctx context.Context, orgID ids.OrgID, rid ids.PinID) error {
return mapErr(p.s.queries().DeletePin(ctx, sqlc.DeletePinParams{RequestID: rid, OrgID: orgID}))
}
func (p pinStore) List(ctx context.Context, orgID ids.OrgID, limit, offset int) ([]*store.Pin, error) {
rows, err := p.s.queries().ListPins(ctx, sqlc.ListPinsParams{
OrgID: orgID, Limit: int32(limit), Offset: int32(offset),
})
if err != nil {
return nil, err
}
out := make([]*store.Pin, 0, len(rows))
for _, r := range rows {
p, err := pinFromRow(r)
if err != nil {
return nil, err
}
out = append(out, p)
}
return out, nil
}
func (p pinStore) Filter(ctx context.Context, orgID ids.OrgID, f store.PinFilter) ([]*store.Pin, error) {
meta := []byte("{}")
if len(f.Meta) > 0 {
b, err := json.Marshal(f.Meta)
if err != nil {
return nil, fmt.Errorf("marshal meta filter: %w", err)
}
meta = b
}
var namePtr *string
if f.Name != "" {
namePtr = &f.Name
}
matchMode := f.Match
if matchMode == "" {
matchMode = "exact"
}
limit := int32(f.Limit)
if limit <= 0 {
limit = 10
}
rows, err := p.s.queries().FilterPins(ctx, sqlc.FilterPinsParams{
OrgID: orgID,
Cids: f.CIDs,
Statuses: f.Status,
Name: namePtr,
MatchMode: matchMode,
Before: tsFromPtr(f.Before),
After: tsFromPtr(f.After),
MetaFilter: meta,
PinLimit: limit,
PinOffset: int32(f.Offset),
})
if err != nil {
return nil, err
}
out := make([]*store.Pin, 0, len(rows))
for _, r := range rows {
p, err := pinFromRow(r)
if err != nil {
return nil, err
}
out = append(out, p)
}
return out, nil
}
func (p pinStore) Replace(ctx context.Context, orgID ids.OrgID, rid ids.PinID, newCID string, name *string, meta map[string]any, origins []string) (*store.Pin, error) {
metaBytes, err := json.Marshal(meta)
if err != nil {
return nil, err
}
if origins == nil {
origins = []string{}
}
row, err := p.s.queries().ReplacePin(ctx, sqlc.ReplacePinParams{
RequestID: rid, OrgID: orgID, Cid: newCID, Name: name, Meta: metaBytes, Origins: origins,
})
if err != nil {
return nil, mapErr(err)
}
return pinFromRow(row)
}
func (p pinStore) InsertPlacement(ctx context.Context, rid ids.PinID, nid ids.NodeID, fence int64) (*store.Placement, error) {
row, err := p.s.queries().InsertPlacement(ctx, sqlc.InsertPlacementParams{RequestID: rid, NodeID: nid, Fence: fence})
if err != nil {
return nil, err
}
return placementFromRow(row), nil
}
func (p pinStore) GetPlacement(ctx context.Context, rid ids.PinID, nid ids.NodeID) (*store.Placement, error) {
row, err := p.s.queries().GetPlacement(ctx, sqlc.GetPlacementParams{RequestID: rid, NodeID: nid})
if err != nil {
return nil, mapErr(err)
}
return placementFromRow(row), nil
}
func (p pinStore) ListPlacements(ctx context.Context, rid ids.PinID) ([]*store.Placement, error) {
rows, err := p.s.queries().ListPlacementsForPin(ctx, rid)
if err != nil {
return nil, err
}
out := make([]*store.Placement, 0, len(rows))
for _, r := range rows {
out = append(out, &store.Placement{
RequestID: r.RequestID, NodeID: r.NodeID,
Status: r.Status, FailureReason: r.FailureReason,
Attempts: int(r.Attempts), Fence: r.Fence,
CreatedAt: tsToTime(r.CreatedAt), UpdatedAt: tsToTime(r.UpdatedAt),
Multiaddrs: r.Multiaddrs,
})
}
return out, nil
}
func (p pinStore) ListPlacementsForNode(ctx context.Context, nid ids.NodeID, status string) ([]*store.Placement, error) {
rows, err := p.s.queries().ListPlacementsForNode(ctx, sqlc.ListPlacementsForNodeParams{NodeID: nid, Status: status})
if err != nil {
return nil, err
}
out := make([]*store.Placement, 0, len(rows))
for _, r := range rows {
out = append(out, placementFromRow(r))
}
return out, nil
}
func (p pinStore) UpdatePlacementFenced(ctx context.Context, rid ids.PinID, nid ids.NodeID, status string, failureReason *string, fence int64) (int64, error) {
return p.s.queries().UpdatePlacementStatusFenced(ctx, sqlc.UpdatePlacementStatusFencedParams{
RequestID: rid, NodeID: nid, Status: status, FailureReason: failureReason, Fence: fence,
})
}
func (p pinStore) ReplacePlacement(ctx context.Context, rid ids.PinID, oldNode, newNode ids.NodeID) (*store.Placement, error) {
row, err := p.s.queries().ReplacePlacementFence(ctx, sqlc.ReplacePlacementFenceParams{
RequestID: rid, NodeID: oldNode, NodeID_2: newNode,
})
if err != nil {
return nil, mapErr(err)
}
return placementFromRow(row), nil
}
func (p pinStore) StuckPlacements(ctx context.Context, stuckAfter time.Duration) ([]store.Placement, error) {
rows, err := p.s.queries().StuckPlacements(ctx, stuckAfter.Seconds())
if err != nil {
return nil, err
}
out := make([]store.Placement, 0, len(rows))
for _, r := range rows {
out = append(out, store.Placement{
RequestID: r.RequestID, NodeID: r.NodeID, Fence: r.Fence,
})
}
return out, nil
}
func (p pinStore) IncRefcount(ctx context.Context, nid ids.NodeID, cid string) error {
return p.s.queries().IncRefcount(ctx, sqlc.IncRefcountParams{NodeID: nid, Cid: cid})
}
func (p pinStore) DecRefcount(ctx context.Context, nid ids.NodeID, cid string) (int, error) {
v, err := p.s.queries().DecRefcount(ctx, sqlc.DecRefcountParams{NodeID: nid, Cid: cid})
if err != nil {
return 0, mapErr(err)
}
return int(v), nil
}
func (p pinStore) DeleteRefcountIfZero(ctx context.Context, nid ids.NodeID, cid string) error {
return p.s.queries().DeleteRefcount(ctx, sqlc.DeleteRefcountParams{NodeID: nid, Cid: cid})
}
// CountPlacementsByStatus runs a GROUP BY query directly against the
// pool. Not in sqlc because we never needed a typed result for it
// elsewhere — only the gauge-refresh goroutine consumes it.
func (p pinStore) CountPlacementsByStatus(ctx context.Context) (map[string]int, error) {
rows, err := p.s.pool.Query(ctx,
`SELECT status, COUNT(*) FROM pin_placements GROUP BY status`)
if err != nil {
return nil, err
}
defer rows.Close()
out := map[string]int{}
for rows.Next() {
var status string
var n int
if err := rows.Scan(&status, &n); err != nil {
return nil, err
}
out[status] = n
}
return out, rows.Err()
}
func pinFromRow(r sqlc.Pin) (*store.Pin, error) {
return pinFromFields(r.RequestID, r.OrgID, r.Cid, r.Name, r.Meta, r.Origins, r.Status, r.FailureReason, r.Created, r.UpdatedAt)
}
func pinFromFields(rid ids.PinID, orgID ids.OrgID, cid string, name *string, meta []byte, origins []string, status string, fr *string, created, updated pgtype.Timestamptz) (*store.Pin, error) {
var metaMap map[string]any
if len(meta) > 0 {
if err := json.Unmarshal(meta, &metaMap); err != nil {
return nil, fmt.Errorf("decode meta: %w", err)
}
}
return &store.Pin{
RequestID: rid, OrgID: orgID, CID: cid, Name: name,
Meta: metaMap, Origins: origins, Status: status, FailureReason: fr,
Created: tsToTime(created), UpdatedAt: tsToTime(updated),
}, nil
}
func placementFromRow(r sqlc.PinPlacement) *store.Placement {
return &store.Placement{
RequestID: r.RequestID, NodeID: r.NodeID,
Status: r.Status, FailureReason: r.FailureReason,
Attempts: int(r.Attempts), Fence: r.Fence,
CreatedAt: tsToTime(r.CreatedAt), UpdatedAt: tsToTime(r.UpdatedAt),
}
}
// ---- Audit ---------------------------------------------------------------
type auditStore struct{ s *Store }
func (a auditStore) Insert(ctx context.Context, e *store.AuditEntry) error {
detail, err := json.Marshal(e.Detail)
if err != nil {
return err
}
var orgID ids.OrgID
if e.OrgID != nil {
orgID = *e.OrgID
}
var userID ids.UserID
if e.ActorUserID != nil {
userID = *e.ActorUserID
}
return a.s.queries().InsertAudit(ctx, sqlc.InsertAuditParams{
OrgID: orgID, ActorUserID: userID, ActorTokenJti: e.ActorTokenJTI,
Action: e.Action, Target: e.Target, Result: e.Result, Detail: detail,
})
}
func (a auditStore) List(ctx context.Context, orgID ids.OrgID, limit, offset int) ([]*store.AuditEntry, error) {
// Zero OrgID → sysadmin cluster-wide listing; sqlc's ListAudit is
// scoped per-org so we drop to a raw query for that path.
if orgID.IsZero() {
rows, err := a.s.pool.Query(ctx,
"SELECT id, org_id, actor_user_id, actor_token_jti, action, target, result, detail, created FROM audit_log ORDER BY created DESC LIMIT $1 OFFSET $2",
limit, offset)
if err != nil {
return nil, err
}
defer rows.Close()
out := []*store.AuditEntry{}
for rows.Next() {
var (
id int64
org ids.OrgID
actorUser ids.UserID
actorTokenJti *string
action, target, result string
detail []byte
created pgtype.Timestamptz
)
if err := rows.Scan(&id, &org, &actorUser, &actorTokenJti, &action, &target, &result, &detail, &created); err != nil {
return nil, err
}
out = append(out, auditRowToEntry(id, org, actorUser, actorTokenJti, action, target, result, detail, created))
}
return out, rows.Err()
}
rows, err := a.s.queries().ListAudit(ctx, sqlc.ListAuditParams{OrgID: orgID, Limit: int32(limit), Offset: int32(offset)})
if err != nil {
return nil, err
}
out := make([]*store.AuditEntry, 0, len(rows))
for _, r := range rows {
out = append(out, auditRowToEntry(r.ID, r.OrgID, r.ActorUserID, r.ActorTokenJti, r.Action, r.Target, r.Result, r.Detail, r.Created))
}
return out, nil
}
func auditRowToEntry(id int64, orgID ids.OrgID, actorUser ids.UserID, actorTokenJti *string, action, target, result string, detail []byte, created pgtype.Timestamptz) *store.AuditEntry {
var detailMap map[string]any
if len(detail) > 0 {
_ = json.Unmarshal(detail, &detailMap)
}
out := &store.AuditEntry{
ID: id, Action: action, Target: target, Result: result,
Detail: detailMap, ActorTokenJTI: actorTokenJti,
Created: tsToTime(created),
}
if !orgID.IsZero() {
v := orgID
out.OrgID = &v
}
if !actorUser.IsZero() {
v := actorUser
out.ActorUserID = &v
}
return out
}
// ---- Helpers ---------------------------------------------------------------
// isUniqueViolation recognises Postgres SQLSTATE 23505.
func isUniqueViolation(err error) bool {
if err == nil {
return false
}
var pgErr interface{ SQLState() string }
if !errors.As(err, &pgErr) {
return false
}
return pgErr.SQLState() == "23505"
}