diff --git a/libs/pubsub/query/query.go b/libs/pubsub/query/query.go index 715b749f4..371ec86ef 100644 --- a/libs/pubsub/query/query.go +++ b/libs/pubsub/query/query.go @@ -95,6 +95,13 @@ func (q *Query) Matches(events map[string][]string) (bool, error) { return q.matchesEvents(ExpandEvents(events)), nil } +func (q *Query) MatchesEvents(events []types.Event) (bool, error) { + if q == nil { + return true, nil + } + return q.matchesEvents(events), nil +} + // String matches part of the pubsub.Query interface. func (q *Query) String() string { if q == nil { diff --git a/libs/pubsub/query/query_test.go b/libs/pubsub/query/query_test.go index 71a05e239..7d067c09a 100644 --- a/libs/pubsub/query/query_test.go +++ b/libs/pubsub/query/query_test.go @@ -332,6 +332,16 @@ func TestCompiledMatches(t *testing.T) { t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v", tc.s, tc.events, got, tc.matches) } + + got, err = c.MatchesEvents(query.ExpandEvents(tc.events)) + if err != nil { + t.Errorf("Query: %#q\nInput: %+v\nMatches: got error %v", + tc.s, tc.events, err) + } + if got != tc.matches { + t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v", + tc.s, tc.events, got, tc.matches) + } }) } } diff --git a/light/rpc/client.go b/light/rpc/client.go index 920641295..234b89ca5 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -12,6 +12,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/merkle" + "github.com/tendermint/tendermint/internal/eventlog/cursor" tmbytes "github.com/tendermint/tendermint/libs/bytes" tmmath "github.com/tendermint/tendermint/libs/math" service "github.com/tendermint/tendermint/libs/service" @@ -599,6 +600,17 @@ func (c *Client) RegisterOpDecoder(typ string, dec merkle.OpDecoder) { c.prt.RegisterOpDecoder(typ, dec) } +// TODO(creachadair): Remove this once the RPC clients support the new method. +// This is just a placeholder to let things build during development. +func (c *Client) Events(ctx *rpctypes.Context, + filter *ctypes.EventFilter, + maxItems int, + before, after cursor.Cursor, + waitTime time.Duration, +) (*ctypes.ResultEvents, error) { + return nil, errors.New("the /events method is not implemented") +} + // SubscribeWS subscribes for events using the given query and remote address as // a subscriber, but does not verify responses (UNSAFE)! // TODO: verify data diff --git a/node/node.go b/node/node.go index 4047ec096..6c3821aa1 100644 --- a/node/node.go +++ b/node/node.go @@ -21,10 +21,12 @@ import ( cs "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/evidence" + "github.com/tendermint/tendermint/internal/eventlog" tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/light" mempl "github.com/tendermint/tendermint/mempool" @@ -210,6 +212,7 @@ type Node struct { // services eventBus *types.EventBus // pub/sub for services + eventLog *eventlog.Log stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk bcReactor p2p.Reactor // for block-syncing @@ -1080,6 +1083,7 @@ func (n *Node) ConfigureRPC() error { BlockIndexer: n.blockIndexer, ConsensusReactor: n.consensusReactor, EventBus: n.eventBus, + EventLog: n.eventLog, Mempool: n.mempool, Logger: n.Logger.With("module", "rpc"), @@ -1116,6 +1120,38 @@ func (n *Node) startRPC() ([]net.Listener, error) { config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second } + // If the event log is enabled, subscribe to all events published to the + // event bus, and forward them to the event log. + if lg := n.eventLog; lg != nil { + // TODO(creachadair): This is kind of a hack, ideally we'd share the + // observer with the indexer, but it's tricky to plumb them together. + // For now, use a "normal" subscription with a big buffer allowance. + // The event log should always be able to keep up. + const subscriberID = "event-log-subscriber" + ctx := context.Background() + sub, err := n.eventBus.Subscribe(ctx, subscriberID, query.All, 1<<16) // essentially "no limit" + if err != nil { + return nil, fmt.Errorf("event log subscribe: %w", err) + } + go func() { + // N.B. Use background for unsubscribe, ctx is already terminated. + defer n.eventBus.UnsubscribeAll(ctx, subscriberID) // nolint:errcheck + for { + msg := <-sub.Out() + if err != nil { + n.Logger.Error("Subscription terminated", "err", err) + return + } + etype, ok := eventlog.FindType(query.ExpandEvents(msg.Events())) + if ok { + _ = lg.Add(etype, msg.Data()) + } + } + }() + + n.Logger.Info("Event log subscription enabled") + } + // We may expose the RPC over both TCP and a Unix-domain socket. listeners := make([]net.Listener, len(listenAddrs)) for i, listenAddr := range listenAddrs { diff --git a/rpc/core/env.go b/rpc/core/env.go index e92319937..5a80f8fd9 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -8,6 +8,7 @@ import ( cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/internal/eventlog" tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" mempl "github.com/tendermint/tendermint/mempool" @@ -92,6 +93,7 @@ type Environment struct { BlockIndexer indexer.BlockIndexer ConsensusReactor *consensus.Reactor EventBus *types.EventBus // thread safe + EventLog *eventlog.Log Mempool mempl.Mempool Logger log.Logger diff --git a/rpc/core/events.go b/rpc/core/events.go index e8d977363..582a5d1af 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -2,10 +2,13 @@ package core import ( "context" + "encoding/json" "errors" "fmt" "time" + "github.com/tendermint/tendermint/internal/eventlog" + "github.com/tendermint/tendermint/internal/eventlog/cursor" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -128,3 +131,135 @@ func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { } return &ctypes.ResultUnsubscribe{}, nil } + +// Events applies a query to the event log. If an event log is not enabled, +// Events reports an error. Otherwise, it filters the current contents of the +// log to return matching events. +// +// Events returns up to maxItems of the newest eligible event items. An item is +// eligible if it is older than before (or before is zero), it is newer than +// after (or after is zero), and its data matches the filter. A nil filter +// matches all event data. +// +// If before is zero and no eligible event items are available, Events waits +// for up to waitTime for a matching item to become available. The wait is +// terminated early if ctx ends. +// +// If maxItems ≤ 0, a default positive number of events is chosen. The values +// of maxItems and waitTime may be capped to sensible internal maxima without +// reporting an error to the caller. +func Events(ctx context.Context, + filter *ctypes.EventFilter, + maxItems int, + before, after cursor.Cursor, + waitTime time.Duration, +) (*ctypes.ResultEvents, error) { + if env.EventLog == nil { + return nil, errors.New("the event log is not enabled") + } + + // Parse and validate parameters. + if maxItems <= 0 { + maxItems = 10 + } else if maxItems > 100 { + maxItems = 100 + } + + const maxWaitTime = 30 * time.Second + if waitTime > maxWaitTime { + waitTime = maxWaitTime + } + + query := tmquery.All + if filter != nil && filter.Query != "" { + q, err := tmquery.New(filter.Query) + if err != nil { + return nil, fmt.Errorf("invalid filter query: %w", err) + } + query = q + } + + var info eventlog.Info + var items []*eventlog.Item + var err error + accept := func(itm *eventlog.Item) error { + // N.B. We accept up to one item more than requested, so we can tell how + // to set the "more" flag in the response. + if len(items) > maxItems { + return eventlog.ErrStopScan + } + match, err := query.MatchesEvents(itm.Events) + if err != nil { + return fmt.Errorf("matches failed: %v", err) + } + if cursorInRange(itm.Cursor, before, after) && match { + items = append(items, itm) + } + return nil + } + + if waitTime > 0 && before.IsZero() { + ctx, cancel := context.WithTimeout(ctx, waitTime) + defer cancel() + + // Long poll. The loop here is because new items may not match the query, + // and we want to keep waiting until we have relevant results (or time out). + cur := after + for len(items) == 0 { + info, err = env.EventLog.WaitScan(ctx, cur, accept) + if err != nil { + // Don't report a timeout as a request failure. + if errors.Is(err, context.DeadlineExceeded) { + err = nil + } + break + } + cur = info.Newest + } + } else { + // Quick poll, return only what is already available. + info, err = env.EventLog.Scan(accept) + } + if err != nil { + return nil, err + } + + more := len(items) > maxItems + if more { + items = items[:len(items)-1] + } + enc, err := marshalItems(items) + if err != nil { + return nil, err + } + return &ctypes.ResultEvents{ + Items: enc, + More: more, + Oldest: cursorString(info.Oldest), + Newest: cursorString(info.Newest), + }, nil +} + +func cursorString(c cursor.Cursor) string { + if c.IsZero() { + return "" + } + return c.String() +} + +func cursorInRange(c, before, after cursor.Cursor) bool { + return (before.IsZero() || c.Before(before)) && (after.IsZero() || after.Before(c)) +} + +func marshalItems(items []*eventlog.Item) ([]*ctypes.EventItem, error) { + out := make([]*ctypes.EventItem, len(items)) + for i, itm := range items { + v, err := json.Marshal(itm.Data) + if err != nil { + return nil, fmt.Errorf("encoding event data: %w", err) + } + out[i] = &ctypes.EventItem{Cursor: itm.Cursor.String(), Event: itm.Type} + out[i].Data = v + } + return out, nil +} diff --git a/rpc/core/routes.go b/rpc/core/routes.go index fe2d17e8b..94f5ebab4 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -8,7 +8,9 @@ import ( // Routes is a map of available routes. var Routes = map[string]*rpc.RPCFunc{ - // subscribe/unsubscribe are reserved for websocket events. + // Event subscription. Note that subscribe, unsubscribe, and + // unsubscribe_all are only available via the websocket endpoint. + "events": rpc.NewRPCFunc(Events, "filter,maxItems,before,after,waitTime"), "subscribe": rpc.NewWSRPCFunc(Subscribe, "query"), "unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"), "unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""), diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index 6da818890..02fc1a70a 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -254,3 +254,64 @@ type ResultEvent struct { Data types.TMEventData `json:"data"` Events map[string][]string `json:"events"` } + +// RequestEvents is the argument for the "/events" RPC endpoint. +type RequestEvents struct { + // Optional filter spec. If nil or empty, all items are eligible. + Filter *EventFilter `json:"filter"` + + // The maximum number of eligible items to return. + // If zero or negative, the server will report a default number. + MaxItems int `json:"max_items"` + + // Return only items after this cursor. If empty, the limit is just + // before the the beginning of the event log. + After string `json:"after"` + + // Return only items before this cursor. If empty, the limit is just + // after the head of the event log. + Before string `json:"before"` + + // Wait for up to this long for events to be available. + WaitTime time.Duration `json:"wait_time"` +} + +// An EventFilter specifies which events are selected by an /events request. +type EventFilter struct { + Query string `json:"query"` +} + +// ResultEvents is the response from the "/events" RPC endpoint. +type ResultEvents struct { + // The items matching the request parameters, from newest + // to oldest, if any were available within the timeout. + Items []*EventItem `json:"items"` + + // This is true if there is at least one older matching item + // available in the log that was not returned. + More bool `json:"more"` + + // The cursor of the oldest item in the log at the time of this reply, + // or "" if the log is empty. + Oldest string `json:"oldest"` + + // The cursor of the newest item in the log at the time of this reply, + // or "" if the log is empty. + Newest string `json:"newest"` +} + +type EventItem struct { + // The cursor of this item. + Cursor string `json:"cursor"` + + // The event label of this item (for example, "Vote"). + Event string `json:"event,omitempty"` + + // The encoded event data for this item. The content is a JSON object with + // the following structure: + // + // + // + // The known type tags are defined by the tendermint/types package. + Data json.RawMessage `json:"data"` +} diff --git a/types/events.go b/types/events.go index ae6c8637b..de32f5388 100644 --- a/types/events.go +++ b/types/events.go @@ -2,6 +2,7 @@ package types import ( "fmt" + "strings" abci "github.com/tendermint/tendermint/abci/types" tmjson "github.com/tendermint/tendermint/libs/json" @@ -68,6 +69,11 @@ type EventDataNewBlock struct { ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"` } +// ABCIEvents implements the eventlog.ABCIEventer interface. +func (e EventDataNewBlock) ABCIEvents() []abci.Event { + return append(e.ResultBeginBlock.Events, e.ResultEndBlock.Events...) +} + type EventDataNewBlockHeader struct { Header Header `json:"header"` @@ -76,6 +82,11 @@ type EventDataNewBlockHeader struct { ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"` } +// ABCIEvents implements the eventlog.ABCIEventer interface. +func (e EventDataNewBlockHeader) ABCIEvents() []abci.Event { + return append(e.ResultBeginBlock.Events, e.ResultEndBlock.Events...) +} + type EventDataNewEvidence struct { Evidence Evidence `json:"evidence"` @@ -87,6 +98,15 @@ type EventDataTx struct { abci.TxResult } +// ABCIEvents implements the eventlog.ABCIEventer interface. +func (e EventDataTx) ABCIEvents() []abci.Event { + base := []abci.Event{ + eventWithAttr(TxHashKey, fmt.Sprintf("%X", Tx(e.Tx).Hash())), + eventWithAttr(TxHeightKey, fmt.Sprintf("%d", e.Height)), + } + return append(base, e.Result.Events...) +} + // NOTE: This goes into the replay WAL type EventDataRoundState struct { Height int64 `json:"height"` @@ -181,3 +201,16 @@ type BlockEventPublisher interface { type TxEventPublisher interface { PublishEventTx(EventDataTx) error } + +// eventWithAttr constructs a single abci.Event with a single attribute. +// The type of the event and the name of the attribute are obtained by +// splitting the event type on period (e.g., "foo.bar"). +func eventWithAttr(etype, value string) abci.Event { + parts := strings.SplitN(etype, ".", 2) + return abci.Event{ + Type: parts[0], + Attributes: []abci.EventAttribute{{ + Key: parts[1], Value: value, + }}, + } +} diff --git a/types/events_test.go b/types/events_test.go index e4479d3ab..96c3500d6 100644 --- a/types/events_test.go +++ b/types/events_test.go @@ -7,6 +7,22 @@ import ( "github.com/stretchr/testify/assert" ) +// Verify that the event data types satisfy their shared interface. +// TODO: add EventDataBlockSyncStatus and EventDataStateSyncStatus +// when backport #6700 and #6755. +var ( + _ TMEventData = EventDataCompleteProposal{} + _ TMEventData = EventDataNewBlock{} + _ TMEventData = EventDataNewBlockHeader{} + _ TMEventData = EventDataNewEvidence{} + _ TMEventData = EventDataNewRound{} + _ TMEventData = EventDataRoundState{} + _ TMEventData = EventDataTx{} + _ TMEventData = EventDataValidatorSetUpdates{} + _ TMEventData = EventDataVote{} + _ TMEventData = EventDataString("") +) + func TestQueryTxFor(t *testing.T) { tx := Tx("foo") assert.Equal(t,