Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.
Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:
* admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
* TTL leader election via NATS KV, fence tokens, JetStream dedup
* rebalancer (plan/apply split), reconciler, requeue sweeper
* ristretto caches with NATS-backed cross-node invalidation
(placements live-nodes + token denylist)
* maintenance watchdog for stuck cluster-pause flag
* Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
* rate limiting: session (10/min) + anonymous global (120/min)
* integration tests: rebalance, refcount multi-org, RLS belt
* goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea
Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
219 lines
6.3 KiB
Go
219 lines
6.3 KiB
Go
// Package node owns an anchorage instance's identity and heartbeat loop.
|
|
//
|
|
// On boot, Register creates or refreshes the node's row in the nodes table.
|
|
// Start then kicks off two loops:
|
|
//
|
|
// - Heartbeat publisher: pushes node.heartbeat.<id> every HeartbeatInterval.
|
|
// The payload is a tiny Presence struct so listeners can update the
|
|
// in-memory live-node cache without a Postgres read.
|
|
// - Heartbeat consumer + stale sweeper: subscribes to node.heartbeat.*
|
|
// to keep the live-node cache fresh, and on the leader node runs a
|
|
// periodic MarkStaleDown to flip neglected rows to status='down'.
|
|
package node
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
|
|
"anchorage/internal/pkg/ids"
|
|
"anchorage/internal/pkg/store"
|
|
)
|
|
|
|
// HeartbeatSubjectPrefix is the per-node presence topic. Listeners use
|
|
// node.heartbeat.* to catch every node's beat.
|
|
const HeartbeatSubjectPrefix = "node.heartbeat"
|
|
|
|
// Presence is the body of a heartbeat message. Kept small so the
|
|
// per-5s message traffic is negligible.
|
|
type Presence struct {
|
|
NodeID string `json:"node_id"`
|
|
Multiaddrs []string `json:"multiaddrs"`
|
|
Status string `json:"status"`
|
|
SentAt time.Time `json:"sent_at"`
|
|
}
|
|
|
|
// Options configures a Runner.
|
|
type Options struct {
|
|
NodeID ids.NodeID
|
|
DisplayName string
|
|
Multiaddrs []string
|
|
RPCURL string
|
|
HeartbeatInterval time.Duration
|
|
DownAfter time.Duration
|
|
// MarkStaleDownEnabled must be true on the elected leader so exactly
|
|
// one node flips stale rows to down. Followers call Start with it
|
|
// false and the leader package toggles it on promotion.
|
|
MarkStaleDownEnabled bool
|
|
}
|
|
|
|
// Runner owns the heartbeat publisher and consumer.
|
|
type Runner struct {
|
|
opts Options
|
|
nc *nats.Conn
|
|
store store.Store
|
|
|
|
mu sync.Mutex
|
|
presence map[ids.NodeID]Presence // latest seen per node
|
|
sweepActive bool // toggled by SetSweepEnabled (leader gate)
|
|
}
|
|
|
|
// NewRunner constructs a runner. Register/Start must be called to wire it up.
|
|
func NewRunner(nc *nats.Conn, s store.Store, opts Options) (*Runner, error) {
|
|
if opts.HeartbeatInterval <= 0 {
|
|
return nil, fmt.Errorf("node: HeartbeatInterval must be > 0")
|
|
}
|
|
if opts.DownAfter <= opts.HeartbeatInterval {
|
|
return nil, fmt.Errorf("node: DownAfter must exceed HeartbeatInterval")
|
|
}
|
|
return &Runner{
|
|
opts: opts,
|
|
nc: nc,
|
|
store: s,
|
|
presence: map[ids.NodeID]Presence{},
|
|
sweepActive: opts.MarkStaleDownEnabled,
|
|
}, nil
|
|
}
|
|
|
|
// SetSweepEnabled toggles the stale-node sweeper loop at runtime. The
|
|
// composition root flips it on in leader.OnPromote and off in OnDemote
|
|
// so exactly one node in the cluster flips neglected rows to 'down'.
|
|
func (r *Runner) SetSweepEnabled(enabled bool) {
|
|
r.mu.Lock()
|
|
r.sweepActive = enabled
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
func (r *Runner) sweepEnabled() bool {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
return r.sweepActive
|
|
}
|
|
|
|
// Register writes (or refreshes) the node's row in the nodes table.
|
|
//
|
|
// If the row already exists with status='drained', it is preserved — an
|
|
// operator-initiated drain must not be undone by a bounce.
|
|
func (r *Runner) Register(ctx context.Context) error {
|
|
return r.store.Nodes().Upsert(ctx, &store.Node{
|
|
ID: r.opts.NodeID,
|
|
DisplayName: r.opts.DisplayName,
|
|
Multiaddrs: r.opts.Multiaddrs,
|
|
RPCURL: r.opts.RPCURL,
|
|
Status: store.NodeStatusUp,
|
|
})
|
|
}
|
|
|
|
// Start runs the heartbeat loops until ctx is cancelled.
|
|
func (r *Runner) Start(ctx context.Context) error {
|
|
// Consumer first so we don't miss our own initial beat.
|
|
if err := r.startConsumer(ctx); err != nil {
|
|
return fmt.Errorf("node: start consumer: %w", err)
|
|
}
|
|
|
|
beatTicker := time.NewTicker(r.opts.HeartbeatInterval)
|
|
defer beatTicker.Stop()
|
|
// Sweep ticker runs unconditionally; each tick checks sweepEnabled()
|
|
// so leader promotion / demotion can toggle the behavior live.
|
|
sweepTicker := time.NewTicker(r.opts.HeartbeatInterval)
|
|
defer sweepTicker.Stop()
|
|
|
|
// Fire once immediately so the node shows up before the first tick.
|
|
r.beat(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-beatTicker.C:
|
|
r.beat(ctx)
|
|
case <-sweepTicker.C:
|
|
if r.sweepEnabled() {
|
|
r.sweepStale(ctx)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// LivePresences returns a snapshot of every node currently considered
|
|
// live (last heartbeat within DownAfter).
|
|
func (r *Runner) LivePresences() []Presence {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
cutoff := time.Now().Add(-r.opts.DownAfter)
|
|
out := make([]Presence, 0, len(r.presence))
|
|
for _, p := range r.presence {
|
|
if p.SentAt.After(cutoff) {
|
|
out = append(out, p)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (r *Runner) beat(ctx context.Context) {
|
|
presence := Presence{
|
|
NodeID: r.opts.NodeID.String(),
|
|
Multiaddrs: r.opts.Multiaddrs,
|
|
Status: store.NodeStatusUp,
|
|
SentAt: time.Now().UTC(),
|
|
}
|
|
b, err := json.Marshal(presence)
|
|
if err != nil {
|
|
slog.Warn("node: marshal presence", "err", err)
|
|
return
|
|
}
|
|
subject := fmt.Sprintf("%s.%s", HeartbeatSubjectPrefix, r.opts.NodeID.String())
|
|
if err := r.nc.Publish(subject, b); err != nil {
|
|
slog.Warn("node: publish heartbeat", "err", err, "subject", subject)
|
|
return
|
|
}
|
|
if err := r.store.Nodes().TouchHeartbeat(ctx, r.opts.NodeID); err != nil {
|
|
slog.Warn("node: touch heartbeat in store", "err", err)
|
|
}
|
|
}
|
|
|
|
func (r *Runner) startConsumer(ctx context.Context) error {
|
|
sub, err := r.nc.Subscribe(HeartbeatSubjectPrefix+".*", func(m *nats.Msg) {
|
|
var p Presence
|
|
if err := json.Unmarshal(m.Data, &p); err != nil {
|
|
slog.Warn("node: malformed heartbeat", "err", err)
|
|
return
|
|
}
|
|
id, err := ids.ParseNode(p.NodeID)
|
|
if err != nil {
|
|
// A peer sent us a garbage node_id; drop rather than cache
|
|
// under the zero NodeID key (which would collide across
|
|
// malformed senders and produce confusing logs).
|
|
slog.Warn("node: heartbeat with unparsable node id", "node_id", p.NodeID, "err", err)
|
|
return
|
|
}
|
|
r.mu.Lock()
|
|
r.presence[id] = p
|
|
r.mu.Unlock()
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go func() {
|
|
<-ctx.Done()
|
|
_ = sub.Unsubscribe()
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (r *Runner) sweepStale(ctx context.Context) {
|
|
downed, err := r.store.Nodes().MarkStaleDown(ctx, r.opts.DownAfter)
|
|
if err != nil {
|
|
slog.Warn("node: mark stale down", "err", err)
|
|
return
|
|
}
|
|
for _, id := range downed {
|
|
slog.Info("node: marked down", "node_id", id)
|
|
}
|
|
}
|