rpc: implement the eventlog defined by ADR 075

Implement the basic cursor and eventlog types described in ADR 075.  Handle
encoding and decoding as strings for compatibility with JSON.

- Add unit tests for the required order and synchronization properties.
- Add hooks for metrics, with one value to be expanded later.
This commit is contained in:
M. J. Fromberger
2022-02-06 11:11:43 -08:00
parent cc18f87000
commit 2c0e6cde6c
7 changed files with 906 additions and 0 deletions

View File

@@ -0,0 +1,100 @@
// Package cursor implements time-ordered item cursors for an event log.
package cursor
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
)
// A Source produces cursors based on a time index generator and a sequence
// counter. A zero-valued Source is ready for use with defaults as described.
type Source struct {
// This function is called to produce the current time index.
// If nil, it defaults to time.Now().UnixNano().
TimeIndex func() int64
// The current counter value used for sequence number generation. It is
// incremented in-place each time a cursor is generated.
Counter int64
}
func (s *Source) timeIndex() int64 {
if s.TimeIndex == nil {
return time.Now().UnixNano()
}
return s.TimeIndex()
}
func (s *Source) nextCounter() int64 {
s.Counter++
return s.Counter
}
// Cursor produces a fresh cursor from s at the current time index and counter.
func (s *Source) Cursor() Cursor {
return Cursor{
timestamp: uint64(s.timeIndex()),
sequence: uint16(s.nextCounter() & 65535),
}
}
// A Cursor is a unique identifier for an item in a time-ordered event log.
// It is safe to copy and compare cursors by value.
type Cursor struct {
timestamp uint64 // ns since Unix epoch
sequence uint16 // sequence number
}
// Before reports whether c is prior to o in time ordering. This comparison
// ignores sequence numbers.
func (c Cursor) Before(o Cursor) bool { return c.timestamp < o.timestamp }
// Diff returns the time duration between c and o. The duration is negative if
// c is before o in time order.
func (c Cursor) Diff(o Cursor) time.Duration {
return time.Duration(c.timestamp) - time.Duration(o.timestamp)
}
// IsZero reports whether c is the zero cursor.
func (c Cursor) IsZero() bool { return c == Cursor{} }
// MarshalText implements the encoding.TextMarshaler interface.
// A zero cursor marshals as "", otherwise the format used by the String method.
func (c Cursor) MarshalText() ([]byte, error) {
if c.IsZero() {
return nil, nil
}
return []byte(c.String()), nil
}
// UnmarshalText implements the encoding.TextUnmarshaler interface.
// An empty text unmarshals without error to a zero cursor.
func (c *Cursor) UnmarshalText(data []byte) error {
if len(data) == 0 {
*c = Cursor{} // set zero
return nil
}
ps := strings.SplitN(string(data), "-", 2)
if len(ps) != 2 {
return errors.New("invalid cursor format")
}
ts, err := strconv.ParseUint(ps[0], 16, 64)
if err != nil {
return fmt.Errorf("invalid timestamp: %w", err)
}
sn, err := strconv.ParseUint(ps[1], 16, 16)
if err != nil {
return fmt.Errorf("invalid sequence: %w", err)
}
c.timestamp = ts
c.sequence = uint16(sn)
return nil
}
// String returns a printable text representation of a cursor.
func (c Cursor) String() string {
return fmt.Sprintf("%016x-%04x", c.timestamp, c.sequence)
}

View File

