diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 43949e484..85c3f4d5a 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -38,6 +38,7 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - CLI/RPC/Config - [config] \#9491 Add new event subscription options and defaults. (@creachadair) - [config] \#9259 Rename the fastsync section and the fast_sync key blocksync and block_sync respectively + - [rpc] \#7982 Add new Events interface and deprecate Subscribe. (@creachadair) - Apps - [abci/counter] \#6684 Delete counter example app diff --git a/light/proxy/routes.go b/light/proxy/routes.go index c97a91dfd..22f248c96 100644 --- a/light/proxy/routes.go +++ b/light/proxy/routes.go @@ -1,6 +1,8 @@ package proxy import ( + "time" + "github.com/tendermint/tendermint/libs/bytes" lrpc "github.com/tendermint/tendermint/light/rpc" rpcclient "github.com/tendermint/tendermint/rpc/client" @@ -12,7 +14,9 @@ import ( func RPCRoutes(c *lrpc.Client) map[string]*rpcserver.RPCFunc { return map[string]*rpcserver.RPCFunc{ - // Subscribe/unsubscribe are reserved for websocket events. + // Event subscription. Note that subscribe, unsubscribe, and + // unsubscribe_all are only available via the websocket endpoint. + "events": rpcserver.NewRPCFunc(makeEventsSearchFunc(c), "filter,maxItems,before,after,waitTime"), "subscribe": rpcserver.NewWSRPCFunc(c.SubscribeWS, "query"), "unsubscribe": rpcserver.NewWSRPCFunc(c.UnsubscribeWS, "query"), "unsubscribe_all": rpcserver.NewWSRPCFunc(c.UnsubscribeAllWS, ""), @@ -300,3 +304,31 @@ func makeBroadcastEvidenceFunc(c *lrpc.Client) rpcBroadcastEvidenceFunc { return c.BroadcastEvidence(ctx.Context(), ev) } } + +type rpcEventsSearchFunc func( + ctx *rpctypes.Context, + filter string, + maxItems int, + before, after string, + waitTime time.Duration, +) (*ctypes.ResultEvents, error) + +func makeEventsSearchFunc(c *lrpc.Client) rpcEventsSearchFunc { + return func( + ctx *rpctypes.Context, + filter string, + maxItems int, + before, after string, + waitTime time.Duration, + ) (*ctypes.ResultEvents, error) { + return c.Events(ctx.Context(), &ctypes.RequestEvents{ + Filter: &ctypes.EventFilter{ + Query: filter, + }, + MaxItems: maxItems, + WaitTime: waitTime, + Before: before, + After: after, + }) + } +} diff --git a/light/rpc/client.go b/light/rpc/client.go index 234b89ca5..40aa14b46 100644 --- a/light/rpc/client.go +++ b/light/rpc/client.go @@ -12,7 +12,6 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/merkle" - "github.com/tendermint/tendermint/internal/eventlog/cursor" tmbytes "github.com/tendermint/tendermint/libs/bytes" tmmath "github.com/tendermint/tendermint/libs/math" service "github.com/tendermint/tendermint/libs/service" @@ -573,11 +572,11 @@ func (c *Client) Subscribe(ctx context.Context, subscriber, query string, } func (c *Client) Unsubscribe(ctx context.Context, subscriber, query string) error { - return c.next.Unsubscribe(ctx, subscriber, query) + return c.next.Unsubscribe(ctx, subscriber, query) //nolint:staticcheck } func (c *Client) UnsubscribeAll(ctx context.Context, subscriber string) error { - return c.next.UnsubscribeAll(ctx, subscriber) + return c.next.UnsubscribeAll(ctx, subscriber) //nolint:staticcheck } func (c *Client) updateLightClientIfNeededTo(ctx context.Context, height *int64) (*types.LightBlock, error) { @@ -600,15 +599,8 @@ func (c *Client) RegisterOpDecoder(typ string, dec merkle.OpDecoder) { c.prt.RegisterOpDecoder(typ, dec) } -// TODO(creachadair): Remove this once the RPC clients support the new method. -// This is just a placeholder to let things build during development. -func (c *Client) Events(ctx *rpctypes.Context, - filter *ctypes.EventFilter, - maxItems int, - before, after cursor.Cursor, - waitTime time.Duration, -) (*ctypes.ResultEvents, error) { - return nil, errors.New("the /events method is not implemented") +func (c *Client) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) { + return c.next.Events(ctx, req) } // SubscribeWS subscribes for events using the given query and remote address as @@ -643,7 +635,7 @@ func (c *Client) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Resul // UnsubscribeWS calls original client's Unsubscribe using remote address as a // subscriber. func (c *Client) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { - err := c.next.Unsubscribe(context.Background(), ctx.RemoteAddr(), query) + err := c.next.Unsubscribe(context.Background(), ctx.RemoteAddr(), query) //nolint:staticcheck if err != nil { return nil, err } @@ -653,7 +645,7 @@ func (c *Client) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Res // UnsubscribeAllWS calls original client's UnsubscribeAll using remote address // as a subscriber. func (c *Client) UnsubscribeAllWS(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { - err := c.next.UnsubscribeAll(context.Background(), ctx.RemoteAddr()) + err := c.next.UnsubscribeAll(context.Background(), ctx.RemoteAddr()) //nolint:staticcheck if err != nil { return nil, err } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 8014d875a..9c4d3ad15 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -57,7 +57,7 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error { // when the timeout duration has expired. // // This handles subscribing and unsubscribing under the hood -func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) { +func WaitForOneEvent(c SubscriptionClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) { const subscriber = "helpers" ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index 8af6b3ee7..5afc85b0e 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -363,6 +363,20 @@ func (c *baseRPCClient) ConsensusParams( return result, nil } +func (c *baseRPCClient) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) { + result := new(ctypes.ResultEvents) + if _, err := c.caller.Call(ctx, "events", map[string]interface{}{ + "filter": req.Filter.Query, + "maxItems": req.MaxItems, + "before": req.Before, + "after": req.After, + "waitTime": req.WaitTime, + }, result); err != nil { + return nil, err + } + return result, nil +} + func (c *baseRPCClient) Health(ctx context.Context) (*ctypes.ResultHealth, error) { result := new(ctypes.ResultHealth) _, err := c.caller.Call(ctx, "health", map[string]interface{}{}, result) @@ -597,7 +611,7 @@ func (c *baseRPCClient) BroadcastEvidence( var errNotRunning = errors.New("client is not running. Use .Start() method to start") -// WSEvents is a wrapper around WSClient, which implements EventsClient. +// WSEvents is a wrapper around WSClient, which implements SubscriptionClient. type WSEvents struct { service.BaseService remote string @@ -608,6 +622,8 @@ type WSEvents struct { subscriptions map[string]chan ctypes.ResultEvent // query -> chan } +var _ rpcclient.SubscriptionClient = (*WSEvents)(nil) + func newWSEvents(remote, endpoint string) (*WSEvents, error) { w := &WSEvents{ endpoint: endpoint, @@ -647,7 +663,7 @@ func (w *WSEvents) OnStop() { } } -// Subscribe implements EventsClient by using WSClient to subscribe given +// Subscribe implements SubscriptionClient by using WSClient to subscribe given // subscriber to query. By default, returns a channel with cap=1. Error is // returned if it fails to subscribe. // @@ -680,7 +696,7 @@ func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, return outc, nil } -// Unsubscribe implements EventsClient by using WSClient to unsubscribe given +// Unsubscribe implements SubscriptionClient by using WSClient to unsubscribe given // subscriber from query. // // It returns an error if WSEvents is not running. @@ -703,7 +719,7 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) er return nil } -// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe +// UnsubscribeAll implements SubscriptionClient by using WSClient to unsubscribe // given subscriber from all the queries. // // It returns an error if WSEvents is not running. diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 92783634c..a2eabc8b3 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -35,12 +35,13 @@ type Client interface { service.Service ABCIClient EventsClient + EvidenceClient HistoryClient + MempoolClient NetworkClient SignClient StatusClient - EvidenceClient - MempoolClient + SubscriptionClient } // ABCIClient groups together the functionality that principally affects the @@ -115,20 +116,40 @@ type NetworkClient interface { Health(context.Context) (*ctypes.ResultHealth, error) } -// EventsClient is reactive, you can subscribe to any message, given the proper -// string. see tendermint/types/events.go +// EventsClient exposes the methods to retrieve events from the consensus engine. type EventsClient interface { - // Subscribe subscribes given subscriber to query. Returns a channel with - // cap=1 onto which events are published. An error is returned if it fails to - // subscribe. outCapacity can be used optionally to set capacity for the - // channel. Channel is never closed to prevent accidental reads. + // Events fetches a batch of events from the server matching the given query + // and time range. + Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) +} + +// TODO(creachadair): This interface should be removed once the streaming event +// interface is removed in Tendermint v0.39. +type SubscriptionClient interface { + // Subscribe issues a subscription request for the given subscriber ID and + // query. This method does not block: If subscription fails, it reports an + // error, and if subscription succeeds it returns a channel that delivers + // matching events until the subscription is stopped. The channel is never + // closed; the client is responsible for knowing when no further data will + // be sent. + // + // The context only governs the initial subscription, it does not control + // the lifetime of the channel. To cancel a subscription call Unsubscribe or + // UnsubscribeAll. // // ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe // or UnsubscribeAll. Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) // Unsubscribe unsubscribes given subscriber from query. + // + // Deprecated: This method will be removed in Tendermint v0.37, use Events + // instead. Unsubscribe(ctx context.Context, subscriber, query string) error + // UnsubscribeAll unsubscribes given subscriber from all the queries. + // + // Deprecated: This method will be removed in Tendermint v0.37, use Events + // instead. UnsubscribeAll(ctx context.Context, subscriber string) error } diff --git a/rpc/client/local/local.go b/rpc/client/local/local.go index ca324dee0..057623616 100644 --- a/rpc/client/local/local.go +++ b/rpc/client/local/local.go @@ -127,6 +127,10 @@ func (c *Local) ConsensusParams(ctx context.Context, height *int64) (*ctypes.Res return core.ConsensusParams(c.ctx, height) } +func (c *Local) Events(ctx context.Context, req *ctypes.RequestEvents) (*ctypes.ResultEvents, error) { + return core.Events(c.ctx, req.Filter.Query, req.MaxItems, req.Before, req.After, req.WaitTime) +} + func (c *Local) Health(ctx context.Context) (*ctypes.ResultHealth, error) { return core.Health(c.ctx) } diff --git a/rpc/client/mock/client.go b/rpc/client/mock/client.go index ec3a358cd..63d1a6dd0 100644 --- a/rpc/client/mock/client.go +++ b/rpc/client/mock/client.go @@ -41,6 +41,7 @@ type Client struct { client.EvidenceClient client.MempoolClient service.Service + client.SubscriptionClient } var _ client.Client = Client{} diff --git a/rpc/core/events.go b/rpc/core/events.go index 582a5d1af..3d2ce477d 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -148,7 +148,25 @@ func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { // If maxItems ≤ 0, a default positive number of events is chosen. The values // of maxItems and waitTime may be capped to sensible internal maxima without // reporting an error to the caller. -func Events(ctx context.Context, +func Events(ctx *rpctypes.Context, + filter string, + maxItems int, + before, after string, + waitTime time.Duration, +) (*ctypes.ResultEvents, error) { + var curBefore, curAfter cursor.Cursor + if err := curBefore.UnmarshalText([]byte(before)); err != nil { + return nil, err + } + if err := curAfter.UnmarshalText([]byte(after)); err != nil { + return nil, err + } + return EventsWithContext(ctx.Context(), &ctypes.EventFilter{ + Query: filter, + }, maxItems, curBefore, curAfter, waitTime) +} + +func EventsWithContext(ctx context.Context, filter *ctypes.EventFilter, maxItems int, before, after cursor.Cursor, @@ -254,6 +272,7 @@ func cursorInRange(c, before, after cursor.Cursor) bool { func marshalItems(items []*eventlog.Item) ([]*ctypes.EventItem, error) { out := make([]*ctypes.EventItem, len(items)) for i, itm := range items { + // FIXME: align usage after remove type-tag v, err := json.Marshal(itm.Data) if err != nil { return nil, fmt.Errorf("encoding event data: %w", err) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index ebf2e14d5..7de49bfc1 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -97,6 +97,7 @@ func createConfig() *cfg.Config { tm, rpc, grpc := makeAddrs() c.P2P.ListenAddress = tm c.RPC.ListenAddress = rpc + c.RPC.EventLogWindowSize = 5 * time.Minute c.RPC.CORSAllowedOrigins = []string{"https://tendermint.com/"} c.RPC.GRPCListenAddress = grpc return c