Compare commits

...

10 Commits

Author SHA1 Message Date
mmsqe
5eac8d7618 rpc: enable the ADR 075 event log by default in new configs (#8572) (#9856) 2022-12-07 22:30:45 -05:00
mmsqe
1b1ba41720 backport: set a minimum long-polling interval for Events in rpc (#8050) (#9768)
* rpc: set a minimum long-polling interval for Events (#8050)

Since the goal of reading events at the head of the event log is to satisfy a
subscription style interface, there is no point in allowing head polling with
no wait interval. The pagination case already bypasses long polling, so the
extra option is unneessary.

Set a minimum default long-polling interval for the head case.
Add a test for minimum delay.

* fix doc

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-12-07 18:53:39 -05:00
mmsqe
544a38da7b Add command-line tool to manually subscribe to an event stream. (#8015) (#9718)
This tool is an aid to debugging, and demonstrates the API of the eventstream
helper package. It subscribes to the event stream of a running node with the
ADR 075 event log enabled, and writes matching events to stdout as JSON.

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-11-25 23:17:56 +01:00
mmsqe
892c765a5c rpc/client: add eventstream helper (#7987) (#9659)
This allows the caller to stream events. It handles the bookkeeping for cursors
and pagination, and delivers items to a callback.

Handle missed items by reporting a structured error. The caller can use the
Reset method to "catch up" to head after this happens.

Add a manual test CLI to probe a running node. Requires the node to be
configured with the event log settings.

Add a unit test that scripts input to the stream to exercise it.

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-11-17 18:25:05 +01:00
mmsqe
d6d3c172da rpc/client: rewrite the WaitForOneEvent helper (#7986) (#9550)
Update usage in tests.

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-11-03 10:19:41 +01:00
mmsqe
96dd4d08c3 backport: add Events method to the client interface (#7982) (#9519)
* rpc/client: add Events method to the client interface (#7982)

- Update documentation to deprecate the old methods.
- Add Events methods to HTTP, WS, and Local clients.
- Add Events method to the light client wrapper.
- Rename legacy events client to SubscriptionClient.

* Apply suggestions from code review

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-10-13 18:34:10 +02:00
mmsqe
c3cc94a0e0 backport: hook up eventlog and eventlog metrics (#7981) (#9510)
* node: hook up eventlog and eventlog metrics (#7981)

* align no metrics for event log

* make use of metricsgen

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-10-06 19:38:36 +02:00
mmsqe
d47b675cda backport: implement the ADR 075 /events method (#7965) (#9497)
* rpc: implement the ADR 075 /events method (#7965)

This method implements the eventlog extension interface to expose ABCI metadata
to the log for query processing. Only the types that have ABCI events need to
implement this.

- Add an event log to the environment
- Add a sketch of the handler method
- Add an /events RPCFunc to the route map
- Implement query logic
- Subscribe to pubsub if confingured, handle termination

* add MatchesEvents test

* add TODO due to backport sequence

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-10-04 22:52:27 +02:00
mmsqe
069e402aa1 backport: add event subscription options and defaults config (#7930) (#9491)
* config: add event subscription options and defaults (#7930)

* Apply suggestions from code review

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-09-28 16:27:19 +02:00
mmsqe
f1a57adee4 Feature/adr075 backport eventlog (#9470)
* rpc: implement the eventlog defined by ADR 075 (#7825)

Implement the basic cursor and eventlog types described in ADR 075.  Handle
encoding and decoding as strings for compatibility with JSON.

- Add unit tests for the required order and synchronization properties.
- Add hooks for metrics, with one value to be expanded later.
- Update ADR 075 to match the specifics of the implementation so far.

* fix event type

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2022-09-23 15:52:50 +02:00
32 changed files with 2036 additions and 81 deletions

View File

@@ -36,7 +36,9 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
### BREAKING CHANGES
- CLI/RPC/Config
- [config] \#9491 Add new event subscription options and defaults. (@creachadair)
- [config] \#9259 Rename the fastsync section and the fast_sync key blocksync and block_sync respectively
- [rpc] \#7982 Add new Events interface and deprecate Subscribe. (@creachadair)
- Apps
- [abci/counter] \#6684 Delete counter example app

View File

@@ -382,6 +382,33 @@ type RPCConfig struct {
// predictability in subscription behavior.
CloseOnSlowClient bool `mapstructure:"experimental_close_on_slow_client"`
// If true, disable the websocket interface to the RPC service. This has
// the effect of disabling the /subscribe, /unsubscribe, and /unsubscribe_all
// methods for event subscription.
//
// EXPERIMENTAL: This setting will be removed in Tendermint v0.37.
ExperimentalDisableWebsocket bool `mapstructure:"experimental-disable-websocket"`
// The time window size for the event log. All events up to this long before
// the latest (up to EventLogMaxItems) will be available for subscribers to
// fetch via the /events method. If 0 (the default) the event log and the
// /events RPC method are disabled.
EventLogWindowSize time.Duration `mapstructure:"event-log-window-size"`
// The maxiumum number of events that may be retained by the event log. If
// this value is 0, no upper limit is set. Otherwise, items in excess of
// this number will be discarded from the event log.
//
// Warning: This setting is a safety valve. Setting it too low may cause
// subscribers to miss events. Try to choose a value higher than the
// maximum worst-case expected event load within the chosen window size in
// ordinary operation.
//
// For example, if the window size is 10 minutes and the node typically
// averages 1000 events per ten minutes, but with occasional known spikes of
// up to 2000, choose a value > 2000.
EventLogMaxItems int `mapstructure:"event-log-max-items"`
// How long to wait for a tx to be committed during /broadcast_tx_commit
// WARNING: Using a value larger than 10s will result in increasing the
// global HTTP write timeout, which applies to all connections and endpoints.
@@ -429,11 +456,16 @@ func DefaultRPCConfig() *RPCConfig {
Unsafe: false,
MaxOpenConnections: 900,
MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
SubscriptionBufferSize: defaultSubscriptionBufferSize,
TimeoutBroadcastTxCommit: 10 * time.Second,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,
SubscriptionBufferSize: defaultSubscriptionBufferSize,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,
// Settings for event subscription.
MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
ExperimentalDisableWebsocket: false, // compatible with TM v0.35 and earlier
EventLogWindowSize: 30 * time.Second,
EventLogMaxItems: 0,
TimeoutBroadcastTxCommit: 10 * time.Second,
MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
@@ -479,6 +511,12 @@ func (cfg *RPCConfig) ValidateBasic() error {
cfg.SubscriptionBufferSize,
)
}
if cfg.EventLogWindowSize < 0 {
return errors.New("event-log-window-size must not be negative")
}
if cfg.EventLogMaxItems < 0 {
return errors.New("event-log-max-items must not be negative")
}
if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout_broadcast_tx_commit can't be negative")
}

View File

@@ -233,6 +233,33 @@ experimental_websocket_write_buffer_size = {{ .RPC.WebSocketWriteBufferSize }}
# predictability in subscription behavior.
experimental_close_on_slow_client = {{ .RPC.CloseOnSlowClient }}
# If true, disable the websocket interface to the RPC service. This has
# the effect of disabling the /subscribe, /unsubscribe, and /unsubscribe_all
# methods for event subscription.
#
# EXPERIMENTAL: This setting will be removed in Tendermint v0.37.
experimental-disable-websocket = {{ .RPC.ExperimentalDisableWebsocket }}
# The time window size for the event log. All events up to this long before
# the latest (up to EventLogMaxItems) will be available for subscribers to
# fetch via the /events method. If 0 (the default) the event log and the
# /events RPC method are disabled.
event-log-window-size = "{{ .RPC.EventLogWindowSize }}"
# The maxiumum number of events that may be retained by the event log. If
# this value is 0, no upper limit is set. Otherwise, items in excess of
# this number will be discarded from the event log.
#
# Warning: This setting is a safety valve. Setting it too low may cause
# subscribers to miss events. Try to choose a value higher than the
# maximum worst-case expected event load within the chosen window size in
# ordinary operation.
#
# For example, if the window size is 10 minutes and the node typically
# averages 1000 events per ten minutes, but with occasional known spikes of
# up to 2000, choose a value > 2000.
event-log-max-items = {{ .RPC.EventLogMaxItems }}
# How long to wait for a tx to be committed during /broadcast_tx_commit.
# WARNING: Using a value larger than 10s will result in increasing the
# global HTTP write timeout, which applies to all connections and endpoints.

View File

@@ -0,0 +1,100 @@
// Package cursor implements time-ordered item cursors for an event log.
package cursor
import (
"errors"
"fmt"
"strconv"
"strings"
"time"
)
// A Source produces cursors based on a time index generator and a sequence
// counter. A zero-valued Source is ready for use with defaults as described.
type Source struct {
// This function is called to produce the current time index.
// If nil, it defaults to time.Now().UnixNano().
TimeIndex func() int64
// The current counter value used for sequence number generation. It is
// incremented in-place each time a cursor is generated.
Counter int64
}
func (s *Source) timeIndex() int64 {
if s.TimeIndex == nil {
return time.Now().UnixNano()
}
return s.TimeIndex()
}
func (s *Source) nextCounter() int64 {
s.Counter++
return s.Counter
}
// Cursor produces a fresh cursor from s at the current time index and counter.
func (s *Source) Cursor() Cursor {
return Cursor{
timestamp: uint64(s.timeIndex()),
sequence: uint16(s.nextCounter() & 0xffff),
}
}
// A Cursor is a unique identifier for an item in a time-ordered event log.
// It is safe to copy and compare cursors by value.
type Cursor struct {
timestamp uint64 // ns since Unix epoch
sequence uint16 // sequence number
}
// Before reports whether c is prior to o in time ordering. This comparison
// ignores sequence numbers.
func (c Cursor) Before(o Cursor) bool { return c.timestamp < o.timestamp }
// Diff returns the time duration between c and o. The duration is negative if
// c is before o in time order.
func (c Cursor) Diff(o Cursor) time.Duration {
return time.Duration(c.timestamp) - time.Duration(o.timestamp)
}
// IsZero reports whether c is the zero cursor.
func (c Cursor) IsZero() bool { return c == Cursor{} }
// MarshalText implements the encoding.TextMarshaler interface.
// A zero cursor marshals as "", otherwise the format used by the String method.
func (c Cursor) MarshalText() ([]byte, error) {
if c.IsZero() {
return nil, nil
}
return []byte(c.String()), nil
}
// UnmarshalText implements the encoding.TextUnmarshaler interface.
// An empty text unmarshals without error to a zero cursor.
func (c *Cursor) UnmarshalText(data []byte) error {
if len(data) == 0 {
*c = Cursor{} // set zero
return nil
}
ps := strings.SplitN(string(data), "-", 2)
if len(ps) != 2 {
return errors.New("invalid cursor format")
}
ts, err := strconv.ParseUint(ps[0], 16, 64)
if err != nil {
return fmt.Errorf("invalid timestamp: %w", err)
}
sn, err := strconv.ParseUint(ps[1], 16, 16)
if err != nil {
return fmt.Errorf("invalid sequence: %w", err)
}
c.timestamp = ts
c.sequence = uint16(sn)
return nil
}
// String returns a printable text representation of a cursor.
func (c Cursor) String() string {
return fmt.Sprintf("%016x-%04x", c.timestamp, c.sequence)
}

View File

@@ -0,0 +1,141 @@
package cursor_test
import (
"fmt"
"testing"
"time"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
)
func mustParse(t *testing.T, s string) cursor.Cursor {
t.Helper()
var c cursor.Cursor
if err := c.UnmarshalText([]byte(s)); err != nil {
t.Fatalf("Unmarshal %q: unexpected error: %v", s, err)
}
return c
}
func TestSource_counter(t *testing.T) {
src := &cursor.Source{
TimeIndex: func() int64 { return 255 },
}
for i := 1; i <= 5; i++ {
want := fmt.Sprintf("00000000000000ff-%04x", i)
got := src.Cursor().String()
if got != want {
t.Errorf("Cursor %d: got %q, want %q", i, got, want)
}
}
}
func TestSource_timeIndex(t *testing.T) {
times := []int64{0, 1, 100, 65535, 0x76543210fecdba98}
src := &cursor.Source{
TimeIndex: func() int64 {
out := times[0]
times = append(times[1:], out)
return out
},
Counter: 160,
}
results := []string{
"0000000000000000-00a1",
"0000000000000001-00a2",
"0000000000000064-00a3",
"000000000000ffff-00a4",
"76543210fecdba98-00a5",
}
for i, want := range results {
if got := src.Cursor().String(); got != want {
t.Errorf("Cursor %d: got %q, want %q", i+1, got, want)
}
}
}
func TestCursor_roundTrip(t *testing.T) {
const text = `0123456789abcdef-fce9`
c := mustParse(t, text)
if got := c.String(); got != text {
t.Errorf("Wrong string format: got %q, want %q", got, text)
}
cmp, err := c.MarshalText()
if err != nil {
t.Fatalf("Marshal %+v failed: %v", c, err)
}
if got := string(cmp); got != text {
t.Errorf("Wrong text format: got %q, want %q", got, text)
}
}
func TestCursor_ordering(t *testing.T) {
// Condition: text1 precedes text2 in time order.
// Condition: text2 has an earlier sequence than text1.
const zero = ""
const text1 = "0000000012345678-0005"
const text2 = "00000000fecdeba9-0002"
zc := mustParse(t, zero)
c1 := mustParse(t, text1)
c2 := mustParse(t, text2)
// Confirm for all pairs that string order respects time order.
pairs := []struct {
t1, t2 string
c1, c2 cursor.Cursor
}{
{zero, zero, zc, zc},
{zero, text1, zc, c1},
{zero, text2, zc, c2},
{text1, zero, c1, zc},
{text1, text1, c1, c1},
{text1, text2, c1, c2},
{text2, zero, c2, zc},
{text2, text1, c2, c1},
{text2, text2, c2, c2},
}
for _, pair := range pairs {
want := pair.t1 < pair.t2
if got := pair.c1.Before(pair.c2); got != want {
t.Errorf("(%s).Before(%s): got %v, want %v", pair.t1, pair.t2, got, want)
}
}
}
func TestCursor_IsZero(t *testing.T) {
tests := []struct {
text string
want bool
}{
{"", true},
{"0000000000000000-0000", true},
{"0000000000000001-0000", false},
{"0000000000000000-0001", false},
{"0000000000000001-0001", false},
}
for _, test := range tests {
c := mustParse(t, test.text)
if got := c.IsZero(); got != test.want {
t.Errorf("IsZero(%q): got %v, want %v", test.text, got, test.want)
}
}
}
func TestCursor_Diff(t *testing.T) {
const time1 = 0x1ac0193001
const time2 = 0x0ac0193001
text1 := fmt.Sprintf("%016x-0001", time1)
text2 := fmt.Sprintf("%016x-0005", time2)
want := time.Duration(time1 - time2)
c1 := mustParse(t, text1)
c2 := mustParse(t, text2)
got := c1.Diff(c2)
if got != want {
t.Fatalf("Diff %q - %q: got %v, want %v", text1, text2, got, want)
}
}

View File

@@ -0,0 +1,217 @@
// Package eventlog defines a reverse time-ordered log of events over a sliding
// window of time before the most recent item in the log.
//
// New items are added to the head of the log (the newest end), and items that
// fall outside the designated window are pruned from its tail (the oldest).
// Items within the log are indexed by lexicographically-ordered cursors.
package eventlog
import (
"context"
"errors"
"sync"
"time"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/types"
)
// A Log is a reverse time-ordered log of events in a sliding window of time
// before the newest item. Use Add to add new items to the front (head) of the
// log, and Scan or WaitScan to traverse the current contents of the log.
//
// After construction, a *Log is safe for concurrent access by one writer and
// any number of readers.
type Log struct {
// These values do not change after construction.
windowSize time.Duration
maxItems int
metrics *Metrics
// Protects access to the fields below. Lock to modify the values of these
// fields, or to read or snapshot the values.
mu sync.Mutex
numItems int // total number of items in the log
oldestCursor cursor.Cursor // cursor of the oldest item
head *logEntry // pointer to the newest item
ready chan struct{} // closed when head changes
source cursor.Source // generator of cursors
}
// New constructs a new empty log with the given settings.
func New(opts LogSettings) (*Log, error) {
if opts.WindowSize <= 0 {
return nil, errors.New("window size must be positive")
}
lg := &Log{
windowSize: opts.WindowSize,
maxItems: opts.MaxItems,
metrics: NopMetrics(),
ready: make(chan struct{}),
source: opts.Source,
}
if opts.Metrics != nil {
lg.metrics = opts.Metrics
}
return lg, nil
}
// Add adds a new item to the front of the log. If necessary, the log is pruned
// to fit its constraints on size and age. Add blocks until both steps are done.
//
// Any error reported by Add arises from pruning; the new item was added to the
// log regardless whether an error occurs.
func (lg *Log) Add(etype string, data types.TMEventData) error {
lg.mu.Lock()
head := &logEntry{
item: newItem(lg.source.Cursor(), etype, data),
next: lg.head,
}
lg.numItems++
lg.updateHead(head)
size := lg.numItems
age := head.item.Cursor.Diff(lg.oldestCursor)
// If the log requires pruning, do the pruning step outside the lock. This
// permits readers to continue to make progress while we're working.
lg.mu.Unlock()
return lg.checkPrune(head, size, age)
}
// Scan scans the current contents of the log, calling f with each item until
// all items are visited or f reports an error. If f returns ErrStopScan, Scan
// returns nil, otherwise it returns the error reported by f.
//
// The Info value returned is valid even if Scan reports an error.
func (lg *Log) Scan(f func(*Item) error) (Info, error) {
return lg.scanState(lg.state(), f)
}
// WaitScan blocks until the cursor of the frontmost log item is different from
// c, then executes a Scan on the contents of the log. If ctx ends before the
// head is updated, WaitScan returns an error without calling f.
//
// The Info value returned is valid even if WaitScan reports an error.
func (lg *Log) WaitScan(ctx context.Context, c cursor.Cursor, f func(*Item) error) (Info, error) {
st := lg.state()
for st.head == nil || st.head.item.Cursor == c {
var err error
st, err = lg.waitStateChange(ctx)
if err != nil {
return st.info(), err
}
}
return lg.scanState(st, f)
}
// Info returns the current state of the log.
func (lg *Log) Info() Info { return lg.state().info() }
// ErrStopScan is returned by a Scan callback to signal that scanning should be
// terminated without error.
var ErrStopScan = errors.New("stop scanning")
// ErrLogPruned is returned by Add to signal that at least some events within
// the time window were discarded by pruning in excess of the size limit.
// This error may be wrapped, use errors.Is to test for it.
var ErrLogPruned = errors.New("log pruned")
// LogSettings configure the construction of an event log.
type LogSettings struct {
// The size of the time window measured in time before the newest item.
// This value must be positive.
WindowSize time.Duration
// The maximum number of items that will be retained in memory within the
// designated time window. A value ≤ 0 imposes no limit, otherwise items in
// excess of this number will be dropped from the log.
MaxItems int
// The cursor source to use for log entries. If not set, use wallclock time.
Source cursor.Source
// If non-nil, exported metrics to update. If nil, metrics are discarded.
Metrics *Metrics
}
// Info records the current state of the log at the time of a scan operation.
type Info struct {
Oldest cursor.Cursor // the cursor of the oldest item in the log
Newest cursor.Cursor // the cursor of the newest item in the log
Size int // the number of items in the log
}
// logState is a snapshot of the state of the log.
type logState struct {
oldest cursor.Cursor
newest cursor.Cursor
size int
head *logEntry
}
func (st logState) info() Info {
return Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
}
// state returns a snapshot of the current log contents. The caller may freely
// traverse the internal structure of the list without locking, provided it
// does not modify either the entries or their items.
func (lg *Log) state() logState {
lg.mu.Lock()
defer lg.mu.Unlock()
if lg.head == nil {
return logState{} // empty
}
return logState{
oldest: lg.oldestCursor,
newest: lg.head.item.Cursor,
size: lg.numItems,
head: lg.head,
}
}
// waitStateChange blocks until either ctx ends or the head of the log is
// modified, then returns the state of the log. An error is reported only if
// ctx terminates before head changes.
func (lg *Log) waitStateChange(ctx context.Context) (logState, error) {
lg.mu.Lock()
ch := lg.ready // capture
lg.mu.Unlock()
select {
case <-ctx.Done():
return lg.state(), ctx.Err()
case <-ch:
return lg.state(), nil
}
}
// scanState scans the contents of the log at st. See the Scan method for a
// description of the callback semantics.
func (lg *Log) scanState(st logState, f func(*Item) error) (Info, error) {
info := Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
for cur := st.head; cur != nil; cur = cur.next {
if err := f(cur.item); err != nil {
if errors.Is(err, ErrStopScan) {
return info, nil
}
return info, err
}
}
return info, nil
}
// updateHead replaces the current head with newHead, signals any waiters, and
// resets the wait signal. The caller must hold log.mu exclusively.
func (lg *Log) updateHead(newHead *logEntry) {
lg.head = newHead
close(lg.ready) // signal
lg.ready = make(chan struct{})
}
// A logEntry is the backbone of the event log queue. Entries are not mutated
// after construction, so it is safe to read item and next without locking.
type logEntry struct {
item *Item
next *logEntry
}

View File

@@ -0,0 +1,222 @@
package eventlog_test
import (
"context"
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/google/go-cmp/cmp"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/types"
)
// fakeTime is a fake clock to use to control cursor assignment.
// The timeIndex method reports the current "time" and advance manually updates
// the apparent time.
type fakeTime struct{ now int64 }
func newFakeTime(init int64) *fakeTime { return &fakeTime{now: init} }
func (f *fakeTime) timeIndex() int64 { return f.now }
func (f *fakeTime) advance(d time.Duration) { f.now += int64(d) }
// eventData is a placeholder event data implementation for testing.
type eventData string
func (eventData) TypeTag() string { return "eventData" }
func TestNewError(t *testing.T) {
lg, err := eventlog.New(eventlog.LogSettings{})
if err == nil {
t.Fatalf("New: got %+v, wanted error", lg)
} else {
t.Logf("New: got expected error: %v", err)
}
}
func TestPruneTime(t *testing.T) {
clk := newFakeTime(0)
// Construct a log with a 60-second time window.
lg, err := eventlog.New(eventlog.LogSettings{
WindowSize: 60 * time.Second,
Source: cursor.Source{
TimeIndex: clk.timeIndex,
},
})
if err != nil {
t.Fatalf("New unexpectedly failed: %v", err)
}
// Add events up to the time window, at seconds 0, 15, 30, 45, 60.
// None of these should be pruned (yet).
var want []string // cursor strings
for i := 1; i <= 5; i++ {
want = append(want, fmt.Sprintf("%016x-%04x", clk.timeIndex(), i))
mustAdd(t, lg, "test-event", eventData("whatever"))
clk.advance(15 * time.Second)
}
// time now: 75 sec.
// Verify that all the events we added are present.
got := cursors(t, lg)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("Cursors before pruning: (-want, +got)\n%s", diff)
}
// Add an event past the end of the window at second 90, and verify that
// this triggered an age-based prune of the oldest events (0, 15) that are
// outside the 60-second window.
clk.advance(15 * time.Second) // time now: 90 sec.
want = append(want[2:], fmt.Sprintf("%016x-%04x", clk.timeIndex(), 6))
mustAdd(t, lg, "test-event", eventData("extra"))
got = cursors(t, lg)
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("Cursors after pruning: (-want, +got)\n%s", diff)
}
}
// Run a publisher and concurrent subscribers to tickle the race detector with
// concurrent add and scan operations.
func TestConcurrent(t *testing.T) {
defer leaktest.Check(t)
if testing.Short() {
t.Skip("Skipping concurrency exercise because -short is set")
}
lg, err := eventlog.New(eventlog.LogSettings{
WindowSize: 30 * time.Second,
})
if err != nil {
t.Fatalf("New unexpectedly failed: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
// Publisher: Add events and handle expirations.
wg.Add(1)
go func() {
defer wg.Done()
tick := time.NewTimer(0)
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-tick.C:
_ = lg.Add("test-event", eventData(t.Format(time.RFC3339Nano)))
tick.Reset(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}
}()
// Subscribers: Wait for new events at the head of the queue. This
// simulates the typical operation of a subscriber by waiting for the head
// cursor to change and then scanning down toward the unconsumed item.
const numSubs = 16
for i := 0; i < numSubs; i++ {
task := i
wg.Add(1)
go func() {
defer wg.Done()
tick := time.NewTimer(0)
var cur cursor.Cursor
for {
// Simulate the subscriber being busy with other things.
select {
case <-ctx.Done():
return
case <-tick.C:
tick.Reset(time.Duration(rand.Intn(150)) * time.Millisecond)
}
// Wait for new data to arrive.
info, err := lg.WaitScan(ctx, cur, func(itm *eventlog.Item) error {
if itm.Cursor == cur {
return eventlog.ErrStopScan
}
return nil
})
if err != nil {
if !errors.Is(err, context.Canceled) {
t.Errorf("Wait scan for task %d failed: %v", task, err)
}
return
}
cur = info.Newest
}
}()
}
time.AfterFunc(2*time.Second, cancel)
wg.Wait()
}
func TestPruneSize(t *testing.T) {
const maxItems = 25
lg, err := eventlog.New(eventlog.LogSettings{
WindowSize: 60 * time.Second,
MaxItems: maxItems,
})
if err != nil {
t.Fatalf("New unexpectedly failed: %v", err)
}
// Add a lot of items to the log and verify that we never exceed the
// specified cap.
for i := 0; i < 60; i++ {
mustAdd(t, lg, "test-event", eventData(strconv.Itoa(i+1)))
if got := lg.Info().Size; got > maxItems {
t.Errorf("After add %d: log size is %d, want ≤ %d", i+1, got, maxItems)
}
}
}
// mustAdd adds a single event to lg. If Add reports an error other than for
// pruning, the test fails; otherwise the error is returned.
func mustAdd(t *testing.T, lg *eventlog.Log, etype string, data types.TMEventData) {
t.Helper()
err := lg.Add(etype, data)
if err != nil && !errors.Is(err, eventlog.ErrLogPruned) {
t.Fatalf("Add %q failed: %v", etype, err)
}
}
// cursors extracts the cursors from lg in ascending order of time.
func cursors(t *testing.T, lg *eventlog.Log) []string {
t.Helper()
var cursors []string
if _, err := lg.Scan(func(itm *eventlog.Item) error {
cursors = append(cursors, itm.Cursor.String())
return nil
}); err != nil {
t.Fatalf("Scan failed: %v", err)
}
reverse(cursors) // put in forward-time order for comparison
return cursors
}
func reverse(ss []string) {
for i, j := 0, len(ss)-1; i < j; {
ss[i], ss[j] = ss[j], ss[i]
i++
j--
}
}

78
internal/eventlog/item.go Normal file
View File

@@ -0,0 +1,78 @@
package eventlog
import (
"strings"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/types"
)
// Cached constants for the pieces of reserved event names.
var (
tmTypeTag string
tmTypeKey string
)
func init() {
parts := strings.SplitN(types.EventTypeKey, ".", 2)
if len(parts) != 2 {
panic("invalid event type key: " + types.EventTypeKey)
}
tmTypeTag = parts[0]
tmTypeKey = parts[1]
}
// ABCIEventer is an optional extension interface that may be implemented by
// event data types, to expose ABCI metadata to the event log. If an event item
// does not implement this interface, it is presumed to have no ABCI metadata.
type ABCIEventer interface {
// Return any ABCI events metadata the receiver contains.
// The reported slice must not contain a type (tm.event) record, since some
// events share the same structure among different event types.
ABCIEvents() []abci.Event
}
// An Item is a single event item.
type Item struct {
Cursor cursor.Cursor
Type string
Data types.TMEventData
Events []abci.Event
}
// newItem constructs a new item with the specified cursor, type, and data.
func newItem(cursor cursor.Cursor, etype string, data types.TMEventData) *Item {
return &Item{Cursor: cursor, Type: etype, Data: data, Events: makeEvents(etype, data)}
}
// makeEvents returns a slice of ABCI events comprising the type tag along with
// any internal events exported by the data value.
func makeEvents(etype string, data types.TMEventData) []abci.Event {
base := []abci.Event{{
Type: tmTypeTag,
Attributes: []abci.EventAttribute{{
Key: tmTypeKey, Value: etype,
}},
}}
if evt, ok := data.(ABCIEventer); ok {
return append(base, evt.ABCIEvents()...)
}
return base
}
// FindType reports whether events contains a tm.event event, and if so returns
// its value, which is the type of the underlying event item.
func FindType(events []abci.Event) (string, bool) {
for _, evt := range events {
if evt.Type != tmTypeTag {
continue
}
for _, attr := range evt.Attributes {
if attr.Key == tmTypeKey {
return attr.Value, true
}
}
}
return "", false
}

View File

@@ -0,0 +1,30 @@
// Code generated by metricsgen. DO NOT EDIT.
package eventlog
import (
"github.com/go-kit/kit/metrics/discard"
prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
numItems: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "num_items",
Help: "Number of items currently resident in the event log.",
}, labels).With(labelsAndValues...),
}
}
func NopMetrics() *Metrics {
return &Metrics{
numItems: discard.NewGauge(),
}
}

View File

@@ -0,0 +1,14 @@
package eventlog
import "github.com/go-kit/kit/metrics"
const MetricsSubsystem = "eventlog"
//go:generate go run ../../scripts/metricsgen -struct=Metrics
// Metrics define the metrics exported by the eventlog package.
type Metrics struct {
// Number of items currently resident in the event log.
numItems metrics.Gauge
}

111
internal/eventlog/prune.go Normal file
View File

@@ -0,0 +1,111 @@
package eventlog
import (
"time"
)
// checkPrune checks whether the log has exceeded its boundaries of size or
// age, and if so prunes the log and updates the head.
func (lg *Log) checkPrune(head *logEntry, size int, age time.Duration) error {
// To avoid potentially re-pruning for every event, don't trigger an age
// prune until we're at least this far beyond the designated size.
const windowSlop = 30 * time.Second
if age < (lg.windowSize+windowSlop) && (lg.maxItems <= 0 || size <= lg.maxItems) {
lg.metrics.numItems.Set(float64(lg.numItems))
return nil // no pruning is needed
}
var newState logState
var err error
switch {
case lg.maxItems > 0 && size > lg.maxItems:
// We exceeded the size cap. In this case, age does not matter: count off
// the newest items and drop the unconsumed tail. Note that we prune by a
// fraction rather than an absolute amount so that we only have to prune
// for size occasionally.
// TODO(creachadair): We may want to spill dropped events to secondary
// storage rather than dropping them. The size cap is meant as a safety
// valve against unexpected extremes, but if a network has "expected"
// spikes that nevertheless exceed any safe buffer size (e.g., Osmosis
// epochs), we may want to have a fallback so that we don't lose events
// that would otherwise fall within the window.
newSize := 3 * size / 4
newState, err = lg.pruneSize(head, newSize)
default:
// We did not exceed the size cap, but some items are too old.
newState = lg.pruneAge(head)
}
// Note that when we update the head after pruning, we do not need to signal
// any waiters; pruning never adds new material to the log so anyone waiting
// should continue doing so until a subsequent Add occurs.
lg.mu.Lock()
defer lg.mu.Unlock()
lg.numItems = newState.size
lg.metrics.numItems.Set(float64(newState.size))
lg.oldestCursor = newState.oldest
lg.head = newState.head
return err
}
// pruneSize returns a new log state by pruning head to newSize.
// Precondition: newSize ≤ len(head).
func (lg *Log) pruneSize(head *logEntry, newSize int) (logState, error) {
// Special case for size 0 to simplify the logic below.
if newSize == 0 {
return logState{}, ErrLogPruned // drop everything
}
// Initialize: New head has the same item as the old head.
first := &logEntry{item: head.item} // new head
last := first // new tail (last copied cons)
cur := head.next
for i := 1; i < newSize; i++ {
cp := &logEntry{item: cur.item}
last.next = cp
last = cp
cur = cur.next
}
var err error
if head.item.Cursor.Diff(last.item.Cursor) <= lg.windowSize {
err = ErrLogPruned
}
return logState{
oldest: last.item.Cursor,
newest: first.item.Cursor,
size: newSize,
head: first,
}, err
}
// pruneAge returns a new log state by pruning items older than the window
// prior to the head element.
func (lg *Log) pruneAge(head *logEntry) logState {
first := &logEntry{item: head.item}
last := first
size := 1
for cur := head.next; cur != nil; cur = cur.next {
diff := head.item.Cursor.Diff(cur.item.Cursor)
if diff > lg.windowSize {
break // all remaining items are older than the window
}
cp := &logEntry{item: cur.item}
last.next = cp
last = cp
size++
}
return logState{
oldest: last.item.Cursor,
newest: first.item.Cursor,
size: size,
head: first,
}
}

View File

@@ -95,6 +95,13 @@ func (q *Query) Matches(events map[string][]string) (bool, error) {
return q.matchesEvents(ExpandEvents(events)), nil
}
func (q *Query) MatchesEvents(events []types.Event) (bool, error) {
if q == nil {
return true, nil
}
return q.matchesEvents(events), nil
}
// String matches part of the pubsub.Query interface.
func (q *Query) String() string {
if q == nil {

View File

@@ -332,6 +332,16 @@ func TestCompiledMatches(t *testing.T) {
t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v",
tc.s, tc.events, got, tc.matches)
}
got, err = c.MatchesEvents(query.ExpandEvents(tc.events))
if err != nil {
t.Errorf("Query: %#q\nInput: %+v\nMatches: got error %v",
tc.s, tc.events, err)
}
if got != tc.matches {
t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v",
tc.s, tc.events, got, tc.matches)
}
})
}
}

View File

@@ -1,6 +1,8 @@
package proxy
import (
"time"
"github.com/tendermint/tendermint/libs/bytes"
lrpc "github.com/tendermint/tendermint/light/rpc"
rpcclient "github.com/tendermint/tendermint/rpc/client"
@@ -12,7 +14,9 @@ import (
func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
return map[string]*rpcserver.RPCFunc{
// Subscribe/unsubscribe are reserved for websocket events.
// Event subscription. Note that subscribe, unsubscribe, and
// unsubscribe_all are only available via the websocket endpoint.
"events": rpcserver.NewRPCFunc(makeEventsSearchFunc(c), "filter,maxItems,before,after,waitTime"),
"subscribe": rpcserver.NewWSRPCFunc(c.SubscribeWS, "query"),
"unsubscribe": rpcserver.NewWSRPCFunc(c.UnsubscribeWS, "query"),
"unsubscribe_all": rpcserver.NewWSRPCFunc(c.UnsubscribeAllWS, ""),
@@ -300,3 +304,31 @@ func makeBroadcastEvidenceFunc(c *lrpc.Client) rpcBroadcastEvidenceFunc {
return c.BroadcastEvidence(ctx.Context(), ev)
}
}
type rpcEventsSearchFunc func(
ctx *rpctypes.Context,
filter string,
maxItems int,
before, after string,
waitTime time.Duration,
) (*ctypes.ResultEvents, error)
func makeEventsSearchFunc(c *lrpc.Client) rpcEventsSearchFunc {
return func(
ctx *rpctypes.Context,
filter string,
maxItems int,
before, after string,
waitTime time.Duration,
) (*ctypes.ResultEvents, error) {
return c.Events(ctx.Context(), &ctypes.RequestEvents{
Filter: &ctypes.EventFilter{
Query: filter,
},
MaxItems: maxItems,
WaitTime: waitTime,
Before: before,
After: after,
})
}
}

View File

@@ -572,11 +572,11 @@ func (c *Client) Subscribe(ctx context.Context, subscriber, query string,
}
func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) error {
return c.next.Unsubscribe(ctx, subscriber, query)
return c.next.Unsubscribe(ctx, subscriber, query) //nolint:staticcheck
}
func (c *Client) UnsubscribeAll(ctx context.Context, subscriber string) error {
return c.next.UnsubscribeAll(ctx, subscriber)
return c.next.UnsubscribeAll(ctx, subscriber) //nolint:staticcheck
}
func (c *Client) updateLightClientIfNeededTo(ctx context.Context, height *int64) (*types.LightBlock, error) {
@@ -599,6 +599,10 @@ func (c *Client) RegisterOpDecoder(typ string, dec merkle.OpDecoder) {
c.prt.RegisterOpDecoder(typ, dec)
}
func (c *Client) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) {
return c.next.Events(ctx, req)
}
// SubscribeWS subscribes for events using the given query and remote address as
// a subscriber, but does not verify responses (UNSAFE)!
// TODO: verify data
@@ -631,7 +635,7 @@ func (c *Client) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Resul
// UnsubscribeWS calls original client's Unsubscribe using remote address as a
// subscriber.
func (c *Client) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
err := c.next.Unsubscribe(context.Background(), ctx.RemoteAddr(), query)
err := c.next.Unsubscribe(context.Background(), ctx.RemoteAddr(), query) //nolint:staticcheck
if err != nil {
return nil, err
}
@@ -641,7 +645,7 @@ func (c *Client) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Res
// UnsubscribeAllWS calls original client's UnsubscribeAll using remote address
// as a subscriber.
func (c *Client) UnsubscribeAllWS(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
err := c.next.UnsubscribeAll(context.Background(), ctx.RemoteAddr())
err := c.next.UnsubscribeAll(context.Background(), ctx.RemoteAddr()) //nolint:staticcheck
if err != nil {
return nil, err
}

View File

@@ -21,10 +21,12 @@ import (
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/evidence"
"github.com/tendermint/tendermint/internal/eventlog"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/light"
mempl "github.com/tendermint/tendermint/mempool"
@@ -113,20 +115,21 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
}
// MetricsProvider returns a consensus, p2p and mempool Metrics.
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
type MetricsProvider func(chainID string) (*cs.Metrics, *eventlog.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
return func(chainID string) (*cs.Metrics, *eventlog.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
eventlog.PrometheusMetrics(config.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID)
}
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
return cs.NopMetrics(), eventlog.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
}
}
@@ -210,6 +213,7 @@ type Node struct {
// services
eventBus *types.EventBus // pub/sub for services
eventLog *eventlog.Log
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor p2p.Reactor // for block-syncing
@@ -724,7 +728,7 @@ func NewNode(config *cfg.Config,
return nil, err
}
csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID)
csMetrics, eventLogMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID)
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics)
@@ -741,6 +745,19 @@ func NewNode(config *cfg.Config,
return nil, err
}
var eventLog *eventlog.Log
if w := config.RPC.EventLogWindowSize; w > 0 {
var err error
eventLog, err = eventlog.New(eventlog.LogSettings{
WindowSize: w,
MaxItems: config.RPC.EventLogMaxItems,
Metrics: eventLogMetrics,
})
if err != nil {
return nil, fmt.Errorf("initializing event log: %w", err)
}
}
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config,
genDoc.ChainID, dbProvider, eventBus, logger)
if err != nil {
@@ -924,6 +941,7 @@ func NewNode(config *cfg.Config,
indexerService: indexerService,
blockIndexer: blockIndexer,
eventBus: eventBus,
eventLog: eventLog,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)
@@ -1080,6 +1098,7 @@ func (n *Node) ConfigureRPC() error {
BlockIndexer: n.blockIndexer,
ConsensusReactor: n.consensusReactor,
EventBus: n.eventBus,
EventLog: n.eventLog,
Mempool: n.mempool,
Logger: n.Logger.With("module", "rpc"),
@@ -1116,25 +1135,61 @@ func (n *Node) startRPC() ([]net.Listener, error) {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
// we may expose the rpc over both a unix and tcp socket
// If the event log is enabled, subscribe to all events published to the
// event bus, and forward them to the event log.
if lg := n.eventLog; lg != nil {
// TODO(creachadair): This is kind of a hack, ideally we'd share the
// observer with the indexer, but it's tricky to plumb them together.
// For now, use a "normal" subscription with a big buffer allowance.
// The event log should always be able to keep up.
const subscriberID = "event-log-subscriber"
ctx := context.Background()
sub, err := n.eventBus.Subscribe(ctx, subscriberID, query.All, 1<<16) // essentially "no limit"
if err != nil {
return nil, fmt.Errorf("event log subscribe: %w", err)
}
go func() {
// N.B. Use background for unsubscribe, ctx is already terminated.
defer n.eventBus.UnsubscribeAll(ctx, subscriberID) // nolint:errcheck
for {
msg := <-sub.Out()
if err != nil {
n.Logger.Error("Subscription terminated", "err", err)
return
}
etype, ok := eventlog.FindType(query.ExpandEvents(msg.Events()))
if ok {
_ = lg.Add(etype, msg.Data())
}
}
}()
n.Logger.Info("Event log subscription enabled")
}
// We may expose the RPC over both TCP and a Unix-domain socket.
listeners := make([]net.Listener, len(listenAddrs))
for i, listenAddr := range listenAddrs {
mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server")
wmLogger := rpcLogger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(rpccore.Routes,
rpcserver.OnDisconnect(func(remoteAddr string) {
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
}),
rpcserver.ReadLimit(config.MaxBodyBytes),
rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize),
)
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
if n.config.RPC.ExperimentalDisableWebsocket {
rpcLogger.Info("Disabling websocket endpoints (experimental-disable-websocket=true)")
} else {
wmLogger := rpcLogger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(rpccore.Routes,
rpcserver.OnDisconnect(func(remoteAddr string) {
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
}),
rpcserver.ReadLimit(config.MaxBodyBytes),
rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize),
)
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
}
listener, err := rpcserver.Listen(
listenAddr,
config,

View File

@@ -41,12 +41,12 @@ func TestHeaderEvents(t *testing.T) {
}
})
}
evtTyp := types.EventNewBlockHeader
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
ectx, cancel := context.WithTimeout(ctx, waitForEventTimeout)
defer cancel()
query := types.QueryForEvent(types.EventNewBlockHeader).String()
var evt types.EventDataNewBlockHeader
err := client.WaitForOneEvent(ectx, c, query, &evt)
require.Nil(t, err, "%d: %+v", i, err)
_, ok := evt.(types.EventDataNewBlockHeader)
require.True(t, ok, "%d: %#v", i, evt)
// TODO: more checks...
})
}
@@ -141,17 +141,15 @@ func testTxEventsSent(t *testing.T, broadcastMethod string) {
}
}()
// and wait for confirmation
evt, err := client.WaitForOneEvent(c, types.EventTx, waitForEventTimeout)
// Wait for the transaction we sent to be confirmed.
query := fmt.Sprintf(`tm.event = '%s' AND tx.hash = '%X'`, types.EventTx, types.Tx(tx).Hash())
var evt types.EventDataTx
err := client.WaitForOneEvent(ctx, c, query, &evt)
require.Nil(t, err)
// and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
require.True(t, ok)
// make sure this is the proper tx
require.EqualValues(t, tx, txe.Tx)
require.True(t, txe.Result.IsOK())
require.EqualValues(t, tx, evt.Tx)
require.True(t, evt.Result.IsOK())
})
}
}

