Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.
Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:
* admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
* TTL leader election via NATS KV, fence tokens, JetStream dedup
* rebalancer (plan/apply split), reconciler, requeue sweeper
* ristretto caches with NATS-backed cross-node invalidation
(placements live-nodes + token denylist)
* maintenance watchdog for stuck cluster-pause flag
* Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
* rate limiting: session (10/min) + anonymous global (120/min)
* integration tests: rebalance, refcount multi-org, RLS belt
* goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea
Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
266 lines
8.5 KiB
Go
266 lines
8.5 KiB
Go
package cmd
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/spf13/cobra"
|
|
)
|
|
|
|
// newAdminCmd groups operational commands (drain, maintenance, ...).
|
|
//
|
|
// Every subcommand is a thin HTTP client against the running
|
|
// anchorage's /v1/admin/* endpoints. The CLI never opens its own NATS
|
|
// server or database handle — that keeps the "two anchorages with one
|
|
// DataDir" footgun from being easy to trip over.
|
|
func newAdminCmd(flags *globalFlags) *cobra.Command {
|
|
admin := &cobra.Command{
|
|
Use: "admin",
|
|
Short: "Operational commands (drain, maintenance mode, ...)",
|
|
}
|
|
admin.PersistentFlags().StringVar(&flags.adminURL, "server", defaultAdminURL(), "base URL of a running anchorage instance (env: ANCHORAGE_ADMIN_URL)")
|
|
admin.PersistentFlags().StringVar(&flags.adminToken, "token", os.Getenv("ANCHORAGE_ADMIN_TOKEN"), "sysadmin API token (env: ANCHORAGE_ADMIN_TOKEN)")
|
|
|
|
admin.AddCommand(newDrainCmd(flags))
|
|
admin.AddCommand(newUncordonCmd(flags))
|
|
admin.AddCommand(newMaintenanceCmd(flags))
|
|
admin.AddCommand(newMintTokenCmd(flags))
|
|
admin.AddCommand(newGrantSysadminCmd(flags))
|
|
admin.AddCommand(newRotateKeyCmd(flags))
|
|
admin.AddCommand(newPruneDenylistCmd(flags))
|
|
admin.AddCommand(newRebalanceCmd(flags))
|
|
admin.AddCommand(newCacheStatsCmd(flags))
|
|
admin.AddCommand(newClusterPresencesCmd(flags))
|
|
return admin
|
|
}
|
|
|
|
func newClusterPresencesCmd(flags *globalFlags) *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "cluster-presences",
|
|
Short: "Real-time NATS heartbeat view from the server you're pointed at",
|
|
Long: `Returns the in-memory heartbeat map maintained by the anchorage
|
|
instance answering the request. Each entry is a peer's most recent
|
|
node.heartbeat.<id> broadcast plus its age in seconds.
|
|
|
|
Distinct from the DB 'nodes' table: this is the real-time NATS view,
|
|
updated every heartbeat interval, while the DB status is only flipped
|
|
to 'down' by the leader's stale sweeper every downAfter seconds.
|
|
Operators diagnosing "the DB says X is up but nobody's heard from it"
|
|
should compare the two.`,
|
|
Args: cobra.NoArgs,
|
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
|
return adminGET(cmd.OutOrStdout(), flags, "/v1/admin/cluster/presences")
|
|
},
|
|
}
|
|
}
|
|
|
|
func newPruneDenylistCmd(flags *globalFlags) *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "prune-denylist",
|
|
Short: "Delete denylist rows whose expires_at has passed",
|
|
Args: cobra.NoArgs,
|
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
|
return adminPOST(cmd.OutOrStdout(), flags, "/v1/admin/tokens/prune-denylist", nil)
|
|
},
|
|
}
|
|
}
|
|
|
|
func newRebalanceCmd(flags *globalFlags) *cobra.Command {
|
|
var apply bool
|
|
c := &cobra.Command{
|
|
Use: "rebalance",
|
|
Short: "Preview (default) or execute (--apply) a one-shot rebalance pass",
|
|
Long: `Walks the current placements table and reports what the rebalancer
|
|
would move to restore replica counts on nodes that transitioned to
|
|
down / drained. Useful for catch-up after a cluster-maintenance window.
|
|
|
|
Default is a dry-run: no placements are moved. Pass --apply to execute.
|
|
The scheduled leader loop keeps running regardless.`,
|
|
Args: cobra.NoArgs,
|
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
|
body := map[string]any{"apply": apply}
|
|
return adminPOST(cmd.OutOrStdout(), flags, "/v1/admin/rebalance", body)
|
|
},
|
|
}
|
|
c.Flags().BoolVar(&apply, "apply", false, "execute the moves (default is a dry-run preview)")
|
|
return c
|
|
}
|
|
|
|
func newCacheStatsCmd(flags *globalFlags) *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "cache-stats",
|
|
Short: "Report per-cache hits / misses / evictions",
|
|
Args: cobra.NoArgs,
|
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
|
return adminGET(cmd.OutOrStdout(), flags, "/v1/admin/cache-stats")
|
|
},
|
|
}
|
|
}
|
|
|
|
func newGrantSysadminCmd(flags *globalFlags) *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "grant-sysadmin <email>",
|
|
Short: "Promote a previously-logged-in user to sysadmin",
|
|
Long: `Promotes an existing anchorage user (identified by the email Authentik
|
|
puts in the id_token) to sysadmin. The user must have completed at
|
|
least one OIDC login before this command can find them; it is not the
|
|
first-boot bootstrap path (that's bootstrap.sysadmins in config).`,
|
|
Args: cobra.ExactArgs(1),
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
path := "/v1/admin/users/" + url.PathEscape(args[0]) + "/grant-sysadmin"
|
|
return adminPOST(cmd.OutOrStdout(), flags, path, nil)
|
|
},
|
|
}
|
|
}
|
|
|
|
func defaultAdminURL() string {
|
|
if u := os.Getenv("ANCHORAGE_ADMIN_URL"); u != "" {
|
|
return u
|
|
}
|
|
return "http://localhost:8080"
|
|
}
|
|
|
|
func newDrainCmd(flags *globalFlags) *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "drain <nodeID>",
|
|
Short: "Mark a node drained (removes it from placement, keeps heartbeats)",
|
|
Args: cobra.ExactArgs(1),
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
return adminPOST(cmd.OutOrStdout(), flags, "/v1/admin/nodes/"+args[0]+"/drain", nil)
|
|
},
|
|
}
|
|
}
|
|
|
|
func newUncordonCmd(flags *globalFlags) *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "uncordon <nodeID>",
|
|
Short: "Clear drained status and rejoin the work queue",
|
|
Args: cobra.ExactArgs(1),
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
return adminPOST(cmd.OutOrStdout(), flags, "/v1/admin/nodes/"+args[0]+"/uncordon", nil)
|
|
},
|
|
}
|
|
}
|
|
|
|
func newMaintenanceCmd(flags *globalFlags) *cobra.Command {
|
|
maint := &cobra.Command{
|
|
Use: "maintenance",
|
|
Short: "Cluster-wide maintenance mode",
|
|
}
|
|
maint.AddCommand(newMaintenanceOnCmd(flags))
|
|
maint.AddCommand(newMaintenanceOffCmd(flags))
|
|
maint.AddCommand(newMaintenanceStatusCmd(flags))
|
|
return maint
|
|
}
|
|
|
|
func newMaintenanceOnCmd(flags *globalFlags) *cobra.Command {
|
|
var reason string
|
|
var ttl time.Duration
|
|
c := &cobra.Command{
|
|
Use: "on",
|
|
Short: "Enable cluster-wide maintenance (pauses rebalancer + sweeper)",
|
|
Args: cobra.NoArgs,
|
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
|
body := map[string]any{}
|
|
if reason != "" {
|
|
body["reason"] = reason
|
|
}
|
|
if ttl > 0 {
|
|
// encode duration as nanoseconds for Go's JSON decoder
|
|
body["ttl"] = int64(ttl)
|
|
}
|
|
return adminPOST(cmd.OutOrStdout(), flags, "/v1/admin/maintenance/on", body)
|
|
},
|
|
}
|
|
c.Flags().StringVar(&reason, "reason", "", "free-form reason (audit record)")
|
|
c.Flags().DurationVar(&ttl, "ttl", 0, "auto-disable after this duration (e.g. 30m)")
|
|
return c
|
|
}
|
|
|
|
func newMaintenanceOffCmd(flags *globalFlags) *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "off",
|
|
Short: "Disable cluster-wide maintenance",
|
|
Args: cobra.NoArgs,
|
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
|
return adminPOST(cmd.OutOrStdout(), flags, "/v1/admin/maintenance/off", nil)
|
|
},
|
|
}
|
|
}
|
|
|
|
func newMaintenanceStatusCmd(flags *globalFlags) *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "status",
|
|
Short: "Print current cluster-maintenance state",
|
|
Args: cobra.NoArgs,
|
|
RunE: func(cmd *cobra.Command, _ []string) error {
|
|
return adminGET(cmd.OutOrStdout(), flags, "/v1/admin/maintenance")
|
|
},
|
|
}
|
|
}
|
|
|
|
func adminPOST(out io.Writer, flags *globalFlags, path string, body any) error {
|
|
return adminCall(out, flags, http.MethodPost, path, body)
|
|
}
|
|
|
|
func adminGET(out io.Writer, flags *globalFlags, path string) error {
|
|
return adminCall(out, flags, http.MethodGet, path, nil)
|
|
}
|
|
|
|
func adminCall(out io.Writer, flags *globalFlags, method, path string, body any) error {
|
|
base, err := url.Parse(flags.adminURL)
|
|
if err != nil {
|
|
return fmt.Errorf("admin: bad --server url: %w", err)
|
|
}
|
|
u := base.JoinPath(path)
|
|
|
|
var rdr io.Reader
|
|
if body != nil {
|
|
b, err := json.Marshal(body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rdr = bytes.NewReader(b)
|
|
}
|
|
req, err := http.NewRequest(method, u.String(), rdr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if flags.adminToken == "" {
|
|
return fmt.Errorf("admin: --token (or ANCHORAGE_ADMIN_TOKEN) is required — mint a sysadmin token first")
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+flags.adminToken)
|
|
if body != nil {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("admin: %s: %w", u, err)
|
|
}
|
|
defer resp.Body.Close()
|
|
b, _ := io.ReadAll(resp.Body)
|
|
if resp.StatusCode >= 400 {
|
|
return fmt.Errorf("admin: %s %s → %d: %s", method, path, resp.StatusCode, strings.TrimSpace(string(b)))
|
|
}
|
|
// Pretty-print JSON; fall back to raw for empty/non-JSON bodies.
|
|
if len(bytes.TrimSpace(b)) > 0 {
|
|
var pretty bytes.Buffer
|
|
if err := json.Indent(&pretty, b, "", " "); err == nil {
|
|
fmt.Fprintln(out, pretty.String())
|
|
return nil
|
|
}
|
|
fmt.Fprintln(out, string(b))
|
|
return nil
|
|
}
|
|
fmt.Fprintf(out, "admin: %s %s OK\n", method, path)
|
|
return nil
|
|
}
|