@@ -0,0 +1,141 @@
package cursor_test
import (
"fmt"
"testing"
"time"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
)
func mustParse(t *testing.T, s string) cursor.Cursor {
t.Helper()
var c cursor.Cursor
if err := c.UnmarshalText([]byte(s)); err != nil {
t.Fatalf("Unmarshal %q: unexpected error: %v", s, err)
}
return c
}
func TestSource_counter(t *testing.T) {
src := &cursor.Source{
TimeIndex: func() int64 { return 255 },
}
for i := 1; i <= 5; i++ {
want := fmt.Sprintf("00000000000000ff-%04x", i)
got := src.Cursor().String()
if got != want {
t.Errorf("Cursor %d: got %q, want %q", i, got, want)
}
}
}
func TestSource_timeIndex(t *testing.T) {
times := []int64{0, 1, 100, 65535, 0x76543210fecdba98}
src := &cursor.Source{
TimeIndex: func() int64 {
out := times[0]
times = append(times[1:], out)
return out
},
Counter: 160,
}
results := []string{
"0000000000000000-00a1",
"0000000000000001-00a2",
"0000000000000064-00a3",
"000000000000ffff-00a4",
"76543210fecdba98-00a5",
}
for i, want := range results {
if got := src.Cursor().String(); got != want {
t.Errorf("Cursor %d: got %q, want %q", i+1, got, want)
}
}
}
func TestCursor_roundTrip(t *testing.T) {
const text = `0123456789abcdef-fce9`
c := mustParse(t, text)
if got := c.String(); got != text {
t.Errorf("Wrong string format: got %q, want %q", got, text)
}
cmp, err := c.MarshalText()
if err != nil {
t.Fatalf("Marshal %+v failed: %v", c, err)
}
if got := string(cmp); got != text {
t.Errorf("Wrong text format: got %q, want %q", got, text)
}
}
func TestCursor_ordering(t *testing.T) {
// Condition: text1 precedes text2 in time order.
// Condition: text2 has an earlier sequence than text1.
const zero = ""
const text1 = "0000000012345678-0005"
const text2 = "00000000fecdeba9-0002"
zc := mustParse(t, zero)
c1 := mustParse(t, text1)
c2 := mustParse(t, text2)
// Confirm for all pairs that string order respects time order.
pairs := []struct {
t1, t2 string
c1, c2 cursor.Cursor
}{
{zero, zero, zc, zc},
{zero, text1, zc, c1},
{zero, text2, zc, c2},
{text1, zero, c1, zc},
{text1, text1, c1, c1},
{text1, text2, c1, c2},
{text2, zero, c2, zc},
{text2, text1, c2, c1},
{text2, text2, c2, c2},
}
for _, pair := range pairs {
want := pair.t1 < pair.t2
if got := pair.c1.Before(pair.c2); got != want {
t.Errorf("(%s).Before(%s): got %v, want %v", pair.t1, pair.t2, got, want)
}
}
}
func TestCursor_IsZero(t *testing.T) {
tests := []struct {
text string
want bool
}{
{"", true},
{"0000000000000000-0000", true},
{"0000000000000001-0000", false},
{"0000000000000000-0001", false},
{"0000000000000001-0001", false},
}
for _, test := range tests {
c := mustParse(t, test.text)
if got := c.IsZero(); got != test.want {
t.Errorf("IsZero(%q): got %v, want %v", test.text, got, test.want)
}
}
}
func TestCursor_Diff(t *testing.T) {
const time1 = 0x1ac0193001
const time2 = 0x0ac0193001
text1 := fmt.Sprintf("%016x-0001", time1)
text2 := fmt.Sprintf("%016x-0005", time2)
want := time.Duration(time1 - time2)
c1 := mustParse(t, text1)
c2 := mustParse(t, text2)
got := c1.Diff(c2)
if got != want {
t.Fatalf("Diff %q - %q: got %v, want %v", text1, text2, got, want)
}
}

View File