View File

@@ -0,0 +1,194 @@
// Package eventstream implements a convenience client for the Events method
// of the Tendermint RPC service, allowing clients to observe a resumable
// stream of events matching a query.
package eventstream
import (
"context"
"errors"
"fmt"
"time"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
)
// Client is the subset of the RPC client interface consumed by Stream.
type Client interface {
Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error)
}
// ErrStopRunning is returned by a Run callback to signal that no more events
// are wanted and that Run should return.
var ErrStopRunning = errors.New("stop accepting events")
// A Stream cpatures the state of a streaming event subscription.
type Stream struct {
filter *coretypes.EventFilter // the query being streamed
batchSize int // request batch size
newestSeen string // from the latest item matching our query
waitTime time.Duration // the long-polling interval
client Client
}
// New constructs a new stream for the given query and options.
// If opts == nil, the stream uses default values as described by
// StreamOptions. This function will panic if cli == nil.
func New(cli Client, query string, opts *StreamOptions) *Stream {
if cli == nil {
panic("eventstream: nil client")
}
return &Stream{
filter: &coretypes.EventFilter{Query: query},
batchSize: opts.batchSize(),
newestSeen: opts.resumeFrom(),
waitTime: opts.waitTime(),
client: cli,
}
}
// Run polls the service for events matching the query, and calls accept for
// each such event. Run handles pagination transparently, and delivers events
// to accept in order of publication.
//
// Run continues until ctx ends or accept reports an error. If accept returns
// ErrStopRunning, Run returns nil; otherwise Run returns the error reported by
// accept or ctx. Run also returns an error if the server reports an error
// from the Events method.
//
// If the stream falls behind the event log on the server, Run will stop and
// report an error of concrete type *MissedItemsError. Call Reset to reset the
// stream to the head of the log, and call Run again to resume.
func (s *Stream) Run(ctx context.Context, accept func(*coretypes.EventItem) error) error {
for {
items, err := s.fetchPages(ctx)
if err != nil {
return err
}
// Deliver events from the current batch to the receiver. We visit the
// batch in reverse order so the receiver sees them in forward order.
for i := len(items) - 1; i >= 0; i-- {
if err := ctx.Err(); err != nil {
return err
}
itm := items[i]
err := accept(itm)
if itm.Cursor > s.newestSeen {
s.newestSeen = itm.Cursor // update the latest delivered
}
if errors.Is(err, ErrStopRunning) {
return nil
} else if err != nil {
return err
}
}
}
}
// Reset updates the stream's current cursor position to the head of the log.
// This method may safely be called only when Run is not executing.
func (s *Stream) Reset() { s.newestSeen = "" }
// fetchPages fetches the next batch of matching results. If there are multiple
// pages, all the matching pages are retrieved. An error is reported if the
// current scan position falls out of the event log window.
func (s *Stream) fetchPages(ctx context.Context) ([]*coretypes.EventItem, error) {
var pageCursor string // if non-empty, page through items before this
var items []*coretypes.EventItem
// Fetch the next paginated batch of matching responses.
for {
rsp, err := s.client.Events(ctx, &coretypes.RequestEvents{
Filter: s.filter,
MaxItems: s.batchSize,
After: s.newestSeen,
Before: pageCursor,
WaitTime: s.waitTime,
})
if err != nil {
return nil, err
}
// If the oldest item in the log is newer than our most recent item,
// it means we might have missed some events matching our query.
if s.newestSeen != "" && s.newestSeen < rsp.Oldest {
return nil, &MissedItemsError{
Query: s.filter.Query,
NewestSeen: s.newestSeen,
OldestPresent: rsp.Oldest,
}
}
items = append(items, rsp.Items...)
if rsp.More {
// There are more results matching this request, leave the baseline
// where it is and set the page cursor so that subsequent requests
// will get the next chunk.
pageCursor = items[len(items)-1].Cursor
} else if len(items) != 0 {
// We got everything matching so far.
return items, nil
}
}
}
// StreamOptions are optional settings for a Stream value. A nil *StreamOptions
// is ready for use and provides default values as described.
type StreamOptions struct {
// How many items to request per call to the service. The stream may pin
// this value to a minimum default batch size.
BatchSize int
// If set, resume streaming from this cursor. Typically this is set to the
// cursor of the most recently-received matching value. If empty, streaming
// begins at the head of the log (the default).
ResumeFrom string
// Specifies the long poll interval. The stream may pin this value to a
// minimum default poll interval.
WaitTime time.Duration
}
func (o *StreamOptions) batchSize() int {
const minBatchSize = 16
if o == nil || o.BatchSize < minBatchSize {
return minBatchSize
}
return o.BatchSize
}
func (o *StreamOptions) resumeFrom() string {
if o == nil {
return ""
}
return o.ResumeFrom
}
func (o *StreamOptions) waitTime() time.Duration {
const minWaitTime = 5 * time.Second
if o == nil || o.WaitTime < minWaitTime {
return minWaitTime
}
return o.WaitTime
}
// MissedItemsError is an error that indicates the stream missed (lost) some
// number of events matching the specified query.
type MissedItemsError struct {
// The cursor of the newest matching item the stream has observed.
NewestSeen string
// The oldest cursor in the log at the point the miss was detected.
// Any matching events between NewestSeen and OldestPresent are lost.
OldestPresent string
// The active query.
Query string
}
// Error satisfies the error interface.
func (e *MissedItemsError) Error() string {
return fmt.Sprintf("missed events matching %q between %q and %q",
e.Query, e.NewestSeen, e.OldestPresent)
}

