Files
William Gill 90ac4b169c
Some checks failed
Security / Vulnerability Check (push) Successful in 1m47s
Test / Build & Unit Tests (push) Successful in 5m4s
Test / Lint (push) Successful in 27s
Test / Integration Tests (push) Failing after 1m58s
ci: fix first CI run — tidy, gofmt, ordering test, govulncheck pin
- ids: TestIDsAreTimeOrdered asserted strict lexicographic ordering of
  back-to-back UUIDv7s, but the sub-ms tail is random and not required
  to be monotonic. Sleep between samples so each ID lands in a distinct
  millisecond — the property that actually gives Postgres index
  locality on (org_id, id desc).
- go.mod/go.sum: run go mod tidy. keyfunc/v3, prometheus/client_golang
  and testcontainers-go/modules/postgres are imported directly and
  should not be marked // indirect; also drops stale sum entries.
- gofmt -w across 12 files flagged by the lint job.
- security.yml: pin govulncheck to v1.2.0. @latest triggers a proxy
  lookup every run, which is the step that hung for 16m on the Gitea
  runner.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 09:18:49 -05:00

501 lines
14 KiB
Go

package openapi
import (
"context"
"errors"
"time"
"github.com/danielgtaylor/huma/v2"
"anchorage/internal/pkg/auth"
"anchorage/internal/pkg/cache"
"anchorage/internal/pkg/ids"
"anchorage/internal/pkg/maintenance"
"anchorage/internal/pkg/node"
"anchorage/internal/pkg/scheduler"
"anchorage/internal/pkg/store"
)
type auditBody struct {
ID int64 `json:"id"`
Action string `json:"action"`
Target string `json:"target"`
Result string `json:"result"`
Detail map[string]any `json:"detail,omitempty"`
ActorUserID string `json:"actor_user_id,omitempty"`
OrgID string `json:"org_id,omitempty"`
Created time.Time `json:"created"`
}
func ifUserID(p *ids.UserID) string {
if p == nil {
return ""
}
return p.String()
}
func ifOrgID(p *ids.OrgID) string {
if p == nil {
return ""
}
return p.String()
}
// AdminDeps holds the collaborators the admin endpoints need.
type AdminDeps struct {
Store store.Store
Maint *maintenance.Manager
Rebalancer *scheduler.Rebalancer
// Presences is the source of real-time NATS heartbeat data
// (distinct from the DB view via Store.Nodes().ListAll()). Used by
// /v1/admin/cluster/presences to let operators compare what each
// node is broadcasting right now against the DB registry.
Presences PresenceSource
}
// PresenceSource is implemented by *node.Runner. Narrow interface so
// tests can stub; the real implementation returns a snapshot of the
// in-memory heartbeat map keyed by NodeID.
type PresenceSource interface {
LivePresences() []node.Presence
}
// NodePresence is the admin-API projection of node.Presence. Kept in
// openapi so the HTTP response shape stays stable even if the internal
// node.Presence struct evolves.
type NodePresence struct {
NodeID string `json:"node_id"`
Multiaddrs []string `json:"multiaddrs"`
Status string `json:"status"`
SentAt time.Time `json:"sent_at"`
// AgeSeconds is derived server-side so clients don't have to
// reason about clock skew — operators eyeballing the response can
// see "last heard 12s ago" directly.
AgeSeconds float64 `json:"age_seconds"`
}
// RegisterAdmin wires drain/uncordon/maintenance endpoints at
// /v1/admin/*. Access is restricted to sysadmins; orgadmins cannot
// drain nodes (that would be a cluster-level action).
func RegisterAdmin(api huma.API, deps AdminDeps) {
huma.Register(api, huma.Operation{
OperationID: "adminDrain",
Method: "POST",
Path: "/admin/nodes/{node_id}/drain",
Summary: "Drain a cluster node",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
NodeID string `path:"node_id"`
}) (*struct {
Body struct {
OK bool `json:"ok"`
}
}, error) {
if err := requireSysadmin(ctx); err != nil {
return nil, err
}
nid, err := ids.ParseNode(in.NodeID)
if err != nil {
return nil, huma.Error400BadRequest("bad node id")
}
if err := deps.Store.Nodes().Drain(ctx, nid); err != nil {
return nil, huma.Error500InternalServerError("drain", err)
}
out := &struct {
Body struct {
OK bool `json:"ok"`
}
}{}
out.Body.OK = true
return out, nil
})
huma.Register(api, huma.Operation{
OperationID: "adminUncordon",
Method: "POST",
Path: "/admin/nodes/{node_id}/uncordon",
Summary: "Return a drained node to service",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
NodeID string `path:"node_id"`
}) (*struct {
Body struct {
OK bool `json:"ok"`
}
}, error) {
if err := requireSysadmin(ctx); err != nil {
return nil, err
}
nid, err := ids.ParseNode(in.NodeID)
if err != nil {
return nil, huma.Error400BadRequest("bad node id")
}
if err := deps.Store.Nodes().Uncordon(ctx, nid); err != nil {
return nil, huma.Error500InternalServerError("uncordon", err)
}
out := &struct {
Body struct {
OK bool `json:"ok"`
}
}{}
out.Body.OK = true
return out, nil
})
type maintenanceStatusBody struct {
On bool `json:"on"`
Reason string `json:"reason,omitempty"`
SetBy string `json:"set_by,omitempty"`
SetAt time.Time `json:"set_at,omitempty"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
}
huma.Register(api, huma.Operation{
OperationID: "adminMaintenanceOn",
Method: "POST",
Path: "/admin/maintenance/on",
Summary: "Enable cluster-wide maintenance",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
Body struct {
Reason string `json:"reason,omitempty"`
TTL time.Duration `json:"ttl,omitempty"`
}
}) (*struct {
Body maintenanceStatusBody
}, error) {
cc, err := requireSysadminCC(ctx)
if err != nil {
return nil, err
}
f := maintenance.Flag{
Reason: in.Body.Reason,
SetBy: cc.User.String(),
SetAt: time.Now().UTC(),
}
if in.Body.TTL > 0 {
f.ExpiresAt = f.SetAt.Add(in.Body.TTL)
}
if err := deps.Maint.Set(ctx, f); err != nil {
return nil, huma.Error500InternalServerError("maintenance on", err)
}
out := &struct{ Body maintenanceStatusBody }{}
out.Body = maintenanceStatusBody{
On: true, Reason: f.Reason, SetBy: f.SetBy, SetAt: f.SetAt, ExpiresAt: f.ExpiresAt,
}
return out, nil
})
huma.Register(api, huma.Operation{
OperationID: "adminMaintenanceOff",
Method: "POST",
Path: "/admin/maintenance/off",
Summary: "Disable cluster-wide maintenance",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, _ *struct{}) (*struct {
Body maintenanceStatusBody
}, error) {
if err := requireSysadmin(ctx); err != nil {
return nil, err
}
if err := deps.Maint.Clear(ctx); err != nil {
return nil, huma.Error500InternalServerError("maintenance off", err)
}
out := &struct{ Body maintenanceStatusBody }{}
return out, nil
})
huma.Register(api, huma.Operation{
OperationID: "adminGrantSysadmin",
Method: "POST",
Path: "/admin/users/{email}/grant-sysadmin",
Summary: "Promote an existing user to sysadmin",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
Email string `path:"email"`
}) (*struct {
Body struct {
OK bool `json:"ok"`
}
}, error) {
cc, err := requireSysadminCC(ctx)
if err != nil {
return nil, err
}
user, err := deps.Store.Users().GetByEmail(ctx, in.Email)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, huma.Error404NotFound("user (must have logged in at least once via Authentik)")
}
return nil, huma.Error500InternalServerError("lookup", err)
}
if err := deps.Store.Users().PromoteSysadmin(ctx, user.ID); err != nil {
return nil, huma.Error500InternalServerError("promote", err)
}
_ = deps.Store.Audit().Insert(ctx, &store.AuditEntry{
ActorUserID: &cc.User,
Action: "admin.grant-sysadmin",
Target: user.ID.String(),
Result: "ok",
Detail: map[string]any{"email": in.Email},
})
out := &struct {
Body struct {
OK bool `json:"ok"`
}
}{}
out.Body.OK = true
return out, nil
})
huma.Register(api, huma.Operation{
OperationID: "adminListAudit",
Method: "GET",
Path: "/admin/audit",
Summary: "Paged audit-log listing",
Description: "sysadmin with no org_id sees every org; org members can only read their own org's audit",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
OrgID string `query:"org_id"`
Limit int `query:"limit" default:"50" minimum:"1" maximum:"1000"`
Offset int `query:"offset" default:"0" minimum:"0"`
}) (*struct {
Body struct {
Results []auditBody `json:"results"`
}
}, error) {
cc, err := auth.Require(ctx)
if err != nil {
return nil, huma.Error401Unauthorized("auth")
}
var scopeOrg ids.OrgID
if in.OrgID != "" {
scopeOrg, err = ids.ParseOrg(in.OrgID)
if err != nil {
return nil, huma.Error400BadRequest("bad org id")
}
if cc.Role != "sysadmin" && scopeOrg != cc.Org {
return nil, huma.Error403Forbidden("cannot view other org's audit")
}
} else if cc.Role != "sysadmin" {
scopeOrg = cc.Org
}
rows, err := deps.Store.Audit().List(ctx, scopeOrg, in.Limit, in.Offset)
if err != nil {
return nil, huma.Error500InternalServerError("list", err)
}
out := &struct {
Body struct {
Results []auditBody `json:"results"`
}
}{}
for _, e := range rows {
out.Body.Results = append(out.Body.Results, auditBody{
ID: e.ID,
Action: e.Action,
Target: e.Target,
Result: e.Result,
Detail: e.Detail,
ActorUserID: ifUserID(e.ActorUserID),
OrgID: ifOrgID(e.OrgID),
Created: e.Created,
})
}
return out, nil
})
huma.Register(api, huma.Operation{
OperationID: "adminMaintenanceStatus",
Method: "GET",
Path: "/admin/maintenance",
Summary: "Report current cluster-maintenance state",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, _ *struct{}) (*struct {
Body maintenanceStatusBody
}, error) {
if err := requireSysadmin(ctx); err != nil {
return nil, err
}
f, on, err := deps.Maint.IsOn(ctx)
if err != nil {
return nil, huma.Error500InternalServerError("maintenance status", err)
}
out := &struct{ Body maintenanceStatusBody }{}
if on && f != nil {
out.Body = maintenanceStatusBody{
On: true, Reason: f.Reason, SetBy: f.SetBy,
SetAt: f.SetAt, ExpiresAt: f.ExpiresAt,
}
}
return out, nil
})
// ---- prune-denylist ----------------------------------------------------
huma.Register(api, huma.Operation{
OperationID: "adminPruneTokenDenylist",
Method: "POST",
Path: "/admin/tokens/prune-denylist",
Summary: "Delete denylist rows whose expires_at has passed",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, _ *struct{}) (*struct {
Body struct {
OK bool `json:"ok"`
}
}, error) {
cc, err := requireSysadminCC(ctx)
if err != nil {
return nil, err
}
if err := deps.Store.Tokens().PruneDenylist(ctx); err != nil {
return nil, huma.Error500InternalServerError("prune", err)
}
_ = deps.Store.Audit().Insert(ctx, &store.AuditEntry{
ActorUserID: &cc.User,
Action: "admin.prune-denylist",
Target: "token_denylist",
Result: "ok",
})
out := &struct {
Body struct {
OK bool `json:"ok"`
}
}{}
out.Body.OK = true
return out, nil
})
// ---- rebalance ---------------------------------------------------------
type rebalanceReq struct {
Apply bool `json:"apply,omitempty" doc:"when true, execute the moves; otherwise return a dry-run preview"`
}
type rebalanceResp struct {
Body struct {
Applied bool `json:"applied"`
Moves []scheduler.PlannedMove `json:"moves"`
}
}
huma.Register(api, huma.Operation{
OperationID: "adminRebalance",
Method: "POST",
Path: "/admin/rebalance",
Summary: "One-shot rebalance preview or apply (leader-loop keeps running regardless)",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, in *struct {
Body rebalanceReq
}) (*rebalanceResp, error) {
cc, err := requireSysadminCC(ctx)
if err != nil {
return nil, err
}
if deps.Rebalancer == nil {
return nil, huma.Error503ServiceUnavailable("rebalancer not available")
}
var moves []scheduler.PlannedMove
if in.Body.Apply {
moves, err = deps.Rebalancer.RunOnce(ctx)
} else {
moves, err = deps.Rebalancer.DryRun(ctx)
}
if err != nil {
return nil, huma.Error500InternalServerError("rebalance", err)
}
if in.Body.Apply {
_ = deps.Store.Audit().Insert(ctx, &store.AuditEntry{
ActorUserID: &cc.User,
Action: "admin.rebalance",
Target: "cluster",
Result: "ok",
Detail: map[string]any{"moves": len(moves)},
})
}
out := &rebalanceResp{}
out.Body.Applied = in.Body.Apply
out.Body.Moves = moves
return out, nil
})
// ---- cache-stats -------------------------------------------------------
huma.Register(api, huma.Operation{
OperationID: "adminCacheStats",
Method: "GET",
Path: "/admin/cache-stats",
Summary: "In-memory cache hit/miss/eviction counters per named cache",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, _ *struct{}) (*struct {
Body struct {
Caches []cache.Stats `json:"caches"`
}
}, error) {
if err := requireSysadmin(ctx); err != nil {
return nil, err
}
out := &struct {
Body struct {
Caches []cache.Stats `json:"caches"`
}
}{}
out.Body.Caches = cache.AllStats()
return out, nil
})
// ---- cluster presences -------------------------------------------------
//
// Real-time heartbeat view, distinct from the DB registry. The DB
// updates status via the stale-sweeper (leader-gated, fires every
// downAfter interval); this endpoint shows what each peer is
// broadcasting RIGHT NOW over NATS. Operators comparing the two
// diagnose "the DB says X is up but nobody's heard from it" gaps.
huma.Register(api, huma.Operation{
OperationID: "adminClusterPresences",
Method: "GET",
Path: "/admin/cluster/presences",
Summary: "Real-time NATS heartbeat view of every peer this node has heard from",
Security: []map[string][]string{{"accessToken": {}}},
}, func(ctx context.Context, _ *struct{}) (*struct {
Body struct {
Presences []NodePresence `json:"presences"`
}
}, error) {
if err := requireSysadmin(ctx); err != nil {
return nil, err
}
if deps.Presences == nil {
return nil, huma.Error503ServiceUnavailable("presence source not wired")
}
now := time.Now()
raw := deps.Presences.LivePresences()
out := &struct {
Body struct {
Presences []NodePresence `json:"presences"`
}
}{}
out.Body.Presences = make([]NodePresence, 0, len(raw))
for _, p := range raw {
out.Body.Presences = append(out.Body.Presences, NodePresence{
NodeID: p.NodeID,
Multiaddrs: p.Multiaddrs,
Status: p.Status,
SentAt: p.SentAt,
AgeSeconds: now.Sub(p.SentAt).Seconds(),
})
}
return out, nil
})
}
// requireSysadmin rejects any request without a sysadmin client context.
func requireSysadmin(ctx context.Context) error {
_, err := requireSysadminCC(ctx)
return err
}
func requireSysadminCC(ctx context.Context) (*auth.ClientContext, error) {
cc, err := auth.Require(ctx)
if err != nil {
return nil, huma.Error401Unauthorized("auth")
}
if cc.Role != "sysadmin" {
return nil, huma.Error403Forbidden("sysadmin required")
}
return cc, nil
}