// Package cache gives anchorage a small layer of bounded in-memory caches // on top of ristretto. Every cache has a byte budget and a default TTL, // so no matter how hot traffic gets the RSS cost is known in advance. // // Values are cached per-node. Cross-node consistency is handled by the // invalidation bus (see invalidate.go) which publishes a NATS message // whenever a writer mutates the underlying record; subscribers on other // nodes drop their cached entry and the next read re-fetches from Postgres. package cache import ( "errors" "sync" "time" "github.com/dgraph-io/ristretto/v2" ) // Options configures a single cache. type Options struct { // Name is used in log messages and metric labels; should be short and // lowercase (e.g., "org/byid", "pin/byid"). Name string // MaxCost is the byte budget. Once the internal cost estimate exceeds // this, ristretto evicts entries using admission-controlled TinyLFU. MaxCost int64 // NumCounters is ristretto's frequency-estimator size. ~10x the expected // entry count is a good default. NumCounters int64 // DefaultTTL is applied when callers set a zero TTL. Zero here means // "no default TTL" — entries live until evicted. DefaultTTL time.Duration } // Cache is a typed, bounded in-memory cache. // // Keys must satisfy ristretto's Key constraint (string, int, byte slice, // etc.). anchorage conventionally uses strings for keys — typeid strings, // compound keys like ":". type Cache[K ristretto.Key, V any] struct { opts Options r *ristretto.Cache[K, V] } // New returns a Cache backed by ristretto. // // MaxCost must be > 0; NumCounters defaults to 10×expected-entries if zero // (falls back to a safe minimum otherwise). func New[K ristretto.Key, V any](opts Options) (*Cache[K, V], error) { if opts.MaxCost <= 0 { return nil, errors.New("cache: MaxCost must be > 0") } if opts.NumCounters == 0 { // ~10× a rough estimate of entry count. Ristretto docs recommend // 10× expected-items; we derive a safe minimum from MaxCost. opts.NumCounters = opts.MaxCost / 64 if opts.NumCounters < 1000 { opts.NumCounters = 1000 } } r, err := ristretto.NewCache(&ristretto.Config[K, V]{ MaxCost: opts.MaxCost, NumCounters: opts.NumCounters, BufferItems: 64, // Metrics has a small per-op cost; worth paying for the // /v1/admin/cache-stats endpoint + any future /metrics wiring. Metrics: true, }) if err != nil { return nil, err } c := &Cache[K, V]{opts: opts, r: r} Register(c) return c, nil } // Stats is a point-in-time snapshot of a cache's counters. Emitted by // /v1/admin/cache-stats. Fields are monotonic counters or instantaneous // gauges — differences between successive snapshots give rates. type Stats struct { Name string `json:"name"` Hits uint64 `json:"hits"` Misses uint64 `json:"misses"` KeysAdded uint64 `json:"keys_added"` KeysEvicted uint64 `json:"keys_evicted"` CostAdded uint64 `json:"cost_added"` CostEvicted uint64 `json:"cost_evicted"` } // Stats returns a snapshot of this cache's current counters. func (c *Cache[K, V]) Stats() Stats { m := c.r.Metrics if m == nil { return Stats{Name: c.opts.Name} } return Stats{ Name: c.opts.Name, Hits: m.Hits(), Misses: m.Misses(), KeysAdded: m.KeysAdded(), KeysEvicted: m.KeysEvicted(), CostAdded: m.CostAdded(), CostEvicted: m.CostEvicted(), } } // StatsProvider is implemented by anything that can expose a Stats // snapshot — both *Cache[K,V] and package-external caches (e.g., // pin.CachedLiveNodes) which keep their own counters. type StatsProvider interface { Stats() Stats } // Global stats registry. Every cache.New() call registers itself; // external providers (pin.CachedLiveNodes) register explicitly via // cache.Register(provider). The admin cache-stats endpoint reads // AllStats() to produce its response. var registry struct { mu sync.RWMutex providers []StatsProvider } // Register adds a StatsProvider to the global registry. Safe to call // from multiple goroutines; idempotent-ish (dedup is caller's concern — // most caches are constructed once during app startup). func Register(p StatsProvider) { registry.mu.Lock() registry.providers = append(registry.providers, p) registry.mu.Unlock() } // AllStats returns snapshots for every registered cache. Order is // registration order; callers that care about stable ordering should // sort by Stats.Name. func AllStats() []Stats { registry.mu.RLock() defer registry.mu.RUnlock() out := make([]Stats, 0, len(registry.providers)) for _, p := range registry.providers { out = append(out, p.Stats()) } return out } // ResetRegistry clears the global registry. Tests call this between // runs; not for production use. func ResetRegistry() { registry.mu.Lock() registry.providers = nil registry.mu.Unlock() } // Get returns the cached value and whether it was present. func (c *Cache[K, V]) Get(k K) (V, bool) { return c.r.Get(k) } // Set stores v with a cost estimate and TTL. ttl == 0 falls back to // opts.DefaultTTL; cost == 0 uses the length of the key's string form // as a (very rough) estimate. // // Ristretto's Set is asynchronous: the value may not be queryable // immediately. Call Wait() in tests that need strict read-your-writes. func (c *Cache[K, V]) Set(k K, v V, cost int64, ttl time.Duration) bool { if ttl == 0 { ttl = c.opts.DefaultTTL } if cost <= 0 { cost = 1 } return c.r.SetWithTTL(k, v, cost, ttl) } // Delete removes k from the cache if present. func (c *Cache[K, V]) Delete(k K) { c.r.Del(k) } // Wait blocks until pending async Sets are visible to Get. Intended for // tests and for the writer-inline update pattern (see invalidate.go). func (c *Cache[K, V]) Wait() { c.r.Wait() } // Close releases ristretto's background goroutines. Only called at // process shutdown. func (c *Cache[K, V]) Close() { c.r.Close() } // Name returns the cache's configured name. Used by the invalidation // bus to form subject names. func (c *Cache[K, V]) Name() string { return c.opts.Name }