View File

@@ -0,0 +1,287 @@
package eventstream_test
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/google/go-cmp/cmp"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
"github.com/tendermint/tendermint/rpc/client/eventstream"
rpccore "github.com/tendermint/tendermint/rpc/core"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
)
func TestStream_filterOrder(t *testing.T) {
defer leaktest.Check(t)
s := newStreamTester(t, `tm.event = 'good'`, eventlog.LogSettings{
WindowSize: 30 * time.Second,
}, nil)
// Verify that events are delivered in forward time order (i.e., that the
// stream unpacks the pages correctly) and that events not matching the
// query (here, type="bad") are skipped.
//
// The minimum batch size is 16 and half the events we publish match, so we
// publish > 32 items (> 16 good) to ensure we exercise paging.
etype := [2]string{"good", "bad"}
var items []testItem
for i := 0; i < 40; i++ {
s.advance(100 * time.Millisecond)
text := fmt.Sprintf("item%d", i)
cur := s.publish(etype[i%2], text)
// Even-numbered items match the target type.
if i%2 == 0 {
items = append(items, makeTestItem(cur, text))
}
}
s.start()
for _, itm := range items {
s.mustItem(t, itm)
}
s.stopWait()
}
func TestStream_lostItem(t *testing.T) {
defer leaktest.Check(t)
s := newStreamTester(t, ``, eventlog.LogSettings{
WindowSize: 30 * time.Second,
}, nil)
// Publish an item and let the client observe it.
cur := s.publish("ok", "whatever")
s.start()
s.mustItem(t, makeTestItem(cur, "whatever"))
s.stopWait()
// Time passes, and cur expires out of the window.
s.advance(50 * time.Second)
next1 := s.publish("ok", "more stuff")
s.advance(15 * time.Second)
next2 := s.publish("ok", "still more stuff")
// At this point, the oldest item in the log is newer than the point at
// which we continued, we should get an error.
s.start()
var missed *eventstream.MissedItemsError
if err := s.mustError(t); !errors.As(err, &missed) {
t.Errorf("Wrong error: got %v, want %T", err, missed)
} else {
t.Logf("Correctly reported missed item: %v", missed)
}
// If we reset the stream and continue from head, we should catch up.
s.stopWait()
s.stream.Reset()
s.start()
s.mustItem(t, makeTestItem(next1, "more stuff"))
s.mustItem(t, makeTestItem(next2, "still more stuff"))
s.stopWait()
}
func TestMinPollTime(t *testing.T) {
defer leaktest.Check(t)
s := newStreamTester(t, ``, eventlog.LogSettings{
WindowSize: 30 * time.Second,
}, nil)
s.publish("bad", "whatever")
// Waiting for an item on a log with no matching events incurs a minimum
// wait time and reports no events.
ctx := context.Background()
filter := &coretypes.EventFilter{Query: `tm.event = 'good'`}
var zero cursor.Cursor
t.Run("NoneMatch", func(t *testing.T) {
start := time.Now()
// Request a very short delay, and affirm we got the server's minimum.
rsp, err := s.Events(ctx, &coretypes.RequestEvents{
Filter: filter,
MaxItems: 1,
After: zero.String(),
Before: zero.String(),
WaitTime: 10 * time.Millisecond,
})
if err != nil {
t.Fatalf("Events failed: %v", err)
} else if elapsed := time.Since(start); elapsed < time.Second {
t.Errorf("Events returned too quickly: got %v, wanted 1s", elapsed)
} else if len(rsp.Items) != 0 {
t.Errorf("Events returned %d items, expected none", len(rsp.Items))
}
})
s.publish("good", "whatever")
// Waiting for an available matching item incurs no delay.
t.Run("SomeMatch", func(t *testing.T) {
start := time.Now()
// Request a long-ish delay and affirm we don't block for it.
// Check for this by ensuring we return sooner than the minimum delay,
// since we don't know the exact timing.
rsp, err := s.Events(ctx, &coretypes.RequestEvents{
Filter: filter,
MaxItems: 1,
After: zero.String(),
Before: zero.String(),
WaitTime: 10 * time.Second,
})
if err != nil {
t.Fatalf("Events failed: %v", err)
} else if elapsed := time.Since(start); elapsed > 500*time.Millisecond {
t.Errorf("Events returned too slowly: got %v, wanted immediate", elapsed)
} else if len(rsp.Items) == 0 {
t.Error("Events returned no items, wanted at least 1")
}
})
}
// testItem is a wrapper for comparing item results in a friendly output format
// for the cmp package.
type testItem struct {
Cursor string
Data string
// N.B. Fields exported to simplify use in cmp.
}
func makeTestItem(cur, data string) testItem {
return testItem{
Cursor: cur,
Data: fmt.Sprintf(`%q`, data),
}
}
// streamTester is a simulation harness for an eventstream.Stream. It simulates
// the production service by plumbing an event log into a stub RPC environment,
// into which the test can publish events and advance the perceived time to
// exercise various cases of the stream.
type streamTester struct {
log *eventlog.Log
env *rpccore.Environment
clock int64
index int64
stream *eventstream.Stream
errc chan error
recv chan *coretypes.EventItem
stop func()
}
func newStreamTester(t *testing.T, query string, logOpts eventlog.LogSettings, streamOpts *eventstream.StreamOptions) *streamTester {
t.Helper()
s := new(streamTester)
// Plumb a time source controlled by the tester into the event log.
logOpts.Source = cursor.Source{
TimeIndex: s.timeNow,
}
lg, err := eventlog.New(logOpts)
if err != nil {
t.Fatalf("Creating event log: %v", err)
}
s.log = lg
s.env = &rpccore.Environment{EventLog: lg}
s.stream = eventstream.New(s, query, streamOpts)
rpccore.SetEnvironment(s.env)
return s
}
// start starts the stream receiver, which runs until it it terminated by
// calling stop.
func (s *streamTester) start() {
ctx, cancel := context.WithCancel(context.Background())
s.errc = make(chan error, 1)
s.recv = make(chan *coretypes.EventItem)
s.stop = cancel
go func() {
defer close(s.errc)
s.errc <- s.stream.Run(ctx, func(itm *coretypes.EventItem) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.recv <- itm:
return nil
}
})
}()
}
// publish adds a single event to the event log at the present moment.
func (s *streamTester) publish(etype, payload string) string {
_ = s.log.Add(etype, types.EventDataString(payload))
s.index++
return fmt.Sprintf("%016x-%04x", s.clock, s.index)
}
// wait blocks until either an item is received or the runner stops.
func (s *streamTester) wait() (*coretypes.EventItem, error) {
select {
case itm := <-s.recv:
return itm, nil
case err := <-s.errc:
return nil, err
}
}
// mustItem waits for an item and fails if either an error occurs or the item
// does not match want.
func (s *streamTester) mustItem(t *testing.T, want testItem) {
t.Helper()
itm, err := s.wait()
if err != nil {
t.Fatalf("Receive: got error %v, want item %v", err, want)
}
got := testItem{Cursor: itm.Cursor, Data: string(itm.Data)}
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("Item: (-want, +got)\n%s", diff)
}
}
// mustError waits for an error and fails if an item is returned.
func (s *streamTester) mustError(t *testing.T) error {
t.Helper()
itm, err := s.wait()
if err == nil {
t.Fatalf("Receive: got item %v, want error", itm)
}
return err
}
// stopWait stops the runner and waits for it to terminate.
func (s *streamTester) stopWait() { s.stop(); s.wait() } //nolint:errcheck
// timeNow reports the current simulated time index.
func (s *streamTester) timeNow() int64 { return s.clock }
// advance moves the simulated time index.
func (s *streamTester) advance(d time.Duration) { s.clock += int64(d) }
// Events implements the eventstream.Client interface by delegating to a stub
// environment as if it were a local RPC client. This works because the Events
// method only requires the event log, the other fields are unused.
func (s *streamTester) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
var before, after cursor.Cursor
if err := before.UnmarshalText([]byte(req.Before)); err != nil {
return nil, err
}
if err := after.UnmarshalText([]byte(req.After)); err != nil {
return nil, err
}
return rpccore.EventsWithContext(ctx, req.Filter, req.MaxItems, before, after, req.WaitTime)
}