@@ -0,0 +1,217 @@
// Package eventlog defines a reverse time-ordered log of events over a sliding
// window of time before the most recent item in the log.
//
// New items are added to the head of the log (the newest end), and items that
// fall outside the designated window are pruned from its tail (the oldest).
// Items within the log are indexed by lexicographically-ordered cursors.
package eventlog
import (
"context"
"errors"
"sync"
"time"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/types"
)
// A Log is a reverse time-ordered log of events in a sliding window of time
// before the newest item. Use Add to add new items to the front (head) of the
// log, and Scan or WaitScan to traverse the current contents of the log.
//
// After construction, a *Log is safe for concurrent access by one writer and
// any number of readers.
type Log struct {
// These values do not change after construction.
windowSize time.Duration
maxItems int
numItemsGauge gauge
// Protects access to the fields below. Lock to modify the values of these
// fields, or to read or snapshot the values.
mu sync.Mutex
numItems int // total number of items in the log
oldestCursor cursor.Cursor // cursor of the oldest item
head *logEntry // pointer to the newest item
ready chan struct{} // closed when head changes
source cursor.Source // generator of cursors
}
// New constructs a new empty log with the given settings.
func New(opts LogSettings) (*Log, error) {
if opts.WindowSize <= 0 {
return nil, errors.New("window size must be positive")
}
lg := &Log{
windowSize: opts.WindowSize,
maxItems: opts.MaxItems,
numItemsGauge: discard{},
ready: make(chan struct{}),
source: opts.Source,
}
if opts.Metrics != nil {
lg.numItemsGauge = opts.Metrics.numItemsGauge
}
return lg, nil
}
// Add adds a new item to the front of the log. If necessary, the log is pruned
// to fit its constraints on size and age. Add blocks until both steps are done.
//
// Any error reported by Add arises from pruning; the new item was added to the
// log regardless whether an error occurs.
func (lg *Log) Add(etype string, data types.EventData) error {
lg.mu.Lock()
head := &logEntry{
item: newItem(lg.source.Cursor(), etype, data),
next: lg.head,
}
lg.numItems++
lg.updateHead(head)
size := lg.numItems
age := head.item.Cursor.Diff(lg.oldestCursor)
lg.mu.Unlock()
// If the log requires pruning, do the pruning step outside the lock. This
// permits readers to continue to make progress while we're working.
return lg.checkPrune(head, size, age)
}
// Scan scans the current contents of the log, calling f with each item until
// all items are visited or f reports an error. If f returns ErrStopScan, Scan
// returns nil, otherwise it returns the error reported by f.
//
// The Info value returned is valid even if Scan reports an error.
func (lg *Log) Scan(f func(*Item) error) (Info, error) {
return lg.scanState(lg.state(), f)
}
// WaitScan blocks until the cursor of the frontmost log item is different from
// c, then executes a Scan on the contents of the log. If ctx ends before the
// head is updated, WaitScan returns an error without calling f.
//
// The Info value returned is valid even if WaitScan reports an error.
func (lg *Log) WaitScan(ctx context.Context, c cursor.Cursor, f func(*Item) error) (Info, error) {
st := lg.state()
for st.head == nil || st.head.item.Cursor == c {
var err error
st, err = lg.waitStateChange(ctx)
if err != nil {
return st.info(), err
}
}
return lg.scanState(st, f)
}
// Info returns the current state of the log.
func (lg *Log) Info() Info { return lg.state().info() }
// ErrStopScan is returned by a Scan callback to signal that scanning should be
// terminated without error.
var ErrStopScan = errors.New("stop scanning")
// ErrLogPruned is returned by Add to signal that at least some events within
// the time window were discarded by pruning in excess of the size limit.
// This error may be wrapped, use errors.Is to test for it.
var ErrLogPruned = errors.New("log pruned")
// LogSettings configure the construction of an event log.
type LogSettings struct {
// The size of the time window measured in time before the newest item.
// This value must be positive.
WindowSize time.Duration
// The maximum number of items that will be retained in memory within the
// designated time window. A value ≤ 0 imposes no limit, otherwise items in
// excess of this number will be dropped from the log.
MaxItems int
// The cursor source to use for log entries. If not set, use wallclock time.
Source cursor.Source
// If non-nil, exported metrics to update. If nil, metrics are discarded.
Metrics *Metrics
}
// Info records the current state of the log at the time of a scan operation.
type Info struct {
Oldest cursor.Cursor // the cursor of the oldest item in the log
Newest cursor.Cursor // the cursor of the newest item in the log
Size int // the number of items in the log
}
// logState is a snapshot of the state of the log.
type logState struct {
oldest cursor.Cursor
newest cursor.Cursor
size int
head *logEntry
}
func (st logState) info() Info {
return Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
}
// state returns a snapshot of the current log contents. The caller may freely
// traverse the internal structure of the list without locking, provided it
// does not modify either the entries or their items.
func (lg *Log) state() logState {
lg.mu.Lock()
defer lg.mu.Unlock()
if lg.head == nil {
return logState{} // empty
}
return logState{
oldest: lg.oldestCursor,
newest: lg.head.item.Cursor,
size: lg.numItems,
head: lg.head,
}
}
// waitStateChange blocks until either ctx ends or the head of the log is
// modified, then returns the statee of the log. An error is reported only if
// ctx terminates before head changes.
func (lg *Log) waitStateChange(ctx context.Context) (logState, error) {
lg.mu.Lock()
ch := lg.ready // capture
lg.mu.Unlock()
select {
case <-ctx.Done():
return lg.state(), ctx.Err()
case <-ch:
return lg.state(), nil
}
}
// scanState scans the contents of the log at st. See the Scan method for a
// description of the callback semantics.
func (lg *Log) scanState(st logState, f func(*Item) error) (Info, error) {
info := Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
for cur := st.head; cur != nil; cur = cur.next {
if err := f(cur.item); err != nil {
if errors.Is(err, ErrStopScan) {
return info, nil
}
return info, err
}
}
return info, nil
}
// updateHead replaces the current head with newHead, signals any waiters, and
// resets the wait signal. The caller must hold log.mu exclusively.
func (lg *Log) updateHead(newHead *logEntry) {
lg.head = newHead
close(lg.ready) // signal
lg.ready = make(chan struct{})
}
// A logEntry is the backbone of the event log queue. Entries are not mutated
// after construction, so it is safe to read item and next without locking.
type logEntry struct {
item *Item
next *logEntry
}

