Files
anchorage/internal/pkg/node/node.go
William Gill 12bf35caf8 anchorage v1.0 initial tree
Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.

Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:

  * admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
    prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
  * TTL leader election via NATS KV, fence tokens, JetStream dedup
  * rebalancer (plan/apply split), reconciler, requeue sweeper
  * ristretto caches with NATS-backed cross-node invalidation
    (placements live-nodes + token denylist)
  * maintenance watchdog for stuck cluster-pause flag
  * Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
  * rate limiting: session (10/min) + anonymous global (120/min)
  * integration tests: rebalance, refcount multi-org, RLS belt
  * goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea

Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
2026-04-16 18:13:36 -05:00

219 lines
6.3 KiB
Go

// Package node owns an anchorage instance's identity and heartbeat loop.
//
// On boot, Register creates or refreshes the node's row in the nodes table.
// Start then kicks off two loops:
//
// - Heartbeat publisher: pushes node.heartbeat.<id> every HeartbeatInterval.
// The payload is a tiny Presence struct so listeners can update the
// in-memory live-node cache without a Postgres read.
// - Heartbeat consumer + stale sweeper: subscribes to node.heartbeat.*
// to keep the live-node cache fresh, and on the leader node runs a
// periodic MarkStaleDown to flip neglected rows to status='down'.
package node
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"time"
"github.com/nats-io/nats.go"
"anchorage/internal/pkg/ids"
"anchorage/internal/pkg/store"
)
// HeartbeatSubjectPrefix is the per-node presence topic. Listeners use
// node.heartbeat.* to catch every node's beat.
const HeartbeatSubjectPrefix = "node.heartbeat"
// Presence is the body of a heartbeat message. Kept small so the
// per-5s message traffic is negligible.
type Presence struct {
NodeID string `json:"node_id"`
Multiaddrs []string `json:"multiaddrs"`
Status string `json:"status"`
SentAt time.Time `json:"sent_at"`
}
// Options configures a Runner.
type Options struct {
NodeID ids.NodeID
DisplayName string
Multiaddrs []string
RPCURL string
HeartbeatInterval time.Duration
DownAfter time.Duration
// MarkStaleDownEnabled must be true on the elected leader so exactly
// one node flips stale rows to down. Followers call Start with it
// false and the leader package toggles it on promotion.
MarkStaleDownEnabled bool
}
// Runner owns the heartbeat publisher and consumer.
type Runner struct {
opts Options
nc *nats.Conn
store store.Store
mu sync.Mutex
presence map[ids.NodeID]Presence // latest seen per node
sweepActive bool // toggled by SetSweepEnabled (leader gate)
}
// NewRunner constructs a runner. Register/Start must be called to wire it up.
func NewRunner(nc *nats.Conn, s store.Store, opts Options) (*Runner, error) {
if opts.HeartbeatInterval <= 0 {
return nil, fmt.Errorf("node: HeartbeatInterval must be > 0")
}
if opts.DownAfter <= opts.HeartbeatInterval {
return nil, fmt.Errorf("node: DownAfter must exceed HeartbeatInterval")
}
return &Runner{
opts: opts,
nc: nc,
store: s,
presence: map[ids.NodeID]Presence{},
sweepActive: opts.MarkStaleDownEnabled,
}, nil
}
// SetSweepEnabled toggles the stale-node sweeper loop at runtime. The
// composition root flips it on in leader.OnPromote and off in OnDemote
// so exactly one node in the cluster flips neglected rows to 'down'.
func (r *Runner) SetSweepEnabled(enabled bool) {
r.mu.Lock()
r.sweepActive = enabled
r.mu.Unlock()
}
func (r *Runner) sweepEnabled() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.sweepActive
}
// Register writes (or refreshes) the node's row in the nodes table.
//
// If the row already exists with status='drained', it is preserved — an
// operator-initiated drain must not be undone by a bounce.
func (r *Runner) Register(ctx context.Context) error {
return r.store.Nodes().Upsert(ctx, &store.Node{
ID: r.opts.NodeID,
DisplayName: r.opts.DisplayName,
Multiaddrs: r.opts.Multiaddrs,
RPCURL: r.opts.RPCURL,
Status: store.NodeStatusUp,
})
}
// Start runs the heartbeat loops until ctx is cancelled.
func (r *Runner) Start(ctx context.Context) error {
// Consumer first so we don't miss our own initial beat.
if err := r.startConsumer(ctx); err != nil {
return fmt.Errorf("node: start consumer: %w", err)
}
beatTicker := time.NewTicker(r.opts.HeartbeatInterval)
defer beatTicker.Stop()
// Sweep ticker runs unconditionally; each tick checks sweepEnabled()
// so leader promotion / demotion can toggle the behavior live.
sweepTicker := time.NewTicker(r.opts.HeartbeatInterval)
defer sweepTicker.Stop()
// Fire once immediately so the node shows up before the first tick.
r.beat(ctx)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-beatTicker.C:
r.beat(ctx)
case <-sweepTicker.C:
if r.sweepEnabled() {
r.sweepStale(ctx)
}
}
}
}
// LivePresences returns a snapshot of every node currently considered
// live (last heartbeat within DownAfter).
func (r *Runner) LivePresences() []Presence {
r.mu.Lock()
defer r.mu.Unlock()
cutoff := time.Now().Add(-r.opts.DownAfter)
out := make([]Presence, 0, len(r.presence))
for _, p := range r.presence {
if p.SentAt.After(cutoff) {
out = append(out, p)
}
}
return out
}
func (r *Runner) beat(ctx context.Context) {
presence := Presence{
NodeID: r.opts.NodeID.String(),
Multiaddrs: r.opts.Multiaddrs,
Status: store.NodeStatusUp,
SentAt: time.Now().UTC(),
}
b, err := json.Marshal(presence)
if err != nil {
slog.Warn("node: marshal presence", "err", err)
return
}
subject := fmt.Sprintf("%s.%s", HeartbeatSubjectPrefix, r.opts.NodeID.String())
if err := r.nc.Publish(subject, b); err != nil {
slog.Warn("node: publish heartbeat", "err", err, "subject", subject)
return
}
if err := r.store.Nodes().TouchHeartbeat(ctx, r.opts.NodeID); err != nil {
slog.Warn("node: touch heartbeat in store", "err", err)
}
}
func (r *Runner) startConsumer(ctx context.Context) error {
sub, err := r.nc.Subscribe(HeartbeatSubjectPrefix+".*", func(m *nats.Msg) {
var p Presence
if err := json.Unmarshal(m.Data, &p); err != nil {
slog.Warn("node: malformed heartbeat", "err", err)
return
}
id, err := ids.ParseNode(p.NodeID)
if err != nil {
// A peer sent us a garbage node_id; drop rather than cache
// under the zero NodeID key (which would collide across
// malformed senders and produce confusing logs).
slog.Warn("node: heartbeat with unparsable node id", "node_id", p.NodeID, "err", err)
return
}
r.mu.Lock()
r.presence[id] = p
r.mu.Unlock()
})
if err != nil {
return err
}
go func() {
<-ctx.Done()
_ = sub.Unsubscribe()
}()
return nil
}
func (r *Runner) sweepStale(ctx context.Context) {
downed, err := r.store.Nodes().MarkStaleDown(ctx, r.opts.DownAfter)
if err != nil {
slog.Warn("node: mark stale down", "err", err)
return
}
for _, id := range downed {
slog.Info("node: marked down", "node_id", id)
}
}