backport: add Events method to the client interface (#7982) (#9519)

* rpc/client: add Events method to the client interface (#7982)

- Update documentation to deprecate the old methods.
- Add Events methods to HTTP, WS, and Local clients.
- Add Events method to the light client wrapper.
- Rename legacy events client to SubscriptionClient.

* Apply suggestions from code review

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
This commit is contained in:
mmsqe
2022-10-14 00:34:10 +08:00
committed by GitHub
parent c3cc94a0e0
commit 96dd4d08c3
10 changed files with 116 additions and 29 deletions

View File

@@ -38,6 +38,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
- CLI/RPC/Config
- [config] \#9491 Add new event subscription options and defaults. (@creachadair)
- [config] \#9259 Rename the fastsync section and the fast_sync key blocksync and block_sync respectively
- [rpc] \#7982 Add new Events interface and deprecate Subscribe. (@creachadair)
- Apps
- [abci/counter] \#6684 Delete counter example app

View File

@@ -1,6 +1,8 @@
package proxy
import (
"time"
"github.com/tendermint/tendermint/libs/bytes"
lrpc "github.com/tendermint/tendermint/light/rpc"
rpcclient "github.com/tendermint/tendermint/rpc/client"
@@ -12,7 +14,9 @@ import (
func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc {
return map[string]*rpcserver.RPCFunc{
// Subscribe/unsubscribe are reserved for websocket events.
// Event subscription. Note that subscribe, unsubscribe, and
// unsubscribe_all are only available via the websocket endpoint.
"events": rpcserver.NewRPCFunc(makeEventsSearchFunc(c), "filter,maxItems,before,after,waitTime"),
"subscribe": rpcserver.NewWSRPCFunc(c.SubscribeWS, "query"),
"unsubscribe": rpcserver.NewWSRPCFunc(c.UnsubscribeWS, "query"),
"unsubscribe_all": rpcserver.NewWSRPCFunc(c.UnsubscribeAllWS, ""),
@@ -300,3 +304,31 @@ func makeBroadcastEvidenceFunc(c *lrpc.Client) rpcBroadcastEvidenceFunc {
return c.BroadcastEvidence(ctx.Context(), ev)
}
}
type rpcEventsSearchFunc func(
ctx *rpctypes.Context,
filter string,
maxItems int,
before, after string,
waitTime time.Duration,
) (*ctypes.ResultEvents, error)
func makeEventsSearchFunc(c *lrpc.Client) rpcEventsSearchFunc {
return func(
ctx *rpctypes.Context,
filter string,
maxItems int,
before, after string,
waitTime time.Duration,
) (*ctypes.ResultEvents, error) {
return c.Events(ctx.Context(), &ctypes.RequestEvents{
Filter: &ctypes.EventFilter{
Query: filter,
},
MaxItems: maxItems,
WaitTime: waitTime,
Before: before,
After: after,
})
}
}

View File

@@ -12,7 +12,6 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/internal/eventlog/cursor"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
tmmath "github.com/tendermint/tendermint/libs/math"
service "github.com/tendermint/tendermint/libs/service"
@@ -573,11 +572,11 @@ func (c *Client) Subscribe(ctx context.Context, subscriber, query string,
}
func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) error {
return c.next.Unsubscribe(ctx, subscriber, query)
return c.next.Unsubscribe(ctx, subscriber, query) //nolint:staticcheck
}
func (c *Client) UnsubscribeAll(ctx context.Context, subscriber string) error {
return c.next.UnsubscribeAll(ctx, subscriber)
return c.next.UnsubscribeAll(ctx, subscriber) //nolint:staticcheck
}
func (c *Client) updateLightClientIfNeededTo(ctx context.Context, height *int64) (*types.LightBlock, error) {
@@ -600,15 +599,8 @@ func (c *Client) RegisterOpDecoder(typ string, dec merkle.OpDecoder) {
c.prt.RegisterOpDecoder(typ, dec)
}
// TODO(creachadair): Remove this once the RPC clients support the new method.
// This is just a placeholder to let things build during development.
func (c *Client) Events(ctx *rpctypes.Context,
filter *ctypes.EventFilter,
maxItems int,
before, after cursor.Cursor,
waitTime time.Duration,
) (*ctypes.ResultEvents, error) {
return nil, errors.New("the /events method is not implemented")
func (c *Client) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) {
return c.next.Events(ctx, req)
}
// SubscribeWS subscribes for events using the given query and remote address as
@@ -643,7 +635,7 @@ func (c *Client) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Resul
// UnsubscribeWS calls original client's Unsubscribe using remote address as a
// subscriber.
func (c *Client) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
err := c.next.Unsubscribe(context.Background(), ctx.RemoteAddr(), query)
err := c.next.Unsubscribe(context.Background(), ctx.RemoteAddr(), query) //nolint:staticcheck
if err != nil {
return nil, err
}
@@ -653,7 +645,7 @@ func (c *Client) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Res
// UnsubscribeAllWS calls original client's UnsubscribeAll using remote address
// as a subscriber.
func (c *Client) UnsubscribeAllWS(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
err := c.next.UnsubscribeAll(context.Background(), ctx.RemoteAddr())
err := c.next.UnsubscribeAll(context.Background(), ctx.RemoteAddr()) //nolint:staticcheck
if err != nil {
return nil, err
}

View File

@@ -57,7 +57,7 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error {
// when the timeout duration has expired.
//
// This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) {
func WaitForOneEvent(c SubscriptionClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) {
const subscriber = "helpers"
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

View File

@@ -363,6 +363,20 @@ func (c *baseRPCClient) ConsensusParams(
return result, nil
}
func (c *baseRPCClient) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) {
result := new(ctypes.ResultEvents)
if _, err := c.caller.Call(ctx, "events", map[string]interface{}{
"filter": req.Filter.Query,
"maxItems": req.MaxItems,
"before": req.Before,
"after": req.After,
"waitTime": req.WaitTime,
}, result); err != nil {
return nil, err
}
return result, nil
}
func (c *baseRPCClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) {
result := new(ctypes.ResultHealth)
_, err := c.caller.Call(ctx, "health", map[string]interface{}{}, result)
@@ -597,7 +611,7 @@ func (c *baseRPCClient) BroadcastEvidence(
var errNotRunning = errors.New("client is not running. Use .Start() method to start")
// WSEvents is a wrapper around WSClient, which implements EventsClient.
// WSEvents is a wrapper around WSClient, which implements SubscriptionClient.
type WSEvents struct {
service.BaseService
remote string
@@ -608,6 +622,8 @@ type WSEvents struct {
subscriptions map[string]chan ctypes.ResultEvent // query -> chan
}
var _ rpcclient.SubscriptionClient = (*WSEvents)(nil)
func newWSEvents(remote, endpoint string) (*WSEvents, error) {
w := &WSEvents{
endpoint: endpoint,
@@ -647,7 +663,7 @@ func (w *WSEvents) OnStop() {
}
}
// Subscribe implements EventsClient by using WSClient to subscribe given
// Subscribe implements SubscriptionClient by using WSClient to subscribe given
// subscriber to query. By default, returns a channel with cap=1. Error is
// returned if it fails to subscribe.
//
@@ -680,7 +696,7 @@ func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
return outc, nil
}
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given
// Unsubscribe implements SubscriptionClient by using WSClient to unsubscribe given
// subscriber from query.
//
// It returns an error if WSEvents is not running.
@@ -703,7 +719,7 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
return nil
}
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
// UnsubscribeAll implements SubscriptionClient by using WSClient to unsubscribe
// given subscriber from all the queries.
//
// It returns an error if WSEvents is not running.

View File

@@ -35,12 +35,13 @@ type Client interface {
service.Service
ABCIClient
EventsClient
EvidenceClient
HistoryClient
MempoolClient
NetworkClient
SignClient
StatusClient
EvidenceClient
MempoolClient
SubscriptionClient
}
// ABCIClient groups together the functionality that principally affects the
@@ -115,20 +116,40 @@ type NetworkClient interface {
Health(context.Context) (*ctypes.ResultHealth, error)
}
// EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go
// EventsClient exposes the methods to retrieve events from the consensus engine.
type EventsClient interface {
// Subscribe subscribes given subscriber to query. Returns a channel with
// cap=1 onto which events are published. An error is returned if it fails to
// subscribe. outCapacity can be used optionally to set capacity for the
// channel. Channel is never closed to prevent accidental reads.
// Events fetches a batch of events from the server matching the given query
// and time range.
Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error)
}
// TODO(creachadair): This interface should be removed once the streaming event
// interface is removed in Tendermint v0.39.
type SubscriptionClient interface {
// Subscribe issues a subscription request for the given subscriber ID and
// query. This method does not block: If subscription fails, it reports an
// error, and if subscription succeeds it returns a channel that delivers
// matching events until the subscription is stopped. The channel is never
// closed; the client is responsible for knowing when no further data will
// be sent.
//
// The context only governs the initial subscription, it does not control
// the lifetime of the channel. To cancel a subscription call Unsubscribe or
// UnsubscribeAll.
//
// ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe
// or UnsubscribeAll.
Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
// Unsubscribe unsubscribes given subscriber from query.
//
// Deprecated: This method will be removed in Tendermint v0.37, use Events
// instead.
Unsubscribe(ctx context.Context, subscriber, query string) error
// UnsubscribeAll unsubscribes given subscriber from all the queries.
//
// Deprecated: This method will be removed in Tendermint v0.37, use Events
// instead.
UnsubscribeAll(ctx context.Context, subscriber string) error
}

View File

@@ -127,6 +127,10 @@ func (c *Local) ConsensusParams(ctx context.Context, height *int64) (*ctypes.Res
return core.ConsensusParams(c.ctx, height)
}
func (c *Local) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) {
return core.Events(c.ctx, req.Filter.Query, req.MaxItems, req.Before, req.After, req.WaitTime)
}
func (c *Local) Health(ctx context.Context) (*ctypes.ResultHealth, error) {
return core.Health(c.ctx)
}

View File

@@ -41,6 +41,7 @@ type Client struct {
client.EvidenceClient
client.MempoolClient
service.Service
client.SubscriptionClient
}
var _ client.Client = Client{}

View File

@@ -148,7 +148,25 @@ func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
// If maxItems ≤ 0, a default positive number of events is chosen. The values
// of maxItems and waitTime may be capped to sensible internal maxima without
// reporting an error to the caller.
func Events(ctx context.Context,
func Events(ctx *rpctypes.Context,
filter string,
maxItems int,
before, after string,
waitTime time.Duration,
) (*ctypes.ResultEvents, error) {
var curBefore, curAfter cursor.Cursor
if err := curBefore.UnmarshalText([]byte(before)); err != nil {
return nil, err
}
if err := curAfter.UnmarshalText([]byte(after)); err != nil {
return nil, err
}
return EventsWithContext(ctx.Context(), &ctypes.EventFilter{
Query: filter,
}, maxItems, curBefore, curAfter, waitTime)
}
func EventsWithContext(ctx context.Context,
filter *ctypes.EventFilter,
maxItems int,
before, after cursor.Cursor,
@@ -254,6 +272,7 @@ func cursorInRange(c, before, after cursor.Cursor) bool {
func marshalItems(items []*eventlog.Item) ([]*ctypes.EventItem, error) {
out := make([]*ctypes.EventItem, len(items))
for i, itm := range items {
// FIXME: align usage after remove type-tag
v, err := json.Marshal(itm.Data)
if err != nil {
return nil, fmt.Errorf("encoding event data: %w", err)

View File

@@ -97,6 +97,7 @@ func createConfig() *cfg.Config {
tm, rpc, grpc := makeAddrs()
c.P2P.ListenAddress = tm
c.RPC.ListenAddress = rpc
c.RPC.EventLogWindowSize = 5 * time.Minute
c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"}
c.RPC.GRPCListenAddress = grpc
return c