- ids: TestIDsAreTimeOrdered asserted strict lexicographic ordering of back-to-back UUIDv7s, but the sub-ms tail is random and not required to be monotonic. Sleep between samples so each ID lands in a distinct millisecond — the property that actually gives Postgres index locality on (org_id, id desc). - go.mod/go.sum: run go mod tidy. keyfunc/v3, prometheus/client_golang and testcontainers-go/modules/postgres are imported directly and should not be marked // indirect; also drops stale sum entries. - gofmt -w across 12 files flagged by the lint job. - security.yml: pin govulncheck to v1.2.0. @latest triggers a proxy lookup every run, which is the step that hung for 16m on the Gitea runner. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
501 lines
14 KiB
Go
501 lines
14 KiB
Go
package openapi
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/danielgtaylor/huma/v2"
|
|
|
|
"anchorage/internal/pkg/auth"
|
|
"anchorage/internal/pkg/cache"
|
|
"anchorage/internal/pkg/ids"
|
|
"anchorage/internal/pkg/maintenance"
|
|
"anchorage/internal/pkg/node"
|
|
"anchorage/internal/pkg/scheduler"
|
|
"anchorage/internal/pkg/store"
|
|
)
|
|
|
|
type auditBody struct {
|
|
ID int64 `json:"id"`
|
|
Action string `json:"action"`
|
|
Target string `json:"target"`
|
|
Result string `json:"result"`
|
|
Detail map[string]any `json:"detail,omitempty"`
|
|
ActorUserID string `json:"actor_user_id,omitempty"`
|
|
OrgID string `json:"org_id,omitempty"`
|
|
Created time.Time `json:"created"`
|
|
}
|
|
|
|
func ifUserID(p *ids.UserID) string {
|
|
if p == nil {
|
|
return ""
|
|
}
|
|
return p.String()
|
|
}
|
|
func ifOrgID(p *ids.OrgID) string {
|
|
if p == nil {
|
|
return ""
|
|
}
|
|
return p.String()
|
|
}
|
|
|
|
// AdminDeps holds the collaborators the admin endpoints need.
|
|
type AdminDeps struct {
|
|
Store store.Store
|
|
Maint *maintenance.Manager
|
|
Rebalancer *scheduler.Rebalancer
|
|
// Presences is the source of real-time NATS heartbeat data
|
|
// (distinct from the DB view via Store.Nodes().ListAll()). Used by
|
|
// /v1/admin/cluster/presences to let operators compare what each
|
|
// node is broadcasting right now against the DB registry.
|
|
Presences PresenceSource
|
|
}
|
|
|
|
// PresenceSource is implemented by *node.Runner. Narrow interface so
|
|
// tests can stub; the real implementation returns a snapshot of the
|
|
// in-memory heartbeat map keyed by NodeID.
|
|
type PresenceSource interface {
|
|
LivePresences() []node.Presence
|
|
}
|
|
|
|
// NodePresence is the admin-API projection of node.Presence. Kept in
|
|
// openapi so the HTTP response shape stays stable even if the internal
|
|
// node.Presence struct evolves.
|
|
type NodePresence struct {
|
|
NodeID string `json:"node_id"`
|
|
Multiaddrs []string `json:"multiaddrs"`
|
|
Status string `json:"status"`
|
|
SentAt time.Time `json:"sent_at"`
|
|
// AgeSeconds is derived server-side so clients don't have to
|
|
// reason about clock skew — operators eyeballing the response can
|
|
// see "last heard 12s ago" directly.
|
|
AgeSeconds float64 `json:"age_seconds"`
|
|
}
|
|
|
|
// RegisterAdmin wires drain/uncordon/maintenance endpoints at
|
|
// /v1/admin/*. Access is restricted to sysadmins; orgadmins cannot
|
|
// drain nodes (that would be a cluster-level action).
|
|
func RegisterAdmin(api huma.API, deps AdminDeps) {
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminDrain",
|
|
Method: "POST",
|
|
Path: "/admin/nodes/{node_id}/drain",
|
|
Summary: "Drain a cluster node",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
NodeID string `path:"node_id"`
|
|
}) (*struct {
|
|
Body struct {
|
|
OK bool `json:"ok"`
|
|
}
|
|
}, error) {
|
|
if err := requireSysadmin(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
nid, err := ids.ParseNode(in.NodeID)
|
|
if err != nil {
|
|
return nil, huma.Error400BadRequest("bad node id")
|
|
}
|
|
if err := deps.Store.Nodes().Drain(ctx, nid); err != nil {
|
|
return nil, huma.Error500InternalServerError("drain", err)
|
|
}
|
|
out := &struct {
|
|
Body struct {
|
|
OK bool `json:"ok"`
|
|
}
|
|
}{}
|
|
out.Body.OK = true
|
|
return out, nil
|
|
})
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminUncordon",
|
|
Method: "POST",
|
|
Path: "/admin/nodes/{node_id}/uncordon",
|
|
Summary: "Return a drained node to service",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
NodeID string `path:"node_id"`
|
|
}) (*struct {
|
|
Body struct {
|
|
OK bool `json:"ok"`
|
|
}
|
|
}, error) {
|
|
if err := requireSysadmin(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
nid, err := ids.ParseNode(in.NodeID)
|
|
if err != nil {
|
|
return nil, huma.Error400BadRequest("bad node id")
|
|
}
|
|
if err := deps.Store.Nodes().Uncordon(ctx, nid); err != nil {
|
|
return nil, huma.Error500InternalServerError("uncordon", err)
|
|
}
|
|
out := &struct {
|
|
Body struct {
|
|
OK bool `json:"ok"`
|
|
}
|
|
}{}
|
|
out.Body.OK = true
|
|
return out, nil
|
|
})
|
|
|
|
type maintenanceStatusBody struct {
|
|
On bool `json:"on"`
|
|
Reason string `json:"reason,omitempty"`
|
|
SetBy string `json:"set_by,omitempty"`
|
|
SetAt time.Time `json:"set_at,omitempty"`
|
|
ExpiresAt time.Time `json:"expires_at,omitempty"`
|
|
}
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminMaintenanceOn",
|
|
Method: "POST",
|
|
Path: "/admin/maintenance/on",
|
|
Summary: "Enable cluster-wide maintenance",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
Body struct {
|
|
Reason string `json:"reason,omitempty"`
|
|
TTL time.Duration `json:"ttl,omitempty"`
|
|
}
|
|
}) (*struct {
|
|
Body maintenanceStatusBody
|
|
}, error) {
|
|
cc, err := requireSysadminCC(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
f := maintenance.Flag{
|
|
Reason: in.Body.Reason,
|
|
SetBy: cc.User.String(),
|
|
SetAt: time.Now().UTC(),
|
|
}
|
|
if in.Body.TTL > 0 {
|
|
f.ExpiresAt = f.SetAt.Add(in.Body.TTL)
|
|
}
|
|
if err := deps.Maint.Set(ctx, f); err != nil {
|
|
return nil, huma.Error500InternalServerError("maintenance on", err)
|
|
}
|
|
out := &struct{ Body maintenanceStatusBody }{}
|
|
out.Body = maintenanceStatusBody{
|
|
On: true, Reason: f.Reason, SetBy: f.SetBy, SetAt: f.SetAt, ExpiresAt: f.ExpiresAt,
|
|
}
|
|
return out, nil
|
|
})
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminMaintenanceOff",
|
|
Method: "POST",
|
|
Path: "/admin/maintenance/off",
|
|
Summary: "Disable cluster-wide maintenance",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, _ *struct{}) (*struct {
|
|
Body maintenanceStatusBody
|
|
}, error) {
|
|
if err := requireSysadmin(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := deps.Maint.Clear(ctx); err != nil {
|
|
return nil, huma.Error500InternalServerError("maintenance off", err)
|
|
}
|
|
out := &struct{ Body maintenanceStatusBody }{}
|
|
return out, nil
|
|
})
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminGrantSysadmin",
|
|
Method: "POST",
|
|
Path: "/admin/users/{email}/grant-sysadmin",
|
|
Summary: "Promote an existing user to sysadmin",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
Email string `path:"email"`
|
|
}) (*struct {
|
|
Body struct {
|
|
OK bool `json:"ok"`
|
|
}
|
|
}, error) {
|
|
cc, err := requireSysadminCC(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
user, err := deps.Store.Users().GetByEmail(ctx, in.Email)
|
|
if err != nil {
|
|
if errors.Is(err, store.ErrNotFound) {
|
|
return nil, huma.Error404NotFound("user (must have logged in at least once via Authentik)")
|
|
}
|
|
return nil, huma.Error500InternalServerError("lookup", err)
|
|
}
|
|
if err := deps.Store.Users().PromoteSysadmin(ctx, user.ID); err != nil {
|
|
return nil, huma.Error500InternalServerError("promote", err)
|
|
}
|
|
_ = deps.Store.Audit().Insert(ctx, &store.AuditEntry{
|
|
ActorUserID: &cc.User,
|
|
Action: "admin.grant-sysadmin",
|
|
Target: user.ID.String(),
|
|
Result: "ok",
|
|
Detail: map[string]any{"email": in.Email},
|
|
})
|
|
out := &struct {
|
|
Body struct {
|
|
OK bool `json:"ok"`
|
|
}
|
|
}{}
|
|
out.Body.OK = true
|
|
return out, nil
|
|
})
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminListAudit",
|
|
Method: "GET",
|
|
Path: "/admin/audit",
|
|
Summary: "Paged audit-log listing",
|
|
Description: "sysadmin with no org_id sees every org; org members can only read their own org's audit",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
OrgID string `query:"org_id"`
|
|
Limit int `query:"limit" default:"50" minimum:"1" maximum:"1000"`
|
|
Offset int `query:"offset" default:"0" minimum:"0"`
|
|
}) (*struct {
|
|
Body struct {
|
|
Results []auditBody `json:"results"`
|
|
}
|
|
}, error) {
|
|
cc, err := auth.Require(ctx)
|
|
if err != nil {
|
|
return nil, huma.Error401Unauthorized("auth")
|
|
}
|
|
var scopeOrg ids.OrgID
|
|
if in.OrgID != "" {
|
|
scopeOrg, err = ids.ParseOrg(in.OrgID)
|
|
if err != nil {
|
|
return nil, huma.Error400BadRequest("bad org id")
|
|
}
|
|
if cc.Role != "sysadmin" && scopeOrg != cc.Org {
|
|
return nil, huma.Error403Forbidden("cannot view other org's audit")
|
|
}
|
|
} else if cc.Role != "sysadmin" {
|
|
scopeOrg = cc.Org
|
|
}
|
|
rows, err := deps.Store.Audit().List(ctx, scopeOrg, in.Limit, in.Offset)
|
|
if err != nil {
|
|
return nil, huma.Error500InternalServerError("list", err)
|
|
}
|
|
out := &struct {
|
|
Body struct {
|
|
Results []auditBody `json:"results"`
|
|
}
|
|
}{}
|
|
for _, e := range rows {
|
|
out.Body.Results = append(out.Body.Results, auditBody{
|
|
ID: e.ID,
|
|
Action: e.Action,
|
|
Target: e.Target,
|
|
Result: e.Result,
|
|
Detail: e.Detail,
|
|
ActorUserID: ifUserID(e.ActorUserID),
|
|
OrgID: ifOrgID(e.OrgID),
|
|
Created: e.Created,
|
|
})
|
|
}
|
|
return out, nil
|
|
})
|
|
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminMaintenanceStatus",
|
|
Method: "GET",
|
|
Path: "/admin/maintenance",
|
|
Summary: "Report current cluster-maintenance state",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, _ *struct{}) (*struct {
|
|
Body maintenanceStatusBody
|
|
}, error) {
|
|
if err := requireSysadmin(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
f, on, err := deps.Maint.IsOn(ctx)
|
|
if err != nil {
|
|
return nil, huma.Error500InternalServerError("maintenance status", err)
|
|
}
|
|
out := &struct{ Body maintenanceStatusBody }{}
|
|
if on && f != nil {
|
|
out.Body = maintenanceStatusBody{
|
|
On: true, Reason: f.Reason, SetBy: f.SetBy,
|
|
SetAt: f.SetAt, ExpiresAt: f.ExpiresAt,
|
|
}
|
|
}
|
|
return out, nil
|
|
})
|
|
|
|
// ---- prune-denylist ----------------------------------------------------
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminPruneTokenDenylist",
|
|
Method: "POST",
|
|
Path: "/admin/tokens/prune-denylist",
|
|
Summary: "Delete denylist rows whose expires_at has passed",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, _ *struct{}) (*struct {
|
|
Body struct {
|
|
OK bool `json:"ok"`
|
|
}
|
|
}, error) {
|
|
cc, err := requireSysadminCC(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := deps.Store.Tokens().PruneDenylist(ctx); err != nil {
|
|
return nil, huma.Error500InternalServerError("prune", err)
|
|
}
|
|
_ = deps.Store.Audit().Insert(ctx, &store.AuditEntry{
|
|
ActorUserID: &cc.User,
|
|
Action: "admin.prune-denylist",
|
|
Target: "token_denylist",
|
|
Result: "ok",
|
|
})
|
|
out := &struct {
|
|
Body struct {
|
|
OK bool `json:"ok"`
|
|
}
|
|
}{}
|
|
out.Body.OK = true
|
|
return out, nil
|
|
})
|
|
|
|
// ---- rebalance ---------------------------------------------------------
|
|
type rebalanceReq struct {
|
|
Apply bool `json:"apply,omitempty" doc:"when true, execute the moves; otherwise return a dry-run preview"`
|
|
}
|
|
type rebalanceResp struct {
|
|
Body struct {
|
|
Applied bool `json:"applied"`
|
|
Moves []scheduler.PlannedMove `json:"moves"`
|
|
}
|
|
}
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminRebalance",
|
|
Method: "POST",
|
|
Path: "/admin/rebalance",
|
|
Summary: "One-shot rebalance preview or apply (leader-loop keeps running regardless)",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, in *struct {
|
|
Body rebalanceReq
|
|
}) (*rebalanceResp, error) {
|
|
cc, err := requireSysadminCC(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if deps.Rebalancer == nil {
|
|
return nil, huma.Error503ServiceUnavailable("rebalancer not available")
|
|
}
|
|
var moves []scheduler.PlannedMove
|
|
if in.Body.Apply {
|
|
moves, err = deps.Rebalancer.RunOnce(ctx)
|
|
} else {
|
|
moves, err = deps.Rebalancer.DryRun(ctx)
|
|
}
|
|
if err != nil {
|
|
return nil, huma.Error500InternalServerError("rebalance", err)
|
|
}
|
|
if in.Body.Apply {
|
|
_ = deps.Store.Audit().Insert(ctx, &store.AuditEntry{
|
|
ActorUserID: &cc.User,
|
|
Action: "admin.rebalance",
|
|
Target: "cluster",
|
|
Result: "ok",
|
|
Detail: map[string]any{"moves": len(moves)},
|
|
})
|
|
}
|
|
out := &rebalanceResp{}
|
|
out.Body.Applied = in.Body.Apply
|
|
out.Body.Moves = moves
|
|
return out, nil
|
|
})
|
|
|
|
// ---- cache-stats -------------------------------------------------------
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminCacheStats",
|
|
Method: "GET",
|
|
Path: "/admin/cache-stats",
|
|
Summary: "In-memory cache hit/miss/eviction counters per named cache",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, _ *struct{}) (*struct {
|
|
Body struct {
|
|
Caches []cache.Stats `json:"caches"`
|
|
}
|
|
}, error) {
|
|
if err := requireSysadmin(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
out := &struct {
|
|
Body struct {
|
|
Caches []cache.Stats `json:"caches"`
|
|
}
|
|
}{}
|
|
out.Body.Caches = cache.AllStats()
|
|
return out, nil
|
|
})
|
|
|
|
// ---- cluster presences -------------------------------------------------
|
|
//
|
|
// Real-time heartbeat view, distinct from the DB registry. The DB
|
|
// updates status via the stale-sweeper (leader-gated, fires every
|
|
// downAfter interval); this endpoint shows what each peer is
|
|
// broadcasting RIGHT NOW over NATS. Operators comparing the two
|
|
// diagnose "the DB says X is up but nobody's heard from it" gaps.
|
|
huma.Register(api, huma.Operation{
|
|
OperationID: "adminClusterPresences",
|
|
Method: "GET",
|
|
Path: "/admin/cluster/presences",
|
|
Summary: "Real-time NATS heartbeat view of every peer this node has heard from",
|
|
Security: []map[string][]string{{"accessToken": {}}},
|
|
}, func(ctx context.Context, _ *struct{}) (*struct {
|
|
Body struct {
|
|
Presences []NodePresence `json:"presences"`
|
|
}
|
|
}, error) {
|
|
if err := requireSysadmin(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
if deps.Presences == nil {
|
|
return nil, huma.Error503ServiceUnavailable("presence source not wired")
|
|
}
|
|
now := time.Now()
|
|
raw := deps.Presences.LivePresences()
|
|
out := &struct {
|
|
Body struct {
|
|
Presences []NodePresence `json:"presences"`
|
|
}
|
|
}{}
|
|
out.Body.Presences = make([]NodePresence, 0, len(raw))
|
|
for _, p := range raw {
|
|
out.Body.Presences = append(out.Body.Presences, NodePresence{
|
|
NodeID: p.NodeID,
|
|
Multiaddrs: p.Multiaddrs,
|
|
Status: p.Status,
|
|
SentAt: p.SentAt,
|
|
AgeSeconds: now.Sub(p.SentAt).Seconds(),
|
|
})
|
|
}
|
|
return out, nil
|
|
})
|
|
}
|
|
|
|
// requireSysadmin rejects any request without a sysadmin client context.
|
|
func requireSysadmin(ctx context.Context) error {
|
|
_, err := requireSysadminCC(ctx)
|
|
return err
|
|
}
|
|
|
|
func requireSysadminCC(ctx context.Context) (*auth.ClientContext, error) {
|
|
cc, err := auth.Require(ctx)
|
|
if err != nil {
|
|
return nil, huma.Error401Unauthorized("auth")
|
|
}
|
|
if cc.Role != "sysadmin" {
|
|
return nil, huma.Error403Forbidden("sysadmin required")
|
|
}
|
|
return cc, nil
|
|
}
|