Greenfield Go multi-tenant IPFS Pinning Service wire-compatible with the
IPFS Pinning Services API spec. Paired 1:1 with Kubo over localhost RPC,
clustered via embedded NATS JetStream, Postgres source-of-truth with
RLS-enforced tenancy, Fiber + huma v2 for the HTTP surface, Authentik
OIDC for session login with kid-rotated HS256 JWT API tokens.
Feature-complete against the 22-milestone build plan, including the
ship-it v1.0 gap items:
* admin CLIs: drain/uncordon, maintenance, mint-token, rotate-key,
prune-denylist, rebalance --dry-run, cache-stats, cluster-presences
* TTL leader election via NATS KV, fence tokens, JetStream dedup
* rebalancer (plan/apply split), reconciler, requeue sweeper
* ristretto caches with NATS-backed cross-node invalidation
(placements live-nodes + token denylist)
* maintenance watchdog for stuck cluster-pause flag
* Prometheus /metrics with CIDR ACL, HTTP/pin/scheduler/cache gauges
* rate limiting: session (10/min) + anonymous global (120/min)
* integration tests: rebalance, refcount multi-org, RLS belt
* goreleaser (tar + deb/rpm/apk + Alpine Docker) targeting Gitea
Stack: Cobra/Viper, Fiber v2 + huma v2, embedded NATS JetStream,
pgx/sqlc/golang-migrate, ristretto, TypeID, prometheus/client_golang,
testcontainers-go.
195 lines
6.0 KiB
Go
195 lines
6.0 KiB
Go
// Package cache gives anchorage a small layer of bounded in-memory caches
|
||
// on top of ristretto. Every cache has a byte budget and a default TTL,
|
||
// so no matter how hot traffic gets the RSS cost is known in advance.
|
||
//
|
||
// Values are cached per-node. Cross-node consistency is handled by the
|
||
// invalidation bus (see invalidate.go) which publishes a NATS message
|
||
// whenever a writer mutates the underlying record; subscribers on other
|
||
// nodes drop their cached entry and the next read re-fetches from Postgres.
|
||
package cache
|
||
|
||
import (
|
||
"errors"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/dgraph-io/ristretto/v2"
|
||
)
|
||
|
||
// Options configures a single cache.
|
||
type Options struct {
|
||
// Name is used in log messages and metric labels; should be short and
|
||
// lowercase (e.g., "org/byid", "pin/byid").
|
||
Name string
|
||
// MaxCost is the byte budget. Once the internal cost estimate exceeds
|
||
// this, ristretto evicts entries using admission-controlled TinyLFU.
|
||
MaxCost int64
|
||
// NumCounters is ristretto's frequency-estimator size. ~10x the expected
|
||
// entry count is a good default.
|
||
NumCounters int64
|
||
// DefaultTTL is applied when callers set a zero TTL. Zero here means
|
||
// "no default TTL" — entries live until evicted.
|
||
DefaultTTL time.Duration
|
||
}
|
||
|
||
// Cache is a typed, bounded in-memory cache.
|
||
//
|
||
// Keys must satisfy ristretto's Key constraint (string, int, byte slice,
|
||
// etc.). anchorage conventionally uses strings for keys — typeid strings,
|
||
// compound keys like "<orgID>:<pinID>".
|
||
type Cache[K ristretto.Key, V any] struct {
|
||
opts Options
|
||
r *ristretto.Cache[K, V]
|
||
}
|
||
|
||
// New returns a Cache backed by ristretto.
|
||
//
|
||
// MaxCost must be > 0; NumCounters defaults to 10×expected-entries if zero
|
||
// (falls back to a safe minimum otherwise).
|
||
func New[K ristretto.Key, V any](opts Options) (*Cache[K, V], error) {
|
||
if opts.MaxCost <= 0 {
|
||
return nil, errors.New("cache: MaxCost must be > 0")
|
||
}
|
||
if opts.NumCounters == 0 {
|
||
// ~10× a rough estimate of entry count. Ristretto docs recommend
|
||
// 10× expected-items; we derive a safe minimum from MaxCost.
|
||
opts.NumCounters = opts.MaxCost / 64
|
||
if opts.NumCounters < 1000 {
|
||
opts.NumCounters = 1000
|
||
}
|
||
}
|
||
|
||
r, err := ristretto.NewCache(&ristretto.Config[K, V]{
|
||
MaxCost: opts.MaxCost,
|
||
NumCounters: opts.NumCounters,
|
||
BufferItems: 64,
|
||
// Metrics has a small per-op cost; worth paying for the
|
||
// /v1/admin/cache-stats endpoint + any future /metrics wiring.
|
||
Metrics: true,
|
||
})
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
c := &Cache[K, V]{opts: opts, r: r}
|
||
Register(c)
|
||
return c, nil
|
||
}
|
||
|
||
// Stats is a point-in-time snapshot of a cache's counters. Emitted by
|
||
// /v1/admin/cache-stats. Fields are monotonic counters or instantaneous
|
||
// gauges — differences between successive snapshots give rates.
|
||
type Stats struct {
|
||
Name string `json:"name"`
|
||
Hits uint64 `json:"hits"`
|
||
Misses uint64 `json:"misses"`
|
||
KeysAdded uint64 `json:"keys_added"`
|
||
KeysEvicted uint64 `json:"keys_evicted"`
|
||
CostAdded uint64 `json:"cost_added"`
|
||
CostEvicted uint64 `json:"cost_evicted"`
|
||
}
|
||
|
||
// Stats returns a snapshot of this cache's current counters.
|
||
func (c *Cache[K, V]) Stats() Stats {
|
||
m := c.r.Metrics
|
||
if m == nil {
|
||
return Stats{Name: c.opts.Name}
|
||
}
|
||
return Stats{
|
||
Name: c.opts.Name,
|
||
Hits: m.Hits(),
|
||
Misses: m.Misses(),
|
||
KeysAdded: m.KeysAdded(),
|
||
KeysEvicted: m.KeysEvicted(),
|
||
CostAdded: m.CostAdded(),
|
||
CostEvicted: m.CostEvicted(),
|
||
}
|
||
}
|
||
|
||
// StatsProvider is implemented by anything that can expose a Stats
|
||
// snapshot — both *Cache[K,V] and package-external caches (e.g.,
|
||
// pin.CachedLiveNodes) which keep their own counters.
|
||
type StatsProvider interface {
|
||
Stats() Stats
|
||
}
|
||
|
||
// Global stats registry. Every cache.New() call registers itself;
|
||
// external providers (pin.CachedLiveNodes) register explicitly via
|
||
// cache.Register(provider). The admin cache-stats endpoint reads
|
||
// AllStats() to produce its response.
|
||
var registry struct {
|
||
mu sync.RWMutex
|
||
providers []StatsProvider
|
||
}
|
||
|
||
// Register adds a StatsProvider to the global registry. Safe to call
|
||
// from multiple goroutines; idempotent-ish (dedup is caller's concern —
|
||
// most caches are constructed once during app startup).
|
||
func Register(p StatsProvider) {
|
||
registry.mu.Lock()
|
||
registry.providers = append(registry.providers, p)
|
||
registry.mu.Unlock()
|
||
}
|
||
|
||
// AllStats returns snapshots for every registered cache. Order is
|
||
// registration order; callers that care about stable ordering should
|
||
// sort by Stats.Name.
|
||
func AllStats() []Stats {
|
||
registry.mu.RLock()
|
||
defer registry.mu.RUnlock()
|
||
out := make([]Stats, 0, len(registry.providers))
|
||
for _, p := range registry.providers {
|
||
out = append(out, p.Stats())
|
||
}
|
||
return out
|
||
}
|
||
|
||
// ResetRegistry clears the global registry. Tests call this between
|
||
// runs; not for production use.
|
||
func ResetRegistry() {
|
||
registry.mu.Lock()
|
||
registry.providers = nil
|
||
registry.mu.Unlock()
|
||
}
|
||
|
||
// Get returns the cached value and whether it was present.
|
||
func (c *Cache[K, V]) Get(k K) (V, bool) {
|
||
return c.r.Get(k)
|
||
}
|
||
|
||
// Set stores v with a cost estimate and TTL. ttl == 0 falls back to
|
||
// opts.DefaultTTL; cost == 0 uses the length of the key's string form
|
||
// as a (very rough) estimate.
|
||
//
|
||
// Ristretto's Set is asynchronous: the value may not be queryable
|
||
// immediately. Call Wait() in tests that need strict read-your-writes.
|
||
func (c *Cache[K, V]) Set(k K, v V, cost int64, ttl time.Duration) bool {
|
||
if ttl == 0 {
|
||
ttl = c.opts.DefaultTTL
|
||
}
|
||
if cost <= 0 {
|
||
cost = 1
|
||
}
|
||
return c.r.SetWithTTL(k, v, cost, ttl)
|
||
}
|
||
|
||
// Delete removes k from the cache if present.
|
||
func (c *Cache[K, V]) Delete(k K) {
|
||
c.r.Del(k)
|
||
}
|
||
|
||
// Wait blocks until pending async Sets are visible to Get. Intended for
|
||
// tests and for the writer-inline update pattern (see invalidate.go).
|
||
func (c *Cache[K, V]) Wait() {
|
||
c.r.Wait()
|
||
}
|
||
|
||
// Close releases ristretto's background goroutines. Only called at
|
||
// process shutdown.
|
||
func (c *Cache[K, V]) Close() {
|
||
c.r.Close()
|
||
}
|
||
|
||
// Name returns the cache's configured name. Used by the invalidation
|
||
// bus to form subject names.
|
||
func (c *Cache[K, V]) Name() string { return c.opts.Name }
|