View File

@@ -2,10 +2,11 @@ package client
import (
"context"
"errors"
"encoding/json"
"fmt"
"time"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
)
@@ -52,32 +53,24 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error {
return nil
}
// WaitForOneEvent subscribes to a websocket event for the given
// event time and returns upon receiving it one time, or
// when the timeout duration has expired.
//
// This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) {
const subscriber = "helpers"
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// register for the next event of this type
eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String())
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %w", err)
}
// make sure to unregister after the test is over
defer func() {
if deferErr := c.UnsubscribeAll(ctx, subscriber); deferErr != nil {
panic(deferErr)
// WaitForOneEvent waits for the first event matching the given query on c, or
// until ctx ends. It reports an error if ctx ends before a matching event is
// received.
func WaitForOneEvent(ctx context.Context, c EventsClient, query string, evt types.TMEventData) error {
for {
rsp, err := c.Events(ctx, &coretypes.RequestEvents{
Filter: &coretypes.EventFilter{Query: query},
MaxItems: 1,
WaitTime: 10 * time.Second, // duration doesn't matter, limited by ctx timeout
})
if err != nil {
return err
} else if len(rsp.Items) == 0 {
continue // continue polling until ctx expires
}
}()
select {
case event := <-eventCh:
return event.Data, nil
case <-ctx.Done():
return nil, errors.New("timed out waiting for event")
if err := json.Unmarshal(rsp.Items[0].Data, evt); err != nil {
return err
}
return nil
}
}

View File

@@ -363,6 +363,20 @@ func (c *baseRPCClient) ConsensusParams(
return result, nil
}
func (c *baseRPCClient) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) {
result := new(ctypes.ResultEvents)
if _, err := c.caller.Call(ctx, "events", map[string]interface{}{
"filter": req.Filter.Query,
"maxItems": req.MaxItems,
"before": req.Before,
"after": req.After,
"waitTime": req.WaitTime,
}, result); err != nil {
return nil, err
}
return result, nil
}
func (c *baseRPCClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) {
result := new(ctypes.ResultHealth)
_, err := c.caller.Call(ctx, "health", map[string]interface{}{}, result)
@@ -597,7 +611,7 @@ func (c *baseRPCClient) BroadcastEvidence(
var errNotRunning = errors.New("client is not running. Use .Start() method to start")
// WSEvents is a wrapper around WSClient, which implements EventsClient.
// WSEvents is a wrapper around WSClient, which implements SubscriptionClient.
type WSEvents struct {
service.BaseService
remote string
@@ -608,6 +622,8 @@ type WSEvents struct {
subscriptions map[string]chan ctypes.ResultEvent // query -> chan
}
var _ rpcclient.SubscriptionClient = (*WSEvents)(nil)
func newWSEvents(remote, endpoint string) (*WSEvents, error) {
w := &WSEvents{
endpoint: endpoint,
@@ -647,7 +663,7 @@ func (w *WSEvents) OnStop() {
}
}
// Subscribe implements EventsClient by using WSClient to subscribe given
// Subscribe implements SubscriptionClient by using WSClient to subscribe given
// subscriber to query. By default, returns a channel with cap=1. Error is
// returned if it fails to subscribe.
//
@@ -680,7 +696,7 @@ func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
return outc, nil
}
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given
// Unsubscribe implements SubscriptionClient by using WSClient to unsubscribe given
// subscriber from query.
//
// It returns an error if WSEvents is not running.
@@ -703,7 +719,7 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
return nil
}
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
// UnsubscribeAll implements SubscriptionClient by using WSClient to unsubscribe
// given subscriber from all the queries.
//
// It returns an error if WSEvents is not running.

View File

@@ -35,12 +35,13 @@ type Client interface {
service.Service
ABCIClient
EventsClient
EvidenceClient
HistoryClient
MempoolClient
NetworkClient
SignClient
StatusClient
EvidenceClient
MempoolClient
SubscriptionClient
}
// ABCIClient groups together the functionality that principally affects the
@@ -115,20 +116,40 @@ type NetworkClient interface {
Health(context.Context) (*ctypes.ResultHealth, error)
}
// EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go
// EventsClient exposes the methods to retrieve events from the consensus engine.
type EventsClient interface {
// Subscribe subscribes given subscriber to query. Returns a channel with
// cap=1 onto which events are published. An error is returned if it fails to
// subscribe. outCapacity can be used optionally to set capacity for the
// channel. Channel is never closed to prevent accidental reads.
// Events fetches a batch of events from the server matching the given query
// and time range.
Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error)
}
// TODO(creachadair): This interface should be removed once the streaming event
// interface is removed in Tendermint v0.39.
type SubscriptionClient interface {
// Subscribe issues a subscription request for the given subscriber ID and
// query. This method does not block: If subscription fails, it reports an
// error, and if subscription succeeds it returns a channel that delivers
// matching events until the subscription is stopped. The channel is never
// closed; the client is responsible for knowing when no further data will
// be sent.
//
// The context only governs the initial subscription, it does not control
// the lifetime of the channel. To cancel a subscription call Unsubscribe or
// UnsubscribeAll.
//
// ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe
// or UnsubscribeAll.
Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
// Unsubscribe unsubscribes given subscriber from query.
//
// Deprecated: This method will be removed in Tendermint v0.37, use Events
// instead.
Unsubscribe(ctx context.Context, subscriber, query string) error
// UnsubscribeAll unsubscribes given subscriber from all the queries.
//
// Deprecated: This method will be removed in Tendermint v0.37, use Events
// instead.
UnsubscribeAll(ctx context.Context, subscriber string) error
}

View File

@@ -127,6 +127,10 @@ func (c *Local) ConsensusParams(ctx context.Context, height *int64) (*ctypes.Res
return core.ConsensusParams(c.ctx, height)
}
func (c *Local) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) {
return core.Events(c.ctx, req.Filter.Query, req.MaxItems, req.Before, req.After, req.WaitTime)
}
func (c *Local) Health(ctx context.Context) (*ctypes.ResultHealth, error) {
return core.Health(c.ctx)
}

View File

@@ -41,6 +41,7 @@ type Client struct {
client.EvidenceClient
client.MempoolClient
service.Service
client.SubscriptionClient
}
var _ client.Client = Client{}

View File

@@ -8,6 +8,7 @@ import (
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/eventlog"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
mempl "github.com/tendermint/tendermint/mempool"
@@ -92,6 +93,7 @@ type Environment struct {
BlockIndexer indexer.BlockIndexer
ConsensusReactor *consensus.Reactor
EventBus *types.EventBus // thread safe
EventLog *eventlog.Log
Mempool mempl.Mempool
Logger log.Logger

View File

@@ -2,10 +2,13 @@ package core
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
@@ -128,3 +131,157 @@ func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
}
return &ctypes.ResultUnsubscribe{}, nil
}
// Events applies a query to the event log. If an event log is not enabled,
// Events reports an error. Otherwise, it filters the current contents of the
// log to return matching events.
//
// Events returns up to maxItems of the newest eligible event items. An item is
// eligible if it is older than before (or before is zero), it is newer than
// after (or after is zero), and its data matches the filter. A nil filter
// matches all event data.
//
// If before is zero and no eligible event items are available, Events waits
// for up to waitTime for a matching item to become available. The wait is
// terminated early if ctx ends.
//
// If maxItems ≤ 0, a default positive number of events is chosen. The values
// of maxItems and waitTime may be capped to sensible internal maxima without
// reporting an error to the caller.
func Events(ctx *rpctypes.Context,
filter string,
maxItems int,
before, after string,
waitTime time.Duration,
) (*ctypes.ResultEvents, error) {
var curBefore, curAfter cursor.Cursor
if err := curBefore.UnmarshalText([]byte(before)); err != nil {
return nil, err
}
if err := curAfter.UnmarshalText([]byte(after)); err != nil {
return nil, err
}
return EventsWithContext(ctx.Context(), &ctypes.EventFilter{
Query: filter,
}, maxItems, curBefore, curAfter, waitTime)
}
func EventsWithContext(ctx context.Context,
filter *ctypes.EventFilter,
maxItems int,
before, after cursor.Cursor,
waitTime time.Duration,
) (*ctypes.ResultEvents, error) {
if env.EventLog == nil {
return nil, errors.New("the event log is not enabled")
}
// Parse and validate parameters.
if maxItems <= 0 {
maxItems = 10
} else if maxItems > 100 {
maxItems = 100
}
const minWaitTime = 1 * time.Second
const maxWaitTime = 30 * time.Second
if waitTime < minWaitTime {
waitTime = minWaitTime
} else if waitTime > maxWaitTime {
waitTime = maxWaitTime
}
query := tmquery.All
if filter != nil && filter.Query != "" {
q, err := tmquery.New(filter.Query)
if err != nil {
return nil, fmt.Errorf("invalid filter query: %w", err)
}
query = q
}
var info eventlog.Info
var items []*eventlog.Item
var err error
accept := func(itm *eventlog.Item) error {
// N.B. We accept up to one item more than requested, so we can tell how
// to set the "more" flag in the response.
if len(items) > maxItems || itm.Cursor.Before(after) {
return eventlog.ErrStopScan
}
match, err := query.MatchesEvents(itm.Events)
if err != nil {
return fmt.Errorf("matches failed: %v", err)
}
if cursorInRange(itm.Cursor, before, after) && match {
items = append(items, itm)
}
return nil
}
if before.IsZero() {
ctx, cancel := context.WithTimeout(ctx, waitTime)
defer cancel()
// Long poll. The loop here is because new items may not match the query,
// and we want to keep waiting until we have relevant results (or time out).
cur := after
for len(items) == 0 {
info, err = env.EventLog.WaitScan(ctx, cur, accept)
if err != nil {
// Don't report a timeout as a request failure.
if errors.Is(err, context.DeadlineExceeded) {
err = nil
}
break
}
cur = info.Newest
}
} else {
// Quick poll, return only what is already available.
info, err = env.EventLog.Scan(accept)
}
if err != nil {
return nil, err
}
more := len(items) > maxItems
if more {
items = items[:len(items)-1]
}
enc, err := marshalItems(items)
if err != nil {
return nil, err
}
return &ctypes.ResultEvents{
Items: enc,
More: more,
Oldest: cursorString(info.Oldest),
Newest: cursorString(info.Newest),
}, nil
}
func cursorString(c cursor.Cursor) string {
if c.IsZero() {
return ""
}
return c.String()
}
func cursorInRange(c, before, after cursor.Cursor) bool {
return (before.IsZero() || c.Before(before)) && (after.IsZero() || after.Before(c))
}
func marshalItems(items []*eventlog.Item) ([]*ctypes.EventItem, error) {
out := make([]*ctypes.EventItem, len(items))
for i, itm := range items {
// FIXME: align usage after remove type-tag
v, err := json.Marshal(itm.Data)
if err != nil {
return nil, fmt.Errorf("encoding event data: %w", err)
}
out[i] = &ctypes.EventItem{Cursor: itm.Cursor.String(), Event: itm.Type}
out[i].Data = v
}
return out, nil
}

View File

@@ -8,7 +8,9 @@ import (
// Routes is a map of available routes.
var Routes = map[string]*rpc.RPCFunc{
// subscribe/unsubscribe are reserved for websocket events.
// Event subscription. Note that subscribe, unsubscribe, and
// unsubscribe_all are only available via the websocket endpoint.
"events": rpc.NewRPCFunc(Events, "filter,maxItems,before,after,waitTime"),
"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"),
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"),
"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""),

View File

@@ -254,3 +254,64 @@ type ResultEvent struct {
Data types.TMEventData `json:"data"`
Events map[string][]string `json:"events"`
}
// RequestEvents is the argument for the "/events" RPC endpoint.
type RequestEvents struct {
// Optional filter spec. If nil or empty, all items are eligible.
Filter *EventFilter `json:"filter"`
// The maximum number of eligible items to return.
// If zero or negative, the server will report a default number.
MaxItems int `json:"maxItems"`
// Return only items after this cursor. If empty, the limit is just
// before the the beginning of the event log.
After string `json:"after"`
// Return only items before this cursor. If empty, the limit is just
// after the head of the event log.
Before string `json:"before"`
// Wait for up to this long for events to be available.
WaitTime time.Duration `json:"waitTime"`
}
// An EventFilter specifies which events are selected by an /events request.
type EventFilter struct {
Query string `json:"query"`
}
// ResultEvents is the response from the "/events" RPC endpoint.
type ResultEvents struct {
// The items matching the request parameters, from newest
// to oldest, if any were available within the timeout.
Items []*EventItem `json:"items"`
// This is true if there is at least one older matching item
// available in the log that was not returned.
More bool `json:"more"`
// The cursor of the oldest item in the log at the time of this reply,
// or "" if the log is empty.
Oldest string `json:"oldest"`
// The cursor of the newest item in the log at the time of this reply,
// or "" if the log is empty.
Newest string `json:"newest"`
}
type EventItem struct {
// The cursor of this item.
Cursor string `json:"cursor"`
// The event label of this item (for example, "Vote").
Event string `json:"event,omitempty"`
// The encoded event data for this item. The content is a JSON object with
// the following structure:
//
// <json-encoded-value>
//
// The known type tags are defined by the tendermint/types package.
Data json.RawMessage `json:"data"`
}

View File

@@ -97,6 +97,7 @@ func createConfig() *cfg.Config {
tm, rpc, grpc := makeAddrs()
c.P2P.ListenAddress = tm
c.RPC.ListenAddress = rpc
c.RPC.EventLogWindowSize = 5 * time.Minute
c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"}
c.RPC.GRPCListenAddress = grpc
return c

View File

@@ -0,0 +1,81 @@
// Program estream is a manual testing tool for polling the event stream
// of a running Tendermint consensus node.
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"github.com/tendermint/tendermint/rpc/client/eventstream"
rpcclient "github.com/tendermint/tendermint/rpc/client/http"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
)
var (
query = flag.String("query", "", "Filter query")
batchSize = flag.Int("batch", 0, "Batch size")
resumeFrom = flag.String("resume", "", "Resume cursor")
numItems = flag.Int("count", 0, "Number of items to read (0 to stream)")
waitTime = flag.Duration("poll", 0, "Long poll interval")
rpcAddr = flag.String("addr", "http://localhost:26657", "RPC service address")
)
func init() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, `Usage: %[1]s [options]
Connect to the Tendermint node whose RPC service is at -addr, and poll for events
matching the specified -query. If no query is given, all events are fetched.
The resulting event data are written to stdout as JSON.
Use -resume to pick up polling from a previously-reported event cursor.
Use -count to stop polling after a certain number of events has been reported.
Use -batch to override the default request batch size.
Use -poll to override the default long-polling interval.
Options:
`, filepath.Base(os.Args[0]))
flag.PrintDefaults()
}
}
func main() {
flag.Parse()
cli, err := rpcclient.New(*rpcAddr, "/websocket")
if err != nil {
log.Fatalf("RPC client: %v", err)
}
stream := eventstream.New(cli, *query, &eventstream.StreamOptions{
BatchSize: *batchSize,
ResumeFrom: *resumeFrom,
WaitTime: *waitTime,
})
// Shut down cleanly on SIGINT. Don't attempt clean shutdown for other
// fatal signals.
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
var nr int
if err := stream.Run(ctx, func(itm *coretypes.EventItem) error {
nr++
bits, err := json.Marshal(itm)
if err != nil {
return err
}
fmt.Println(string(bits))
if *numItems > 0 && nr >= *numItems {
return eventstream.ErrStopRunning
}
return nil
}); err != nil {
log.Fatalf("Stream failed: %v", err)
}
}