View File

@@ -0,0 +1,220 @@
package eventlog_test
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/types"
)
// fakeTime is a fake clock to use to control cursor assignment.
// The timeIndex method reports the current "time" and advance manually updates
// the apparent time.
type fakeTime struct{ now int64 }
func newFakeTime(init int64) *fakeTime { return &fakeTime{now: init} }
func (f *fakeTime) timeIndex() int64 { return f.now }
func (f *fakeTime) advance(d time.Duration) { f.now += int64(d) }
// eventData is a placeholder event data implementation for testing.
type eventData string
func (eventData) TypeTag() string { return "eventData" }
func TestNewError(t *testing.T) {
lg, err := eventlog.New(eventlog.LogSettings{})
if err == nil {
t.Fatalf("New: got %+v, wanted error", lg)
} else {
t.Logf("New: got expected error: %v", err)
}
}
func TestPruneTime(t *testing.T) {
clk := newFakeTime(0)
// Construct a log with a 60-second time window.
lg, err := eventlog.New(eventlog.LogSettings{
WindowSize: 60 * time.Second,
Source: cursor.Source{
TimeIndex: clk.timeIndex,
},
})
if err != nil {
t.Fatalf("New unexpectedly failed: %v", err)
}
// Add events up to the time window, at seconds 0, 15, 30, 45, 60.
// None of these should be pruned (yet).
var want []string // cursor strings
for i := 1; i <= 5; i++ {
want = append(want, fmt.Sprintf("%016x-%04x", clk.timeIndex(), i))
mustAdd(t, lg, "test-event", eventData("whatever"))
clk.advance(15 * time.Second)
}
// time now: 75 sec.
// Verify that all the events we added are present.
got := cursors(t, lg)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("Cursors before pruning: (-want, +got)\n%s", diff)
}
// Add an event past the end of the window at second 90, and verify that
// this triggered an age-based prune of the oldest events (0, 15) that are
// outside the 60-second window.
clk.advance(15 * time.Second) // time now: 90 sec.
want = append(want[2:], fmt.Sprintf("%016x-%04x", clk.timeIndex(), 6))
mustAdd(t, lg, "test-event", eventData("extra"))
got = cursors(t, lg)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("Cursors after pruning: (-want, +got)\n%s", diff)
}
}
// Run a publisher and concurrent subscribers to tickle the race detector with
// concurrent add and scan operations.
func TestConcurrent(t *testing.T) {
if testing.Short() {
t.Skip("Skipping concurrency exercise because -short is set")
}
lg, err := eventlog.New(eventlog.LogSettings{
WindowSize: 30 * time.Second,
})
if err != nil {
t.Fatalf("New unexpectedly failed: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
// Publisher: Add events and handle expirations.
wg.Add(1)
go func() {
defer wg.Done()
tick := time.NewTimer(0)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-tick.C:
_ = lg.Add("test-event", eventData(t.Format(time.RFC3339Nano)))
tick.Reset(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}
}()
// Subscribers: Wait for new events at the head of the queue. This
// simulates the typical operation of a subscriber by waiting for the head
// cursor to change and then scanning down toward the unconsumed item.
const numSubs = 16
for i := 0; i < numSubs; i++ {
task := i
wg.Add(1)
go func() {
defer wg.Done()
tick := time.NewTimer(0)
var cur cursor.Cursor
for {
// Simulate the subscriber being busy with other things.
select {
case <-ctx.Done():
return
case <-tick.C:
tick.Reset(time.Duration(rand.Intn(150)) * time.Millisecond)
}
// Wait for new data to arrive.
info, err := lg.WaitScan(ctx, cur, func(itm *eventlog.Item) error {
if itm.Cursor == cur {
return eventlog.ErrStopScan
}
return nil
})
if err != nil {
if !errors.Is(err, context.Canceled) {
t.Errorf("Wait scan for task %d failed: %v", task, err)
}
return
}
cur = info.Newest
}
}()
}
time.AfterFunc(2*time.Second, cancel)
wg.Wait()
}
func TestPruneSize(t *testing.T) {
const maxItems = 25
lg, err := eventlog.New(eventlog.LogSettings{
WindowSize: 60 * time.Second,
MaxItems: maxItems,
})
if err != nil {
t.Fatalf("New unexpectedly failed: %v", err)
}
// Add a lot of items to the log and verify that we never exceed the
// specified cap.
for i := 0; i < 60; i++ {
mustAdd(t, lg, "test-event", eventData(strconv.Itoa(i+1)))
if got := lg.Info().Size; got > maxItems {
t.Errorf("After add %d: log size is %d, want ≤ %d", i+1, got, maxItems)
}
}
}
// mustAdd adds a single event to lg. If Add reports an error other than for
// pruning, the test fails; otherwise the error is returned.
func mustAdd(t *testing.T, lg *eventlog.Log, etype string, data types.EventData) {
t.Helper()
err := lg.Add(etype, data)
if err != nil && !errors.Is(err, eventlog.ErrLogPruned) {
t.Fatalf("Add %q failed: %v", etype, err)
}
}
// cursors extracts the cursors from lg in ascending order of time.
func cursors(t *testing.T, lg *eventlog.Log) []string {
t.Helper()
var cursors []string
if _, err := lg.Scan(func(itm *eventlog.Item) error {
cursors = append(cursors, itm.Cursor.String())
return nil
}); err != nil {
t.Fatalf("Scan failed: %v", err)
}
reverse(cursors) // put in forward-time order for comparison
return cursors
}
func reverse(ss []string) {
for i, j := 0, len(ss)-1; i < j; {
ss[i], ss[j] = ss[j], ss[i]
i++
j--
}
}

78
internal/eventlog/item.go Normal file
View File

@@ -0,0 +1,78 @@
package eventlog
import (
"strings"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/types"
)
// Cached constants for the pieces of reserved event names.
var (
tmTypeTag string
tmTypeKey string
)
func init() {
parts := strings.SplitN(types.EventTypeKey, ".", 2)
if len(parts) != 2 {
panic("invalid event type key: " + types.EventTypeKey)
}
tmTypeTag = parts[0]
tmTypeKey = parts[1]
}
// ABCIEventer is an optional extension interface that may be implemented by
// event data types, to expose ABCI metadata to the event log. If an event item
// does not implement this interface, it is presumed to have no ABCI metadata.
type ABCIEventer interface {
// Return any ABCI events metadata the receiver contains.
// The reported slice must not contain a type (tm.event) record, since some
// events share the same structure among different event types.
ABCIEvents() []abci.Event
}
// An Item is a single event item.
type Item struct {
Cursor cursor.Cursor
Type string
Data types.EventData
Events []abci.Event
}
// newItem constructs a new item with the specified cursor, type, and data.
func newItem(cursor cursor.Cursor, etype string, data types.EventData) *Item {
return &Item{Cursor: cursor, Type: etype, Data: data, Events: makeEvents(etype, data)}
}
// makeEvents returns a slice of ABCI events comprising the type tag along with
// any internal events exported by the data value.
func makeEvents(etype string, data types.EventData) []abci.Event {
base := []abci.Event{{
Type: tmTypeTag,
Attributes: []abci.EventAttribute{{
Key: tmTypeKey, Value: etype,
}},
}}
if evt, ok := data.(ABCIEventer); ok {
return append(base, evt.ABCIEvents()...)
}
return base
}
// FindType reports whether events contains a tm.event event, and if so returns
// its value, which is the type of the underlying event item.
func FindType(events []abci.Event) (string, bool) {
for _, evt := range events {
if evt.Type != tmTypeTag {
continue
}
for _, attr := range evt.Attributes {
if attr.Key == tmTypeKey {
return attr.Value, true
}
}
}
return "", false
}

View File

@@ -0,0 +1,39 @@
package eventlog
import (
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
// gauge is the subset of the Prometheus gauge interface used here.
type gauge interface {
Set(float64)
}
// Metrics define the metrics exported by the eventlog package.
type Metrics struct {
numItemsGauge gauge
}
// discard is a no-op implementation of the gauge interface.
type discard struct{}
func (discard) Set(float64) {}
const eventlogSubsystem = "eventlog"
// PrometheusMetrics returns a collection of eventlog metrics for Prometheus.
func PrometheusMetrics(ns string, fields ...string) *Metrics {
var labels []string
for i := 0; i < len(fields); i += 2 {
labels = append(labels, fields[i])
}
return &Metrics{
numItemsGauge: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: ns,
Subsystem: eventlogSubsystem,
Name: "num_items",
Help: "Number of items currently resident in the event log.",
}, labels).With(fields...),
}
}

111
internal/eventlog/prune.go Normal file
View File

@@ -0,0 +1,111 @@
package eventlog
import (
"time"
)
// checkPrune checks whether the log has exceeded its boundaries of size or
// age, and if so prunes the log and updates the head.
func (lg *Log) checkPrune(head *logEntry, size int, age time.Duration) error {
// To avoid potentially re-pruning for every event, don't trigger an age
// prune until we're at least this far beyond the designated size.
const windowSlop = 30 * time.Second
if age < (lg.windowSize+windowSlop) && (lg.maxItems <= 0 || size <= lg.maxItems) {
lg.numItemsGauge.Set(float64(lg.numItems))
return nil // no pruning is needed
}
var newState logState
var err error
switch {
case lg.maxItems > 0 && size > lg.maxItems:
// We exceeded the size cap. In this case, age does not matter: count off
// the newest items and drop the unconsumed tail. Note that we prune by a
// fraction rather than an absolute amount so that we only have to prune
// for size occasionally.
// TODO(creachadair): We may want to spill dropped events to secondary
// storage rather than dropping them. The size cap is meant as a safety
// valve against unexpected extremes, but if a network has "expected"
// spikes that nevertheless exceed any safe buffer size (e.g., Osmosis
// epochs), we may want to have a fallback so that we don't lose events
// that would otherwise fall within the window.
newSize := 3 * size / 4
newState, err = lg.pruneSize(head, newSize)
default:
// We did not exceed the size cap, but some items are too old.
newState = lg.pruneAge(head)
}
// Note that when we update the head after pruning, we do not need to signal
// any waiters; pruning never adds new material to the log so anyone waiting
// should continue doing so until a subsequent Add occurs.
lg.mu.Lock()
defer lg.mu.Unlock()
lg.numItems = newState.size
lg.numItemsGauge.Set(float64(newState.size))
lg.oldestCursor = newState.oldest
lg.head = newState.head
return err
}
// pruneSize returns a new log state by pruning head to newSize.
// Precondition: newSize ≤ len(head).
func (lg *Log) pruneSize(head *logEntry, newSize int) (logState, error) {
// Special case for size 0 to simplify the logic below.
if newSize == 0 {
return logState{}, ErrLogPruned // drop everything
}
// Initialize: New head has the same item as the old head.
pruned := &logEntry{item: head.item} // new head
last := pruned // new tail (last copied cons)
cur := head.next
for i := 1; i < newSize; i++ {
cp := &logEntry{item: cur.item}
last.next = cp
last = cp
cur = cur.next
}
var err error
if head.item.Cursor.Diff(last.item.Cursor) <= lg.windowSize {
err = ErrLogPruned
}
return logState{
oldest: last.item.Cursor,
newest: pruned.item.Cursor,
size: newSize,
head: pruned,
}, err
}
// pruneAge returns a new log state by pruning items older than the window
// prior to the head element.
func (lg *Log) pruneAge(head *logEntry) logState {
pruned := &logEntry{item: head.item}
last := pruned
size := 1
for cur := head.next; cur != nil; cur = cur.next {
diff := head.item.Cursor.Diff(cur.item.Cursor)
if diff > lg.windowSize {
break // all remaining items are older than the window
}
cp := &logEntry{item: cur.item}
last.next = cp
last = cp
size++
}
return logState{
oldest: last.item.Cursor,
newest: pruned.item.Cursor,
size: size,
head: pruned,
}
}