From 2c0e6cde6c0030e54d35eca77f7fd4b8ad1000da Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Sun, 6 Feb 2022 11:11:43 -0800 Subject: [PATCH] rpc: implement the eventlog defined by ADR 075 Implement the basic cursor and eventlog types described in ADR 075. Handle encoding and decoding as strings for compatibility with JSON. - Add unit tests for the required order and synchronization properties. - Add hooks for metrics, with one value to be expanded later. --- internal/eventlog/cursor/cursor.go | 100 +++++++++++ internal/eventlog/cursor/cursor_test.go | 141 +++++++++++++++ internal/eventlog/eventlog.go | 217 +++++++++++++++++++++++ internal/eventlog/eventlog_test.go | 220 ++++++++++++++++++++++++ internal/eventlog/item.go | 78 +++++++++ internal/eventlog/metrics.go | 39 +++++ internal/eventlog/prune.go | 111 ++++++++++++ 7 files changed, 906 insertions(+) create mode 100644 internal/eventlog/cursor/cursor.go create mode 100644 internal/eventlog/cursor/cursor_test.go create mode 100644 internal/eventlog/eventlog.go create mode 100644 internal/eventlog/eventlog_test.go create mode 100644 internal/eventlog/item.go create mode 100644 internal/eventlog/metrics.go create mode 100644 internal/eventlog/prune.go diff --git a/internal/eventlog/cursor/cursor.go b/internal/eventlog/cursor/cursor.go new file mode 100644 index 000000000..9e17ceb45 --- /dev/null +++ b/internal/eventlog/cursor/cursor.go @@ -0,0 +1,100 @@ +// Package cursor implements time-ordered item cursors for an event log. +package cursor + +import ( + "errors" + "fmt" + "strconv" + "strings" + "time" +) + +// A Source produces cursors based on a time index generator and a sequence +// counter. A zero-valued Source is ready for use with defaults as described. +type Source struct { + // This function is called to produce the current time index. + // If nil, it defaults to time.Now().UnixNano(). + TimeIndex func() int64 + + // The current counter value used for sequence number generation. It is + // incremented in-place each time a cursor is generated. + Counter int64 +} + +func (s *Source) timeIndex() int64 { + if s.TimeIndex == nil { + return time.Now().UnixNano() + } + return s.TimeIndex() +} + +func (s *Source) nextCounter() int64 { + s.Counter++ + return s.Counter +} + +// Cursor produces a fresh cursor from s at the current time index and counter. +func (s *Source) Cursor() Cursor { + return Cursor{ + timestamp: uint64(s.timeIndex()), + sequence: uint16(s.nextCounter() & 65535), + } +} + +// A Cursor is a unique identifier for an item in a time-ordered event log. +// It is safe to copy and compare cursors by value. +type Cursor struct { + timestamp uint64 // ns since Unix epoch + sequence uint16 // sequence number +} + +// Before reports whether c is prior to o in time ordering. This comparison +// ignores sequence numbers. +func (c Cursor) Before(o Cursor) bool { return c.timestamp < o.timestamp } + +// Diff returns the time duration between c and o. The duration is negative if +// c is before o in time order. +func (c Cursor) Diff(o Cursor) time.Duration { + return time.Duration(c.timestamp) - time.Duration(o.timestamp) +} + +// IsZero reports whether c is the zero cursor. +func (c Cursor) IsZero() bool { return c == Cursor{} } + +// MarshalText implements the encoding.TextMarshaler interface. +// A zero cursor marshals as "", otherwise the format used by the String method. +func (c Cursor) MarshalText() ([]byte, error) { + if c.IsZero() { + return nil, nil + } + return []byte(c.String()), nil +} + +// UnmarshalText implements the encoding.TextUnmarshaler interface. +// An empty text unmarshals without error to a zero cursor. +func (c *Cursor) UnmarshalText(data []byte) error { + if len(data) == 0 { + *c = Cursor{} // set zero + return nil + } + ps := strings.SplitN(string(data), "-", 2) + if len(ps) != 2 { + return errors.New("invalid cursor format") + } + ts, err := strconv.ParseUint(ps[0], 16, 64) + if err != nil { + return fmt.Errorf("invalid timestamp: %w", err) + } + sn, err := strconv.ParseUint(ps[1], 16, 16) + if err != nil { + return fmt.Errorf("invalid sequence: %w", err) + } + c.timestamp = ts + c.sequence = uint16(sn) + return nil +} + +// String returns a printable text representation of a cursor. +func (c Cursor) String() string { + return fmt.Sprintf("%016x-%04x", c.timestamp, c.sequence) +} diff --git a/internal/eventlog/cursor/cursor_test.go b/internal/eventlog/cursor/cursor_test.go new file mode 100644 index 000000000..31701ddf7 --- /dev/null +++ b/internal/eventlog/cursor/cursor_test.go @@ -0,0 +1,141 @@ +package cursor_test + +import ( + "fmt" + "testing" + "time" + + "github.com/tendermint/tendermint/internal/eventlog/cursor" +) + +func mustParse(t *testing.T, s string) cursor.Cursor { + t.Helper() + var c cursor.Cursor + if err := c.UnmarshalText([]byte(s)); err != nil { + t.Fatalf("Unmarshal %q: unexpected error: %v", s, err) + } + return c +} + +func TestSource_counter(t *testing.T) { + src := &cursor.Source{ + TimeIndex: func() int64 { return 255 }, + } + for i := 1; i <= 5; i++ { + want := fmt.Sprintf("00000000000000ff-%04x", i) + got := src.Cursor().String() + if got != want { + t.Errorf("Cursor %d: got %q, want %q", i, got, want) + } + } +} + +func TestSource_timeIndex(t *testing.T) { + times := []int64{0, 1, 100, 65535, 0x76543210fecdba98} + src := &cursor.Source{ + TimeIndex: func() int64 { + out := times[0] + times = append(times[1:], out) + return out + }, + Counter: 160, + } + results := []string{ + "0000000000000000-00a1", + "0000000000000001-00a2", + "0000000000000064-00a3", + "000000000000ffff-00a4", + "76543210fecdba98-00a5", + } + for i, want := range results { + if got := src.Cursor().String(); got != want { + t.Errorf("Cursor %d: got %q, want %q", i+1, got, want) + } + } +} + +func TestCursor_roundTrip(t *testing.T) { + const text = `0123456789abcdef-fce9` + + c := mustParse(t, text) + if got := c.String(); got != text { + t.Errorf("Wrong string format: got %q, want %q", got, text) + } + cmp, err := c.MarshalText() + if err != nil { + t.Fatalf("Marshal %+v failed: %v", c, err) + } + if got := string(cmp); got != text { + t.Errorf("Wrong text format: got %q, want %q", got, text) + } +} + +func TestCursor_ordering(t *testing.T) { + // Condition: text1 precedes text2 in time order. + // Condition: text2 has an earlier sequence than text1. + const zero = "" + const text1 = "0000000012345678-0005" + const text2 = "00000000fecdeba9-0002" + + zc := mustParse(t, zero) + c1 := mustParse(t, text1) + c2 := mustParse(t, text2) + + // Confirm for all pairs that string order respects time order. + pairs := []struct { + t1, t2 string + c1, c2 cursor.Cursor + }{ + {zero, zero, zc, zc}, + {zero, text1, zc, c1}, + {zero, text2, zc, c2}, + {text1, zero, c1, zc}, + {text1, text1, c1, c1}, + {text1, text2, c1, c2}, + {text2, zero, c2, zc}, + {text2, text1, c2, c1}, + {text2, text2, c2, c2}, + } + for _, pair := range pairs { + want := pair.t1 < pair.t2 + if got := pair.c1.Before(pair.c2); got != want { + t.Errorf("(%s).Before(%s): got %v, want %v", pair.t1, pair.t2, got, want) + } + } +} + +func TestCursor_IsZero(t *testing.T) { + tests := []struct { + text string + want bool + }{ + {"", true}, + {"0000000000000000-0000", true}, + {"0000000000000001-0000", false}, + {"0000000000000000-0001", false}, + {"0000000000000001-0001", false}, + } + for _, test := range tests { + c := mustParse(t, test.text) + if got := c.IsZero(); got != test.want { + t.Errorf("IsZero(%q): got %v, want %v", test.text, got, test.want) + } + } +} + +func TestCursor_Diff(t *testing.T) { + const time1 = 0x1ac0193001 + const time2 = 0x0ac0193001 + + text1 := fmt.Sprintf("%016x-0001", time1) + text2 := fmt.Sprintf("%016x-0005", time2) + want := time.Duration(time1 - time2) + + c1 := mustParse(t, text1) + c2 := mustParse(t, text2) + + got := c1.Diff(c2) + if got != want { + t.Fatalf("Diff %q - %q: got %v, want %v", text1, text2, got, want) + } +} diff --git a/internal/eventlog/eventlog.go b/internal/eventlog/eventlog.go new file mode 100644 index 000000000..a33464f5d --- /dev/null +++ b/internal/eventlog/eventlog.go @@ -0,0 +1,217 @@ +// Package eventlog defines a reverse time-ordered log of events over a sliding +// window of time before the most recent item in the log. +// +// New items are added to the head of the log (the newest end), and items that +// fall outside the designated window are pruned from its tail (the oldest). +// Items within the log are indexed by lexicographically-ordered cursors. +package eventlog + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/tendermint/tendermint/internal/eventlog/cursor" + "github.com/tendermint/tendermint/types" +) + +// A Log is a reverse time-ordered log of events in a sliding window of time +// before the newest item. Use Add to add new items to the front (head) of the +// log, and Scan or WaitScan to traverse the current contents of the log. +// +// After construction, a *Log is safe for concurrent access by one writer and +// any number of readers. +type Log struct { + // These values do not change after construction. + windowSize time.Duration + maxItems int + numItemsGauge gauge + + // Protects access to the fields below. Lock to modify the values of these + // fields, or to read or snapshot the values. + mu sync.Mutex + + numItems int // total number of items in the log + oldestCursor cursor.Cursor // cursor of the oldest item + head *logEntry // pointer to the newest item + ready chan struct{} // closed when head changes + source cursor.Source // generator of cursors +} + +// New constructs a new empty log with the given settings. +func New(opts LogSettings) (*Log, error) { + if opts.WindowSize <= 0 { + return nil, errors.New("window size must be positive") + } + lg := &Log{ + windowSize: opts.WindowSize, + maxItems: opts.MaxItems, + numItemsGauge: discard{}, + ready: make(chan struct{}), + source: opts.Source, + } + if opts.Metrics != nil { + lg.numItemsGauge = opts.Metrics.numItemsGauge + } + return lg, nil +} + +// Add adds a new item to the front of the log. If necessary, the log is pruned +// to fit its constraints on size and age. Add blocks until both steps are done. +// +// Any error reported by Add arises from pruning; the new item was added to the +// log regardless whether an error occurs. +func (lg *Log) Add(etype string, data types.EventData) error { + lg.mu.Lock() + head := &logEntry{ + item: newItem(lg.source.Cursor(), etype, data), + next: lg.head, + } + lg.numItems++ + lg.updateHead(head) + size := lg.numItems + age := head.item.Cursor.Diff(lg.oldestCursor) + lg.mu.Unlock() + + // If the log requires pruning, do the pruning step outside the lock. This + // permits readers to continue to make progress while we're working. + return lg.checkPrune(head, size, age) +} + +// Scan scans the current contents of the log, calling f with each item until +// all items are visited or f reports an error. If f returns ErrStopScan, Scan +// returns nil, otherwise it returns the error reported by f. +// +// The Info value returned is valid even if Scan reports an error. +func (lg *Log) Scan(f func(*Item) error) (Info, error) { + return lg.scanState(lg.state(), f) +} + +// WaitScan blocks until the cursor of the frontmost log item is different from +// c, then executes a Scan on the contents of the log. If ctx ends before the +// head is updated, WaitScan returns an error without calling f. +// +// The Info value returned is valid even if WaitScan reports an error. +func (lg *Log) WaitScan(ctx context.Context, c cursor.Cursor, f func(*Item) error) (Info, error) { + st := lg.state() + for st.head == nil || st.head.item.Cursor == c { + var err error + st, err = lg.waitStateChange(ctx) + if err != nil { + return st.info(), err + } + } + return lg.scanState(st, f) +} + +// Info returns the current state of the log. +func (lg *Log) Info() Info { return lg.state().info() } + +// ErrStopScan is returned by a Scan callback to signal that scanning should be +// terminated without error. +var ErrStopScan = errors.New("stop scanning") + +// ErrLogPruned is returned by Add to signal that at least some events within +// the time window were discarded by pruning in excess of the size limit. +// This error may be wrapped, use errors.Is to test for it. +var ErrLogPruned = errors.New("log pruned") + +// LogSettings configure the construction of an event log. +type LogSettings struct { + // The size of the time window measured in time before the newest item. + // This value must be positive. + WindowSize time.Duration + + // The maximum number of items that will be retained in memory within the + // designated time window. A value ≤ 0 imposes no limit, otherwise items in + // excess of this number will be dropped from the log. + MaxItems int + + // The cursor source to use for log entries. If not set, use wallclock time. + Source cursor.Source + + // If non-nil, exported metrics to update. If nil, metrics are discarded. + Metrics *Metrics +} + +// Info records the current state of the log at the time of a scan operation. +type Info struct { + Oldest cursor.Cursor // the cursor of the oldest item in the log + Newest cursor.Cursor // the cursor of the newest item in the log + Size int // the number of items in the log +} + +// logState is a snapshot of the state of the log. +type logState struct { + oldest cursor.Cursor + newest cursor.Cursor + size int + head *logEntry +} + +func (st logState) info() Info { + return Info{Oldest: st.oldest, Newest: st.newest, Size: st.size} +} + +// state returns a snapshot of the current log contents. The caller may freely +// traverse the internal structure of the list without locking, provided it +// does not modify either the entries or their items. +func (lg *Log) state() logState { + lg.mu.Lock() + defer lg.mu.Unlock() + if lg.head == nil { + return logState{} // empty + } + return logState{ + oldest: lg.oldestCursor, + newest: lg.head.item.Cursor, + size: lg.numItems, + head: lg.head, + } +} + +// waitStateChange blocks until either ctx ends or the head of the log is +// modified, then returns the statee of the log. An error is reported only if +// ctx terminates before head changes. +func (lg *Log) waitStateChange(ctx context.Context) (logState, error) { + lg.mu.Lock() + ch := lg.ready // capture + lg.mu.Unlock() + select { + case <-ctx.Done(): + return lg.state(), ctx.Err() + case <-ch: + return lg.state(), nil + } +} + +// scanState scans the contents of the log at st. See the Scan method for a +// description of the callback semantics. +func (lg *Log) scanState(st logState, f func(*Item) error) (Info, error) { + info := Info{Oldest: st.oldest, Newest: st.newest, Size: st.size} + for cur := st.head; cur != nil; cur = cur.next { + if err := f(cur.item); err != nil { + if errors.Is(err, ErrStopScan) { + return info, nil + } + return info, err + } + } + return info, nil +} + +// updateHead replaces the current head with newHead, signals any waiters, and +// resets the wait signal. The caller must hold log.mu exclusively. +func (lg *Log) updateHead(newHead *logEntry) { + lg.head = newHead + close(lg.ready) // signal + lg.ready = make(chan struct{}) +} + +// A logEntry is the backbone of the event log queue. Entries are not mutated +// after construction, so it is safe to read item and next without locking. +type logEntry struct { + item *Item + next *logEntry +} diff --git a/internal/eventlog/eventlog_test.go b/internal/eventlog/eventlog_test.go new file mode 100644 index 000000000..ee9b727ec --- /dev/null +++ b/internal/eventlog/eventlog_test.go @@ -0,0 +1,220 @@ +package eventlog_test + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strconv" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/tendermint/tendermint/internal/eventlog" + "github.com/tendermint/tendermint/internal/eventlog/cursor" + "github.com/tendermint/tendermint/types" +) + +// fakeTime is a fake clock to use to control cursor assignment. +// The timeIndex method reports the current "time" and advance manually updates +// the apparent time. +type fakeTime struct{ now int64 } + +func newFakeTime(init int64) *fakeTime { return &fakeTime{now: init} } + +func (f *fakeTime) timeIndex() int64 { return f.now } + +func (f *fakeTime) advance(d time.Duration) { f.now += int64(d) } + +// eventData is a placeholder event data implementation for testing. +type eventData string + +func (eventData) TypeTag() string { return "eventData" } + +func TestNewError(t *testing.T) { + lg, err := eventlog.New(eventlog.LogSettings{}) + if err == nil { + t.Fatalf("New: got %+v, wanted error", lg) + } else { + t.Logf("New: got expected error: %v", err) + } +} + +func TestPruneTime(t *testing.T) { + clk := newFakeTime(0) + + // Construct a log with a 60-second time window. + lg, err := eventlog.New(eventlog.LogSettings{ + WindowSize: 60 * time.Second, + Source: cursor.Source{ + TimeIndex: clk.timeIndex, + }, + }) + if err != nil { + t.Fatalf("New unexpectedly failed: %v", err) + } + + // Add events up to the time window, at seconds 0, 15, 30, 45, 60. + // None of these should be pruned (yet). + var want []string // cursor strings + for i := 1; i <= 5; i++ { + want = append(want, fmt.Sprintf("%016x-%04x", clk.timeIndex(), i)) + mustAdd(t, lg, "test-event", eventData("whatever")) + clk.advance(15 * time.Second) + } + // time now: 75 sec. + + // Verify that all the events we added are present. + got := cursors(t, lg) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Cursors before pruning: (-want, +got)\n%s", diff) + } + + // Add an event past the end of the window at second 90, and verify that + // this triggered an age-based prune of the oldest events (0, 15) that are + // outside the 60-second window. + + clk.advance(15 * time.Second) // time now: 90 sec. + want = append(want[2:], fmt.Sprintf("%016x-%04x", clk.timeIndex(), 6)) + + mustAdd(t, lg, "test-event", eventData("extra")) + got = cursors(t, lg) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Cursors after pruning: (-want, +got)\n%s", diff) + } +} + +// Run a publisher and concurrent subscribers to tickle the race detector with +// concurrent add and scan operations. +func TestConcurrent(t *testing.T) { + if testing.Short() { + t.Skip("Skipping concurrency exercise because -short is set") + } + + lg, err := eventlog.New(eventlog.LogSettings{ + WindowSize: 30 * time.Second, + }) + if err != nil { + t.Fatalf("New unexpectedly failed: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var wg sync.WaitGroup + + // Publisher: Add events and handle expirations. + wg.Add(1) + go func() { + defer wg.Done() + + tick := time.NewTimer(0) + defer tick.Stop() + for { + select { + case <-ctx.Done(): + return + case t := <-tick.C: + _ = lg.Add("test-event", eventData(t.Format(time.RFC3339Nano))) + tick.Reset(time.Duration(rand.Intn(50)) * time.Millisecond) + } + } + }() + + // Subscribers: Wait for new events at the head of the queue. This + // simulates the typical operation of a subscriber by waiting for the head + // cursor to change and then scanning down toward the unconsumed item. + const numSubs = 16 + for i := 0; i < numSubs; i++ { + task := i + wg.Add(1) + go func() { + defer wg.Done() + + tick := time.NewTimer(0) + var cur cursor.Cursor + for { + // Simulate the subscriber being busy with other things. + select { + case <-ctx.Done(): + return + case <-tick.C: + tick.Reset(time.Duration(rand.Intn(150)) * time.Millisecond) + } + + // Wait for new data to arrive. + info, err := lg.WaitScan(ctx, cur, func(itm *eventlog.Item) error { + if itm.Cursor == cur { + return eventlog.ErrStopScan + } + return nil + }) + if err != nil { + if !errors.Is(err, context.Canceled) { + t.Errorf("Wait scan for task %d failed: %v", task, err) + } + return + } + cur = info.Newest + } + }() + } + + time.AfterFunc(2*time.Second, cancel) + wg.Wait() +} + +func TestPruneSize(t *testing.T) { + const maxItems = 25 + lg, err := eventlog.New(eventlog.LogSettings{ + WindowSize: 60 * time.Second, + MaxItems: maxItems, + }) + if err != nil { + t.Fatalf("New unexpectedly failed: %v", err) + } + + // Add a lot of items to the log and verify that we never exceed the + // specified cap. + for i := 0; i < 60; i++ { + mustAdd(t, lg, "test-event", eventData(strconv.Itoa(i+1))) + + if got := lg.Info().Size; got > maxItems { + t.Errorf("After add %d: log size is %d, want ≤ %d", i+1, got, maxItems) + } + } +} + +// mustAdd adds a single event to lg. If Add reports an error other than for +// pruning, the test fails; otherwise the error is returned. +func mustAdd(t *testing.T, lg *eventlog.Log, etype string, data types.EventData) { + t.Helper() + err := lg.Add(etype, data) + if err != nil && !errors.Is(err, eventlog.ErrLogPruned) { + t.Fatalf("Add %q failed: %v", etype, err) + } +} + +// cursors extracts the cursors from lg in ascending order of time. +func cursors(t *testing.T, lg *eventlog.Log) []string { + t.Helper() + + var cursors []string + if _, err := lg.Scan(func(itm *eventlog.Item) error { + cursors = append(cursors, itm.Cursor.String()) + return nil + }); err != nil { + t.Fatalf("Scan failed: %v", err) + } + reverse(cursors) // put in forward-time order for comparison + return cursors +} + +func reverse(ss []string) { + for i, j := 0, len(ss)-1; i < j; { + ss[i], ss[j] = ss[j], ss[i] + i++ + j-- + } +} diff --git a/internal/eventlog/item.go b/internal/eventlog/item.go new file mode 100644 index 000000000..f1f43b46d --- /dev/null +++ b/internal/eventlog/item.go @@ -0,0 +1,78 @@ +package eventlog + +import ( + "strings" + + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/internal/eventlog/cursor" + "github.com/tendermint/tendermint/types" +) + +// Cached constants for the pieces of reserved event names. +var ( + tmTypeTag string + tmTypeKey string +) + +func init() { + parts := strings.SplitN(types.EventTypeKey, ".", 2) + if len(parts) != 2 { + panic("invalid event type key: " + types.EventTypeKey) + } + tmTypeTag = parts[0] + tmTypeKey = parts[1] +} + +// ABCIEventer is an optional extension interface that may be implemented by +// event data types, to expose ABCI metadata to the event log. If an event item +// does not implement this interface, it is presumed to have no ABCI metadata. +type ABCIEventer interface { + // Return any ABCI events metadata the receiver contains. + // The reported slice must not contain a type (tm.event) record, since some + // events share the same structure among different event types. + ABCIEvents() []abci.Event +} + +// An Item is a single event item. +type Item struct { + Cursor cursor.Cursor + Type string + Data types.EventData + Events []abci.Event +} + +// newItem constructs a new item with the specified cursor, type, and data. +func newItem(cursor cursor.Cursor, etype string, data types.EventData) *Item { + return &Item{Cursor: cursor, Type: etype, Data: data, Events: makeEvents(etype, data)} +} + +// makeEvents returns a slice of ABCI events comprising the type tag along with +// any internal events exported by the data value. +func makeEvents(etype string, data types.EventData) []abci.Event { + base := []abci.Event{{ + Type: tmTypeTag, + Attributes: []abci.EventAttribute{{ + Key: tmTypeKey, Value: etype, + }}, + }} + if evt, ok := data.(ABCIEventer); ok { + return append(base, evt.ABCIEvents()...) + } + return base +} + +// FindType reports whether events contains a tm.event event, and if so returns +// its value, which is the type of the underlying event item. +func FindType(events []abci.Event) (string, bool) { + for _, evt := range events { + if evt.Type != tmTypeTag { + continue + } + for _, attr := range evt.Attributes { + if attr.Key == tmTypeKey { + return attr.Value, true + } + } + } + return "", false +} diff --git a/internal/eventlog/metrics.go b/internal/eventlog/metrics.go new file mode 100644 index 000000000..cc319032e --- /dev/null +++ b/internal/eventlog/metrics.go @@ -0,0 +1,39 @@ +package eventlog + +import ( + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +// gauge is the subset of the Prometheus gauge interface used here. +type gauge interface { + Set(float64) +} + +// Metrics define the metrics exported by the eventlog package. +type Metrics struct { + numItemsGauge gauge +} + +// discard is a no-op implementation of the gauge interface. +type discard struct{} + +func (discard) Set(float64) {} + +const eventlogSubsystem = "eventlog" + +// PrometheusMetrics returns a collection of eventlog metrics for Prometheus. +func PrometheusMetrics(ns string, fields ...string) *Metrics { + var labels []string + for i := 0; i < len(fields); i += 2 { + labels = append(labels, fields[i]) + } + return &Metrics{ + numItemsGauge: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: ns, + Subsystem: eventlogSubsystem, + Name: "num_items", + Help: "Number of items currently resident in the event log.", + }, labels).With(fields...), + } +} diff --git a/internal/eventlog/prune.go b/internal/eventlog/prune.go new file mode 100644 index 000000000..8d806ca98 --- /dev/null +++ b/internal/eventlog/prune.go @@ -0,0 +1,111 @@ +package eventlog + +import ( + "time" +) + +// checkPrune checks whether the log has exceeded its boundaries of size or +// age, and if so prunes the log and updates the head. +func (lg *Log) checkPrune(head *logEntry, size int, age time.Duration) error { + // To avoid potentially re-pruning for every event, don't trigger an age + // prune until we're at least this far beyond the designated size. + const windowSlop = 30 * time.Second + + if age < (lg.windowSize+windowSlop) && (lg.maxItems <= 0 || size <= lg.maxItems) { + lg.numItemsGauge.Set(float64(lg.numItems)) + return nil // no pruning is needed + } + + var newState logState + var err error + + switch { + case lg.maxItems > 0 && size > lg.maxItems: + // We exceeded the size cap. In this case, age does not matter: count off + // the newest items and drop the unconsumed tail. Note that we prune by a + // fraction rather than an absolute amount so that we only have to prune + // for size occasionally. + + // TODO(creachadair): We may want to spill dropped events to secondary + // storage rather than dropping them. The size cap is meant as a safety + // valve against unexpected extremes, but if a network has "expected" + // spikes that nevertheless exceed any safe buffer size (e.g., Osmosis + // epochs), we may want to have a fallback so that we don't lose events + // that would otherwise fall within the window. + newSize := 3 * size / 4 + newState, err = lg.pruneSize(head, newSize) + + default: + // We did not exceed the size cap, but some items are too old. + newState = lg.pruneAge(head) + } + + // Note that when we update the head after pruning, we do not need to signal + // any waiters; pruning never adds new material to the log so anyone waiting + // should continue doing so until a subsequent Add occurs. + lg.mu.Lock() + defer lg.mu.Unlock() + lg.numItems = newState.size + lg.numItemsGauge.Set(float64(newState.size)) + lg.oldestCursor = newState.oldest + lg.head = newState.head + return err +} + +// pruneSize returns a new log state by pruning head to newSize. +// Precondition: newSize ≤ len(head). +func (lg *Log) pruneSize(head *logEntry, newSize int) (logState, error) { + // Special case for size 0 to simplify the logic below. + if newSize == 0 { + return logState{}, ErrLogPruned // drop everything + } + + // Initialize: New head has the same item as the old head. + pruned := &logEntry{item: head.item} // new head + last := pruned // new tail (last copied cons) + + cur := head.next + for i := 1; i < newSize; i++ { + cp := &logEntry{item: cur.item} + last.next = cp + last = cp + + cur = cur.next + } + var err error + if head.item.Cursor.Diff(last.item.Cursor) <= lg.windowSize { + err = ErrLogPruned + } + + return logState{ + oldest: last.item.Cursor, + newest: pruned.item.Cursor, + size: newSize, + head: pruned, + }, err +} + +// pruneAge returns a new log state by pruning items older than the window +// prior to the head element. +func (lg *Log) pruneAge(head *logEntry) logState { + pruned := &logEntry{item: head.item} + last := pruned + + size := 1 + for cur := head.next; cur != nil; cur = cur.next { + diff := head.item.Cursor.Diff(cur.item.Cursor) + if diff > lg.windowSize { + break // all remaining items are older than the window + } + cp := &logEntry{item: cur.item} + last.next = cp + last = cp + size++ + } + return logState{ + oldest: last.item.Cursor, + newest: pruned.item.Cursor, + size: size, + head: pruned, + } +}