mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-21 12:12:50 +00:00
Compare commits
10 Commits
cal/node-c
...
feature/ad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5eac8d7618 | ||
|
|
1b1ba41720 | ||
|
|
544a38da7b | ||
|
|
892c765a5c | ||
|
|
d6d3c172da | ||
|
|
96dd4d08c3 | ||
|
|
c3cc94a0e0 | ||
|
|
d47b675cda | ||
|
|
069e402aa1 | ||
|
|
f1a57adee4 |
@@ -36,7 +36,9 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
### BREAKING CHANGES
|
||||
|
||||
- CLI/RPC/Config
|
||||
- [config] \#9491 Add new event subscription options and defaults. (@creachadair)
|
||||
- [config] \#9259 Rename the fastsync section and the fast_sync key blocksync and block_sync respectively
|
||||
- [rpc] \#7982 Add new Events interface and deprecate Subscribe. (@creachadair)
|
||||
|
||||
- Apps
|
||||
- [abci/counter] \#6684 Delete counter example app
|
||||
|
||||
@@ -382,6 +382,33 @@ type RPCConfig struct {
|
||||
// predictability in subscription behavior.
|
||||
CloseOnSlowClient bool `mapstructure:"experimental_close_on_slow_client"`
|
||||
|
||||
// If true, disable the websocket interface to the RPC service. This has
|
||||
// the effect of disabling the /subscribe, /unsubscribe, and /unsubscribe_all
|
||||
// methods for event subscription.
|
||||
//
|
||||
// EXPERIMENTAL: This setting will be removed in Tendermint v0.37.
|
||||
ExperimentalDisableWebsocket bool `mapstructure:"experimental-disable-websocket"`
|
||||
|
||||
// The time window size for the event log. All events up to this long before
|
||||
// the latest (up to EventLogMaxItems) will be available for subscribers to
|
||||
// fetch via the /events method. If 0 (the default) the event log and the
|
||||
// /events RPC method are disabled.
|
||||
EventLogWindowSize time.Duration `mapstructure:"event-log-window-size"`
|
||||
|
||||
// The maxiumum number of events that may be retained by the event log. If
|
||||
// this value is 0, no upper limit is set. Otherwise, items in excess of
|
||||
// this number will be discarded from the event log.
|
||||
//
|
||||
// Warning: This setting is a safety valve. Setting it too low may cause
|
||||
// subscribers to miss events. Try to choose a value higher than the
|
||||
// maximum worst-case expected event load within the chosen window size in
|
||||
// ordinary operation.
|
||||
//
|
||||
// For example, if the window size is 10 minutes and the node typically
|
||||
// averages 1000 events per ten minutes, but with occasional known spikes of
|
||||
// up to 2000, choose a value > 2000.
|
||||
EventLogMaxItems int `mapstructure:"event-log-max-items"`
|
||||
|
||||
// How long to wait for a tx to be committed during /broadcast_tx_commit
|
||||
// WARNING: Using a value larger than 10s will result in increasing the
|
||||
// global HTTP write timeout, which applies to all connections and endpoints.
|
||||
@@ -429,11 +456,16 @@ func DefaultRPCConfig() *RPCConfig {
|
||||
Unsafe: false,
|
||||
MaxOpenConnections: 900,
|
||||
|
||||
MaxSubscriptionClients: 100,
|
||||
MaxSubscriptionsPerClient: 5,
|
||||
SubscriptionBufferSize: defaultSubscriptionBufferSize,
|
||||
TimeoutBroadcastTxCommit: 10 * time.Second,
|
||||
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,
|
||||
SubscriptionBufferSize: defaultSubscriptionBufferSize,
|
||||
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,
|
||||
// Settings for event subscription.
|
||||
MaxSubscriptionClients: 100,
|
||||
MaxSubscriptionsPerClient: 5,
|
||||
ExperimentalDisableWebsocket: false, // compatible with TM v0.35 and earlier
|
||||
EventLogWindowSize: 30 * time.Second,
|
||||
EventLogMaxItems: 0,
|
||||
|
||||
TimeoutBroadcastTxCommit: 10 * time.Second,
|
||||
|
||||
MaxBodyBytes: int64(1000000), // 1MB
|
||||
MaxHeaderBytes: 1 << 20, // same as the net/http default
|
||||
@@ -479,6 +511,12 @@ func (cfg *RPCConfig) ValidateBasic() error {
|
||||
cfg.SubscriptionBufferSize,
|
||||
)
|
||||
}
|
||||
if cfg.EventLogWindowSize < 0 {
|
||||
return errors.New("event-log-window-size must not be negative")
|
||||
}
|
||||
if cfg.EventLogMaxItems < 0 {
|
||||
return errors.New("event-log-max-items must not be negative")
|
||||
}
|
||||
if cfg.TimeoutBroadcastTxCommit < 0 {
|
||||
return errors.New("timeout_broadcast_tx_commit can't be negative")
|
||||
}
|
||||
|
||||
@@ -233,6 +233,33 @@ experimental_websocket_write_buffer_size = {{ .RPC.WebSocketWriteBufferSize }}
|
||||
# predictability in subscription behavior.
|
||||
experimental_close_on_slow_client = {{ .RPC.CloseOnSlowClient }}
|
||||
|
||||
# If true, disable the websocket interface to the RPC service. This has
|
||||
# the effect of disabling the /subscribe, /unsubscribe, and /unsubscribe_all
|
||||
# methods for event subscription.
|
||||
#
|
||||
# EXPERIMENTAL: This setting will be removed in Tendermint v0.37.
|
||||
experimental-disable-websocket = {{ .RPC.ExperimentalDisableWebsocket }}
|
||||
|
||||
# The time window size for the event log. All events up to this long before
|
||||
# the latest (up to EventLogMaxItems) will be available for subscribers to
|
||||
# fetch via the /events method. If 0 (the default) the event log and the
|
||||
# /events RPC method are disabled.
|
||||
event-log-window-size = "{{ .RPC.EventLogWindowSize }}"
|
||||
|
||||
# The maxiumum number of events that may be retained by the event log. If
|
||||
# this value is 0, no upper limit is set. Otherwise, items in excess of
|
||||
# this number will be discarded from the event log.
|
||||
#
|
||||
# Warning: This setting is a safety valve. Setting it too low may cause
|
||||
# subscribers to miss events. Try to choose a value higher than the
|
||||
# maximum worst-case expected event load within the chosen window size in
|
||||
# ordinary operation.
|
||||
#
|
||||
# For example, if the window size is 10 minutes and the node typically
|
||||
# averages 1000 events per ten minutes, but with occasional known spikes of
|
||||
# up to 2000, choose a value > 2000.
|
||||
event-log-max-items = {{ .RPC.EventLogMaxItems }}
|
||||
|
||||
# How long to wait for a tx to be committed during /broadcast_tx_commit.
|
||||
# WARNING: Using a value larger than 10s will result in increasing the
|
||||
# global HTTP write timeout, which applies to all connections and endpoints.
|
||||
|
||||
100
internal/eventlog/cursor/cursor.go
Normal file
100
internal/eventlog/cursor/cursor.go
Normal file
@@ -0,0 +1,100 @@
|
||||
// Package cursor implements time-ordered item cursors for an event log.
|
||||
package cursor
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// A Source produces cursors based on a time index generator and a sequence
|
||||
// counter. A zero-valued Source is ready for use with defaults as described.
|
||||
type Source struct {
|
||||
// This function is called to produce the current time index.
|
||||
// If nil, it defaults to time.Now().UnixNano().
|
||||
TimeIndex func() int64
|
||||
|
||||
// The current counter value used for sequence number generation. It is
|
||||
// incremented in-place each time a cursor is generated.
|
||||
Counter int64
|
||||
}
|
||||
|
||||
func (s *Source) timeIndex() int64 {
|
||||
if s.TimeIndex == nil {
|
||||
return time.Now().UnixNano()
|
||||
}
|
||||
return s.TimeIndex()
|
||||
}
|
||||
|
||||
func (s *Source) nextCounter() int64 {
|
||||
s.Counter++
|
||||
return s.Counter
|
||||
}
|
||||
|
||||
// Cursor produces a fresh cursor from s at the current time index and counter.
|
||||
func (s *Source) Cursor() Cursor {
|
||||
return Cursor{
|
||||
timestamp: uint64(s.timeIndex()),
|
||||
sequence: uint16(s.nextCounter() & 0xffff),
|
||||
}
|
||||
}
|
||||
|
||||
// A Cursor is a unique identifier for an item in a time-ordered event log.
|
||||
// It is safe to copy and compare cursors by value.
|
||||
type Cursor struct {
|
||||
timestamp uint64 // ns since Unix epoch
|
||||
sequence uint16 // sequence number
|
||||
}
|
||||
|
||||
// Before reports whether c is prior to o in time ordering. This comparison
|
||||
// ignores sequence numbers.
|
||||
func (c Cursor) Before(o Cursor) bool { return c.timestamp < o.timestamp }
|
||||
|
||||
// Diff returns the time duration between c and o. The duration is negative if
|
||||
// c is before o in time order.
|
||||
func (c Cursor) Diff(o Cursor) time.Duration {
|
||||
return time.Duration(c.timestamp) - time.Duration(o.timestamp)
|
||||
}
|
||||
|
||||
// IsZero reports whether c is the zero cursor.
|
||||
func (c Cursor) IsZero() bool { return c == Cursor{} }
|
||||
|
||||
// MarshalText implements the encoding.TextMarshaler interface.
|
||||
// A zero cursor marshals as "", otherwise the format used by the String method.
|
||||
func (c Cursor) MarshalText() ([]byte, error) {
|
||||
if c.IsZero() {
|
||||
return nil, nil
|
||||
}
|
||||
return []byte(c.String()), nil
|
||||
}
|
||||
|
||||
// UnmarshalText implements the encoding.TextUnmarshaler interface.
|
||||
// An empty text unmarshals without error to a zero cursor.
|
||||
func (c *Cursor) UnmarshalText(data []byte) error {
|
||||
if len(data) == 0 {
|
||||
*c = Cursor{} // set zero
|
||||
return nil
|
||||
}
|
||||
ps := strings.SplitN(string(data), "-", 2)
|
||||
if len(ps) != 2 {
|
||||
return errors.New("invalid cursor format")
|
||||
}
|
||||
ts, err := strconv.ParseUint(ps[0], 16, 64)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid timestamp: %w", err)
|
||||
}
|
||||
sn, err := strconv.ParseUint(ps[1], 16, 16)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid sequence: %w", err)
|
||||
}
|
||||
c.timestamp = ts
|
||||
c.sequence = uint16(sn)
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a printable text representation of a cursor.
|
||||
func (c Cursor) String() string {
|
||||
return fmt.Sprintf("%016x-%04x", c.timestamp, c.sequence)
|
||||
}
|
||||
141
internal/eventlog/cursor/cursor_test.go
Normal file
141
internal/eventlog/cursor/cursor_test.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package cursor_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/eventlog/cursor"
|
||||
)
|
||||
|
||||
func mustParse(t *testing.T, s string) cursor.Cursor {
|
||||
t.Helper()
|
||||
var c cursor.Cursor
|
||||
if err := c.UnmarshalText([]byte(s)); err != nil {
|
||||
t.Fatalf("Unmarshal %q: unexpected error: %v", s, err)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func TestSource_counter(t *testing.T) {
|
||||
src := &cursor.Source{
|
||||
TimeIndex: func() int64 { return 255 },
|
||||
}
|
||||
for i := 1; i <= 5; i++ {
|
||||
want := fmt.Sprintf("00000000000000ff-%04x", i)
|
||||
got := src.Cursor().String()
|
||||
if got != want {
|
||||
t.Errorf("Cursor %d: got %q, want %q", i, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSource_timeIndex(t *testing.T) {
|
||||
times := []int64{0, 1, 100, 65535, 0x76543210fecdba98}
|
||||
src := &cursor.Source{
|
||||
TimeIndex: func() int64 {
|
||||
out := times[0]
|
||||
times = append(times[1:], out)
|
||||
return out
|
||||
},
|
||||
Counter: 160,
|
||||
}
|
||||
results := []string{
|
||||
"0000000000000000-00a1",
|
||||
"0000000000000001-00a2",
|
||||
"0000000000000064-00a3",
|
||||
"000000000000ffff-00a4",
|
||||
"76543210fecdba98-00a5",
|
||||
}
|
||||
for i, want := range results {
|
||||
if got := src.Cursor().String(); got != want {
|
||||
t.Errorf("Cursor %d: got %q, want %q", i+1, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCursor_roundTrip(t *testing.T) {
|
||||
const text = `0123456789abcdef-fce9`
|
||||
|
||||
c := mustParse(t, text)
|
||||
if got := c.String(); got != text {
|
||||
t.Errorf("Wrong string format: got %q, want %q", got, text)
|
||||
}
|
||||
cmp, err := c.MarshalText()
|
||||
if err != nil {
|
||||
t.Fatalf("Marshal %+v failed: %v", c, err)
|
||||
}
|
||||
if got := string(cmp); got != text {
|
||||
t.Errorf("Wrong text format: got %q, want %q", got, text)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCursor_ordering(t *testing.T) {
|
||||
// Condition: text1 precedes text2 in time order.
|
||||
// Condition: text2 has an earlier sequence than text1.
|
||||
const zero = ""
|
||||
const text1 = "0000000012345678-0005"
|
||||
const text2 = "00000000fecdeba9-0002"
|
||||
|
||||
zc := mustParse(t, zero)
|
||||
c1 := mustParse(t, text1)
|
||||
c2 := mustParse(t, text2)
|
||||
|
||||
// Confirm for all pairs that string order respects time order.
|
||||
pairs := []struct {
|
||||
t1, t2 string
|
||||
c1, c2 cursor.Cursor
|
||||
}{
|
||||
{zero, zero, zc, zc},
|
||||
{zero, text1, zc, c1},
|
||||
{zero, text2, zc, c2},
|
||||
{text1, zero, c1, zc},
|
||||
{text1, text1, c1, c1},
|
||||
{text1, text2, c1, c2},
|
||||
{text2, zero, c2, zc},
|
||||
{text2, text1, c2, c1},
|
||||
{text2, text2, c2, c2},
|
||||
}
|
||||
for _, pair := range pairs {
|
||||
want := pair.t1 < pair.t2
|
||||
if got := pair.c1.Before(pair.c2); got != want {
|
||||
t.Errorf("(%s).Before(%s): got %v, want %v", pair.t1, pair.t2, got, want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCursor_IsZero(t *testing.T) {
|
||||
tests := []struct {
|
||||
text string
|
||||
want bool
|
||||
}{
|
||||
{"", true},
|
||||
{"0000000000000000-0000", true},
|
||||
{"0000000000000001-0000", false},
|
||||
{"0000000000000000-0001", false},
|
||||
{"0000000000000001-0001", false},
|
||||
}
|
||||
for _, test := range tests {
|
||||
c := mustParse(t, test.text)
|
||||
if got := c.IsZero(); got != test.want {
|
||||
t.Errorf("IsZero(%q): got %v, want %v", test.text, got, test.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCursor_Diff(t *testing.T) {
|
||||
const time1 = 0x1ac0193001
|
||||
const time2 = 0x0ac0193001
|
||||
|
||||
text1 := fmt.Sprintf("%016x-0001", time1)
|
||||
text2 := fmt.Sprintf("%016x-0005", time2)
|
||||
want := time.Duration(time1 - time2)
|
||||
|
||||
c1 := mustParse(t, text1)
|
||||
c2 := mustParse(t, text2)
|
||||
|
||||
got := c1.Diff(c2)
|
||||
if got != want {
|
||||
t.Fatalf("Diff %q - %q: got %v, want %v", text1, text2, got, want)
|
||||
}
|
||||
}
|
||||
217
internal/eventlog/eventlog.go
Normal file
217
internal/eventlog/eventlog.go
Normal file
@@ -0,0 +1,217 @@
|
||||
// Package eventlog defines a reverse time-ordered log of events over a sliding
|
||||
// window of time before the most recent item in the log.
|
||||
//
|
||||
// New items are added to the head of the log (the newest end), and items that
|
||||
// fall outside the designated window are pruned from its tail (the oldest).
|
||||
// Items within the log are indexed by lexicographically-ordered cursors.
|
||||
package eventlog
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/eventlog/cursor"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// A Log is a reverse time-ordered log of events in a sliding window of time
|
||||
// before the newest item. Use Add to add new items to the front (head) of the
|
||||
// log, and Scan or WaitScan to traverse the current contents of the log.
|
||||
//
|
||||
// After construction, a *Log is safe for concurrent access by one writer and
|
||||
// any number of readers.
|
||||
type Log struct {
|
||||
// These values do not change after construction.
|
||||
windowSize time.Duration
|
||||
maxItems int
|
||||
metrics *Metrics
|
||||
|
||||
// Protects access to the fields below. Lock to modify the values of these
|
||||
// fields, or to read or snapshot the values.
|
||||
mu sync.Mutex
|
||||
|
||||
numItems int // total number of items in the log
|
||||
oldestCursor cursor.Cursor // cursor of the oldest item
|
||||
head *logEntry // pointer to the newest item
|
||||
ready chan struct{} // closed when head changes
|
||||
source cursor.Source // generator of cursors
|
||||
}
|
||||
|
||||
// New constructs a new empty log with the given settings.
|
||||
func New(opts LogSettings) (*Log, error) {
|
||||
if opts.WindowSize <= 0 {
|
||||
return nil, errors.New("window size must be positive")
|
||||
}
|
||||
lg := &Log{
|
||||
windowSize: opts.WindowSize,
|
||||
maxItems: opts.MaxItems,
|
||||
metrics: NopMetrics(),
|
||||
ready: make(chan struct{}),
|
||||
source: opts.Source,
|
||||
}
|
||||
if opts.Metrics != nil {
|
||||
lg.metrics = opts.Metrics
|
||||
}
|
||||
return lg, nil
|
||||
}
|
||||
|
||||
// Add adds a new item to the front of the log. If necessary, the log is pruned
|
||||
// to fit its constraints on size and age. Add blocks until both steps are done.
|
||||
//
|
||||
// Any error reported by Add arises from pruning; the new item was added to the
|
||||
// log regardless whether an error occurs.
|
||||
func (lg *Log) Add(etype string, data types.TMEventData) error {
|
||||
lg.mu.Lock()
|
||||
head := &logEntry{
|
||||
item: newItem(lg.source.Cursor(), etype, data),
|
||||
next: lg.head,
|
||||
}
|
||||
lg.numItems++
|
||||
lg.updateHead(head)
|
||||
size := lg.numItems
|
||||
age := head.item.Cursor.Diff(lg.oldestCursor)
|
||||
|
||||
// If the log requires pruning, do the pruning step outside the lock. This
|
||||
// permits readers to continue to make progress while we're working.
|
||||
lg.mu.Unlock()
|
||||
return lg.checkPrune(head, size, age)
|
||||
}
|
||||
|
||||
// Scan scans the current contents of the log, calling f with each item until
|
||||
// all items are visited or f reports an error. If f returns ErrStopScan, Scan
|
||||
// returns nil, otherwise it returns the error reported by f.
|
||||
//
|
||||
// The Info value returned is valid even if Scan reports an error.
|
||||
func (lg *Log) Scan(f func(*Item) error) (Info, error) {
|
||||
return lg.scanState(lg.state(), f)
|
||||
}
|
||||
|
||||
// WaitScan blocks until the cursor of the frontmost log item is different from
|
||||
// c, then executes a Scan on the contents of the log. If ctx ends before the
|
||||
// head is updated, WaitScan returns an error without calling f.
|
||||
//
|
||||
// The Info value returned is valid even if WaitScan reports an error.
|
||||
func (lg *Log) WaitScan(ctx context.Context, c cursor.Cursor, f func(*Item) error) (Info, error) {
|
||||
st := lg.state()
|
||||
for st.head == nil || st.head.item.Cursor == c {
|
||||
var err error
|
||||
st, err = lg.waitStateChange(ctx)
|
||||
if err != nil {
|
||||
return st.info(), err
|
||||
}
|
||||
}
|
||||
return lg.scanState(st, f)
|
||||
}
|
||||
|
||||
// Info returns the current state of the log.
|
||||
func (lg *Log) Info() Info { return lg.state().info() }
|
||||
|
||||
// ErrStopScan is returned by a Scan callback to signal that scanning should be
|
||||
// terminated without error.
|
||||
var ErrStopScan = errors.New("stop scanning")
|
||||
|
||||
// ErrLogPruned is returned by Add to signal that at least some events within
|
||||
// the time window were discarded by pruning in excess of the size limit.
|
||||
// This error may be wrapped, use errors.Is to test for it.
|
||||
var ErrLogPruned = errors.New("log pruned")
|
||||
|
||||
// LogSettings configure the construction of an event log.
|
||||
type LogSettings struct {
|
||||
// The size of the time window measured in time before the newest item.
|
||||
// This value must be positive.
|
||||
WindowSize time.Duration
|
||||
|
||||
// The maximum number of items that will be retained in memory within the
|
||||
// designated time window. A value ≤ 0 imposes no limit, otherwise items in
|
||||
// excess of this number will be dropped from the log.
|
||||
MaxItems int
|
||||
|
||||
// The cursor source to use for log entries. If not set, use wallclock time.
|
||||
Source cursor.Source
|
||||
|
||||
// If non-nil, exported metrics to update. If nil, metrics are discarded.
|
||||
Metrics *Metrics
|
||||
}
|
||||
|
||||
// Info records the current state of the log at the time of a scan operation.
|
||||
type Info struct {
|
||||
Oldest cursor.Cursor // the cursor of the oldest item in the log
|
||||
Newest cursor.Cursor // the cursor of the newest item in the log
|
||||
Size int // the number of items in the log
|
||||
}
|
||||
|
||||
// logState is a snapshot of the state of the log.
|
||||
type logState struct {
|
||||
oldest cursor.Cursor
|
||||
newest cursor.Cursor
|
||||
size int
|
||||
head *logEntry
|
||||
}
|
||||
|
||||
func (st logState) info() Info {
|
||||
return Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
|
||||
}
|
||||
|
||||
// state returns a snapshot of the current log contents. The caller may freely
|
||||
// traverse the internal structure of the list without locking, provided it
|
||||
// does not modify either the entries or their items.
|
||||
func (lg *Log) state() logState {
|
||||
lg.mu.Lock()
|
||||
defer lg.mu.Unlock()
|
||||
if lg.head == nil {
|
||||
return logState{} // empty
|
||||
}
|
||||
return logState{
|
||||
oldest: lg.oldestCursor,
|
||||
newest: lg.head.item.Cursor,
|
||||
size: lg.numItems,
|
||||
head: lg.head,
|
||||
}
|
||||
}
|
||||
|
||||
// waitStateChange blocks until either ctx ends or the head of the log is
|
||||
// modified, then returns the state of the log. An error is reported only if
|
||||
// ctx terminates before head changes.
|
||||
func (lg *Log) waitStateChange(ctx context.Context) (logState, error) {
|
||||
lg.mu.Lock()
|
||||
ch := lg.ready // capture
|
||||
lg.mu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return lg.state(), ctx.Err()
|
||||
case <-ch:
|
||||
return lg.state(), nil
|
||||
}
|
||||
}
|
||||
|
||||
// scanState scans the contents of the log at st. See the Scan method for a
|
||||
// description of the callback semantics.
|
||||
func (lg *Log) scanState(st logState, f func(*Item) error) (Info, error) {
|
||||
info := Info{Oldest: st.oldest, Newest: st.newest, Size: st.size}
|
||||
for cur := st.head; cur != nil; cur = cur.next {
|
||||
if err := f(cur.item); err != nil {
|
||||
if errors.Is(err, ErrStopScan) {
|
||||
return info, nil
|
||||
}
|
||||
return info, err
|
||||
}
|
||||
}
|
||||
return info, nil
|
||||
}
|
||||
|
||||
// updateHead replaces the current head with newHead, signals any waiters, and
|
||||
// resets the wait signal. The caller must hold log.mu exclusively.
|
||||
func (lg *Log) updateHead(newHead *logEntry) {
|
||||
lg.head = newHead
|
||||
close(lg.ready) // signal
|
||||
lg.ready = make(chan struct{})
|
||||
}
|
||||
|
||||
// A logEntry is the backbone of the event log queue. Entries are not mutated
|
||||
// after construction, so it is safe to read item and next without locking.
|
||||
type logEntry struct {
|
||||
item *Item
|
||||
next *logEntry
|
||||
}
|
||||
222
internal/eventlog/eventlog_test.go
Normal file
222
internal/eventlog/eventlog_test.go
Normal file
@@ -0,0 +1,222 @@
|
||||
package eventlog_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fortytw2/leaktest"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
"github.com/tendermint/tendermint/internal/eventlog/cursor"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// fakeTime is a fake clock to use to control cursor assignment.
|
||||
// The timeIndex method reports the current "time" and advance manually updates
|
||||
// the apparent time.
|
||||
type fakeTime struct{ now int64 }
|
||||
|
||||
func newFakeTime(init int64) *fakeTime { return &fakeTime{now: init} }
|
||||
|
||||
func (f *fakeTime) timeIndex() int64 { return f.now }
|
||||
|
||||
func (f *fakeTime) advance(d time.Duration) { f.now += int64(d) }
|
||||
|
||||
// eventData is a placeholder event data implementation for testing.
|
||||
type eventData string
|
||||
|
||||
func (eventData) TypeTag() string { return "eventData" }
|
||||
|
||||
func TestNewError(t *testing.T) {
|
||||
lg, err := eventlog.New(eventlog.LogSettings{})
|
||||
if err == nil {
|
||||
t.Fatalf("New: got %+v, wanted error", lg)
|
||||
} else {
|
||||
t.Logf("New: got expected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPruneTime(t *testing.T) {
|
||||
clk := newFakeTime(0)
|
||||
|
||||
// Construct a log with a 60-second time window.
|
||||
lg, err := eventlog.New(eventlog.LogSettings{
|
||||
WindowSize: 60 * time.Second,
|
||||
Source: cursor.Source{
|
||||
TimeIndex: clk.timeIndex,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New unexpectedly failed: %v", err)
|
||||
}
|
||||
|
||||
// Add events up to the time window, at seconds 0, 15, 30, 45, 60.
|
||||
// None of these should be pruned (yet).
|
||||
var want []string // cursor strings
|
||||
for i := 1; i <= 5; i++ {
|
||||
want = append(want, fmt.Sprintf("%016x-%04x", clk.timeIndex(), i))
|
||||
mustAdd(t, lg, "test-event", eventData("whatever"))
|
||||
clk.advance(15 * time.Second)
|
||||
}
|
||||
// time now: 75 sec.
|
||||
|
||||
// Verify that all the events we added are present.
|
||||
got := cursors(t, lg)
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Errorf("Cursors before pruning: (-want, +got)\n%s", diff)
|
||||
}
|
||||
|
||||
// Add an event past the end of the window at second 90, and verify that
|
||||
// this triggered an age-based prune of the oldest events (0, 15) that are
|
||||
// outside the 60-second window.
|
||||
|
||||
clk.advance(15 * time.Second) // time now: 90 sec.
|
||||
want = append(want[2:], fmt.Sprintf("%016x-%04x", clk.timeIndex(), 6))
|
||||
|
||||
mustAdd(t, lg, "test-event", eventData("extra"))
|
||||
got = cursors(t, lg)
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Errorf("Cursors after pruning: (-want, +got)\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Run a publisher and concurrent subscribers to tickle the race detector with
|
||||
// concurrent add and scan operations.
|
||||
func TestConcurrent(t *testing.T) {
|
||||
defer leaktest.Check(t)
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping concurrency exercise because -short is set")
|
||||
}
|
||||
|
||||
lg, err := eventlog.New(eventlog.LogSettings{
|
||||
WindowSize: 30 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New unexpectedly failed: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Publisher: Add events and handle expirations.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
tick := time.NewTimer(0)
|
||||
defer tick.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case t := <-tick.C:
|
||||
_ = lg.Add("test-event", eventData(t.Format(time.RFC3339Nano)))
|
||||
tick.Reset(time.Duration(rand.Intn(50)) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Subscribers: Wait for new events at the head of the queue. This
|
||||
// simulates the typical operation of a subscriber by waiting for the head
|
||||
// cursor to change and then scanning down toward the unconsumed item.
|
||||
const numSubs = 16
|
||||
for i := 0; i < numSubs; i++ {
|
||||
task := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
tick := time.NewTimer(0)
|
||||
var cur cursor.Cursor
|
||||
for {
|
||||
// Simulate the subscriber being busy with other things.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-tick.C:
|
||||
tick.Reset(time.Duration(rand.Intn(150)) * time.Millisecond)
|
||||
}
|
||||
|
||||
// Wait for new data to arrive.
|
||||
info, err := lg.WaitScan(ctx, cur, func(itm *eventlog.Item) error {
|
||||
if itm.Cursor == cur {
|
||||
return eventlog.ErrStopScan
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
t.Errorf("Wait scan for task %d failed: %v", task, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
cur = info.Newest
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
time.AfterFunc(2*time.Second, cancel)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestPruneSize(t *testing.T) {
|
||||
const maxItems = 25
|
||||
lg, err := eventlog.New(eventlog.LogSettings{
|
||||
WindowSize: 60 * time.Second,
|
||||
MaxItems: maxItems,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New unexpectedly failed: %v", err)
|
||||
}
|
||||
|
||||
// Add a lot of items to the log and verify that we never exceed the
|
||||
// specified cap.
|
||||
for i := 0; i < 60; i++ {
|
||||
mustAdd(t, lg, "test-event", eventData(strconv.Itoa(i+1)))
|
||||
|
||||
if got := lg.Info().Size; got > maxItems {
|
||||
t.Errorf("After add %d: log size is %d, want ≤ %d", i+1, got, maxItems)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// mustAdd adds a single event to lg. If Add reports an error other than for
|
||||
// pruning, the test fails; otherwise the error is returned.
|
||||
func mustAdd(t *testing.T, lg *eventlog.Log, etype string, data types.TMEventData) {
|
||||
t.Helper()
|
||||
err := lg.Add(etype, data)
|
||||
if err != nil && !errors.Is(err, eventlog.ErrLogPruned) {
|
||||
t.Fatalf("Add %q failed: %v", etype, err)
|
||||
}
|
||||
}
|
||||
|
||||
// cursors extracts the cursors from lg in ascending order of time.
|
||||
func cursors(t *testing.T, lg *eventlog.Log) []string {
|
||||
t.Helper()
|
||||
|
||||
var cursors []string
|
||||
if _, err := lg.Scan(func(itm *eventlog.Item) error {
|
||||
cursors = append(cursors, itm.Cursor.String())
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Scan failed: %v", err)
|
||||
}
|
||||
reverse(cursors) // put in forward-time order for comparison
|
||||
return cursors
|
||||
}
|
||||
|
||||
func reverse(ss []string) {
|
||||
for i, j := 0, len(ss)-1; i < j; {
|
||||
ss[i], ss[j] = ss[j], ss[i]
|
||||
i++
|
||||
j--
|
||||
}
|
||||
}
|
||||
78
internal/eventlog/item.go
Normal file
78
internal/eventlog/item.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package eventlog
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/internal/eventlog/cursor"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// Cached constants for the pieces of reserved event names.
|
||||
var (
|
||||
tmTypeTag string
|
||||
tmTypeKey string
|
||||
)
|
||||
|
||||
func init() {
|
||||
parts := strings.SplitN(types.EventTypeKey, ".", 2)
|
||||
if len(parts) != 2 {
|
||||
panic("invalid event type key: " + types.EventTypeKey)
|
||||
}
|
||||
tmTypeTag = parts[0]
|
||||
tmTypeKey = parts[1]
|
||||
}
|
||||
|
||||
// ABCIEventer is an optional extension interface that may be implemented by
|
||||
// event data types, to expose ABCI metadata to the event log. If an event item
|
||||
// does not implement this interface, it is presumed to have no ABCI metadata.
|
||||
type ABCIEventer interface {
|
||||
// Return any ABCI events metadata the receiver contains.
|
||||
// The reported slice must not contain a type (tm.event) record, since some
|
||||
// events share the same structure among different event types.
|
||||
ABCIEvents() []abci.Event
|
||||
}
|
||||
|
||||
// An Item is a single event item.
|
||||
type Item struct {
|
||||
Cursor cursor.Cursor
|
||||
Type string
|
||||
Data types.TMEventData
|
||||
Events []abci.Event
|
||||
}
|
||||
|
||||
// newItem constructs a new item with the specified cursor, type, and data.
|
||||
func newItem(cursor cursor.Cursor, etype string, data types.TMEventData) *Item {
|
||||
return &Item{Cursor: cursor, Type: etype, Data: data, Events: makeEvents(etype, data)}
|
||||
}
|
||||
|
||||
// makeEvents returns a slice of ABCI events comprising the type tag along with
|
||||
// any internal events exported by the data value.
|
||||
func makeEvents(etype string, data types.TMEventData) []abci.Event {
|
||||
base := []abci.Event{{
|
||||
Type: tmTypeTag,
|
||||
Attributes: []abci.EventAttribute{{
|
||||
Key: tmTypeKey, Value: etype,
|
||||
}},
|
||||
}}
|
||||
if evt, ok := data.(ABCIEventer); ok {
|
||||
return append(base, evt.ABCIEvents()...)
|
||||
}
|
||||
return base
|
||||
}
|
||||
|
||||
// FindType reports whether events contains a tm.event event, and if so returns
|
||||
// its value, which is the type of the underlying event item.
|
||||
func FindType(events []abci.Event) (string, bool) {
|
||||
for _, evt := range events {
|
||||
if evt.Type != tmTypeTag {
|
||||
continue
|
||||
}
|
||||
for _, attr := range evt.Attributes {
|
||||
if attr.Key == tmTypeKey {
|
||||
return attr.Value, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
30
internal/eventlog/metrics.gen.go
Normal file
30
internal/eventlog/metrics.gen.go
Normal file
@@ -0,0 +1,30 @@
|
||||
// Code generated by metricsgen. DO NOT EDIT.
|
||||
|
||||
package eventlog
|
||||
|
||||
import (
|
||||
"github.com/go-kit/kit/metrics/discard"
|
||||
prometheus "github.com/go-kit/kit/metrics/prometheus"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
labels := []string{}
|
||||
for i := 0; i < len(labelsAndValues); i += 2 {
|
||||
labels = append(labels, labelsAndValues[i])
|
||||
}
|
||||
return &Metrics{
|
||||
numItems: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "num_items",
|
||||
Help: "Number of items currently resident in the event log.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
}
|
||||
}
|
||||
|
||||
func NopMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
numItems: discard.NewGauge(),
|
||||
}
|
||||
}
|
||||
14
internal/eventlog/metrics.go
Normal file
14
internal/eventlog/metrics.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package eventlog
|
||||
|
||||
import "github.com/go-kit/kit/metrics"
|
||||
|
||||
const MetricsSubsystem = "eventlog"
|
||||
|
||||
//go:generate go run ../../scripts/metricsgen -struct=Metrics
|
||||
|
||||
// Metrics define the metrics exported by the eventlog package.
|
||||
type Metrics struct {
|
||||
|
||||
// Number of items currently resident in the event log.
|
||||
numItems metrics.Gauge
|
||||
}
|
||||
111
internal/eventlog/prune.go
Normal file
111
internal/eventlog/prune.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package eventlog
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// checkPrune checks whether the log has exceeded its boundaries of size or
|
||||
// age, and if so prunes the log and updates the head.
|
||||
func (lg *Log) checkPrune(head *logEntry, size int, age time.Duration) error {
|
||||
// To avoid potentially re-pruning for every event, don't trigger an age
|
||||
// prune until we're at least this far beyond the designated size.
|
||||
const windowSlop = 30 * time.Second
|
||||
|
||||
if age < (lg.windowSize+windowSlop) && (lg.maxItems <= 0 || size <= lg.maxItems) {
|
||||
lg.metrics.numItems.Set(float64(lg.numItems))
|
||||
return nil // no pruning is needed
|
||||
}
|
||||
|
||||
var newState logState
|
||||
var err error
|
||||
|
||||
switch {
|
||||
case lg.maxItems > 0 && size > lg.maxItems:
|
||||
// We exceeded the size cap. In this case, age does not matter: count off
|
||||
// the newest items and drop the unconsumed tail. Note that we prune by a
|
||||
// fraction rather than an absolute amount so that we only have to prune
|
||||
// for size occasionally.
|
||||
|
||||
// TODO(creachadair): We may want to spill dropped events to secondary
|
||||
// storage rather than dropping them. The size cap is meant as a safety
|
||||
// valve against unexpected extremes, but if a network has "expected"
|
||||
// spikes that nevertheless exceed any safe buffer size (e.g., Osmosis
|
||||
// epochs), we may want to have a fallback so that we don't lose events
|
||||
// that would otherwise fall within the window.
|
||||
newSize := 3 * size / 4
|
||||
newState, err = lg.pruneSize(head, newSize)
|
||||
|
||||
default:
|
||||
// We did not exceed the size cap, but some items are too old.
|
||||
newState = lg.pruneAge(head)
|
||||
}
|
||||
|
||||
// Note that when we update the head after pruning, we do not need to signal
|
||||
// any waiters; pruning never adds new material to the log so anyone waiting
|
||||
// should continue doing so until a subsequent Add occurs.
|
||||
lg.mu.Lock()
|
||||
defer lg.mu.Unlock()
|
||||
lg.numItems = newState.size
|
||||
lg.metrics.numItems.Set(float64(newState.size))
|
||||
lg.oldestCursor = newState.oldest
|
||||
lg.head = newState.head
|
||||
return err
|
||||
}
|
||||
|
||||
// pruneSize returns a new log state by pruning head to newSize.
|
||||
// Precondition: newSize ≤ len(head).
|
||||
func (lg *Log) pruneSize(head *logEntry, newSize int) (logState, error) {
|
||||
// Special case for size 0 to simplify the logic below.
|
||||
if newSize == 0 {
|
||||
return logState{}, ErrLogPruned // drop everything
|
||||
}
|
||||
|
||||
// Initialize: New head has the same item as the old head.
|
||||
first := &logEntry{item: head.item} // new head
|
||||
last := first // new tail (last copied cons)
|
||||
|
||||
cur := head.next
|
||||
for i := 1; i < newSize; i++ {
|
||||
cp := &logEntry{item: cur.item}
|
||||
last.next = cp
|
||||
last = cp
|
||||
|
||||
cur = cur.next
|
||||
}
|
||||
var err error
|
||||
if head.item.Cursor.Diff(last.item.Cursor) <= lg.windowSize {
|
||||
err = ErrLogPruned
|
||||
}
|
||||
|
||||
return logState{
|
||||
oldest: last.item.Cursor,
|
||||
newest: first.item.Cursor,
|
||||
size: newSize,
|
||||
head: first,
|
||||
}, err
|
||||
}
|
||||
|
||||
// pruneAge returns a new log state by pruning items older than the window
|
||||
// prior to the head element.
|
||||
func (lg *Log) pruneAge(head *logEntry) logState {
|
||||
first := &logEntry{item: head.item}
|
||||
last := first
|
||||
|
||||
size := 1
|
||||
for cur := head.next; cur != nil; cur = cur.next {
|
||||
diff := head.item.Cursor.Diff(cur.item.Cursor)
|
||||
if diff > lg.windowSize {
|
||||
break // all remaining items are older than the window
|
||||
}
|
||||
cp := &logEntry{item: cur.item}
|
||||
last.next = cp
|
||||
last = cp
|
||||
size++
|
||||
}
|
||||
return logState{
|
||||
oldest: last.item.Cursor,
|
||||
newest: first.item.Cursor,
|
||||
size: size,
|
||||
head: first,
|
||||
}
|
||||
}
|
||||
@@ -95,6 +95,13 @@ func (q *Query) Matches(events map[string][]string) (bool, error) {
|
||||
return q.matchesEvents(ExpandEvents(events)), nil
|
||||
}
|
||||
|
||||
func (q *Query) MatchesEvents(events []types.Event) (bool, error) {
|
||||
if q == nil {
|
||||
return true, nil
|
||||
}
|
||||
return q.matchesEvents(events), nil
|
||||
}
|
||||
|
||||
// String matches part of the pubsub.Query interface.
|
||||
func (q *Query) String() string {
|
||||
if q == nil {
|
||||
|
||||
@@ -332,6 +332,16 @@ func TestCompiledMatches(t *testing.T) {
|
||||
t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v",
|
||||
tc.s, tc.events, got, tc.matches)
|
||||
}
|
||||
|
||||
got, err = c.MatchesEvents(query.ExpandEvents(tc.events))
|
||||
if err != nil {
|
||||
t.Errorf("Query: %#q\nInput: %+v\nMatches: got error %v",
|
||||
tc.s, tc.events, err)
|
||||
}
|
||||
if got != tc.matches {
|
||||
t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v",
|
||||
tc.s, tc.events, got, tc.matches)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/bytes"
|
||||
lrpc "github.com/tendermint/tendermint/light/rpc"
|
||||
rpcclient "github.com/tendermint/tendermint/rpc/client"
|
||||
@@ -12,7 +14,9 @@ import (
|
||||
|
||||
func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
|
||||
return map[string]*rpcserver.RPCFunc{
|
||||
// Subscribe/unsubscribe are reserved for websocket events.
|
||||
// Event subscription. Note that subscribe, unsubscribe, and
|
||||
// unsubscribe_all are only available via the websocket endpoint.
|
||||
"events": rpcserver.NewRPCFunc(makeEventsSearchFunc(c), "filter,maxItems,before,after,waitTime"),
|
||||
"subscribe": rpcserver.NewWSRPCFunc(c.SubscribeWS, "query"),
|
||||
"unsubscribe": rpcserver.NewWSRPCFunc(c.UnsubscribeWS, "query"),
|
||||
"unsubscribe_all": rpcserver.NewWSRPCFunc(c.UnsubscribeAllWS, ""),
|
||||
@@ -300,3 +304,31 @@ func makeBroadcastEvidenceFunc(c *lrpc.Client) rpcBroadcastEvidenceFunc {
|
||||
return c.BroadcastEvidence(ctx.Context(), ev)
|
||||
}
|
||||
}
|
||||
|
||||
type rpcEventsSearchFunc func(
|
||||
ctx *rpctypes.Context,
|
||||
filter string,
|
||||
maxItems int,
|
||||
before, after string,
|
||||
waitTime time.Duration,
|
||||
) (*ctypes.ResultEvents, error)
|
||||
|
||||
func makeEventsSearchFunc(c *lrpc.Client) rpcEventsSearchFunc {
|
||||
return func(
|
||||
ctx *rpctypes.Context,
|
||||
filter string,
|
||||
maxItems int,
|
||||
before, after string,
|
||||
waitTime time.Duration,
|
||||
) (*ctypes.ResultEvents, error) {
|
||||
return c.Events(ctx.Context(), &ctypes.RequestEvents{
|
||||
Filter: &ctypes.EventFilter{
|
||||
Query: filter,
|
||||
},
|
||||
MaxItems: maxItems,
|
||||
WaitTime: waitTime,
|
||||
Before: before,
|
||||
After: after,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -572,11 +572,11 @@ func (c *Client) Subscribe(ctx context.Context, subscriber, query string,
|
||||
}
|
||||
|
||||
func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) error {
|
||||
return c.next.Unsubscribe(ctx, subscriber, query)
|
||||
return c.next.Unsubscribe(ctx, subscriber, query) //nolint:staticcheck
|
||||
}
|
||||
|
||||
func (c *Client) UnsubscribeAll(ctx context.Context, subscriber string) error {
|
||||
return c.next.UnsubscribeAll(ctx, subscriber)
|
||||
return c.next.UnsubscribeAll(ctx, subscriber) //nolint:staticcheck
|
||||
}
|
||||
|
||||
func (c *Client) updateLightClientIfNeededTo(ctx context.Context, height *int64) (*types.LightBlock, error) {
|
||||
@@ -599,6 +599,10 @@ func (c *Client) RegisterOpDecoder(typ string, dec merkle.OpDecoder) {
|
||||
c.prt.RegisterOpDecoder(typ, dec)
|
||||
}
|
||||
|
||||
func (c *Client) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) {
|
||||
return c.next.Events(ctx, req)
|
||||
}
|
||||
|
||||
// SubscribeWS subscribes for events using the given query and remote address as
|
||||
// a subscriber, but does not verify responses (UNSAFE)!
|
||||
// TODO: verify data
|
||||
@@ -631,7 +635,7 @@ func (c *Client) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Resul
|
||||
// UnsubscribeWS calls original client's Unsubscribe using remote address as a
|
||||
// subscriber.
|
||||
func (c *Client) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
|
||||
err := c.next.Unsubscribe(context.Background(), ctx.RemoteAddr(), query)
|
||||
err := c.next.Unsubscribe(context.Background(), ctx.RemoteAddr(), query) //nolint:staticcheck
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -641,7 +645,7 @@ func (c *Client) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Res
|
||||
// UnsubscribeAllWS calls original client's UnsubscribeAll using remote address
|
||||
// as a subscriber.
|
||||
func (c *Client) UnsubscribeAllWS(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
|
||||
err := c.next.UnsubscribeAll(context.Background(), ctx.RemoteAddr())
|
||||
err := c.next.UnsubscribeAll(context.Background(), ctx.RemoteAddr()) //nolint:staticcheck
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
91
node/node.go
91
node/node.go
@@ -21,10 +21,12 @@ import (
|
||||
cs "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/evidence"
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/light"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
@@ -113,20 +115,21 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
|
||||
}
|
||||
|
||||
// MetricsProvider returns a consensus, p2p and mempool Metrics.
|
||||
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
|
||||
type MetricsProvider func(chainID string) (*cs.Metrics, *eventlog.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
|
||||
|
||||
// DefaultMetricsProvider returns Metrics build using Prometheus client library
|
||||
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
|
||||
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
|
||||
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
|
||||
return func(chainID string) (*cs.Metrics, *eventlog.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
|
||||
if config.Prometheus {
|
||||
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
eventlog.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID)
|
||||
}
|
||||
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
|
||||
return cs.NopMetrics(), eventlog.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -210,6 +213,7 @@ type Node struct {
|
||||
|
||||
// services
|
||||
eventBus *types.EventBus // pub/sub for services
|
||||
eventLog *eventlog.Log
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore // store the blockchain to disk
|
||||
bcReactor p2p.Reactor // for block-syncing
|
||||
@@ -724,7 +728,7 @@ func NewNode(config *cfg.Config,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID)
|
||||
csMetrics, eventLogMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics := metricsProvider(genDoc.ChainID)
|
||||
|
||||
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
|
||||
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, abciMetrics)
|
||||
@@ -741,6 +745,19 @@ func NewNode(config *cfg.Config,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var eventLog *eventlog.Log
|
||||
if w := config.RPC.EventLogWindowSize; w > 0 {
|
||||
var err error
|
||||
eventLog, err = eventlog.New(eventlog.LogSettings{
|
||||
WindowSize: w,
|
||||
MaxItems: config.RPC.EventLogMaxItems,
|
||||
Metrics: eventLogMetrics,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("initializing event log: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config,
|
||||
genDoc.ChainID, dbProvider, eventBus, logger)
|
||||
if err != nil {
|
||||
@@ -924,6 +941,7 @@ func NewNode(config *cfg.Config,
|
||||
indexerService: indexerService,
|
||||
blockIndexer: blockIndexer,
|
||||
eventBus: eventBus,
|
||||
eventLog: eventLog,
|
||||
}
|
||||
node.BaseService = *service.NewBaseService(logger, "Node", node)
|
||||
|
||||
@@ -1080,6 +1098,7 @@ func (n *Node) ConfigureRPC() error {
|
||||
BlockIndexer: n.blockIndexer,
|
||||
ConsensusReactor: n.consensusReactor,
|
||||
EventBus: n.eventBus,
|
||||
EventLog: n.eventLog,
|
||||
Mempool: n.mempool,
|
||||
|
||||
Logger: n.Logger.With("module", "rpc"),
|
||||
@@ -1116,25 +1135,61 @@ func (n *Node) startRPC() ([]net.Listener, error) {
|
||||
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
}
|
||||
|
||||
// we may expose the rpc over both a unix and tcp socket
|
||||
// If the event log is enabled, subscribe to all events published to the
|
||||
// event bus, and forward them to the event log.
|
||||
if lg := n.eventLog; lg != nil {
|
||||
// TODO(creachadair): This is kind of a hack, ideally we'd share the
|
||||
// observer with the indexer, but it's tricky to plumb them together.
|
||||
// For now, use a "normal" subscription with a big buffer allowance.
|
||||
// The event log should always be able to keep up.
|
||||
const subscriberID = "event-log-subscriber"
|
||||
ctx := context.Background()
|
||||
sub, err := n.eventBus.Subscribe(ctx, subscriberID, query.All, 1<<16) // essentially "no limit"
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("event log subscribe: %w", err)
|
||||
}
|
||||
go func() {
|
||||
// N.B. Use background for unsubscribe, ctx is already terminated.
|
||||
defer n.eventBus.UnsubscribeAll(ctx, subscriberID) // nolint:errcheck
|
||||
for {
|
||||
msg := <-sub.Out()
|
||||
if err != nil {
|
||||
n.Logger.Error("Subscription terminated", "err", err)
|
||||
return
|
||||
}
|
||||
etype, ok := eventlog.FindType(query.ExpandEvents(msg.Events()))
|
||||
if ok {
|
||||
_ = lg.Add(etype, msg.Data())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
n.Logger.Info("Event log subscription enabled")
|
||||
}
|
||||
|
||||
// We may expose the RPC over both TCP and a Unix-domain socket.
|
||||
listeners := make([]net.Listener, len(listenAddrs))
|
||||
for i, listenAddr := range listenAddrs {
|
||||
mux := http.NewServeMux()
|
||||
rpcLogger := n.Logger.With("module", "rpc-server")
|
||||
wmLogger := rpcLogger.With("protocol", "websocket")
|
||||
wm := rpcserver.NewWebsocketManager(rpccore.Routes,
|
||||
rpcserver.OnDisconnect(func(remoteAddr string) {
|
||||
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
|
||||
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
|
||||
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
|
||||
}
|
||||
}),
|
||||
rpcserver.ReadLimit(config.MaxBodyBytes),
|
||||
rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize),
|
||||
)
|
||||
wm.SetLogger(wmLogger)
|
||||
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
||||
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
|
||||
if n.config.RPC.ExperimentalDisableWebsocket {
|
||||
rpcLogger.Info("Disabling websocket endpoints (experimental-disable-websocket=true)")
|
||||
} else {
|
||||
wmLogger := rpcLogger.With("protocol", "websocket")
|
||||
wm := rpcserver.NewWebsocketManager(rpccore.Routes,
|
||||
rpcserver.OnDisconnect(func(remoteAddr string) {
|
||||
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
|
||||
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
|
||||
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
|
||||
}
|
||||
}),
|
||||
rpcserver.ReadLimit(config.MaxBodyBytes),
|
||||
rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize),
|
||||
)
|
||||
wm.SetLogger(wmLogger)
|
||||
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
||||
}
|
||||
listener, err := rpcserver.Listen(
|
||||
listenAddr,
|
||||
config,
|
||||
|
||||
@@ -41,12 +41,12 @@ func TestHeaderEvents(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
evtTyp := types.EventNewBlockHeader
|
||||
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
|
||||
ectx, cancel := context.WithTimeout(ctx, waitForEventTimeout)
|
||||
defer cancel()
|
||||
query := types.QueryForEvent(types.EventNewBlockHeader).String()
|
||||
var evt types.EventDataNewBlockHeader
|
||||
err := client.WaitForOneEvent(ectx, c, query, &evt)
|
||||
require.Nil(t, err, "%d: %+v", i, err)
|
||||
_, ok := evt.(types.EventDataNewBlockHeader)
|
||||
require.True(t, ok, "%d: %#v", i, evt)
|
||||
// TODO: more checks...
|
||||
})
|
||||
}
|
||||
@@ -141,17 +141,15 @@ func testTxEventsSent(t *testing.T, broadcastMethod string) {
|
||||
}
|
||||
}()
|
||||
|
||||
// and wait for confirmation
|
||||
evt, err := client.WaitForOneEvent(c, types.EventTx, waitForEventTimeout)
|
||||
// Wait for the transaction we sent to be confirmed.
|
||||
query := fmt.Sprintf(`tm.event = '%s' AND tx.hash = '%X'`, types.EventTx, types.Tx(tx).Hash())
|
||||
var evt types.EventDataTx
|
||||
err := client.WaitForOneEvent(ctx, c, query, &evt)
|
||||
require.Nil(t, err)
|
||||
|
||||
// and make sure it has the proper info
|
||||
txe, ok := evt.(types.EventDataTx)
|
||||
require.True(t, ok)
|
||||
|
||||
// make sure this is the proper tx
|
||||
require.EqualValues(t, tx, txe.Tx)
|
||||
require.True(t, txe.Result.IsOK())
|
||||
require.EqualValues(t, tx, evt.Tx)
|
||||
require.True(t, evt.Result.IsOK())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
194
rpc/client/eventstream/eventstream.go
Normal file
194
rpc/client/eventstream/eventstream.go
Normal file
@@ -0,0 +1,194 @@
|
||||
// Package eventstream implements a convenience client for the Events method
|
||||
// of the Tendermint RPC service, allowing clients to observe a resumable
|
||||
// stream of events matching a query.
|
||||
package eventstream
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
coretypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
)
|
||||
|
||||
// Client is the subset of the RPC client interface consumed by Stream.
|
||||
type Client interface {
|
||||
Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error)
|
||||
}
|
||||
|
||||
// ErrStopRunning is returned by a Run callback to signal that no more events
|
||||
// are wanted and that Run should return.
|
||||
var ErrStopRunning = errors.New("stop accepting events")
|
||||
|
||||
// A Stream cpatures the state of a streaming event subscription.
|
||||
type Stream struct {
|
||||
filter *coretypes.EventFilter // the query being streamed
|
||||
batchSize int // request batch size
|
||||
newestSeen string // from the latest item matching our query
|
||||
waitTime time.Duration // the long-polling interval
|
||||
client Client
|
||||
}
|
||||
|
||||
// New constructs a new stream for the given query and options.
|
||||
// If opts == nil, the stream uses default values as described by
|
||||
// StreamOptions. This function will panic if cli == nil.
|
||||
func New(cli Client, query string, opts *StreamOptions) *Stream {
|
||||
if cli == nil {
|
||||
panic("eventstream: nil client")
|
||||
}
|
||||
return &Stream{
|
||||
filter: &coretypes.EventFilter{Query: query},
|
||||
batchSize: opts.batchSize(),
|
||||
newestSeen: opts.resumeFrom(),
|
||||
waitTime: opts.waitTime(),
|
||||
client: cli,
|
||||
}
|
||||
}
|
||||
|
||||
// Run polls the service for events matching the query, and calls accept for
|
||||
// each such event. Run handles pagination transparently, and delivers events
|
||||
// to accept in order of publication.
|
||||
//
|
||||
// Run continues until ctx ends or accept reports an error. If accept returns
|
||||
// ErrStopRunning, Run returns nil; otherwise Run returns the error reported by
|
||||
// accept or ctx. Run also returns an error if the server reports an error
|
||||
// from the Events method.
|
||||
//
|
||||
// If the stream falls behind the event log on the server, Run will stop and
|
||||
// report an error of concrete type *MissedItemsError. Call Reset to reset the
|
||||
// stream to the head of the log, and call Run again to resume.
|
||||
func (s *Stream) Run(ctx context.Context, accept func(*coretypes.EventItem) error) error {
|
||||
for {
|
||||
items, err := s.fetchPages(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Deliver events from the current batch to the receiver. We visit the
|
||||
// batch in reverse order so the receiver sees them in forward order.
|
||||
for i := len(items) - 1; i >= 0; i-- {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
itm := items[i]
|
||||
err := accept(itm)
|
||||
if itm.Cursor > s.newestSeen {
|
||||
s.newestSeen = itm.Cursor // update the latest delivered
|
||||
}
|
||||
if errors.Is(err, ErrStopRunning) {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset updates the stream's current cursor position to the head of the log.
|
||||
// This method may safely be called only when Run is not executing.
|
||||
func (s *Stream) Reset() { s.newestSeen = "" }
|
||||
|
||||
// fetchPages fetches the next batch of matching results. If there are multiple
|
||||
// pages, all the matching pages are retrieved. An error is reported if the
|
||||
// current scan position falls out of the event log window.
|
||||
func (s *Stream) fetchPages(ctx context.Context) ([]*coretypes.EventItem, error) {
|
||||
var pageCursor string // if non-empty, page through items before this
|
||||
var items []*coretypes.EventItem
|
||||
|
||||
// Fetch the next paginated batch of matching responses.
|
||||
for {
|
||||
rsp, err := s.client.Events(ctx, &coretypes.RequestEvents{
|
||||
Filter: s.filter,
|
||||
MaxItems: s.batchSize,
|
||||
After: s.newestSeen,
|
||||
Before: pageCursor,
|
||||
WaitTime: s.waitTime,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If the oldest item in the log is newer than our most recent item,
|
||||
// it means we might have missed some events matching our query.
|
||||
if s.newestSeen != "" && s.newestSeen < rsp.Oldest {
|
||||
return nil, &MissedItemsError{
|
||||
Query: s.filter.Query,
|
||||
NewestSeen: s.newestSeen,
|
||||
OldestPresent: rsp.Oldest,
|
||||
}
|
||||
}
|
||||
items = append(items, rsp.Items...)
|
||||
|
||||
if rsp.More {
|
||||
// There are more results matching this request, leave the baseline
|
||||
// where it is and set the page cursor so that subsequent requests
|
||||
// will get the next chunk.
|
||||
pageCursor = items[len(items)-1].Cursor
|
||||
} else if len(items) != 0 {
|
||||
// We got everything matching so far.
|
||||
return items, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StreamOptions are optional settings for a Stream value. A nil *StreamOptions
|
||||
// is ready for use and provides default values as described.
|
||||
type StreamOptions struct {
|
||||
// How many items to request per call to the service. The stream may pin
|
||||
// this value to a minimum default batch size.
|
||||
BatchSize int
|
||||
|
||||
// If set, resume streaming from this cursor. Typically this is set to the
|
||||
// cursor of the most recently-received matching value. If empty, streaming
|
||||
// begins at the head of the log (the default).
|
||||
ResumeFrom string
|
||||
|
||||
// Specifies the long poll interval. The stream may pin this value to a
|
||||
// minimum default poll interval.
|
||||
WaitTime time.Duration
|
||||
}
|
||||
|
||||
func (o *StreamOptions) batchSize() int {
|
||||
const minBatchSize = 16
|
||||
if o == nil || o.BatchSize < minBatchSize {
|
||||
return minBatchSize
|
||||
}
|
||||
return o.BatchSize
|
||||
}
|
||||
|
||||
func (o *StreamOptions) resumeFrom() string {
|
||||
if o == nil {
|
||||
return ""
|
||||
}
|
||||
return o.ResumeFrom
|
||||
}
|
||||
|
||||
func (o *StreamOptions) waitTime() time.Duration {
|
||||
const minWaitTime = 5 * time.Second
|
||||
if o == nil || o.WaitTime < minWaitTime {
|
||||
return minWaitTime
|
||||
}
|
||||
return o.WaitTime
|
||||
}
|
||||
|
||||
// MissedItemsError is an error that indicates the stream missed (lost) some
|
||||
// number of events matching the specified query.
|
||||
type MissedItemsError struct {
|
||||
// The cursor of the newest matching item the stream has observed.
|
||||
NewestSeen string
|
||||
|
||||
// The oldest cursor in the log at the point the miss was detected.
|
||||
// Any matching events between NewestSeen and OldestPresent are lost.
|
||||
OldestPresent string
|
||||
|
||||
// The active query.
|
||||
Query string
|
||||
}
|
||||
|
||||
// Error satisfies the error interface.
|
||||
func (e *MissedItemsError) Error() string {
|
||||
return fmt.Sprintf("missed events matching %q between %q and %q",
|
||||
e.Query, e.NewestSeen, e.OldestPresent)
|
||||
}
|
||||
287
rpc/client/eventstream/eventstream_test.go
Normal file
287
rpc/client/eventstream/eventstream_test.go
Normal file
@@ -0,0 +1,287 @@
|
||||
package eventstream_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fortytw2/leaktest"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
"github.com/tendermint/tendermint/internal/eventlog/cursor"
|
||||
"github.com/tendermint/tendermint/rpc/client/eventstream"
|
||||
rpccore "github.com/tendermint/tendermint/rpc/core"
|
||||
coretypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func TestStream_filterOrder(t *testing.T) {
|
||||
defer leaktest.Check(t)
|
||||
|
||||
s := newStreamTester(t, `tm.event = 'good'`, eventlog.LogSettings{
|
||||
WindowSize: 30 * time.Second,
|
||||
}, nil)
|
||||
|
||||
// Verify that events are delivered in forward time order (i.e., that the
|
||||
// stream unpacks the pages correctly) and that events not matching the
|
||||
// query (here, type="bad") are skipped.
|
||||
//
|
||||
// The minimum batch size is 16 and half the events we publish match, so we
|
||||
// publish > 32 items (> 16 good) to ensure we exercise paging.
|
||||
etype := [2]string{"good", "bad"}
|
||||
var items []testItem
|
||||
for i := 0; i < 40; i++ {
|
||||
s.advance(100 * time.Millisecond)
|
||||
text := fmt.Sprintf("item%d", i)
|
||||
cur := s.publish(etype[i%2], text)
|
||||
|
||||
// Even-numbered items match the target type.
|
||||
if i%2 == 0 {
|
||||
items = append(items, makeTestItem(cur, text))
|
||||
}
|
||||
}
|
||||
|
||||
s.start()
|
||||
for _, itm := range items {
|
||||
s.mustItem(t, itm)
|
||||
}
|
||||
s.stopWait()
|
||||
}
|
||||
|
||||
func TestStream_lostItem(t *testing.T) {
|
||||
defer leaktest.Check(t)
|
||||
|
||||
s := newStreamTester(t, ``, eventlog.LogSettings{
|
||||
WindowSize: 30 * time.Second,
|
||||
}, nil)
|
||||
|
||||
// Publish an item and let the client observe it.
|
||||
cur := s.publish("ok", "whatever")
|
||||
s.start()
|
||||
s.mustItem(t, makeTestItem(cur, "whatever"))
|
||||
s.stopWait()
|
||||
|
||||
// Time passes, and cur expires out of the window.
|
||||
s.advance(50 * time.Second)
|
||||
next1 := s.publish("ok", "more stuff")
|
||||
s.advance(15 * time.Second)
|
||||
next2 := s.publish("ok", "still more stuff")
|
||||
|
||||
// At this point, the oldest item in the log is newer than the point at
|
||||
// which we continued, we should get an error.
|
||||
s.start()
|
||||
var missed *eventstream.MissedItemsError
|
||||
if err := s.mustError(t); !errors.As(err, &missed) {
|
||||
t.Errorf("Wrong error: got %v, want %T", err, missed)
|
||||
} else {
|
||||
t.Logf("Correctly reported missed item: %v", missed)
|
||||
}
|
||||
|
||||
// If we reset the stream and continue from head, we should catch up.
|
||||
s.stopWait()
|
||||
s.stream.Reset()
|
||||
s.start()
|
||||
|
||||
s.mustItem(t, makeTestItem(next1, "more stuff"))
|
||||
s.mustItem(t, makeTestItem(next2, "still more stuff"))
|
||||
s.stopWait()
|
||||
}
|
||||
|
||||
func TestMinPollTime(t *testing.T) {
|
||||
defer leaktest.Check(t)
|
||||
|
||||
s := newStreamTester(t, ``, eventlog.LogSettings{
|
||||
WindowSize: 30 * time.Second,
|
||||
}, nil)
|
||||
|
||||
s.publish("bad", "whatever")
|
||||
|
||||
// Waiting for an item on a log with no matching events incurs a minimum
|
||||
// wait time and reports no events.
|
||||
ctx := context.Background()
|
||||
filter := &coretypes.EventFilter{Query: `tm.event = 'good'`}
|
||||
var zero cursor.Cursor
|
||||
|
||||
t.Run("NoneMatch", func(t *testing.T) {
|
||||
start := time.Now()
|
||||
|
||||
// Request a very short delay, and affirm we got the server's minimum.
|
||||
rsp, err := s.Events(ctx, &coretypes.RequestEvents{
|
||||
Filter: filter,
|
||||
MaxItems: 1,
|
||||
After: zero.String(),
|
||||
Before: zero.String(),
|
||||
WaitTime: 10 * time.Millisecond,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Events failed: %v", err)
|
||||
} else if elapsed := time.Since(start); elapsed < time.Second {
|
||||
t.Errorf("Events returned too quickly: got %v, wanted 1s", elapsed)
|
||||
} else if len(rsp.Items) != 0 {
|
||||
t.Errorf("Events returned %d items, expected none", len(rsp.Items))
|
||||
}
|
||||
})
|
||||
|
||||
s.publish("good", "whatever")
|
||||
|
||||
// Waiting for an available matching item incurs no delay.
|
||||
t.Run("SomeMatch", func(t *testing.T) {
|
||||
start := time.Now()
|
||||
|
||||
// Request a long-ish delay and affirm we don't block for it.
|
||||
// Check for this by ensuring we return sooner than the minimum delay,
|
||||
// since we don't know the exact timing.
|
||||
rsp, err := s.Events(ctx, &coretypes.RequestEvents{
|
||||
Filter: filter,
|
||||
MaxItems: 1,
|
||||
After: zero.String(),
|
||||
Before: zero.String(),
|
||||
WaitTime: 10 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Events failed: %v", err)
|
||||
} else if elapsed := time.Since(start); elapsed > 500*time.Millisecond {
|
||||
t.Errorf("Events returned too slowly: got %v, wanted immediate", elapsed)
|
||||
} else if len(rsp.Items) == 0 {
|
||||
t.Error("Events returned no items, wanted at least 1")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// testItem is a wrapper for comparing item results in a friendly output format
|
||||
// for the cmp package.
|
||||
type testItem struct {
|
||||
Cursor string
|
||||
Data string
|
||||
|
||||
// N.B. Fields exported to simplify use in cmp.
|
||||
}
|
||||
|
||||
func makeTestItem(cur, data string) testItem {
|
||||
return testItem{
|
||||
Cursor: cur,
|
||||
Data: fmt.Sprintf(`%q`, data),
|
||||
}
|
||||
}
|
||||
|
||||
// streamTester is a simulation harness for an eventstream.Stream. It simulates
|
||||
// the production service by plumbing an event log into a stub RPC environment,
|
||||
// into which the test can publish events and advance the perceived time to
|
||||
// exercise various cases of the stream.
|
||||
type streamTester struct {
|
||||
log *eventlog.Log
|
||||
env *rpccore.Environment
|
||||
clock int64
|
||||
index int64
|
||||
stream *eventstream.Stream
|
||||
errc chan error
|
||||
recv chan *coretypes.EventItem
|
||||
stop func()
|
||||
}
|
||||
|
||||
func newStreamTester(t *testing.T, query string, logOpts eventlog.LogSettings, streamOpts *eventstream.StreamOptions) *streamTester {
|
||||
t.Helper()
|
||||
s := new(streamTester)
|
||||
|
||||
// Plumb a time source controlled by the tester into the event log.
|
||||
logOpts.Source = cursor.Source{
|
||||
TimeIndex: s.timeNow,
|
||||
}
|
||||
lg, err := eventlog.New(logOpts)
|
||||
if err != nil {
|
||||
t.Fatalf("Creating event log: %v", err)
|
||||
}
|
||||
s.log = lg
|
||||
s.env = &rpccore.Environment{EventLog: lg}
|
||||
s.stream = eventstream.New(s, query, streamOpts)
|
||||
rpccore.SetEnvironment(s.env)
|
||||
return s
|
||||
}
|
||||
|
||||
// start starts the stream receiver, which runs until it it terminated by
|
||||
// calling stop.
|
||||
func (s *streamTester) start() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.errc = make(chan error, 1)
|
||||
s.recv = make(chan *coretypes.EventItem)
|
||||
s.stop = cancel
|
||||
go func() {
|
||||
defer close(s.errc)
|
||||
s.errc <- s.stream.Run(ctx, func(itm *coretypes.EventItem) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case s.recv <- itm:
|
||||
return nil
|
||||
}
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
// publish adds a single event to the event log at the present moment.
|
||||
func (s *streamTester) publish(etype, payload string) string {
|
||||
_ = s.log.Add(etype, types.EventDataString(payload))
|
||||
s.index++
|
||||
return fmt.Sprintf("%016x-%04x", s.clock, s.index)
|
||||
}
|
||||
|
||||
// wait blocks until either an item is received or the runner stops.
|
||||
func (s *streamTester) wait() (*coretypes.EventItem, error) {
|
||||
select {
|
||||
case itm := <-s.recv:
|
||||
return itm, nil
|
||||
case err := <-s.errc:
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// mustItem waits for an item and fails if either an error occurs or the item
|
||||
// does not match want.
|
||||
func (s *streamTester) mustItem(t *testing.T, want testItem) {
|
||||
t.Helper()
|
||||
|
||||
itm, err := s.wait()
|
||||
if err != nil {
|
||||
t.Fatalf("Receive: got error %v, want item %v", err, want)
|
||||
}
|
||||
got := testItem{Cursor: itm.Cursor, Data: string(itm.Data)}
|
||||
if diff := cmp.Diff(want, got); diff != "" {
|
||||
t.Errorf("Item: (-want, +got)\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
// mustError waits for an error and fails if an item is returned.
|
||||
func (s *streamTester) mustError(t *testing.T) error {
|
||||
t.Helper()
|
||||
itm, err := s.wait()
|
||||
if err == nil {
|
||||
t.Fatalf("Receive: got item %v, want error", itm)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// stopWait stops the runner and waits for it to terminate.
|
||||
func (s *streamTester) stopWait() { s.stop(); s.wait() } //nolint:errcheck
|
||||
|
||||
// timeNow reports the current simulated time index.
|
||||
func (s *streamTester) timeNow() int64 { return s.clock }
|
||||
|
||||
// advance moves the simulated time index.
|
||||
func (s *streamTester) advance(d time.Duration) { s.clock += int64(d) }
|
||||
|
||||
// Events implements the eventstream.Client interface by delegating to a stub
|
||||
// environment as if it were a local RPC client. This works because the Events
|
||||
// method only requires the event log, the other fields are unused.
|
||||
func (s *streamTester) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
|
||||
var before, after cursor.Cursor
|
||||
if err := before.UnmarshalText([]byte(req.Before)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := after.UnmarshalText([]byte(req.After)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rpccore.EventsWithContext(ctx, req.Filter, req.MaxItems, before, after, req.WaitTime)
|
||||
}
|
||||
@@ -2,10 +2,11 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
coretypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -52,32 +53,24 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForOneEvent subscribes to a websocket event for the given
|
||||
// event time and returns upon receiving it one time, or
|
||||
// when the timeout duration has expired.
|
||||
//
|
||||
// This handles subscribing and unsubscribing under the hood
|
||||
func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) {
|
||||
const subscriber = "helpers"
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
|
||||
// register for the next event of this type
|
||||
eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to subscribe: %w", err)
|
||||
}
|
||||
// make sure to unregister after the test is over
|
||||
defer func() {
|
||||
if deferErr := c.UnsubscribeAll(ctx, subscriber); deferErr != nil {
|
||||
panic(deferErr)
|
||||
// WaitForOneEvent waits for the first event matching the given query on c, or
|
||||
// until ctx ends. It reports an error if ctx ends before a matching event is
|
||||
// received.
|
||||
func WaitForOneEvent(ctx context.Context, c EventsClient, query string, evt types.TMEventData) error {
|
||||
for {
|
||||
rsp, err := c.Events(ctx, &coretypes.RequestEvents{
|
||||
Filter: &coretypes.EventFilter{Query: query},
|
||||
MaxItems: 1,
|
||||
WaitTime: 10 * time.Second, // duration doesn't matter, limited by ctx timeout
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
} else if len(rsp.Items) == 0 {
|
||||
continue // continue polling until ctx expires
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
return event.Data, nil
|
||||
case <-ctx.Done():
|
||||
return nil, errors.New("timed out waiting for event")
|
||||
if err := json.Unmarshal(rsp.Items[0].Data, evt); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -363,6 +363,20 @@ func (c *baseRPCClient) ConsensusParams(
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) {
|
||||
result := new(ctypes.ResultEvents)
|
||||
if _, err := c.caller.Call(ctx, "events", map[string]interface{}{
|
||||
"filter": req.Filter.Query,
|
||||
"maxItems": req.MaxItems,
|
||||
"before": req.Before,
|
||||
"after": req.After,
|
||||
"waitTime": req.WaitTime,
|
||||
}, result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *baseRPCClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) {
|
||||
result := new(ctypes.ResultHealth)
|
||||
_, err := c.caller.Call(ctx, "health", map[string]interface{}{}, result)
|
||||
@@ -597,7 +611,7 @@ func (c *baseRPCClient) BroadcastEvidence(
|
||||
|
||||
var errNotRunning = errors.New("client is not running. Use .Start() method to start")
|
||||
|
||||
// WSEvents is a wrapper around WSClient, which implements EventsClient.
|
||||
// WSEvents is a wrapper around WSClient, which implements SubscriptionClient.
|
||||
type WSEvents struct {
|
||||
service.BaseService
|
||||
remote string
|
||||
@@ -608,6 +622,8 @@ type WSEvents struct {
|
||||
subscriptions map[string]chan ctypes.ResultEvent // query -> chan
|
||||
}
|
||||
|
||||
var _ rpcclient.SubscriptionClient = (*WSEvents)(nil)
|
||||
|
||||
func newWSEvents(remote, endpoint string) (*WSEvents, error) {
|
||||
w := &WSEvents{
|
||||
endpoint: endpoint,
|
||||
@@ -647,7 +663,7 @@ func (w *WSEvents) OnStop() {
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe implements EventsClient by using WSClient to subscribe given
|
||||
// Subscribe implements SubscriptionClient by using WSClient to subscribe given
|
||||
// subscriber to query. By default, returns a channel with cap=1. Error is
|
||||
// returned if it fails to subscribe.
|
||||
//
|
||||
@@ -680,7 +696,7 @@ func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
|
||||
return outc, nil
|
||||
}
|
||||
|
||||
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given
|
||||
// Unsubscribe implements SubscriptionClient by using WSClient to unsubscribe given
|
||||
// subscriber from query.
|
||||
//
|
||||
// It returns an error if WSEvents is not running.
|
||||
@@ -703,7 +719,7 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
|
||||
// UnsubscribeAll implements SubscriptionClient by using WSClient to unsubscribe
|
||||
// given subscriber from all the queries.
|
||||
//
|
||||
// It returns an error if WSEvents is not running.
|
||||
|
||||
@@ -35,12 +35,13 @@ type Client interface {
|
||||
service.Service
|
||||
ABCIClient
|
||||
EventsClient
|
||||
EvidenceClient
|
||||
HistoryClient
|
||||
MempoolClient
|
||||
NetworkClient
|
||||
SignClient
|
||||
StatusClient
|
||||
EvidenceClient
|
||||
MempoolClient
|
||||
SubscriptionClient
|
||||
}
|
||||
|
||||
// ABCIClient groups together the functionality that principally affects the
|
||||
@@ -115,20 +116,40 @@ type NetworkClient interface {
|
||||
Health(context.Context) (*ctypes.ResultHealth, error)
|
||||
}
|
||||
|
||||
// EventsClient is reactive, you can subscribe to any message, given the proper
|
||||
// string. see tendermint/types/events.go
|
||||
// EventsClient exposes the methods to retrieve events from the consensus engine.
|
||||
type EventsClient interface {
|
||||
// Subscribe subscribes given subscriber to query. Returns a channel with
|
||||
// cap=1 onto which events are published. An error is returned if it fails to
|
||||
// subscribe. outCapacity can be used optionally to set capacity for the
|
||||
// channel. Channel is never closed to prevent accidental reads.
|
||||
// Events fetches a batch of events from the server matching the given query
|
||||
// and time range.
|
||||
Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error)
|
||||
}
|
||||
|
||||
// TODO(creachadair): This interface should be removed once the streaming event
|
||||
// interface is removed in Tendermint v0.39.
|
||||
type SubscriptionClient interface {
|
||||
// Subscribe issues a subscription request for the given subscriber ID and
|
||||
// query. This method does not block: If subscription fails, it reports an
|
||||
// error, and if subscription succeeds it returns a channel that delivers
|
||||
// matching events until the subscription is stopped. The channel is never
|
||||
// closed; the client is responsible for knowing when no further data will
|
||||
// be sent.
|
||||
//
|
||||
// The context only governs the initial subscription, it does not control
|
||||
// the lifetime of the channel. To cancel a subscription call Unsubscribe or
|
||||
// UnsubscribeAll.
|
||||
//
|
||||
// ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe
|
||||
// or UnsubscribeAll.
|
||||
Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
|
||||
// Unsubscribe unsubscribes given subscriber from query.
|
||||
//
|
||||
// Deprecated: This method will be removed in Tendermint v0.37, use Events
|
||||
// instead.
|
||||
Unsubscribe(ctx context.Context, subscriber, query string) error
|
||||
|
||||
// UnsubscribeAll unsubscribes given subscriber from all the queries.
|
||||
//
|
||||
// Deprecated: This method will be removed in Tendermint v0.37, use Events
|
||||
// instead.
|
||||
UnsubscribeAll(ctx context.Context, subscriber string) error
|
||||
}
|
||||
|
||||
|
||||
@@ -127,6 +127,10 @@ func (c *Local) ConsensusParams(ctx context.Context, height *int64) (*ctypes.Res
|
||||
return core.ConsensusParams(c.ctx, height)
|
||||
}
|
||||
|
||||
func (c *Local) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) {
|
||||
return core.Events(c.ctx, req.Filter.Query, req.MaxItems, req.Before, req.After, req.WaitTime)
|
||||
}
|
||||
|
||||
func (c *Local) Health(ctx context.Context) (*ctypes.ResultHealth, error) {
|
||||
return core.Health(c.ctx)
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ type Client struct {
|
||||
client.EvidenceClient
|
||||
client.MempoolClient
|
||||
service.Service
|
||||
client.SubscriptionClient
|
||||
}
|
||||
|
||||
var _ client.Client = Client{}
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
@@ -92,6 +93,7 @@ type Environment struct {
|
||||
BlockIndexer indexer.BlockIndexer
|
||||
ConsensusReactor *consensus.Reactor
|
||||
EventBus *types.EventBus // thread safe
|
||||
EventLog *eventlog.Log
|
||||
Mempool mempl.Mempool
|
||||
|
||||
Logger log.Logger
|
||||
|
||||
@@ -2,10 +2,13 @@ package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
"github.com/tendermint/tendermint/internal/eventlog/cursor"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
@@ -128,3 +131,157 @@ func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
|
||||
}
|
||||
return &ctypes.ResultUnsubscribe{}, nil
|
||||
}
|
||||
|
||||
// Events applies a query to the event log. If an event log is not enabled,
|
||||
// Events reports an error. Otherwise, it filters the current contents of the
|
||||
// log to return matching events.
|
||||
//
|
||||
// Events returns up to maxItems of the newest eligible event items. An item is
|
||||
// eligible if it is older than before (or before is zero), it is newer than
|
||||
// after (or after is zero), and its data matches the filter. A nil filter
|
||||
// matches all event data.
|
||||
//
|
||||
// If before is zero and no eligible event items are available, Events waits
|
||||
// for up to waitTime for a matching item to become available. The wait is
|
||||
// terminated early if ctx ends.
|
||||
//
|
||||
// If maxItems ≤ 0, a default positive number of events is chosen. The values
|
||||
// of maxItems and waitTime may be capped to sensible internal maxima without
|
||||
// reporting an error to the caller.
|
||||
func Events(ctx *rpctypes.Context,
|
||||
filter string,
|
||||
maxItems int,
|
||||
before, after string,
|
||||
waitTime time.Duration,
|
||||
) (*ctypes.ResultEvents, error) {
|
||||
var curBefore, curAfter cursor.Cursor
|
||||
if err := curBefore.UnmarshalText([]byte(before)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := curAfter.UnmarshalText([]byte(after)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return EventsWithContext(ctx.Context(), &ctypes.EventFilter{
|
||||
Query: filter,
|
||||
}, maxItems, curBefore, curAfter, waitTime)
|
||||
}
|
||||
|
||||
func EventsWithContext(ctx context.Context,
|
||||
filter *ctypes.EventFilter,
|
||||
maxItems int,
|
||||
before, after cursor.Cursor,
|
||||
waitTime time.Duration,
|
||||
) (*ctypes.ResultEvents, error) {
|
||||
if env.EventLog == nil {
|
||||
return nil, errors.New("the event log is not enabled")
|
||||
}
|
||||
|
||||
// Parse and validate parameters.
|
||||
if maxItems <= 0 {
|
||||
maxItems = 10
|
||||
} else if maxItems > 100 {
|
||||
maxItems = 100
|
||||
}
|
||||
|
||||
const minWaitTime = 1 * time.Second
|
||||
const maxWaitTime = 30 * time.Second
|
||||
if waitTime < minWaitTime {
|
||||
waitTime = minWaitTime
|
||||
} else if waitTime > maxWaitTime {
|
||||
waitTime = maxWaitTime
|
||||
}
|
||||
|
||||
query := tmquery.All
|
||||
if filter != nil && filter.Query != "" {
|
||||
q, err := tmquery.New(filter.Query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid filter query: %w", err)
|
||||
}
|
||||
query = q
|
||||
}
|
||||
|
||||
var info eventlog.Info
|
||||
var items []*eventlog.Item
|
||||
var err error
|
||||
accept := func(itm *eventlog.Item) error {
|
||||
// N.B. We accept up to one item more than requested, so we can tell how
|
||||
// to set the "more" flag in the response.
|
||||
if len(items) > maxItems || itm.Cursor.Before(after) {
|
||||
return eventlog.ErrStopScan
|
||||
}
|
||||
match, err := query.MatchesEvents(itm.Events)
|
||||
if err != nil {
|
||||
return fmt.Errorf("matches failed: %v", err)
|
||||
}
|
||||
if cursorInRange(itm.Cursor, before, after) && match {
|
||||
items = append(items, itm)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if before.IsZero() {
|
||||
ctx, cancel := context.WithTimeout(ctx, waitTime)
|
||||
defer cancel()
|
||||
|
||||
// Long poll. The loop here is because new items may not match the query,
|
||||
// and we want to keep waiting until we have relevant results (or time out).
|
||||
cur := after
|
||||
for len(items) == 0 {
|
||||
info, err = env.EventLog.WaitScan(ctx, cur, accept)
|
||||
if err != nil {
|
||||
// Don't report a timeout as a request failure.
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
err = nil
|
||||
}
|
||||
break
|
||||
}
|
||||
cur = info.Newest
|
||||
}
|
||||
} else {
|
||||
// Quick poll, return only what is already available.
|
||||
info, err = env.EventLog.Scan(accept)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
more := len(items) > maxItems
|
||||
if more {
|
||||
items = items[:len(items)-1]
|
||||
}
|
||||
enc, err := marshalItems(items)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ctypes.ResultEvents{
|
||||
Items: enc,
|
||||
More: more,
|
||||
Oldest: cursorString(info.Oldest),
|
||||
Newest: cursorString(info.Newest),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func cursorString(c cursor.Cursor) string {
|
||||
if c.IsZero() {
|
||||
return ""
|
||||
}
|
||||
return c.String()
|
||||
}
|
||||
|
||||
func cursorInRange(c, before, after cursor.Cursor) bool {
|
||||
return (before.IsZero() || c.Before(before)) && (after.IsZero() || after.Before(c))
|
||||
}
|
||||
|
||||
func marshalItems(items []*eventlog.Item) ([]*ctypes.EventItem, error) {
|
||||
out := make([]*ctypes.EventItem, len(items))
|
||||
for i, itm := range items {
|
||||
// FIXME: align usage after remove type-tag
|
||||
v, err := json.Marshal(itm.Data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encoding event data: %w", err)
|
||||
}
|
||||
out[i] = &ctypes.EventItem{Cursor: itm.Cursor.String(), Event: itm.Type}
|
||||
out[i].Data = v
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
@@ -8,7 +8,9 @@ import (
|
||||
|
||||
// Routes is a map of available routes.
|
||||
var Routes = map[string]*rpc.RPCFunc{
|
||||
// subscribe/unsubscribe are reserved for websocket events.
|
||||
// Event subscription. Note that subscribe, unsubscribe, and
|
||||
// unsubscribe_all are only available via the websocket endpoint.
|
||||
"events": rpc.NewRPCFunc(Events, "filter,maxItems,before,after,waitTime"),
|
||||
"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"),
|
||||
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"),
|
||||
"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""),
|
||||
|
||||
@@ -254,3 +254,64 @@ type ResultEvent struct {
|
||||
Data types.TMEventData `json:"data"`
|
||||
Events map[string][]string `json:"events"`
|
||||
}
|
||||
|
||||
// RequestEvents is the argument for the "/events" RPC endpoint.
|
||||
type RequestEvents struct {
|
||||
// Optional filter spec. If nil or empty, all items are eligible.
|
||||
Filter *EventFilter `json:"filter"`
|
||||
|
||||
// The maximum number of eligible items to return.
|
||||
// If zero or negative, the server will report a default number.
|
||||
MaxItems int `json:"maxItems"`
|
||||
|
||||
// Return only items after this cursor. If empty, the limit is just
|
||||
// before the the beginning of the event log.
|
||||
After string `json:"after"`
|
||||
|
||||
// Return only items before this cursor. If empty, the limit is just
|
||||
// after the head of the event log.
|
||||
Before string `json:"before"`
|
||||
|
||||
// Wait for up to this long for events to be available.
|
||||
WaitTime time.Duration `json:"waitTime"`
|
||||
}
|
||||
|
||||
// An EventFilter specifies which events are selected by an /events request.
|
||||
type EventFilter struct {
|
||||
Query string `json:"query"`
|
||||
}
|
||||
|
||||
// ResultEvents is the response from the "/events" RPC endpoint.
|
||||
type ResultEvents struct {
|
||||
// The items matching the request parameters, from newest
|
||||
// to oldest, if any were available within the timeout.
|
||||
Items []*EventItem `json:"items"`
|
||||
|
||||
// This is true if there is at least one older matching item
|
||||
// available in the log that was not returned.
|
||||
More bool `json:"more"`
|
||||
|
||||
// The cursor of the oldest item in the log at the time of this reply,
|
||||
// or "" if the log is empty.
|
||||
Oldest string `json:"oldest"`
|
||||
|
||||
// The cursor of the newest item in the log at the time of this reply,
|
||||
// or "" if the log is empty.
|
||||
Newest string `json:"newest"`
|
||||
}
|
||||
|
||||
type EventItem struct {
|
||||
// The cursor of this item.
|
||||
Cursor string `json:"cursor"`
|
||||
|
||||
// The event label of this item (for example, "Vote").
|
||||
Event string `json:"event,omitempty"`
|
||||
|
||||
// The encoded event data for this item. The content is a JSON object with
|
||||
// the following structure:
|
||||
//
|
||||
// <json-encoded-value>
|
||||
//
|
||||
// The known type tags are defined by the tendermint/types package.
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
|
||||
@@ -97,6 +97,7 @@ func createConfig() *cfg.Config {
|
||||
tm, rpc, grpc := makeAddrs()
|
||||
c.P2P.ListenAddress = tm
|
||||
c.RPC.ListenAddress = rpc
|
||||
c.RPC.EventLogWindowSize = 5 * time.Minute
|
||||
c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"}
|
||||
c.RPC.GRPCListenAddress = grpc
|
||||
return c
|
||||
|
||||
81
scripts/estream/estream.go
Normal file
81
scripts/estream/estream.go
Normal file
@@ -0,0 +1,81 @@
|
||||
// Program estream is a manual testing tool for polling the event stream
|
||||
// of a running Tendermint consensus node.
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/tendermint/tendermint/rpc/client/eventstream"
|
||||
rpcclient "github.com/tendermint/tendermint/rpc/client/http"
|
||||
coretypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
)
|
||||
|
||||
var (
|
||||
query = flag.String("query", "", "Filter query")
|
||||
batchSize = flag.Int("batch", 0, "Batch size")
|
||||
resumeFrom = flag.String("resume", "", "Resume cursor")
|
||||
numItems = flag.Int("count", 0, "Number of items to read (0 to stream)")
|
||||
waitTime = flag.Duration("poll", 0, "Long poll interval")
|
||||
rpcAddr = flag.String("addr", "http://localhost:26657", "RPC service address")
|
||||
)
|
||||
|
||||
func init() {
|
||||
flag.Usage = func() {
|
||||
fmt.Fprintf(os.Stderr, `Usage: %[1]s [options]
|
||||
|
||||
Connect to the Tendermint node whose RPC service is at -addr, and poll for events
|
||||
matching the specified -query. If no query is given, all events are fetched.
|
||||
The resulting event data are written to stdout as JSON.
|
||||
|
||||
Use -resume to pick up polling from a previously-reported event cursor.
|
||||
Use -count to stop polling after a certain number of events has been reported.
|
||||
Use -batch to override the default request batch size.
|
||||
Use -poll to override the default long-polling interval.
|
||||
|
||||
Options:
|
||||
`, filepath.Base(os.Args[0]))
|
||||
flag.PrintDefaults()
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
cli, err := rpcclient.New(*rpcAddr, "/websocket")
|
||||
if err != nil {
|
||||
log.Fatalf("RPC client: %v", err)
|
||||
}
|
||||
stream := eventstream.New(cli, *query, &eventstream.StreamOptions{
|
||||
BatchSize: *batchSize,
|
||||
ResumeFrom: *resumeFrom,
|
||||
WaitTime: *waitTime,
|
||||
})
|
||||
|
||||
// Shut down cleanly on SIGINT. Don't attempt clean shutdown for other
|
||||
// fatal signals.
|
||||
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
|
||||
defer cancel()
|
||||
|
||||
var nr int
|
||||
if err := stream.Run(ctx, func(itm *coretypes.EventItem) error {
|
||||
nr++
|
||||
bits, err := json.Marshal(itm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fmt.Println(string(bits))
|
||||
if *numItems > 0 && nr >= *numItems {
|
||||
return eventstream.ErrStopRunning
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
log.Fatalf("Stream failed: %v", err)
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
@@ -68,6 +69,11 @@ type EventDataNewBlock struct {
|
||||
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
|
||||
}
|
||||
|
||||
// ABCIEvents implements the eventlog.ABCIEventer interface.
|
||||
func (e EventDataNewBlock) ABCIEvents() []abci.Event {
|
||||
return append(e.ResultBeginBlock.Events, e.ResultEndBlock.Events...)
|
||||
}
|
||||
|
||||
type EventDataNewBlockHeader struct {
|
||||
Header Header `json:"header"`
|
||||
|
||||
@@ -76,6 +82,11 @@ type EventDataNewBlockHeader struct {
|
||||
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
|
||||
}
|
||||
|
||||
// ABCIEvents implements the eventlog.ABCIEventer interface.
|
||||
func (e EventDataNewBlockHeader) ABCIEvents() []abci.Event {
|
||||
return append(e.ResultBeginBlock.Events, e.ResultEndBlock.Events...)
|
||||
}
|
||||
|
||||
type EventDataNewEvidence struct {
|
||||
Evidence Evidence `json:"evidence"`
|
||||
|
||||
@@ -87,6 +98,15 @@ type EventDataTx struct {
|
||||
abci.TxResult
|
||||
}
|
||||
|
||||
// ABCIEvents implements the eventlog.ABCIEventer interface.
|
||||
func (e EventDataTx) ABCIEvents() []abci.Event {
|
||||
base := []abci.Event{
|
||||
eventWithAttr(TxHashKey, fmt.Sprintf("%X", Tx(e.Tx).Hash())),
|
||||
eventWithAttr(TxHeightKey, fmt.Sprintf("%d", e.Height)),
|
||||
}
|
||||
return append(base, e.Result.Events...)
|
||||
}
|
||||
|
||||
// NOTE: This goes into the replay WAL
|
||||
type EventDataRoundState struct {
|
||||
Height int64 `json:"height"`
|
||||
@@ -181,3 +201,16 @@ type BlockEventPublisher interface {
|
||||
type TxEventPublisher interface {
|
||||
PublishEventTx(EventDataTx) error
|
||||
}
|
||||
|
||||
// eventWithAttr constructs a single abci.Event with a single attribute.
|
||||
// The type of the event and the name of the attribute are obtained by
|
||||
// splitting the event type on period (e.g., "foo.bar").
|
||||
func eventWithAttr(etype, value string) abci.Event {
|
||||
parts := strings.SplitN(etype, ".", 2)
|
||||
return abci.Event{
|
||||
Type: parts[0],
|
||||
Attributes: []abci.EventAttribute{{
|
||||
Key: parts[1], Value: value,
|
||||
}},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,22 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Verify that the event data types satisfy their shared interface.
|
||||
// TODO: add EventDataBlockSyncStatus and EventDataStateSyncStatus
|
||||
// when backport #6700 and #6755.
|
||||
var (
|
||||
_ TMEventData = EventDataCompleteProposal{}
|
||||
_ TMEventData = EventDataNewBlock{}
|
||||
_ TMEventData = EventDataNewBlockHeader{}
|
||||
_ TMEventData = EventDataNewEvidence{}
|
||||
_ TMEventData = EventDataNewRound{}
|
||||
_ TMEventData = EventDataRoundState{}
|
||||
_ TMEventData = EventDataTx{}
|
||||
_ TMEventData = EventDataValidatorSetUpdates{}
|
||||
_ TMEventData = EventDataVote{}
|
||||
_ TMEventData = EventDataString("")
|
||||
)
|
||||
|
||||
func TestQueryTxFor(t *testing.T) {
|
||||
tx := Tx("foo")
|
||||
assert.Equal(t,
|
||||
|
||||
Reference in New Issue
Block a user