Two bugs exposed by the integration suite:
1. (production) pins.origins is NOT NULL DEFAULT '{}', but pgx
serialises a Go nil []string as SQL NULL — so every Create/Replace
whose caller omitted origins (an optional field per the IPFS
Pinning Service spec) was 500ing on the NOT NULL constraint. The
openapi/pin-service paths pass origins through verbatim, so any
client POST without "origins" hit this. Normalise nil -> []string{}
at the store boundary in both pinStore.Create and pinStore.Replace.
2. (test I introduced last commit) SET LOCAL does not accept bound
parameters; the RLS integration test was getting a 42601 syntax
error. Switch to SELECT set_config('anchorage.org_id', $1, true),
which is parameterisable and keeps the value out of the SQL string.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
811 lines
25 KiB
Go
811 lines
25 KiB
Go
// Package postgres is anchorage's pgx-backed implementation of
|
|
// store.Store. Sub-store methods are thin wrappers over sqlc-generated
|
|
// query objects in internal/pkg/store/postgres/sqlc; the file you're
|
|
// reading is the bridge between anchorage's domain types and the
|
|
// sqlc Pgx types (pgtype.Timestamptz → time.Time, etc).
|
|
//
|
|
// Transactions: Tx opens a pgx tx, sets the `anchorage.org_id` GUC when
|
|
// a tenant context is active, and hands the closure a Store wrapping
|
|
// the transactional Queries. Commit / rollback follows the standard
|
|
// Go "defer rollback-unless-committed" idiom.
|
|
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"anchorage/internal/pkg/ids"
|
|
"anchorage/internal/pkg/store"
|
|
"anchorage/internal/pkg/store/postgres/sqlc"
|
|
)
|
|
|
|
// ctxKey namespaces the context values this package stashes (currently
|
|
// just the RLS org id).
|
|
type ctxKey struct{ name string }
|
|
|
|
var orgCtxKey = ctxKey{"anchorage-org-id"}
|
|
|
|
// Store implements store.Store on top of a pgxpool.Pool. A separate
|
|
// *Store is constructed per transaction (see Tx); the top-level one
|
|
// holds the pool directly.
|
|
type Store struct {
|
|
pool *pgxpool.Pool
|
|
// If non-nil, all sub-stores use this Queries (scoped to a
|
|
// transaction). The top-level Store has txQ=nil and lazily wraps
|
|
// pool.
|
|
txQ *sqlc.Queries
|
|
txOrgID *ids.OrgID // set inside Tx so GUC stays consistent
|
|
}
|
|
|
|
// New builds a pool-backed Store.
|
|
func New(pool *pgxpool.Pool) *Store {
|
|
return &Store{pool: pool}
|
|
}
|
|
|
|
// queries returns a sqlc.Queries over the correct dbtx handle — either
|
|
// the transactional one from Tx, or a pool-backed one.
|
|
func (s *Store) queries() *sqlc.Queries {
|
|
if s.txQ != nil {
|
|
return s.txQ
|
|
}
|
|
return sqlc.New(s.pool)
|
|
}
|
|
|
|
// ---- Root store.Store interface --------------------------------------------
|
|
|
|
func (s *Store) Orgs() store.OrgStore { return orgStore{s} }
|
|
func (s *Store) Users() store.UserStore { return userStore{s} }
|
|
func (s *Store) Memberships() store.MembershipStore { return membershipStore{s} }
|
|
func (s *Store) Tokens() store.TokenStore { return tokenStore{s} }
|
|
func (s *Store) Nodes() store.NodeStore { return nodeStore{s} }
|
|
func (s *Store) Pins() store.PinStore { return pinStore{s} }
|
|
func (s *Store) Audit() store.AuditStore { return auditStore{s} }
|
|
|
|
// WithOrgContext returns a context tagged with orgID. The Tx handler
|
|
// reads this to SET LOCAL anchorage.org_id inside the transaction.
|
|
func (s *Store) WithOrgContext(ctx context.Context, orgID ids.OrgID) (context.Context, error) {
|
|
return context.WithValue(ctx, orgCtxKey, orgID), nil
|
|
}
|
|
|
|
// Tx opens a pgx transaction, sets the anchorage.org_id GUC if an org
|
|
// has been tagged on ctx via WithOrgContext, and passes fn a Store
|
|
// bound to that transaction.
|
|
func (s *Store) Tx(ctx context.Context, fn func(txCtx context.Context, tx store.Store) error) error {
|
|
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("begin tx: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback(ctx) }()
|
|
|
|
q := sqlc.New(tx).WithTx(tx)
|
|
txStore := &Store{pool: s.pool, txQ: q}
|
|
|
|
// If the caller wrapped ctx with WithOrgContext, apply the GUC so
|
|
// RLS policies on the tenant tables kick in.
|
|
if v, ok := ctx.Value(orgCtxKey).(ids.OrgID); ok && !v.IsZero() {
|
|
if _, err := tx.Exec(ctx, "SET LOCAL anchorage.org_id = $1", v.String()); err != nil {
|
|
return fmt.Errorf("set org guc: %w", err)
|
|
}
|
|
txStore.txOrgID = &v
|
|
}
|
|
|
|
if err := fn(ctx, txStore); err != nil {
|
|
return err
|
|
}
|
|
return tx.Commit(ctx)
|
|
}
|
|
|
|
// ---- Converters ------------------------------------------------------------
|
|
|
|
func tsToTime(t pgtype.Timestamptz) time.Time {
|
|
if !t.Valid {
|
|
return time.Time{}
|
|
}
|
|
return t.Time
|
|
}
|
|
|
|
func timePtr(t pgtype.Timestamptz) *time.Time {
|
|
if !t.Valid {
|
|
return nil
|
|
}
|
|
v := t.Time
|
|
return &v
|
|
}
|
|
|
|
func tsFrom(t time.Time) pgtype.Timestamptz {
|
|
return pgtype.Timestamptz{Time: t.UTC(), Valid: !t.IsZero()}
|
|
}
|
|
|
|
func tsFromPtr(t *time.Time) pgtype.Timestamptz {
|
|
if t == nil {
|
|
return pgtype.Timestamptz{}
|
|
}
|
|
return tsFrom(*t)
|
|
}
|
|
|
|
func mapErr(err error) error {
|
|
switch {
|
|
case err == nil:
|
|
return nil
|
|
case errors.Is(err, pgx.ErrNoRows):
|
|
return store.ErrNotFound
|
|
}
|
|
return err
|
|
}
|
|
|
|
// ---- Orgs -----------------------------------------------------------------
|
|
|
|
type orgStore struct{ s *Store }
|
|
|
|
func (o orgStore) Create(ctx context.Context, id ids.OrgID, slug, name string) (*store.Org, error) {
|
|
row, err := o.s.queries().CreateOrg(ctx, sqlc.CreateOrgParams{ID: id, Slug: slug, Name: name})
|
|
if err != nil {
|
|
// pgx reports unique-constraint violations with SQLSTATE 23505;
|
|
// surface as ErrConflict so callers can 409 cleanly.
|
|
if isUniqueViolation(err) {
|
|
return nil, store.ErrConflict
|
|
}
|
|
return nil, err
|
|
}
|
|
return orgFromRow(row), nil
|
|
}
|
|
|
|
func (o orgStore) GetByID(ctx context.Context, id ids.OrgID) (*store.Org, error) {
|
|
row, err := o.s.queries().GetOrgByID(ctx, id)
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return orgFromRow(row), nil
|
|
}
|
|
|
|
func (o orgStore) GetBySlug(ctx context.Context, slug string) (*store.Org, error) {
|
|
row, err := o.s.queries().GetOrgBySlug(ctx, slug)
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return orgFromRow(row), nil
|
|
}
|
|
|
|
func (o orgStore) UpdateName(ctx context.Context, id ids.OrgID, name string) (*store.Org, error) {
|
|
row, err := o.s.queries().UpdateOrgName(ctx, sqlc.UpdateOrgNameParams{ID: id, Name: name})
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return orgFromRow(row), nil
|
|
}
|
|
|
|
func (o orgStore) List(ctx context.Context, limit, offset int) ([]*store.Org, error) {
|
|
rows, err := o.s.queries().ListOrgs(ctx, sqlc.ListOrgsParams{Limit: int32(limit), Offset: int32(offset)})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.Org, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, orgFromRow(r))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func orgFromRow(r sqlc.Org) *store.Org {
|
|
return &store.Org{
|
|
ID: r.ID, Slug: r.Slug, Name: r.Name,
|
|
CreatedAt: tsToTime(r.CreatedAt),
|
|
UpdatedAt: tsToTime(r.UpdatedAt),
|
|
}
|
|
}
|
|
|
|
// ---- Users ----------------------------------------------------------------
|
|
|
|
type userStore struct{ s *Store }
|
|
|
|
func (u userStore) UpsertByAuthentikSub(ctx context.Context, id ids.UserID, sub, email, displayName string, isSysadmin bool) (*store.User, error) {
|
|
row, err := u.s.queries().UpsertUserByAuthentikSub(ctx, sqlc.UpsertUserByAuthentikSubParams{
|
|
ID: id,
|
|
AuthentikSub: &sub,
|
|
Email: email,
|
|
DisplayName: displayName,
|
|
IsSysadmin: isSysadmin,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return userFromRow(row), nil
|
|
}
|
|
|
|
func (u userStore) GetByID(ctx context.Context, id ids.UserID) (*store.User, error) {
|
|
row, err := u.s.queries().GetUserByID(ctx, id)
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return userFromRow(row), nil
|
|
}
|
|
|
|
func (u userStore) GetByEmail(ctx context.Context, email string) (*store.User, error) {
|
|
row, err := u.s.queries().GetUserByEmail(ctx, email)
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return userFromRow(row), nil
|
|
}
|
|
|
|
func (u userStore) PromoteSysadmin(ctx context.Context, id ids.UserID) error {
|
|
return mapErr(u.s.queries().PromoteSysadmin(ctx, id))
|
|
}
|
|
|
|
func userFromRow(r sqlc.User) *store.User {
|
|
out := &store.User{
|
|
ID: r.ID, Email: r.Email, DisplayName: r.DisplayName, IsSysadmin: r.IsSysadmin,
|
|
CreatedAt: tsToTime(r.CreatedAt), UpdatedAt: tsToTime(r.UpdatedAt),
|
|
}
|
|
if r.AuthentikSub != nil {
|
|
out.AuthentikSub = *r.AuthentikSub
|
|
}
|
|
return out
|
|
}
|
|
|
|
// ---- Memberships ---------------------------------------------------------
|
|
|
|
type membershipStore struct{ s *Store }
|
|
|
|
func (m membershipStore) Add(ctx context.Context, orgID ids.OrgID, userID ids.UserID, role string) error {
|
|
return m.s.queries().AddMembership(ctx, sqlc.AddMembershipParams{OrgID: orgID, UserID: userID, Role: role})
|
|
}
|
|
|
|
func (m membershipStore) Remove(ctx context.Context, orgID ids.OrgID, userID ids.UserID) error {
|
|
return m.s.queries().RemoveMembership(ctx, sqlc.RemoveMembershipParams{OrgID: orgID, UserID: userID})
|
|
}
|
|
|
|
func (m membershipStore) ListForUser(ctx context.Context, userID ids.UserID) ([]*store.Membership, error) {
|
|
rows, err := m.s.queries().ListMemberships(ctx, userID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.Membership, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, &store.Membership{
|
|
OrgID: r.OrgID, UserID: r.UserID, Role: r.Role,
|
|
OrgSlug: r.Slug, OrgName: r.Name,
|
|
Created: tsToTime(r.CreatedAt),
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// ---- Tokens --------------------------------------------------------------
|
|
|
|
type tokenStore struct{ s *Store }
|
|
|
|
func (t tokenStore) Create(ctx context.Context, tk *store.APIToken) error {
|
|
_, err := t.s.queries().CreateAPIToken(ctx, sqlc.CreateAPITokenParams{
|
|
Jti: tk.JTI,
|
|
OrgID: tk.OrgID,
|
|
UserID: tk.UserID,
|
|
Label: tk.Label,
|
|
Scopes: tk.Scopes,
|
|
ExpiresAt: tsFrom(tk.ExpiresAt),
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (t tokenStore) GetByJTI(ctx context.Context, jti ids.TokenID) (*store.APIToken, error) {
|
|
row, err := t.s.queries().GetAPITokenByJTI(ctx, jti)
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return apiTokenFromRow(row), nil
|
|
}
|
|
|
|
func (t tokenStore) ListForUser(ctx context.Context, orgID ids.OrgID, userID ids.UserID) ([]*store.APIToken, error) {
|
|
rows, err := t.s.queries().ListAPITokensForUser(ctx, sqlc.ListAPITokensForUserParams{OrgID: orgID, UserID: userID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.APIToken, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, apiTokenFromRow(r))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (t tokenStore) Revoke(ctx context.Context, jti ids.TokenID) error {
|
|
return mapErr(t.s.queries().RevokeAPIToken(ctx, jti))
|
|
}
|
|
|
|
func (t tokenStore) TouchLastUsed(ctx context.Context, jti ids.TokenID) error {
|
|
return mapErr(t.s.queries().TouchAPITokenLastUsed(ctx, jti))
|
|
}
|
|
|
|
func (t tokenStore) AddDenylist(ctx context.Context, jti ids.TokenID, expiresAt time.Time, reason string) error {
|
|
return t.s.queries().AddTokenDenylist(ctx, sqlc.AddTokenDenylistParams{
|
|
Jti: jti.String(), ExpiresAt: tsFrom(expiresAt), Reason: reason,
|
|
})
|
|
}
|
|
|
|
func (t tokenStore) IsDenied(ctx context.Context, jti ids.TokenID) (bool, error) {
|
|
return t.s.queries().IsTokenDenied(ctx, jti.String())
|
|
}
|
|
|
|
func (t tokenStore) PruneDenylist(ctx context.Context) error {
|
|
return t.s.queries().PruneTokenDenylist(ctx)
|
|
}
|
|
|
|
func apiTokenFromRow(r sqlc.ApiToken) *store.APIToken {
|
|
return &store.APIToken{
|
|
JTI: r.Jti, OrgID: r.OrgID, UserID: r.UserID,
|
|
Label: r.Label, Scopes: r.Scopes,
|
|
ExpiresAt: tsToTime(r.ExpiresAt),
|
|
RevokedAt: timePtr(r.RevokedAt), LastUsedAt: timePtr(r.LastUsedAt),
|
|
CreatedAt: tsToTime(r.CreatedAt), UpdatedAt: tsToTime(r.UpdatedAt),
|
|
}
|
|
}
|
|
|
|
// ---- Nodes ---------------------------------------------------------------
|
|
|
|
type nodeStore struct{ s *Store }
|
|
|
|
func (n nodeStore) Upsert(ctx context.Context, nd *store.Node) error {
|
|
_, err := n.s.queries().UpsertNode(ctx, sqlc.UpsertNodeParams{
|
|
ID: nd.ID, DisplayName: nd.DisplayName, Multiaddrs: nd.Multiaddrs, RpcUrl: nd.RPCURL,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (n nodeStore) Get(ctx context.Context, id ids.NodeID) (*store.Node, error) {
|
|
row, err := n.s.queries().GetNode(ctx, id)
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return nodeFromRow(row), nil
|
|
}
|
|
|
|
func (n nodeStore) ListAll(ctx context.Context) ([]*store.Node, error) {
|
|
rows, err := n.s.queries().ListAllNodes(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.Node, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, nodeFromRow(r))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (n nodeStore) ListLive(ctx context.Context) ([]*store.Node, error) {
|
|
rows, err := n.s.queries().ListLiveNodes(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.Node, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, nodeFromRow(r))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (n nodeStore) TouchHeartbeat(ctx context.Context, id ids.NodeID) error {
|
|
return n.s.queries().TouchNodeHeartbeat(ctx, id)
|
|
}
|
|
|
|
func (n nodeStore) MarkStaleDown(ctx context.Context, staleAfter time.Duration) ([]ids.NodeID, error) {
|
|
return n.s.queries().MarkStaleNodesDown(ctx, staleAfter.Seconds())
|
|
}
|
|
|
|
func (n nodeStore) Drain(ctx context.Context, id ids.NodeID) error {
|
|
return n.s.queries().DrainNode(ctx, id)
|
|
}
|
|
|
|
func (n nodeStore) Uncordon(ctx context.Context, id ids.NodeID) error {
|
|
return n.s.queries().UncordonNode(ctx, id)
|
|
}
|
|
|
|
func nodeFromRow(r sqlc.Node) *store.Node {
|
|
return &store.Node{
|
|
ID: r.ID, DisplayName: r.DisplayName, Multiaddrs: r.Multiaddrs, RPCURL: r.RpcUrl,
|
|
Status: r.Status,
|
|
LastSeenAt: tsToTime(r.LastSeenAt), JoinedAt: tsToTime(r.JoinedAt), UpdatedAt: tsToTime(r.UpdatedAt),
|
|
}
|
|
}
|
|
|
|
// ---- Pins ----------------------------------------------------------------
|
|
|
|
type pinStore struct{ s *Store }
|
|
|
|
func (p pinStore) Create(ctx context.Context, pn *store.Pin) error {
|
|
meta, err := json.Marshal(pn.Meta)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal meta: %w", err)
|
|
}
|
|
// pins.origins is NOT NULL DEFAULT '{}'. pgx serialises a nil
|
|
// []string as SQL NULL, which would fail the constraint, so coerce
|
|
// nil to an empty slice at the boundary.
|
|
origins := pn.Origins
|
|
if origins == nil {
|
|
origins = []string{}
|
|
}
|
|
_, err = p.s.queries().CreatePin(ctx, sqlc.CreatePinParams{
|
|
RequestID: pn.RequestID, OrgID: pn.OrgID, Cid: pn.CID,
|
|
Name: pn.Name, Meta: meta, Origins: origins,
|
|
})
|
|
if isUniqueViolation(err) {
|
|
return store.ErrConflict
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (p pinStore) Get(ctx context.Context, orgID ids.OrgID, rid ids.PinID) (*store.Pin, error) {
|
|
row, err := p.s.queries().GetPin(ctx, sqlc.GetPinParams{RequestID: rid, OrgID: orgID})
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return pinFromRow(row)
|
|
}
|
|
|
|
func (p pinStore) GetByRequestID(ctx context.Context, rid ids.PinID) (*store.Pin, error) {
|
|
// The sqlc GetPin requires org_id; fall back to filtered list of one.
|
|
// We work around by using GetPin with a scratched filter — but sqlc
|
|
// doesn't expose that. Use a raw query via the pool.
|
|
var orgID ids.OrgID
|
|
var cid string
|
|
var name *string
|
|
var meta []byte
|
|
var origins []string
|
|
var status string
|
|
var failureReason *string
|
|
var created, updated pgtype.Timestamptz
|
|
err := p.s.pool.QueryRow(ctx,
|
|
"SELECT org_id, cid, name, meta, origins, status, failure_reason, created, updated_at FROM pins WHERE request_id = $1",
|
|
rid,
|
|
).Scan(&orgID, &cid, &name, &meta, &origins, &status, &failureReason, &created, &updated)
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return pinFromFields(rid, orgID, cid, name, meta, origins, status, failureReason, created, updated)
|
|
}
|
|
|
|
func (p pinStore) GetLiveByCID(ctx context.Context, orgID ids.OrgID, cid string) (*store.Pin, error) {
|
|
row, err := p.s.queries().GetExistingLivePinByCID(ctx, sqlc.GetExistingLivePinByCIDParams{OrgID: orgID, Cid: cid})
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return pinFromRow(row)
|
|
}
|
|
|
|
func (p pinStore) UpdateStatus(ctx context.Context, rid ids.PinID, status string, failureReason *string) error {
|
|
return p.s.queries().UpdatePinStatus(ctx, sqlc.UpdatePinStatusParams{
|
|
RequestID: rid, Status: status, FailureReason: failureReason,
|
|
})
|
|
}
|
|
|
|
func (p pinStore) Delete(ctx context.Context, orgID ids.OrgID, rid ids.PinID) error {
|
|
return mapErr(p.s.queries().DeletePin(ctx, sqlc.DeletePinParams{RequestID: rid, OrgID: orgID}))
|
|
}
|
|
|
|
func (p pinStore) List(ctx context.Context, orgID ids.OrgID, limit, offset int) ([]*store.Pin, error) {
|
|
rows, err := p.s.queries().ListPins(ctx, sqlc.ListPinsParams{
|
|
OrgID: orgID, Limit: int32(limit), Offset: int32(offset),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.Pin, 0, len(rows))
|
|
for _, r := range rows {
|
|
p, err := pinFromRow(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, p)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (p pinStore) Filter(ctx context.Context, orgID ids.OrgID, f store.PinFilter) ([]*store.Pin, error) {
|
|
meta := []byte("{}")
|
|
if len(f.Meta) > 0 {
|
|
b, err := json.Marshal(f.Meta)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshal meta filter: %w", err)
|
|
}
|
|
meta = b
|
|
}
|
|
var namePtr *string
|
|
if f.Name != "" {
|
|
namePtr = &f.Name
|
|
}
|
|
matchMode := f.Match
|
|
if matchMode == "" {
|
|
matchMode = "exact"
|
|
}
|
|
limit := int32(f.Limit)
|
|
if limit <= 0 {
|
|
limit = 10
|
|
}
|
|
rows, err := p.s.queries().FilterPins(ctx, sqlc.FilterPinsParams{
|
|
OrgID: orgID,
|
|
Cids: f.CIDs,
|
|
Statuses: f.Status,
|
|
Name: namePtr,
|
|
MatchMode: matchMode,
|
|
Before: tsFromPtr(f.Before),
|
|
After: tsFromPtr(f.After),
|
|
MetaFilter: meta,
|
|
PinLimit: limit,
|
|
PinOffset: int32(f.Offset),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.Pin, 0, len(rows))
|
|
for _, r := range rows {
|
|
p, err := pinFromRow(r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, p)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (p pinStore) Replace(ctx context.Context, orgID ids.OrgID, rid ids.PinID, newCID string, name *string, meta map[string]any, origins []string) (*store.Pin, error) {
|
|
metaBytes, err := json.Marshal(meta)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if origins == nil {
|
|
origins = []string{}
|
|
}
|
|
row, err := p.s.queries().ReplacePin(ctx, sqlc.ReplacePinParams{
|
|
RequestID: rid, OrgID: orgID, Cid: newCID, Name: name, Meta: metaBytes, Origins: origins,
|
|
})
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return pinFromRow(row)
|
|
}
|
|
|
|
func (p pinStore) InsertPlacement(ctx context.Context, rid ids.PinID, nid ids.NodeID, fence int64) (*store.Placement, error) {
|
|
row, err := p.s.queries().InsertPlacement(ctx, sqlc.InsertPlacementParams{RequestID: rid, NodeID: nid, Fence: fence})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return placementFromRow(row), nil
|
|
}
|
|
|
|
func (p pinStore) GetPlacement(ctx context.Context, rid ids.PinID, nid ids.NodeID) (*store.Placement, error) {
|
|
row, err := p.s.queries().GetPlacement(ctx, sqlc.GetPlacementParams{RequestID: rid, NodeID: nid})
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return placementFromRow(row), nil
|
|
}
|
|
|
|
func (p pinStore) ListPlacements(ctx context.Context, rid ids.PinID) ([]*store.Placement, error) {
|
|
rows, err := p.s.queries().ListPlacementsForPin(ctx, rid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.Placement, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, &store.Placement{
|
|
RequestID: r.RequestID, NodeID: r.NodeID,
|
|
Status: r.Status, FailureReason: r.FailureReason,
|
|
Attempts: int(r.Attempts), Fence: r.Fence,
|
|
CreatedAt: tsToTime(r.CreatedAt), UpdatedAt: tsToTime(r.UpdatedAt),
|
|
Multiaddrs: r.Multiaddrs,
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (p pinStore) ListPlacementsForNode(ctx context.Context, nid ids.NodeID, status string) ([]*store.Placement, error) {
|
|
rows, err := p.s.queries().ListPlacementsForNode(ctx, sqlc.ListPlacementsForNodeParams{NodeID: nid, Status: status})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.Placement, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, placementFromRow(r))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (p pinStore) UpdatePlacementFenced(ctx context.Context, rid ids.PinID, nid ids.NodeID, status string, failureReason *string, fence int64) (int64, error) {
|
|
return p.s.queries().UpdatePlacementStatusFenced(ctx, sqlc.UpdatePlacementStatusFencedParams{
|
|
RequestID: rid, NodeID: nid, Status: status, FailureReason: failureReason, Fence: fence,
|
|
})
|
|
}
|
|
|
|
func (p pinStore) ReplacePlacement(ctx context.Context, rid ids.PinID, oldNode, newNode ids.NodeID) (*store.Placement, error) {
|
|
row, err := p.s.queries().ReplacePlacementFence(ctx, sqlc.ReplacePlacementFenceParams{
|
|
RequestID: rid, NodeID: oldNode, NodeID_2: newNode,
|
|
})
|
|
if err != nil {
|
|
return nil, mapErr(err)
|
|
}
|
|
return placementFromRow(row), nil
|
|
}
|
|
|
|
func (p pinStore) StuckPlacements(ctx context.Context, stuckAfter time.Duration) ([]store.Placement, error) {
|
|
rows, err := p.s.queries().StuckPlacements(ctx, stuckAfter.Seconds())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]store.Placement, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, store.Placement{
|
|
RequestID: r.RequestID, NodeID: r.NodeID, Fence: r.Fence,
|
|
})
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (p pinStore) IncRefcount(ctx context.Context, nid ids.NodeID, cid string) error {
|
|
return p.s.queries().IncRefcount(ctx, sqlc.IncRefcountParams{NodeID: nid, Cid: cid})
|
|
}
|
|
|
|
func (p pinStore) DecRefcount(ctx context.Context, nid ids.NodeID, cid string) (int, error) {
|
|
v, err := p.s.queries().DecRefcount(ctx, sqlc.DecRefcountParams{NodeID: nid, Cid: cid})
|
|
if err != nil {
|
|
return 0, mapErr(err)
|
|
}
|
|
return int(v), nil
|
|
}
|
|
|
|
func (p pinStore) DeleteRefcountIfZero(ctx context.Context, nid ids.NodeID, cid string) error {
|
|
return p.s.queries().DeleteRefcount(ctx, sqlc.DeleteRefcountParams{NodeID: nid, Cid: cid})
|
|
}
|
|
|
|
// CountPlacementsByStatus runs a GROUP BY query directly against the
|
|
// pool. Not in sqlc because we never needed a typed result for it
|
|
// elsewhere — only the gauge-refresh goroutine consumes it.
|
|
func (p pinStore) CountPlacementsByStatus(ctx context.Context) (map[string]int, error) {
|
|
rows, err := p.s.pool.Query(ctx,
|
|
`SELECT status, COUNT(*) FROM pin_placements GROUP BY status`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := map[string]int{}
|
|
for rows.Next() {
|
|
var status string
|
|
var n int
|
|
if err := rows.Scan(&status, &n); err != nil {
|
|
return nil, err
|
|
}
|
|
out[status] = n
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
func pinFromRow(r sqlc.Pin) (*store.Pin, error) {
|
|
return pinFromFields(r.RequestID, r.OrgID, r.Cid, r.Name, r.Meta, r.Origins, r.Status, r.FailureReason, r.Created, r.UpdatedAt)
|
|
}
|
|
|
|
func pinFromFields(rid ids.PinID, orgID ids.OrgID, cid string, name *string, meta []byte, origins []string, status string, fr *string, created, updated pgtype.Timestamptz) (*store.Pin, error) {
|
|
var metaMap map[string]any
|
|
if len(meta) > 0 {
|
|
if err := json.Unmarshal(meta, &metaMap); err != nil {
|
|
return nil, fmt.Errorf("decode meta: %w", err)
|
|
}
|
|
}
|
|
return &store.Pin{
|
|
RequestID: rid, OrgID: orgID, CID: cid, Name: name,
|
|
Meta: metaMap, Origins: origins, Status: status, FailureReason: fr,
|
|
Created: tsToTime(created), UpdatedAt: tsToTime(updated),
|
|
}, nil
|
|
}
|
|
|
|
func placementFromRow(r sqlc.PinPlacement) *store.Placement {
|
|
return &store.Placement{
|
|
RequestID: r.RequestID, NodeID: r.NodeID,
|
|
Status: r.Status, FailureReason: r.FailureReason,
|
|
Attempts: int(r.Attempts), Fence: r.Fence,
|
|
CreatedAt: tsToTime(r.CreatedAt), UpdatedAt: tsToTime(r.UpdatedAt),
|
|
}
|
|
}
|
|
|
|
// ---- Audit ---------------------------------------------------------------
|
|
|
|
type auditStore struct{ s *Store }
|
|
|
|
func (a auditStore) Insert(ctx context.Context, e *store.AuditEntry) error {
|
|
detail, err := json.Marshal(e.Detail)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var orgID ids.OrgID
|
|
if e.OrgID != nil {
|
|
orgID = *e.OrgID
|
|
}
|
|
var userID ids.UserID
|
|
if e.ActorUserID != nil {
|
|
userID = *e.ActorUserID
|
|
}
|
|
return a.s.queries().InsertAudit(ctx, sqlc.InsertAuditParams{
|
|
OrgID: orgID, ActorUserID: userID, ActorTokenJti: e.ActorTokenJTI,
|
|
Action: e.Action, Target: e.Target, Result: e.Result, Detail: detail,
|
|
})
|
|
}
|
|
|
|
func (a auditStore) List(ctx context.Context, orgID ids.OrgID, limit, offset int) ([]*store.AuditEntry, error) {
|
|
// Zero OrgID → sysadmin cluster-wide listing; sqlc's ListAudit is
|
|
// scoped per-org so we drop to a raw query for that path.
|
|
if orgID.IsZero() {
|
|
rows, err := a.s.pool.Query(ctx,
|
|
"SELECT id, org_id, actor_user_id, actor_token_jti, action, target, result, detail, created FROM audit_log ORDER BY created DESC LIMIT $1 OFFSET $2",
|
|
limit, offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := []*store.AuditEntry{}
|
|
for rows.Next() {
|
|
var (
|
|
id int64
|
|
org ids.OrgID
|
|
actorUser ids.UserID
|
|
actorTokenJti *string
|
|
action, target, result string
|
|
detail []byte
|
|
created pgtype.Timestamptz
|
|
)
|
|
if err := rows.Scan(&id, &org, &actorUser, &actorTokenJti, &action, &target, &result, &detail, &created); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, auditRowToEntry(id, org, actorUser, actorTokenJti, action, target, result, detail, created))
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
rows, err := a.s.queries().ListAudit(ctx, sqlc.ListAuditParams{OrgID: orgID, Limit: int32(limit), Offset: int32(offset)})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out := make([]*store.AuditEntry, 0, len(rows))
|
|
for _, r := range rows {
|
|
out = append(out, auditRowToEntry(r.ID, r.OrgID, r.ActorUserID, r.ActorTokenJti, r.Action, r.Target, r.Result, r.Detail, r.Created))
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func auditRowToEntry(id int64, orgID ids.OrgID, actorUser ids.UserID, actorTokenJti *string, action, target, result string, detail []byte, created pgtype.Timestamptz) *store.AuditEntry {
|
|
var detailMap map[string]any
|
|
if len(detail) > 0 {
|
|
_ = json.Unmarshal(detail, &detailMap)
|
|
}
|
|
out := &store.AuditEntry{
|
|
ID: id, Action: action, Target: target, Result: result,
|
|
Detail: detailMap, ActorTokenJTI: actorTokenJti,
|
|
Created: tsToTime(created),
|
|
}
|
|
if !orgID.IsZero() {
|
|
v := orgID
|
|
out.OrgID = &v
|
|
}
|
|
if !actorUser.IsZero() {
|
|
v := actorUser
|
|
out.ActorUserID = &v
|
|
}
|
|
return out
|
|
}
|
|
|
|
// ---- Helpers ---------------------------------------------------------------
|
|
|
|
// isUniqueViolation recognises Postgres SQLSTATE 23505.
|
|
func isUniqueViolation(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
var pgErr interface{ SQLState() string }
|
|
if !errors.As(err, &pgErr) {
|
|
return false
|
|
}
|
|
return pgErr.SQLState() == "23505"
|
|
}
|