package cache import ( "context" "fmt" "log/slog" "github.com/nats-io/nats.go" ) // Subject prefix for cache invalidation events. The full subject is // // cache.invalidate.. // // where is "org", "node", "token", etc. Writers publish to the // specific subject after a successful Postgres commit; every node's // subscriber wildcards the prefix and drops matching cached entries. const InvalidateSubjectPrefix = "cache.invalidate" // Invalidator publishes cache-invalidation events across the cluster. // // Each writer holds one Invalidator shared across all caches; after a // successful mutation it calls Emit("org", orgID) and every node sees // the message within a NATS RTT (typically <1ms in-cluster). type Invalidator struct { nc *nats.Conn } // NewInvalidator wraps a connected nats.Conn. func NewInvalidator(nc *nats.Conn) *Invalidator { return &Invalidator{nc: nc} } // Emit publishes cache.invalidate... The payload is empty — // subscribers only need the subject to know which key to drop. // // Errors are returned rather than swallowed so the caller can decide // whether to retry or log. For most write paths a best-effort log-only // handling is fine because the cache entry will TTL out regardless. func (i *Invalidator) Emit(entity, id string) error { if i == nil || i.nc == nil { return nil // no-op in tests / standalone mode } subject := fmt.Sprintf("%s.%s.%s", InvalidateSubjectPrefix, entity, id) return i.nc.Publish(subject, nil) } // Subscriber wires a NATS subscription to a cache's Delete method. // // The returned subscription must be drained / unsubscribed at shutdown. type Subscriber interface { Unsubscribe() error } // WatchEntity starts a subscription that calls onInvalidate(id) for every // message arriving on cache.invalidate..*. // // Intended to be called once per cache per node during startup. func WatchEntity(ctx context.Context, nc *nats.Conn, entity string, onInvalidate func(id string)) (Subscriber, error) { subject := fmt.Sprintf("%s.%s.*", InvalidateSubjectPrefix, entity) sub, err := nc.Subscribe(subject, func(m *nats.Msg) { // Subject is "cache.invalidate.." — the ID is the // trailing token. tokens := splitSubject(m.Subject) if len(tokens) < 4 { slog.Warn("invalidate: malformed subject", "subject", m.Subject) return } onInvalidate(tokens[len(tokens)-1]) }) if err != nil { return nil, fmt.Errorf("subscribe %s: %w", subject, err) } // Tie unsubscription to ctx cancellation so the caller doesn't have // to plumb sub.Unsubscribe() manually if they're already managing // lifecycles via context. go func() { <-ctx.Done() _ = sub.Unsubscribe() }() return sub, nil } func splitSubject(s string) []string { out := make([]string, 0, 4) start := 0 for i := 0; i < len(s); i++ { if s[i] == '.' { out = append(out, s[start:i]) start = i + 1 } } out = append(out, s[start:]) return out }