View File

@@ -2,6 +2,7 @@ package types
import (
"fmt"
"strings"
abci "github.com/tendermint/tendermint/abci/types"
tmjson "github.com/tendermint/tendermint/libs/json"
@@ -68,6 +69,11 @@ type EventDataNewBlock struct {
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
}
// ABCIEvents implements the eventlog.ABCIEventer interface.
func (e EventDataNewBlock) ABCIEvents() []abci.Event {
return append(e.ResultBeginBlock.Events, e.ResultEndBlock.Events...)
}
type EventDataNewBlockHeader struct {
Header Header `json:"header"`
@@ -76,6 +82,11 @@ type EventDataNewBlockHeader struct {
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
}
// ABCIEvents implements the eventlog.ABCIEventer interface.
func (e EventDataNewBlockHeader) ABCIEvents() []abci.Event {
return append(e.ResultBeginBlock.Events, e.ResultEndBlock.Events...)
}
type EventDataNewEvidence struct {
Evidence Evidence `json:"evidence"`
@@ -87,6 +98,15 @@ type EventDataTx struct {
abci.TxResult
}
// ABCIEvents implements the eventlog.ABCIEventer interface.
func (e EventDataTx) ABCIEvents() []abci.Event {
base := []abci.Event{
eventWithAttr(TxHashKey, fmt.Sprintf("%X", Tx(e.Tx).Hash())),
eventWithAttr(TxHeightKey, fmt.Sprintf("%d", e.Height)),
}
return append(base, e.Result.Events...)
}
// NOTE: This goes into the replay WAL
type EventDataRoundState struct {
Height int64 `json:"height"`
@@ -181,3 +201,16 @@ type BlockEventPublisher interface {
type TxEventPublisher interface {
PublishEventTx(EventDataTx) error
}
// eventWithAttr constructs a single abci.Event with a single attribute.
// The type of the event and the name of the attribute are obtained by
// splitting the event type on period (e.g., "foo.bar").
func eventWithAttr(etype, value string) abci.Event {
parts := strings.SplitN(etype, ".", 2)
return abci.Event{
Type: parts[0],
Attributes: []abci.EventAttribute{{
Key: parts[1], Value: value,
}},
}
}

View File

@@ -7,6 +7,22 @@ import (
"github.com/stretchr/testify/assert"
)
// Verify that the event data types satisfy their shared interface.
// TODO: add EventDataBlockSyncStatus and EventDataStateSyncStatus
// when backport #6700 and #6755.
var (
_ TMEventData = EventDataCompleteProposal{}
_ TMEventData = EventDataNewBlock{}
_ TMEventData = EventDataNewBlockHeader{}
_ TMEventData = EventDataNewEvidence{}
_ TMEventData = EventDataNewRound{}
_ TMEventData = EventDataRoundState{}
_ TMEventData = EventDataTx{}
_ TMEventData = EventDataValidatorSetUpdates{}
_ TMEventData = EventDataVote{}
_ TMEventData = EventDataString("")
)
func TestQueryTxFor(t *testing.T) {
tx := Tx("foo")
assert.Equal(t,