Files
anchorage/internal/pkg/cache/cache.go
William Gill 12bf35caf8 anchorage v1.0 initial tree
Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.

Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:

  * admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
    prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
  * TTL leader election via NATS KV, fence tokens, JetStream dedup
  * rebalancer (plan/apply split), reconciler, requeue sweeper
  * ristretto caches with NATS-backed cross-node invalidation
    (placements live-nodes + token denylist)
  * maintenance watchdog for stuck cluster-pause flag
  * Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
  * rate limiting: session (10/min) + anonymous global (120/min)
  * integration tests: rebalance, refcount multi-org, RLS belt
  * goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea

Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
2026-04-16 18:13:36 -05:00

195 lines
6.0 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package cache gives anchorage a small layer of bounded in-memory caches
// on top of ristretto. Every cache has a byte budget and a default TTL,
// so no matter how hot traffic gets the RSS cost is known in advance.
//
// Values are cached per-node. Cross-node consistency is handled by the
// invalidation bus (see invalidate.go) which publishes a NATS message
// whenever a writer mutates the underlying record; subscribers on other
// nodes drop their cached entry and the next read re-fetches from Postgres.
package cache
import (
"errors"
"sync"
"time"
"github.com/dgraph-io/ristretto/v2"
)
// Options configures a single cache.
type Options struct {
// Name is used in log messages and metric labels; should be short and
// lowercase (e.g., "org/byid", "pin/byid").
Name string
// MaxCost is the byte budget. Once the internal cost estimate exceeds
// this, ristretto evicts entries using admission-controlled TinyLFU.
MaxCost int64
// NumCounters is ristretto's frequency-estimator size. ~10x the expected
// entry count is a good default.
NumCounters int64
// DefaultTTL is applied when callers set a zero TTL. Zero here means
// "no default TTL" — entries live until evicted.
DefaultTTL time.Duration
}
// Cache is a typed, bounded in-memory cache.
//
// Keys must satisfy ristretto's Key constraint (string, int, byte slice,
// etc.). anchorage conventionally uses strings for keys — typeid strings,
// compound keys like "<orgID>:<pinID>".
type Cache[K ristretto.Key, V any] struct {
opts Options
r *ristretto.Cache[K, V]
}
// New returns a Cache backed by ristretto.
//
// MaxCost must be > 0; NumCounters defaults to 10×expected-entries if zero
// (falls back to a safe minimum otherwise).
func New[K ristretto.Key, V any](opts Options) (*Cache[K, V], error) {
if opts.MaxCost <= 0 {
return nil, errors.New("cache: MaxCost must be > 0")
}
if opts.NumCounters == 0 {
// ~10× a rough estimate of entry count. Ristretto docs recommend
// 10× expected-items; we derive a safe minimum from MaxCost.
opts.NumCounters = opts.MaxCost / 64
if opts.NumCounters < 1000 {
opts.NumCounters = 1000
}
}
r, err := ristretto.NewCache(&ristretto.Config[K, V]{
MaxCost: opts.MaxCost,
NumCounters: opts.NumCounters,
BufferItems: 64,
// Metrics has a small per-op cost; worth paying for the
// /v1/admin/cache-stats endpoint + any future /metrics wiring.
Metrics: true,
})
if err != nil {
return nil, err
}
c := &Cache[K, V]{opts: opts, r: r}
Register(c)
return c, nil
}
// Stats is a point-in-time snapshot of a cache's counters. Emitted by
// /v1/admin/cache-stats. Fields are monotonic counters or instantaneous
// gauges — differences between successive snapshots give rates.
type Stats struct {
Name string `json:"name"`
Hits uint64 `json:"hits"`
Misses uint64 `json:"misses"`
KeysAdded uint64 `json:"keys_added"`
KeysEvicted uint64 `json:"keys_evicted"`
CostAdded uint64 `json:"cost_added"`
CostEvicted uint64 `json:"cost_evicted"`
}
// Stats returns a snapshot of this cache's current counters.
func (c *Cache[K, V]) Stats() Stats {
m := c.r.Metrics
if m == nil {
return Stats{Name: c.opts.Name}
}
return Stats{
Name: c.opts.Name,
Hits: m.Hits(),
Misses: m.Misses(),
KeysAdded: m.KeysAdded(),
KeysEvicted: m.KeysEvicted(),
CostAdded: m.CostAdded(),
CostEvicted: m.CostEvicted(),
}
}
// StatsProvider is implemented by anything that can expose a Stats
// snapshot — both *Cache[K,V] and package-external caches (e.g.,
// pin.CachedLiveNodes) which keep their own counters.
type StatsProvider interface {
Stats() Stats
}
// Global stats registry. Every cache.New() call registers itself;
// external providers (pin.CachedLiveNodes) register explicitly via
// cache.Register(provider). The admin cache-stats endpoint reads
// AllStats() to produce its response.
var registry struct {
mu sync.RWMutex
providers []StatsProvider
}
// Register adds a StatsProvider to the global registry. Safe to call
// from multiple goroutines; idempotent-ish (dedup is caller's concern —
// most caches are constructed once during app startup).
func Register(p StatsProvider) {
registry.mu.Lock()
registry.providers = append(registry.providers, p)
registry.mu.Unlock()
}
// AllStats returns snapshots for every registered cache. Order is
// registration order; callers that care about stable ordering should
// sort by Stats.Name.
func AllStats() []Stats {
registry.mu.RLock()
defer registry.mu.RUnlock()
out := make([]Stats, 0, len(registry.providers))
for _, p := range registry.providers {
out = append(out, p.Stats())
}
return out
}
// ResetRegistry clears the global registry. Tests call this between
// runs; not for production use.
func ResetRegistry() {
registry.mu.Lock()
registry.providers = nil
registry.mu.Unlock()
}
// Get returns the cached value and whether it was present.
func (c *Cache[K, V]) Get(k K) (V, bool) {
return c.r.Get(k)
}
// Set stores v with a cost estimate and TTL. ttl == 0 falls back to
// opts.DefaultTTL; cost == 0 uses the length of the key's string form
// as a (very rough) estimate.
//
// Ristretto's Set is asynchronous: the value may not be queryable
// immediately. Call Wait() in tests that need strict read-your-writes.
func (c *Cache[K, V]) Set(k K, v V, cost int64, ttl time.Duration) bool {
if ttl == 0 {
ttl = c.opts.DefaultTTL
}
if cost <= 0 {
cost = 1
}
return c.r.SetWithTTL(k, v, cost, ttl)
}
// Delete removes k from the cache if present.
func (c *Cache[K, V]) Delete(k K) {
c.r.Del(k)
}
// Wait blocks until pending async Sets are visible to Get. Intended for
// tests and for the writer-inline update pattern (see invalidate.go).
func (c *Cache[K, V]) Wait() {
c.r.Wait()
}
// Close releases ristretto's background goroutines. Only called at
// process shutdown.
func (c *Cache[K, V]) Close() {
c.r.Close()
}
// Name returns the cache's configured name. Used by the invalidation
// bus to form subject names.
func (c *Cache[K, V]) Name() string { return c.opts.Name }