// Package node owns an anchorage instance's identity and heartbeat loop. // // On boot, Register creates or refreshes the node's row in the nodes table. // Start then kicks off two loops: // // - Heartbeat publisher: pushes node.heartbeat. every HeartbeatInterval. // The payload is a tiny Presence struct so listeners can update the // in-memory live-node cache without a Postgres read. // - Heartbeat consumer + stale sweeper: subscribes to node.heartbeat.* // to keep the live-node cache fresh, and on the leader node runs a // periodic MarkStaleDown to flip neglected rows to status='down'. package node import ( "context" "encoding/json" "fmt" "log/slog" "sync" "time" "github.com/nats-io/nats.go" "anchorage/internal/pkg/ids" "anchorage/internal/pkg/store" ) // HeartbeatSubjectPrefix is the per-node presence topic. Listeners use // node.heartbeat.* to catch every node's beat. const HeartbeatSubjectPrefix = "node.heartbeat" // Presence is the body of a heartbeat message. Kept small so the // per-5s message traffic is negligible. type Presence struct { NodeID string `json:"node_id"` Multiaddrs []string `json:"multiaddrs"` Status string `json:"status"` SentAt time.Time `json:"sent_at"` } // Options configures a Runner. type Options struct { NodeID ids.NodeID DisplayName string Multiaddrs []string RPCURL string HeartbeatInterval time.Duration DownAfter time.Duration // MarkStaleDownEnabled must be true on the elected leader so exactly // one node flips stale rows to down. Followers call Start with it // false and the leader package toggles it on promotion. MarkStaleDownEnabled bool } // Runner owns the heartbeat publisher and consumer. type Runner struct { opts Options nc *nats.Conn store store.Store mu sync.Mutex presence map[ids.NodeID]Presence // latest seen per node sweepActive bool // toggled by SetSweepEnabled (leader gate) } // NewRunner constructs a runner. Register/Start must be called to wire it up. func NewRunner(nc *nats.Conn, s store.Store, opts Options) (*Runner, error) { if opts.HeartbeatInterval <= 0 { return nil, fmt.Errorf("node: HeartbeatInterval must be > 0") } if opts.DownAfter <= opts.HeartbeatInterval { return nil, fmt.Errorf("node: DownAfter must exceed HeartbeatInterval") } return &Runner{ opts: opts, nc: nc, store: s, presence: map[ids.NodeID]Presence{}, sweepActive: opts.MarkStaleDownEnabled, }, nil } // SetSweepEnabled toggles the stale-node sweeper loop at runtime. The // composition root flips it on in leader.OnPromote and off in OnDemote // so exactly one node in the cluster flips neglected rows to 'down'. func (r *Runner) SetSweepEnabled(enabled bool) { r.mu.Lock() r.sweepActive = enabled r.mu.Unlock() } func (r *Runner) sweepEnabled() bool { r.mu.Lock() defer r.mu.Unlock() return r.sweepActive } // Register writes (or refreshes) the node's row in the nodes table. // // If the row already exists with status='drained', it is preserved — an // operator-initiated drain must not be undone by a bounce. func (r *Runner) Register(ctx context.Context) error { return r.store.Nodes().Upsert(ctx, &store.Node{ ID: r.opts.NodeID, DisplayName: r.opts.DisplayName, Multiaddrs: r.opts.Multiaddrs, RPCURL: r.opts.RPCURL, Status: store.NodeStatusUp, }) } // Start runs the heartbeat loops until ctx is cancelled. func (r *Runner) Start(ctx context.Context) error { // Consumer first so we don't miss our own initial beat. if err := r.startConsumer(ctx); err != nil { return fmt.Errorf("node: start consumer: %w", err) } beatTicker := time.NewTicker(r.opts.HeartbeatInterval) defer beatTicker.Stop() // Sweep ticker runs unconditionally; each tick checks sweepEnabled() // so leader promotion / demotion can toggle the behavior live. sweepTicker := time.NewTicker(r.opts.HeartbeatInterval) defer sweepTicker.Stop() // Fire once immediately so the node shows up before the first tick. r.beat(ctx) for { select { case <-ctx.Done(): return ctx.Err() case <-beatTicker.C: r.beat(ctx) case <-sweepTicker.C: if r.sweepEnabled() { r.sweepStale(ctx) } } } } // LivePresences returns a snapshot of every node currently considered // live (last heartbeat within DownAfter). func (r *Runner) LivePresences() []Presence { r.mu.Lock() defer r.mu.Unlock() cutoff := time.Now().Add(-r.opts.DownAfter) out := make([]Presence, 0, len(r.presence)) for _, p := range r.presence { if p.SentAt.After(cutoff) { out = append(out, p) } } return out } func (r *Runner) beat(ctx context.Context) { presence := Presence{ NodeID: r.opts.NodeID.String(), Multiaddrs: r.opts.Multiaddrs, Status: store.NodeStatusUp, SentAt: time.Now().UTC(), } b, err := json.Marshal(presence) if err != nil { slog.Warn("node: marshal presence", "err", err) return } subject := fmt.Sprintf("%s.%s", HeartbeatSubjectPrefix, r.opts.NodeID.String()) if err := r.nc.Publish(subject, b); err != nil { slog.Warn("node: publish heartbeat", "err", err, "subject", subject) return } if err := r.store.Nodes().TouchHeartbeat(ctx, r.opts.NodeID); err != nil { slog.Warn("node: touch heartbeat in store", "err", err) } } func (r *Runner) startConsumer(ctx context.Context) error { sub, err := r.nc.Subscribe(HeartbeatSubjectPrefix+".*", func(m *nats.Msg) { var p Presence if err := json.Unmarshal(m.Data, &p); err != nil { slog.Warn("node: malformed heartbeat", "err", err) return } id, err := ids.ParseNode(p.NodeID) if err != nil { // A peer sent us a garbage node_id; drop rather than cache // under the zero NodeID key (which would collide across // malformed senders and produce confusing logs). slog.Warn("node: heartbeat with unparsable node id", "node_id", p.NodeID, "err", err) return } r.mu.Lock() r.presence[id] = p r.mu.Unlock() }) if err != nil { return err } go func() { <-ctx.Done() _ = sub.Unsubscribe() }() return nil } func (r *Runner) sweepStale(ctx context.Context) { downed, err := r.store.Nodes().MarkStaleDown(ctx, r.opts.DownAfter) if err != nil { slog.Warn("node: mark stale down", "err", err) return } for _, id := range downed { slog.Info("node: marked down", "node_id", id) } }