mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-06 03:50:46 +00:00
* rpc: implement the ADR 075 /events method (#7965) This method implements the eventlog extension interface to expose ABCI metadata to the log for query processing. Only the types that have ABCI events need to implement this. - Add an event log to the environment - Add a sketch of the handler method - Add an /events RPCFunc to the route map - Implement query logic - Subscribe to pubsub if confingured, handle termination * add MatchesEvents test * add TODO due to backport sequence Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
This commit is contained in:
@@ -95,6 +95,13 @@ func (q *Query) Matches(events map[string][]string) (bool, error) {
|
||||
return q.matchesEvents(ExpandEvents(events)), nil
|
||||
}
|
||||
|
||||
func (q *Query) MatchesEvents(events []types.Event) (bool, error) {
|
||||
if q == nil {
|
||||
return true, nil
|
||||
}
|
||||
return q.matchesEvents(events), nil
|
||||
}
|
||||
|
||||
// String matches part of the pubsub.Query interface.
|
||||
func (q *Query) String() string {
|
||||
if q == nil {
|
||||
|
||||
@@ -332,6 +332,16 @@ func TestCompiledMatches(t *testing.T) {
|
||||
t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v",
|
||||
tc.s, tc.events, got, tc.matches)
|
||||
}
|
||||
|
||||
got, err = c.MatchesEvents(query.ExpandEvents(tc.events))
|
||||
if err != nil {
|
||||
t.Errorf("Query: %#q\nInput: %+v\nMatches: got error %v",
|
||||
tc.s, tc.events, err)
|
||||
}
|
||||
if got != tc.matches {
|
||||
t.Errorf("Query: %#q\nInput: %+v\nMatches: got %v, want %v",
|
||||
tc.s, tc.events, got, tc.matches)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto/merkle"
|
||||
"github.com/tendermint/tendermint/internal/eventlog/cursor"
|
||||
tmbytes "github.com/tendermint/tendermint/libs/bytes"
|
||||
tmmath "github.com/tendermint/tendermint/libs/math"
|
||||
service "github.com/tendermint/tendermint/libs/service"
|
||||
@@ -599,6 +600,17 @@ func (c *Client) RegisterOpDecoder(typ string, dec merkle.OpDecoder) {
|
||||
c.prt.RegisterOpDecoder(typ, dec)
|
||||
}
|
||||
|
||||
// TODO(creachadair): Remove this once the RPC clients support the new method.
|
||||
// This is just a placeholder to let things build during development.
|
||||
func (c *Client) Events(ctx *rpctypes.Context,
|
||||
filter *ctypes.EventFilter,
|
||||
maxItems int,
|
||||
before, after cursor.Cursor,
|
||||
waitTime time.Duration,
|
||||
) (*ctypes.ResultEvents, error) {
|
||||
return nil, errors.New("the /events method is not implemented")
|
||||
}
|
||||
|
||||
// SubscribeWS subscribes for events using the given query and remote address as
|
||||
// a subscriber, but does not verify responses (UNSAFE)!
|
||||
// TODO: verify data
|
||||
|
||||
36
node/node.go
36
node/node.go
@@ -21,10 +21,12 @@ import (
|
||||
cs "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/evidence"
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/light"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
@@ -210,6 +212,7 @@ type Node struct {
|
||||
|
||||
// services
|
||||
eventBus *types.EventBus // pub/sub for services
|
||||
eventLog *eventlog.Log
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore // store the blockchain to disk
|
||||
bcReactor p2p.Reactor // for block-syncing
|
||||
@@ -1080,6 +1083,7 @@ func (n *Node) ConfigureRPC() error {
|
||||
BlockIndexer: n.blockIndexer,
|
||||
ConsensusReactor: n.consensusReactor,
|
||||
EventBus: n.eventBus,
|
||||
EventLog: n.eventLog,
|
||||
Mempool: n.mempool,
|
||||
|
||||
Logger: n.Logger.With("module", "rpc"),
|
||||
@@ -1116,6 +1120,38 @@ func (n *Node) startRPC() ([]net.Listener, error) {
|
||||
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
}
|
||||
|
||||
// If the event log is enabled, subscribe to all events published to the
|
||||
// event bus, and forward them to the event log.
|
||||
if lg := n.eventLog; lg != nil {
|
||||
// TODO(creachadair): This is kind of a hack, ideally we'd share the
|
||||
// observer with the indexer, but it's tricky to plumb them together.
|
||||
// For now, use a "normal" subscription with a big buffer allowance.
|
||||
// The event log should always be able to keep up.
|
||||
const subscriberID = "event-log-subscriber"
|
||||
ctx := context.Background()
|
||||
sub, err := n.eventBus.Subscribe(ctx, subscriberID, query.All, 1<<16) // essentially "no limit"
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("event log subscribe: %w", err)
|
||||
}
|
||||
go func() {
|
||||
// N.B. Use background for unsubscribe, ctx is already terminated.
|
||||
defer n.eventBus.UnsubscribeAll(ctx, subscriberID) // nolint:errcheck
|
||||
for {
|
||||
msg := <-sub.Out()
|
||||
if err != nil {
|
||||
n.Logger.Error("Subscription terminated", "err", err)
|
||||
return
|
||||
}
|
||||
etype, ok := eventlog.FindType(query.ExpandEvents(msg.Events()))
|
||||
if ok {
|
||||
_ = lg.Add(etype, msg.Data())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
n.Logger.Info("Event log subscription enabled")
|
||||
}
|
||||
|
||||
// We may expose the RPC over both TCP and a Unix-domain socket.
|
||||
listeners := make([]net.Listener, len(listenAddrs))
|
||||
for i, listenAddr := range listenAddrs {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
@@ -92,6 +93,7 @@ type Environment struct {
|
||||
BlockIndexer indexer.BlockIndexer
|
||||
ConsensusReactor *consensus.Reactor
|
||||
EventBus *types.EventBus // thread safe
|
||||
EventLog *eventlog.Log
|
||||
Mempool mempl.Mempool
|
||||
|
||||
Logger log.Logger
|
||||
|
||||
@@ -2,10 +2,13 @@ package core
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
"github.com/tendermint/tendermint/internal/eventlog/cursor"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
@@ -128,3 +131,135 @@ func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
|
||||
}
|
||||
return &ctypes.ResultUnsubscribe{}, nil
|
||||
}
|
||||
|
||||
// Events applies a query to the event log. If an event log is not enabled,
|
||||
// Events reports an error. Otherwise, it filters the current contents of the
|
||||
// log to return matching events.
|
||||
//
|
||||
// Events returns up to maxItems of the newest eligible event items. An item is
|
||||
// eligible if it is older than before (or before is zero), it is newer than
|
||||
// after (or after is zero), and its data matches the filter. A nil filter
|
||||
// matches all event data.
|
||||
//
|
||||
// If before is zero and no eligible event items are available, Events waits
|
||||
// for up to waitTime for a matching item to become available. The wait is
|
||||
// terminated early if ctx ends.
|
||||
//
|
||||
// If maxItems ≤ 0, a default positive number of events is chosen. The values
|
||||
// of maxItems and waitTime may be capped to sensible internal maxima without
|
||||
// reporting an error to the caller.
|
||||
func Events(ctx context.Context,
|
||||
filter *ctypes.EventFilter,
|
||||
maxItems int,
|
||||
before, after cursor.Cursor,
|
||||
waitTime time.Duration,
|
||||
) (*ctypes.ResultEvents, error) {
|
||||
if env.EventLog == nil {
|
||||
return nil, errors.New("the event log is not enabled")
|
||||
}
|
||||
|
||||
// Parse and validate parameters.
|
||||
if maxItems <= 0 {
|
||||
maxItems = 10
|
||||
} else if maxItems > 100 {
|
||||
maxItems = 100
|
||||
}
|
||||
|
||||
const maxWaitTime = 30 * time.Second
|
||||
if waitTime > maxWaitTime {
|
||||
waitTime = maxWaitTime
|
||||
}
|
||||
|
||||
query := tmquery.All
|
||||
if filter != nil && filter.Query != "" {
|
||||
q, err := tmquery.New(filter.Query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid filter query: %w", err)
|
||||
}
|
||||
query = q
|
||||
}
|
||||
|
||||
var info eventlog.Info
|
||||
var items []*eventlog.Item
|
||||
var err error
|
||||
accept := func(itm *eventlog.Item) error {
|
||||
// N.B. We accept up to one item more than requested, so we can tell how
|
||||
// to set the "more" flag in the response.
|
||||
if len(items) > maxItems {
|
||||
return eventlog.ErrStopScan
|
||||
}
|
||||
match, err := query.MatchesEvents(itm.Events)
|
||||
if err != nil {
|
||||
return fmt.Errorf("matches failed: %v", err)
|
||||
}
|
||||
if cursorInRange(itm.Cursor, before, after) && match {
|
||||
items = append(items, itm)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
if waitTime > 0 && before.IsZero() {
|
||||
ctx, cancel := context.WithTimeout(ctx, waitTime)
|
||||
defer cancel()
|
||||
|
||||
// Long poll. The loop here is because new items may not match the query,
|
||||
// and we want to keep waiting until we have relevant results (or time out).
|
||||
cur := after
|
||||
for len(items) == 0 {
|
||||
info, err = env.EventLog.WaitScan(ctx, cur, accept)
|
||||
if err != nil {
|
||||
// Don't report a timeout as a request failure.
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
err = nil
|
||||
}
|
||||
break
|
||||
}
|
||||
cur = info.Newest
|
||||
}
|
||||
} else {
|
||||
// Quick poll, return only what is already available.
|
||||
info, err = env.EventLog.Scan(accept)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
more := len(items) > maxItems
|
||||
if more {
|
||||
items = items[:len(items)-1]
|
||||
}
|
||||
enc, err := marshalItems(items)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ctypes.ResultEvents{
|
||||
Items: enc,
|
||||
More: more,
|
||||
Oldest: cursorString(info.Oldest),
|
||||
Newest: cursorString(info.Newest),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func cursorString(c cursor.Cursor) string {
|
||||
if c.IsZero() {
|
||||
return ""
|
||||
}
|
||||
return c.String()
|
||||
}
|
||||
|
||||
func cursorInRange(c, before, after cursor.Cursor) bool {
|
||||
return (before.IsZero() || c.Before(before)) && (after.IsZero() || after.Before(c))
|
||||
}
|
||||
|
||||
func marshalItems(items []*eventlog.Item) ([]*ctypes.EventItem, error) {
|
||||
out := make([]*ctypes.EventItem, len(items))
|
||||
for i, itm := range items {
|
||||
v, err := json.Marshal(itm.Data)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encoding event data: %w", err)
|
||||
}
|
||||
out[i] = &ctypes.EventItem{Cursor: itm.Cursor.String(), Event: itm.Type}
|
||||
out[i].Data = v
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
@@ -8,7 +8,9 @@ import (
|
||||
|
||||
// Routes is a map of available routes.
|
||||
var Routes = map[string]*rpc.RPCFunc{
|
||||
// subscribe/unsubscribe are reserved for websocket events.
|
||||
// Event subscription. Note that subscribe, unsubscribe, and
|
||||
// unsubscribe_all are only available via the websocket endpoint.
|
||||
"events": rpc.NewRPCFunc(Events, "filter,maxItems,before,after,waitTime"),
|
||||
"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"),
|
||||
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"),
|
||||
"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""),
|
||||
|
||||
@@ -254,3 +254,64 @@ type ResultEvent struct {
|
||||
Data types.TMEventData `json:"data"`
|
||||
Events map[string][]string `json:"events"`
|
||||
}
|
||||
|
||||
// RequestEvents is the argument for the "/events" RPC endpoint.
|
||||
type RequestEvents struct {
|
||||
// Optional filter spec. If nil or empty, all items are eligible.
|
||||
Filter *EventFilter `json:"filter"`
|
||||
|
||||
// The maximum number of eligible items to return.
|
||||
// If zero or negative, the server will report a default number.
|
||||
MaxItems int `json:"max_items"`
|
||||
|
||||
// Return only items after this cursor. If empty, the limit is just
|
||||
// before the the beginning of the event log.
|
||||
After string `json:"after"`
|
||||
|
||||
// Return only items before this cursor. If empty, the limit is just
|
||||
// after the head of the event log.
|
||||
Before string `json:"before"`
|
||||
|
||||
// Wait for up to this long for events to be available.
|
||||
WaitTime time.Duration `json:"wait_time"`
|
||||
}
|
||||
|
||||
// An EventFilter specifies which events are selected by an /events request.
|
||||
type EventFilter struct {
|
||||
Query string `json:"query"`
|
||||
}
|
||||
|
||||
// ResultEvents is the response from the "/events" RPC endpoint.
|
||||
type ResultEvents struct {
|
||||
// The items matching the request parameters, from newest
|
||||
// to oldest, if any were available within the timeout.
|
||||
Items []*EventItem `json:"items"`
|
||||
|
||||
// This is true if there is at least one older matching item
|
||||
// available in the log that was not returned.
|
||||
More bool `json:"more"`
|
||||
|
||||
// The cursor of the oldest item in the log at the time of this reply,
|
||||
// or "" if the log is empty.
|
||||
Oldest string `json:"oldest"`
|
||||
|
||||
// The cursor of the newest item in the log at the time of this reply,
|
||||
// or "" if the log is empty.
|
||||
Newest string `json:"newest"`
|
||||
}
|
||||
|
||||
type EventItem struct {
|
||||
// The cursor of this item.
|
||||
Cursor string `json:"cursor"`
|
||||
|
||||
// The event label of this item (for example, "Vote").
|
||||
Event string `json:"event,omitempty"`
|
||||
|
||||
// The encoded event data for this item. The content is a JSON object with
|
||||
// the following structure:
|
||||
//
|
||||
// <json-encoded-value>
|
||||
//
|
||||
// The known type tags are defined by the tendermint/types package.
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
@@ -68,6 +69,11 @@ type EventDataNewBlock struct {
|
||||
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
|
||||
}
|
||||
|
||||
// ABCIEvents implements the eventlog.ABCIEventer interface.
|
||||
func (e EventDataNewBlock) ABCIEvents() []abci.Event {
|
||||
return append(e.ResultBeginBlock.Events, e.ResultEndBlock.Events...)
|
||||
}
|
||||
|
||||
type EventDataNewBlockHeader struct {
|
||||
Header Header `json:"header"`
|
||||
|
||||
@@ -76,6 +82,11 @@ type EventDataNewBlockHeader struct {
|
||||
ResultEndBlock abci.ResponseEndBlock `json:"result_end_block"`
|
||||
}
|
||||
|
||||
// ABCIEvents implements the eventlog.ABCIEventer interface.
|
||||
func (e EventDataNewBlockHeader) ABCIEvents() []abci.Event {
|
||||
return append(e.ResultBeginBlock.Events, e.ResultEndBlock.Events...)
|
||||
}
|
||||
|
||||
type EventDataNewEvidence struct {
|
||||
Evidence Evidence `json:"evidence"`
|
||||
|
||||
@@ -87,6 +98,15 @@ type EventDataTx struct {
|
||||
abci.TxResult
|
||||
}
|
||||
|
||||
// ABCIEvents implements the eventlog.ABCIEventer interface.
|
||||
func (e EventDataTx) ABCIEvents() []abci.Event {
|
||||
base := []abci.Event{
|
||||
eventWithAttr(TxHashKey, fmt.Sprintf("%X", Tx(e.Tx).Hash())),
|
||||
eventWithAttr(TxHeightKey, fmt.Sprintf("%d", e.Height)),
|
||||
}
|
||||
return append(base, e.Result.Events...)
|
||||
}
|
||||
|
||||
// NOTE: This goes into the replay WAL
|
||||
type EventDataRoundState struct {
|
||||
Height int64 `json:"height"`
|
||||
@@ -181,3 +201,16 @@ type BlockEventPublisher interface {
|
||||
type TxEventPublisher interface {
|
||||
PublishEventTx(EventDataTx) error
|
||||
}
|
||||
|
||||
// eventWithAttr constructs a single abci.Event with a single attribute.
|
||||
// The type of the event and the name of the attribute are obtained by
|
||||
// splitting the event type on period (e.g., "foo.bar").
|
||||
func eventWithAttr(etype, value string) abci.Event {
|
||||
parts := strings.SplitN(etype, ".", 2)
|
||||
return abci.Event{
|
||||
Type: parts[0],
|
||||
Attributes: []abci.EventAttribute{{
|
||||
Key: parts[1], Value: value,
|
||||
}},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,22 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
// Verify that the event data types satisfy their shared interface.
|
||||
// TODO: add EventDataBlockSyncStatus and EventDataStateSyncStatus
|
||||
// when backport #6700 and #6755.
|
||||
var (
|
||||
_ TMEventData = EventDataCompleteProposal{}
|
||||
_ TMEventData = EventDataNewBlock{}
|
||||
_ TMEventData = EventDataNewBlockHeader{}
|
||||
_ TMEventData = EventDataNewEvidence{}
|
||||
_ TMEventData = EventDataNewRound{}
|
||||
_ TMEventData = EventDataRoundState{}
|
||||
_ TMEventData = EventDataTx{}
|
||||
_ TMEventData = EventDataValidatorSetUpdates{}
|
||||
_ TMEventData = EventDataVote{}
|
||||
_ TMEventData = EventDataString("")
|
||||
)
|
||||
|
||||
func TestQueryTxFor(t *testing.T) {
|
||||
tx := Tx("foo")
|
||||
assert.Equal(t,
|
||||
|
||||
Reference in New Issue
Block a user