package openapi import ( "context" "errors" "time" "github.com/danielgtaylor/huma/v2" "anchorage/internal/pkg/auth" "anchorage/internal/pkg/cache" "anchorage/internal/pkg/ids" "anchorage/internal/pkg/maintenance" "anchorage/internal/pkg/node" "anchorage/internal/pkg/scheduler" "anchorage/internal/pkg/store" ) type auditBody struct { ID int64 `json:"id"` Action string `json:"action"` Target string `json:"target"` Result string `json:"result"` Detail map[string]any `json:"detail,omitempty"` ActorUserID string `json:"actor_user_id,omitempty"` OrgID string `json:"org_id,omitempty"` Created time.Time `json:"created"` } func ifUserID(p *ids.UserID) string { if p == nil { return "" } return p.String() } func ifOrgID(p *ids.OrgID) string { if p == nil { return "" } return p.String() } // AdminDeps holds the collaborators the admin endpoints need. type AdminDeps struct { Store store.Store Maint *maintenance.Manager Rebalancer *scheduler.Rebalancer // Presences is the source of real-time NATS heartbeat data // (distinct from the DB view via Store.Nodes().ListAll()). Used by // /v1/admin/cluster/presences to let operators compare what each // node is broadcasting right now against the DB registry. Presences PresenceSource } // PresenceSource is implemented by *node.Runner. Narrow interface so // tests can stub; the real implementation returns a snapshot of the // in-memory heartbeat map keyed by NodeID. type PresenceSource interface { LivePresences() []node.Presence } // NodePresence is the admin-API projection of node.Presence. Kept in // openapi so the HTTP response shape stays stable even if the internal // node.Presence struct evolves. type NodePresence struct { NodeID string `json:"node_id"` Multiaddrs []string `json:"multiaddrs"` Status string `json:"status"` SentAt time.Time `json:"sent_at"` // AgeSeconds is derived server-side so clients don't have to // reason about clock skew — operators eyeballing the response can // see "last heard 12s ago" directly. AgeSeconds float64 `json:"age_seconds"` } // RegisterAdmin wires drain/uncordon/maintenance endpoints at // /v1/admin/*. Access is restricted to sysadmins; orgadmins cannot // drain nodes (that would be a cluster-level action). func RegisterAdmin(api huma.API, deps AdminDeps) { huma.Register(api, huma.Operation{ OperationID: "adminDrain", Method: "POST", Path: "/admin/nodes/{node_id}/drain", Summary: "Drain a cluster node", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, in *struct { NodeID string `path:"node_id"` }) (*struct { Body struct { OK bool `json:"ok"` } }, error) { if err := requireSysadmin(ctx); err != nil { return nil, err } nid, err := ids.ParseNode(in.NodeID) if err != nil { return nil, huma.Error400BadRequest("bad node id") } if err := deps.Store.Nodes().Drain(ctx, nid); err != nil { return nil, huma.Error500InternalServerError("drain", err) } out := &struct { Body struct { OK bool `json:"ok"` } }{} out.Body.OK = true return out, nil }) huma.Register(api, huma.Operation{ OperationID: "adminUncordon", Method: "POST", Path: "/admin/nodes/{node_id}/uncordon", Summary: "Return a drained node to service", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, in *struct { NodeID string `path:"node_id"` }) (*struct { Body struct { OK bool `json:"ok"` } }, error) { if err := requireSysadmin(ctx); err != nil { return nil, err } nid, err := ids.ParseNode(in.NodeID) if err != nil { return nil, huma.Error400BadRequest("bad node id") } if err := deps.Store.Nodes().Uncordon(ctx, nid); err != nil { return nil, huma.Error500InternalServerError("uncordon", err) } out := &struct { Body struct { OK bool `json:"ok"` } }{} out.Body.OK = true return out, nil }) type maintenanceStatusBody struct { On bool `json:"on"` Reason string `json:"reason,omitempty"` SetBy string `json:"set_by,omitempty"` SetAt time.Time `json:"set_at,omitempty"` ExpiresAt time.Time `json:"expires_at,omitempty"` } huma.Register(api, huma.Operation{ OperationID: "adminMaintenanceOn", Method: "POST", Path: "/admin/maintenance/on", Summary: "Enable cluster-wide maintenance", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, in *struct { Body struct { Reason string `json:"reason,omitempty"` TTL time.Duration `json:"ttl,omitempty"` } }) (*struct { Body maintenanceStatusBody }, error) { cc, err := requireSysadminCC(ctx) if err != nil { return nil, err } f := maintenance.Flag{ Reason: in.Body.Reason, SetBy: cc.User.String(), SetAt: time.Now().UTC(), } if in.Body.TTL > 0 { f.ExpiresAt = f.SetAt.Add(in.Body.TTL) } if err := deps.Maint.Set(ctx, f); err != nil { return nil, huma.Error500InternalServerError("maintenance on", err) } out := &struct{ Body maintenanceStatusBody }{} out.Body = maintenanceStatusBody{ On: true, Reason: f.Reason, SetBy: f.SetBy, SetAt: f.SetAt, ExpiresAt: f.ExpiresAt, } return out, nil }) huma.Register(api, huma.Operation{ OperationID: "adminMaintenanceOff", Method: "POST", Path: "/admin/maintenance/off", Summary: "Disable cluster-wide maintenance", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, _ *struct{}) (*struct { Body maintenanceStatusBody }, error) { if err := requireSysadmin(ctx); err != nil { return nil, err } if err := deps.Maint.Clear(ctx); err != nil { return nil, huma.Error500InternalServerError("maintenance off", err) } out := &struct{ Body maintenanceStatusBody }{} return out, nil }) huma.Register(api, huma.Operation{ OperationID: "adminGrantSysadmin", Method: "POST", Path: "/admin/users/{email}/grant-sysadmin", Summary: "Promote an existing user to sysadmin", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, in *struct { Email string `path:"email"` }) (*struct { Body struct { OK bool `json:"ok"` } }, error) { cc, err := requireSysadminCC(ctx) if err != nil { return nil, err } user, err := deps.Store.Users().GetByEmail(ctx, in.Email) if err != nil { if errors.Is(err, store.ErrNotFound) { return nil, huma.Error404NotFound("user (must have logged in at least once via Authentik)") } return nil, huma.Error500InternalServerError("lookup", err) } if err := deps.Store.Users().PromoteSysadmin(ctx, user.ID); err != nil { return nil, huma.Error500InternalServerError("promote", err) } _ = deps.Store.Audit().Insert(ctx, &store.AuditEntry{ ActorUserID: &cc.User, Action: "admin.grant-sysadmin", Target: user.ID.String(), Result: "ok", Detail: map[string]any{"email": in.Email}, }) out := &struct { Body struct { OK bool `json:"ok"` } }{} out.Body.OK = true return out, nil }) huma.Register(api, huma.Operation{ OperationID: "adminListAudit", Method: "GET", Path: "/admin/audit", Summary: "Paged audit-log listing", Description: "sysadmin with no org_id sees every org; org members can only read their own org's audit", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, in *struct { OrgID string `query:"org_id"` Limit int `query:"limit" default:"50" minimum:"1" maximum:"1000"` Offset int `query:"offset" default:"0" minimum:"0"` }) (*struct { Body struct { Results []auditBody `json:"results"` } }, error) { cc, err := auth.Require(ctx) if err != nil { return nil, huma.Error401Unauthorized("auth") } var scopeOrg ids.OrgID if in.OrgID != "" { scopeOrg, err = ids.ParseOrg(in.OrgID) if err != nil { return nil, huma.Error400BadRequest("bad org id") } if cc.Role != "sysadmin" && scopeOrg != cc.Org { return nil, huma.Error403Forbidden("cannot view other org's audit") } } else if cc.Role != "sysadmin" { scopeOrg = cc.Org } rows, err := deps.Store.Audit().List(ctx, scopeOrg, in.Limit, in.Offset) if err != nil { return nil, huma.Error500InternalServerError("list", err) } out := &struct { Body struct { Results []auditBody `json:"results"` } }{} for _, e := range rows { out.Body.Results = append(out.Body.Results, auditBody{ ID: e.ID, Action: e.Action, Target: e.Target, Result: e.Result, Detail: e.Detail, ActorUserID: ifUserID(e.ActorUserID), OrgID: ifOrgID(e.OrgID), Created: e.Created, }) } return out, nil }) huma.Register(api, huma.Operation{ OperationID: "adminMaintenanceStatus", Method: "GET", Path: "/admin/maintenance", Summary: "Report current cluster-maintenance state", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, _ *struct{}) (*struct { Body maintenanceStatusBody }, error) { if err := requireSysadmin(ctx); err != nil { return nil, err } f, on, err := deps.Maint.IsOn(ctx) if err != nil { return nil, huma.Error500InternalServerError("maintenance status", err) } out := &struct{ Body maintenanceStatusBody }{} if on && f != nil { out.Body = maintenanceStatusBody{ On: true, Reason: f.Reason, SetBy: f.SetBy, SetAt: f.SetAt, ExpiresAt: f.ExpiresAt, } } return out, nil }) // ---- prune-denylist ---------------------------------------------------- huma.Register(api, huma.Operation{ OperationID: "adminPruneTokenDenylist", Method: "POST", Path: "/admin/tokens/prune-denylist", Summary: "Delete denylist rows whose expires_at has passed", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, _ *struct{}) (*struct { Body struct { OK bool `json:"ok"` } }, error) { cc, err := requireSysadminCC(ctx) if err != nil { return nil, err } if err := deps.Store.Tokens().PruneDenylist(ctx); err != nil { return nil, huma.Error500InternalServerError("prune", err) } _ = deps.Store.Audit().Insert(ctx, &store.AuditEntry{ ActorUserID: &cc.User, Action: "admin.prune-denylist", Target: "token_denylist", Result: "ok", }) out := &struct { Body struct { OK bool `json:"ok"` } }{} out.Body.OK = true return out, nil }) // ---- rebalance --------------------------------------------------------- type rebalanceReq struct { Apply bool `json:"apply,omitempty" doc:"when true, execute the moves; otherwise return a dry-run preview"` } type rebalanceResp struct { Body struct { Applied bool `json:"applied"` Moves []scheduler.PlannedMove `json:"moves"` } } huma.Register(api, huma.Operation{ OperationID: "adminRebalance", Method: "POST", Path: "/admin/rebalance", Summary: "One-shot rebalance preview or apply (leader-loop keeps running regardless)", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, in *struct { Body rebalanceReq }) (*rebalanceResp, error) { cc, err := requireSysadminCC(ctx) if err != nil { return nil, err } if deps.Rebalancer == nil { return nil, huma.Error503ServiceUnavailable("rebalancer not available") } var moves []scheduler.PlannedMove if in.Body.Apply { moves, err = deps.Rebalancer.RunOnce(ctx) } else { moves, err = deps.Rebalancer.DryRun(ctx) } if err != nil { return nil, huma.Error500InternalServerError("rebalance", err) } if in.Body.Apply { _ = deps.Store.Audit().Insert(ctx, &store.AuditEntry{ ActorUserID: &cc.User, Action: "admin.rebalance", Target: "cluster", Result: "ok", Detail: map[string]any{"moves": len(moves)}, }) } out := &rebalanceResp{} out.Body.Applied = in.Body.Apply out.Body.Moves = moves return out, nil }) // ---- cache-stats ------------------------------------------------------- huma.Register(api, huma.Operation{ OperationID: "adminCacheStats", Method: "GET", Path: "/admin/cache-stats", Summary: "In-memory cache hit/miss/eviction counters per named cache", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, _ *struct{}) (*struct { Body struct { Caches []cache.Stats `json:"caches"` } }, error) { if err := requireSysadmin(ctx); err != nil { return nil, err } out := &struct { Body struct { Caches []cache.Stats `json:"caches"` } }{} out.Body.Caches = cache.AllStats() return out, nil }) // ---- cluster presences ------------------------------------------------- // // Real-time heartbeat view, distinct from the DB registry. The DB // updates status via the stale-sweeper (leader-gated, fires every // downAfter interval); this endpoint shows what each peer is // broadcasting RIGHT NOW over NATS. Operators comparing the two // diagnose "the DB says X is up but nobody's heard from it" gaps. huma.Register(api, huma.Operation{ OperationID: "adminClusterPresences", Method: "GET", Path: "/admin/cluster/presences", Summary: "Real-time NATS heartbeat view of every peer this node has heard from", Security: []map[string][]string{{"accessToken": {}}}, }, func(ctx context.Context, _ *struct{}) (*struct { Body struct { Presences []NodePresence `json:"presences"` } }, error) { if err := requireSysadmin(ctx); err != nil { return nil, err } if deps.Presences == nil { return nil, huma.Error503ServiceUnavailable("presence source not wired") } now := time.Now() raw := deps.Presences.LivePresences() out := &struct { Body struct { Presences []NodePresence `json:"presences"` } }{} out.Body.Presences = make([]NodePresence, 0, len(raw)) for _, p := range raw { out.Body.Presences = append(out.Body.Presences, NodePresence{ NodeID: p.NodeID, Multiaddrs: p.Multiaddrs, Status: p.Status, SentAt: p.SentAt, AgeSeconds: now.Sub(p.SentAt).Seconds(), }) } return out, nil }) } // requireSysadmin rejects any request without a sysadmin client context. func requireSysadmin(ctx context.Context) error { _, err := requireSysadminCC(ctx) return err } func requireSysadminCC(ctx context.Context) (*auth.ClientContext, error) { cc, err := auth.Require(ctx) if err != nil { return nil, huma.Error401Unauthorized("auth") } if cc.Role != "sysadmin" { return nil, huma.Error403Forbidden("sysadmin required") } return cc, nil }