From e9e5026dacbc9609a4c313c939102dc6e5855138 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 25 Feb 2021 10:57:31 +0400 Subject: [PATCH] rpc/client/http: drop endpoint arg from New and add WSOptions (#6176) also - replace `MaxReconnectAttempts`, `ReadWait`, `WriteWait` and `PingPeriod` options with `WSOptions` in `WSClient` (rpc/jsonrpc/client/ws_client.go). - set default write wait to 10s for `WSClient`(rpc/jsonrpc/client/ws_client.go) - unexpose `WSEvents`(rpc/client/http.go) Closes #6162 --- CHANGELOG_PENDING.md | 3 + cmd/tendermint/commands/debug/dump.go | 2 +- cmd/tendermint/commands/debug/kill.go | 4 +- cmd/tendermint/commands/light.go | 2 +- light/provider/http/http.go | 2 +- light/provider/http/http_test.go | 2 +- rpc/client/examples_test.go | 4 +- rpc/client/http/http.go | 263 +++---------------------- rpc/client/http/ws.go | 249 +++++++++++++++++++++++ rpc/client/rpc_test.go | 8 +- rpc/jsonrpc/client/http_json_client.go | 14 +- rpc/jsonrpc/client/ws_client.go | 93 ++++----- rpc/openapi/openapi.yaml | 8 +- statesync/stateprovider.go | 2 +- test/e2e/pkg/testnet.go | 2 +- 15 files changed, 344 insertions(+), 314 deletions(-) create mode 100644 rpc/client/http/ws.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 8abbc8082..016ab7e2f 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -39,6 +39,9 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi - [all] \#6077 Change spelling from British English to American (@cmwaters) - Rename "Subscription.Cancelled()" to "Subscription.Canceled()" in libs/pubsub - Rename "behaviour" pkg to "behavior" and internalized it in blockchain v2 + - [rpc/client/http] \#6176 Remove `endpoint` arg from `New`, `NewWithTimeout` and `NewWithClient` (@melekes) + - [rpc/client/http] \#6176 Unexpose `WSEvents` (@melekes) + - [rpc/jsonrpc/client/ws_client] \#6176 `NewWS` no longer accepts options (use `NewWSWithOptions` and `OnReconnect` funcs to configure the client) (@melekes) - Blockchain Protocol diff --git a/cmd/tendermint/commands/debug/dump.go b/cmd/tendermint/commands/debug/dump.go index 678f70791..f99975a75 100644 --- a/cmd/tendermint/commands/debug/dump.go +++ b/cmd/tendermint/commands/debug/dump.go @@ -59,7 +59,7 @@ func dumpCmdHandler(_ *cobra.Command, args []string) error { } } - rpc, err := rpchttp.New(nodeRPCAddr, "/websocket") + rpc, err := rpchttp.New(nodeRPCAddr) if err != nil { return fmt.Errorf("failed to create new http client: %w", err) } diff --git a/cmd/tendermint/commands/debug/kill.go b/cmd/tendermint/commands/debug/kill.go index 9a12cc77a..bef534152 100644 --- a/cmd/tendermint/commands/debug/kill.go +++ b/cmd/tendermint/commands/debug/kill.go @@ -28,7 +28,7 @@ go-routine state, and the node's WAL and config information. This aggregated dat is packaged into a compressed archive. Example: -$ tendermint debug 34255 /path/to/tm-debug.zip`, +$ tendermint debug kill 34255 /path/to/tm-debug.zip`, Args: cobra.ExactArgs(2), RunE: killCmdHandler, } @@ -44,7 +44,7 @@ func killCmdHandler(cmd *cobra.Command, args []string) error { return errors.New("invalid output file") } - rpc, err := rpchttp.New(nodeRPCAddr, "/websocket") + rpc, err := rpchttp.New(nodeRPCAddr) if err != nil { return fmt.Errorf("failed to create new http client: %w", err) } diff --git a/cmd/tendermint/commands/light.go b/cmd/tendermint/commands/light.go index 1bcce8f5b..0264d3725 100644 --- a/cmd/tendermint/commands/light.go +++ b/cmd/tendermint/commands/light.go @@ -217,7 +217,7 @@ func runProxy(cmd *cobra.Command, args []string) error { cfg.WriteTimeout = config.RPC.TimeoutBroadcastTxCommit + 1*time.Second } - rpcClient, err := rpchttp.NewWithTimeout(primaryAddr, "/websocket", cfg.WriteTimeout) + rpcClient, err := rpchttp.NewWithTimeout(primaryAddr, cfg.WriteTimeout) if err != nil { return fmt.Errorf("failed to create http client for %s: %w", primaryAddr, err) } diff --git a/light/provider/http/http.go b/light/provider/http/http.go index eb451fa1d..f96a0edb5 100644 --- a/light/provider/http/http.go +++ b/light/provider/http/http.go @@ -52,7 +52,7 @@ func NewWithOptions(chainID, remote string, options Options) (provider.Provider, remote = "http://" + remote } - httpClient, err := rpchttp.NewWithTimeout(remote, "/websocket", options.Timeout) + httpClient, err := rpchttp.NewWithTimeout(remote, options.Timeout) if err != nil { return nil, err } diff --git a/light/provider/http/http_test.go b/light/provider/http/http_test.go index da6b19e9e..8716db327 100644 --- a/light/provider/http/http_test.go +++ b/light/provider/http/http_test.go @@ -54,7 +54,7 @@ func TestProvider(t *testing.T) { chainID := genDoc.ChainID t.Log("chainID:", chainID) - c, err := rpchttp.New(rpcAddr, "/websocket") + c, err := rpchttp.New(rpcAddr) require.Nil(t, err) p := lighthttp.NewWithClient(chainID, c) diff --git a/rpc/client/examples_test.go b/rpc/client/examples_test.go index 474aba1b6..b4653e524 100644 --- a/rpc/client/examples_test.go +++ b/rpc/client/examples_test.go @@ -20,7 +20,7 @@ func ExampleHTTP_simple() { // Create our RPC client rpcAddr := rpctest.GetConfig().RPC.ListenAddress - c, err := rpchttp.New(rpcAddr, "/websocket") + c, err := rpchttp.New(rpcAddr) if err != nil { log.Fatal(err) //nolint:gocritic } @@ -72,7 +72,7 @@ func ExampleHTTP_batching() { // Create our RPC client rpcAddr := rpctest.GetConfig().RPC.ListenAddress - c, err := rpchttp.New(rpcAddr, "/websocket") + c, err := rpchttp.New(rpcAddr) if err != nil { log.Fatal(err) } diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index 5f79d6a8d..7009fad01 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -2,17 +2,11 @@ package http import ( "context" - "errors" "net/http" - "strings" "time" "github.com/tendermint/tendermint/libs/bytes" - tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" - tmpubsub "github.com/tendermint/tendermint/libs/pubsub" - "github.com/tendermint/tendermint/libs/service" - tmsync "github.com/tendermint/tendermint/libs/sync" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" @@ -39,7 +33,7 @@ the example for more details. Example: - c, err := New("http://192.168.1.10:26657", "/websocket") + c, err := New("http://192.168.1.10:26657") if err != nil { // handle error } @@ -63,7 +57,7 @@ type HTTP struct { rpc *jsonrpcclient.Client *baseRPCClient - *WSEvents + *wsEvents } // BatchHTTP provides the same interface as `HTTP`, but allows for batching of @@ -105,50 +99,58 @@ var _ rpcClient = (*baseRPCClient)(nil) //----------------------------------------------------------------------------- // HTTP -// New takes a remote endpoint in the form ://: and -// the websocket path (which always seems to be "/websocket") -// An error is returned on invalid remote. The function panics when remote is nil. -func New(remote, wsEndpoint string) (*HTTP, error) { - httpClient, err := jsonrpcclient.DefaultHTTPClient(remote) +// New takes a remote endpoint in the form ://:. An error +// is returned on invalid remote. +func New(remote string) (*HTTP, error) { + c, err := jsonrpcclient.DefaultHTTPClient(remote) if err != nil { return nil, err } - return NewWithClient(remote, wsEndpoint, httpClient) + return NewWithClient(remote, c) } // NewWithTimeout does the same thing as New, except you can set a Timeout for // http.Client. A Timeout of zero means no timeout. -func NewWithTimeout(remote, wsEndpoint string, timeout time.Duration) (*HTTP, error) { - httpClient, err := jsonrpcclient.DefaultHTTPClient(remote) +func NewWithTimeout(remote string, t time.Duration) (*HTTP, error) { + c, err := jsonrpcclient.DefaultHTTPClient(remote) if err != nil { return nil, err } - httpClient.Timeout = timeout - return NewWithClient(remote, wsEndpoint, httpClient) + c.Timeout = t + return NewWithClient(remote, c) } -// NewWithClient allows for setting a custom http client (See New). -// An error is returned on invalid remote. The function panics when remote is nil. -func NewWithClient(remote, wsEndpoint string, client *http.Client) (*HTTP, error) { - if client == nil { - panic("nil http.Client provided") +// NewWithClient allows you to set a custom http client. An error is returned +// on invalid remote. The function panics when client is nil. +func NewWithClient(remote string, c *http.Client) (*HTTP, error) { + if c == nil { + panic("nil http.Client") } + return NewWithClientAndWSOptions(remote, c, DefaultWSOptions()) +} - rc, err := jsonrpcclient.NewWithHTTPClient(remote, client) +// NewWithClientAndWSOptions allows you to set a custom http client and +// WebSocket options. An error is returned on invalid remote. The function +// panics when client is nil. +func NewWithClientAndWSOptions(remote string, c *http.Client, wso WSOptions) (*HTTP, error) { + if c == nil { + panic("nil http.Client") + } + rpc, err := jsonrpcclient.NewWithHTTPClient(remote, c) if err != nil { return nil, err } - wsEvents, err := newWSEvents(remote, wsEndpoint) + wsEvents, err := newWsEvents(remote, wso) if err != nil { return nil, err } httpClient := &HTTP{ - rpc: rc, + rpc: rpc, remote: remote, - baseRPCClient: &baseRPCClient{caller: rc}, - WSEvents: wsEvents, + baseRPCClient: &baseRPCClient{caller: rpc}, + wsEvents: wsEvents, } return httpClient, nil @@ -158,7 +160,7 @@ var _ rpcclient.Client = (*HTTP)(nil) // SetLogger sets a logger. func (c *HTTP) SetLogger(l log.Logger) { - c.WSEvents.SetLogger(l) + c.wsEvents.SetLogger(l) } // Remote returns the remote network address in a string form. @@ -525,206 +527,3 @@ func (c *baseRPCClient) BroadcastEvidence( } return result, nil } - -//----------------------------------------------------------------------------- -// WSEvents - -var errNotRunning = errors.New("client is not running. Use .Start() method to start") - -// WSEvents is a wrapper around WSClient, which implements EventsClient. -type WSEvents struct { - service.BaseService - remote string - endpoint string - ws *jsonrpcclient.WSClient - - mtx tmsync.RWMutex - subscriptions map[string]chan ctypes.ResultEvent // query -> chan -} - -func newWSEvents(remote, endpoint string) (*WSEvents, error) { - w := &WSEvents{ - endpoint: endpoint, - remote: remote, - subscriptions: make(map[string]chan ctypes.ResultEvent), - } - w.BaseService = *service.NewBaseService(nil, "WSEvents", w) - - var err error - w.ws, err = jsonrpcclient.NewWS(w.remote, w.endpoint, jsonrpcclient.OnReconnect(func() { - // resubscribe immediately - w.redoSubscriptionsAfter(0 * time.Second) - })) - if err != nil { - return nil, err - } - w.ws.SetLogger(w.Logger) - - return w, nil -} - -// OnStart implements service.Service by starting WSClient and event loop. -func (w *WSEvents) OnStart() error { - if err := w.ws.Start(); err != nil { - return err - } - - go w.eventListener() - - return nil -} - -// OnStop implements service.Service by stopping WSClient. -func (w *WSEvents) OnStop() { - if err := w.ws.Stop(); err != nil { - w.Logger.Error("Can't stop ws client", "err", err) - } -} - -// Subscribe implements EventsClient by using WSClient to subscribe given -// subscriber to query. By default, it returns a channel with cap=1. Error is -// returned if it fails to subscribe. -// -// When reading from the channel, keep in mind there's a single events loop, so -// if you don't read events for this subscription fast enough, other -// subscriptions will slow down in effect. -// -// The channel is never closed to prevent clients from seeing an erroneous -// event. -// -// It returns an error if WSEvents is not running. -func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, - outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { - - if !w.IsRunning() { - return nil, errNotRunning - } - - if err := w.ws.Subscribe(ctx, query); err != nil { - return nil, err - } - - outCap := 1 - if len(outCapacity) > 0 { - outCap = outCapacity[0] - } - - outc := make(chan ctypes.ResultEvent, outCap) - w.mtx.Lock() - // subscriber param is ignored because Tendermint will override it with - // remote IP anyway. - w.subscriptions[query] = outc - w.mtx.Unlock() - - return outc, nil -} - -// Unsubscribe implements EventsClient by using WSClient to unsubscribe given -// subscriber from query. -// -// It returns an error if WSEvents is not running. -func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { - if !w.IsRunning() { - return errNotRunning - } - - if err := w.ws.Unsubscribe(ctx, query); err != nil { - return err - } - - w.mtx.Lock() - _, ok := w.subscriptions[query] - if ok { - delete(w.subscriptions, query) - } - w.mtx.Unlock() - - return nil -} - -// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe -// given subscriber from all the queries. -// -// It returns an error if WSEvents is not running. -func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { - if !w.IsRunning() { - return errNotRunning - } - - if err := w.ws.UnsubscribeAll(ctx); err != nil { - return err - } - - w.mtx.Lock() - w.subscriptions = make(map[string]chan ctypes.ResultEvent) - w.mtx.Unlock() - - return nil -} - -// After being reconnected, it is necessary to redo subscription to server -// otherwise no data will be automatically received. -func (w *WSEvents) redoSubscriptionsAfter(d time.Duration) { - time.Sleep(d) - - ctx := context.Background() - - w.mtx.Lock() - defer w.mtx.Unlock() - for q := range w.subscriptions { - err := w.ws.Subscribe(ctx, q) - if err != nil { - w.Logger.Error("failed to resubscribe", "query", q, "err", err) - delete(w.subscriptions, q) - } - } -} - -func isErrAlreadySubscribed(err error) bool { - return strings.Contains(err.Error(), tmpubsub.ErrAlreadySubscribed.Error()) -} - -func (w *WSEvents) eventListener() { - for { - select { - case resp, ok := <-w.ws.ResponsesCh: - if !ok { - return - } - - if resp.Error != nil { - w.Logger.Error("WS error", "err", resp.Error.Error()) - // Error can be ErrAlreadySubscribed or max client (subscriptions per - // client) reached or Tendermint exited. - // We can ignore ErrAlreadySubscribed, but need to retry in other - // cases. - if !isErrAlreadySubscribed(resp.Error) { - // Resubscribe after 1 second to give Tendermint time to restart (if - // crashed). - w.redoSubscriptionsAfter(1 * time.Second) - } - continue - } - - result := new(ctypes.ResultEvent) - err := tmjson.Unmarshal(resp.Result, result) - if err != nil { - w.Logger.Error("failed to unmarshal response", "err", err) - continue - } - - w.mtx.RLock() - out, ok := w.subscriptions[result.Query] - w.mtx.RUnlock() - if ok { - select { - case out <- *result: - case <-w.Quit(): - return - } - } - case <-w.Quit(): - return - } - } -} diff --git a/rpc/client/http/ws.go b/rpc/client/http/ws.go new file mode 100644 index 000000000..54b75ee59 --- /dev/null +++ b/rpc/client/http/ws.go @@ -0,0 +1,249 @@ +package http + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + tmjson "github.com/tendermint/tendermint/libs/json" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + "github.com/tendermint/tendermint/libs/service" + tmsync "github.com/tendermint/tendermint/libs/sync" + rpcclient "github.com/tendermint/tendermint/rpc/client" + ctypes "github.com/tendermint/tendermint/rpc/core/types" + jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" +) + +var errNotRunning = errors.New("client is not running. Use .Start() method to start") + +// WSOptions for the WS part of the HTTP client. +type WSOptions struct { + Path string // path (e.g. "/ws") + + jsonrpcclient.WSOptions // WSClient options +} + +// DefaultWSOptions returns default WS options. +// See jsonrpcclient.DefaultWSOptions. +func DefaultWSOptions() WSOptions { + return WSOptions{ + Path: "/websocket", + WSOptions: jsonrpcclient.DefaultWSOptions(), + } +} + +// Validate performs a basic validation of WSOptions. +func (wso WSOptions) Validate() error { + if len(wso.Path) <= 1 { + return errors.New("empty Path") + } + if wso.Path[0] != '/' { + return errors.New("leading slash is missing in Path") + } + + return nil +} + +// wsEvents is a wrapper around WSClient, which implements EventsClient. +type wsEvents struct { + service.BaseService + ws *jsonrpcclient.WSClient + + mtx tmsync.RWMutex + subscriptions map[string]chan ctypes.ResultEvent // query -> chan +} + +var _ rpcclient.EventsClient = (*wsEvents)(nil) + +func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) { + // validate options + if err := wso.Validate(); err != nil { + return nil, fmt.Errorf("invalid WSOptions: %w", err) + } + + w := &wsEvents{ + subscriptions: make(map[string]chan ctypes.ResultEvent), + } + w.BaseService = *service.NewBaseService(nil, "wsEvents", w) + + var err error + w.ws, err = jsonrpcclient.NewWSWithOptions(remote, wso.Path, wso.WSOptions) + if err != nil { + return nil, fmt.Errorf("can't create WS client: %w", err) + } + w.ws.OnReconnect(func() { + // resubscribe immediately + w.redoSubscriptionsAfter(0 * time.Second) + }) + w.ws.SetLogger(w.Logger) + + return w, nil +} + +// OnStart implements service.Service by starting WSClient and event loop. +func (w *wsEvents) OnStart() error { + if err := w.ws.Start(); err != nil { + return err + } + + go w.eventListener() + + return nil +} + +// OnStop implements service.Service by stopping WSClient. +func (w *wsEvents) OnStop() { + if err := w.ws.Stop(); err != nil { + w.Logger.Error("Can't stop ws client", "err", err) + } +} + +// Subscribe implements EventsClient by using WSClient to subscribe given +// subscriber to query. By default, it returns a channel with cap=1. Error is +// returned if it fails to subscribe. +// +// When reading from the channel, keep in mind there's a single events loop, so +// if you don't read events for this subscription fast enough, other +// subscriptions will slow down in effect. +// +// The channel is never closed to prevent clients from seeing an erroneous +// event. +// +// It returns an error if wsEvents is not running. +func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, + outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { + + if !w.IsRunning() { + return nil, errNotRunning + } + + if err := w.ws.Subscribe(ctx, query); err != nil { + return nil, err + } + + outCap := 1 + if len(outCapacity) > 0 { + outCap = outCapacity[0] + } + + outc := make(chan ctypes.ResultEvent, outCap) + w.mtx.Lock() + // subscriber param is ignored because Tendermint will override it with + // remote IP anyway. + w.subscriptions[query] = outc + w.mtx.Unlock() + + return outc, nil +} + +// Unsubscribe implements EventsClient by using WSClient to unsubscribe given +// subscriber from query. +// +// It returns an error if wsEvents is not running. +func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { + if !w.IsRunning() { + return errNotRunning + } + + if err := w.ws.Unsubscribe(ctx, query); err != nil { + return err + } + + w.mtx.Lock() + _, ok := w.subscriptions[query] + if ok { + delete(w.subscriptions, query) + } + w.mtx.Unlock() + + return nil +} + +// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe +// given subscriber from all the queries. +// +// It returns an error if wsEvents is not running. +func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { + if !w.IsRunning() { + return errNotRunning + } + + if err := w.ws.UnsubscribeAll(ctx); err != nil { + return err + } + + w.mtx.Lock() + w.subscriptions = make(map[string]chan ctypes.ResultEvent) + w.mtx.Unlock() + + return nil +} + +// After being reconnected, it is necessary to redo subscription to server +// otherwise no data will be automatically received. +func (w *wsEvents) redoSubscriptionsAfter(d time.Duration) { + time.Sleep(d) + + ctx := context.Background() + + w.mtx.Lock() + defer w.mtx.Unlock() + for q := range w.subscriptions { + err := w.ws.Subscribe(ctx, q) + if err != nil { + w.Logger.Error("failed to resubscribe", "query", q, "err", err) + delete(w.subscriptions, q) + } + } +} + +func isErrAlreadySubscribed(err error) bool { + return strings.Contains(err.Error(), tmpubsub.ErrAlreadySubscribed.Error()) +} + +func (w *wsEvents) eventListener() { + for { + select { + case resp, ok := <-w.ws.ResponsesCh: + if !ok { + return + } + + if resp.Error != nil { + w.Logger.Error("WS error", "err", resp.Error.Error()) + // Error can be ErrAlreadySubscribed or max client (subscriptions per + // client) reached or Tendermint exited. + // We can ignore ErrAlreadySubscribed, but need to retry in other + // cases. + if !isErrAlreadySubscribed(resp.Error) { + // Resubscribe after 1 second to give Tendermint time to restart (if + // crashed). + w.redoSubscriptionsAfter(1 * time.Second) + } + continue + } + + result := new(ctypes.ResultEvent) + err := tmjson.Unmarshal(resp.Result, result) + if err != nil { + w.Logger.Error("failed to unmarshal response", "err", err) + continue + } + + w.mtx.RLock() + out, ok := w.subscriptions[result.Query] + w.mtx.RUnlock() + if ok { + select { + case out <- *result: + case <-w.Quit(): + return + } + } + case <-w.Quit(): + return + } + } +} diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 7232ed9dd..362503d99 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -32,7 +32,7 @@ var ( func getHTTPClient() *rpchttp.HTTP { rpcAddr := rpctest.GetConfig().RPC.ListenAddress - c, err := rpchttp.New(rpcAddr, "/websocket") + c, err := rpchttp.New(rpcAddr) if err != nil { panic(err) } @@ -42,7 +42,7 @@ func getHTTPClient() *rpchttp.HTTP { func getHTTPClientWithTimeout(timeout time.Duration) *rpchttp.HTTP { rpcAddr := rpctest.GetConfig().RPC.ListenAddress - c, err := rpchttp.NewWithTimeout(rpcAddr, "/websocket", timeout) + c, err := rpchttp.NewWithTimeout(rpcAddr, timeout) if err != nil { panic(err) } @@ -64,7 +64,7 @@ func GetClients() []client.Client { func TestNilCustomHTTPClient(t *testing.T) { require.Panics(t, func() { - _, _ = rpchttp.NewWithClient("http://example.com", "/websocket", nil) + _, _ = rpchttp.NewWithClient("http://example.com", nil) }) require.Panics(t, func() { _, _ = rpcclient.NewWithHTTPClient("http://example.com", nil) @@ -73,7 +73,7 @@ func TestNilCustomHTTPClient(t *testing.T) { func TestCustomHTTPClient(t *testing.T) { remote := rpctest.GetConfig().RPC.ListenAddress - c, err := rpchttp.NewWithClient(remote, "/websocket", http.DefaultClient) + c, err := rpchttp.NewWithClient(remote, http.DefaultClient) require.Nil(t, err) status, err := c.Status(context.Background()) require.NoError(t, err) diff --git a/rpc/jsonrpc/client/http_json_client.go b/rpc/jsonrpc/client/http_json_client.go index 479aa7266..7f6a9a74a 100644 --- a/rpc/jsonrpc/client/http_json_client.go +++ b/rpc/jsonrpc/client/http_json_client.go @@ -121,12 +121,12 @@ func New(remote string) (*Client, error) { return NewWithHTTPClient(remote, httpClient) } -// NewWithHTTPClient returns a Client pointed at the given -// address using a custom http client. An error is returned on invalid remote. -// The function panics when remote is nil. -func NewWithHTTPClient(remote string, client *http.Client) (*Client, error) { - if client == nil { - panic("nil http.Client provided") +// NewWithHTTPClient returns a Client pointed at the given address using a +// custom http client. An error is returned on invalid remote. The function +// panics when client is nil. +func NewWithHTTPClient(remote string, c *http.Client) (*Client, error) { + if c == nil { + panic("nil http.Client") } parsedURL, err := newParsedURL(remote) @@ -144,7 +144,7 @@ func NewWithHTTPClient(remote string, client *http.Client) (*Client, error) { address: address, username: username, password: password, - client: client, + client: c, } return rpcClient, nil diff --git a/rpc/jsonrpc/client/ws_client.go b/rpc/jsonrpc/client/ws_client.go index c30e4e1d9..f28f1bee6 100644 --- a/rpc/jsonrpc/client/ws_client.go +++ b/rpc/jsonrpc/client/ws_client.go @@ -18,12 +18,23 @@ import ( types "github.com/tendermint/tendermint/rpc/jsonrpc/types" ) -const ( - defaultMaxReconnectAttempts = 25 - defaultWriteWait = 0 - defaultReadWait = 0 - defaultPingPeriod = 0 -) +// WSOptions for WSClient. +type WSOptions struct { + MaxReconnectAttempts uint // maximum attempts to reconnect + ReadWait time.Duration // deadline for any read op + WriteWait time.Duration // deadline for any write op + PingPeriod time.Duration // frequency with which pings are sent +} + +// DefaultWSOptions returns default WS options. +func DefaultWSOptions() WSOptions { + return WSOptions{ + MaxReconnectAttempts: 10, // first: 2 sec, last: 17 min. + WriteWait: 10 * time.Second, + ReadWait: 0, + PingPeriod: 0, + } +} // WSClient is a JSON-RPC client, which uses WebSocket for communication with // the remote server. @@ -50,7 +61,7 @@ type WSClient struct { // nolint: maligned readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine // Maximum reconnect attempts (0 or greater; default: 25). - maxReconnectAttempts int + maxReconnectAttempts uint // Support both ws and wss protocols protocol string @@ -79,11 +90,15 @@ type WSClient struct { // nolint: maligned PingPongLatencyTimer metrics.Timer } -// NewWS returns a new client. See the commentary on the func(*WSClient) -// functions for a detailed description of how to configure ping period and -// pong wait time. The endpoint argument must begin with a `/`. -// An error is returned on invalid remote. The function panics when remote is nil. -func NewWS(remoteAddr, endpoint string, options ...func(*WSClient)) (*WSClient, error) { +// NewWS returns a new client. The endpoint argument must begin with a `/`. An +// error is returned on invalid remote. +// It uses DefaultWSOptions. +func NewWS(remoteAddr, endpoint string) (*WSClient, error) { + return NewWSWithOptions(remoteAddr, endpoint, DefaultWSOptions()) +} + +// NewWSWithOptions allows you to provide custom WSOptions. +func NewWSWithOptions(remoteAddr, endpoint string, opts WSOptions) (*WSClient, error) { parsedURL, err := newParsedURL(remoteAddr) if err != nil { return nil, err @@ -104,59 +119,23 @@ func NewWS(remoteAddr, endpoint string, options ...func(*WSClient)) (*WSClient, Endpoint: endpoint, PingPongLatencyTimer: metrics.NewTimer(), - maxReconnectAttempts: defaultMaxReconnectAttempts, - readWait: defaultReadWait, - writeWait: defaultWriteWait, - pingPeriod: defaultPingPeriod, + maxReconnectAttempts: opts.MaxReconnectAttempts, + readWait: opts.ReadWait, + writeWait: opts.WriteWait, + pingPeriod: opts.PingPeriod, protocol: parsedURL.Scheme, // sentIDs: make(map[types.JSONRPCIntID]bool), } c.BaseService = *service.NewBaseService(nil, "WSClient", c) - for _, option := range options { - option(c) - } return c, nil } -// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error. -// It should only be used in the constructor and is not Goroutine-safe. -func MaxReconnectAttempts(max int) func(*WSClient) { - return func(c *WSClient) { - c.maxReconnectAttempts = max - } -} - -// ReadWait sets the amount of time to wait before a websocket read times out. -// It should only be used in the constructor and is not Goroutine-safe. -func ReadWait(readWait time.Duration) func(*WSClient) { - return func(c *WSClient) { - c.readWait = readWait - } -} - -// WriteWait sets the amount of time to wait before a websocket write times out. -// It should only be used in the constructor and is not Goroutine-safe. -func WriteWait(writeWait time.Duration) func(*WSClient) { - return func(c *WSClient) { - c.writeWait = writeWait - } -} - -// PingPeriod sets the duration for sending websocket pings. -// It should only be used in the constructor - not Goroutine-safe. -func PingPeriod(pingPeriod time.Duration) func(*WSClient) { - return func(c *WSClient) { - c.pingPeriod = pingPeriod - } -} - // OnReconnect sets the callback, which will be called every time after // successful reconnect. -func OnReconnect(cb func()) func(*WSClient) { - return func(c *WSClient) { - c.onReconnect = cb - } +// Could only be set before Start. +func (c *WSClient) OnReconnect(cb func()) { + c.onReconnect = cb } // String returns WS client full address. @@ -275,7 +254,7 @@ func (c *WSClient) dial() error { // reconnect tries to redial up to maxReconnectAttempts with exponential // backoff. func (c *WSClient) reconnect() error { - attempt := 0 + attempt := uint(0) c.mtx.Lock() c.reconnecting = true @@ -288,7 +267,7 @@ func (c *WSClient) reconnect() error { for { jitter := time.Duration(tmrand.Float64() * float64(time.Second)) // 1s == (1e9 ns) - backoffDuration := jitter + ((1 << uint(attempt)) * time.Second) + backoffDuration := jitter + ((1 << attempt) * time.Second) c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration) time.Sleep(backoffDuration) diff --git a/rpc/openapi/openapi.yaml b/rpc/openapi/openapi.yaml index ed7771ba1..e5985d080 100644 --- a/rpc/openapi/openapi.yaml +++ b/rpc/openapi/openapi.yaml @@ -340,7 +340,7 @@ paths: import rpchttp "github.com/tendermint/rpc/client/http" import "github.com/tendermint/tendermint/types" - client := rpchttp.New("tcp:0.0.0.0:26657", "/websocket") + client := rpchttp.New("tcp://0.0.0.0:26657") err := client.Start() if err != nil { handle error @@ -397,7 +397,7 @@ paths: operationId: unsubscribe description: | ```go - client := rpchttp.New("tcp:0.0.0.0:26657", "/websocket") + client := rpchttp.New("tcp://0.0.0.0:26657") err := client.Start() if err != nil { handle error @@ -1221,7 +1221,7 @@ components: example: "5576458aef205977e18fd50b274e9b5d9014525a" listen_addr: type: string - example: "tcp:0.0.0.0:26656" + example: "tcp://0.0.0.0:26656" network: type: string example: "cosmoshub-2" @@ -1242,7 +1242,7 @@ components: example: "on" rpc_address: type: string - example: "tcp:0.0.0.0:26657" + example: "tcp://0.0.0.0:26657" SyncInfo: type: object properties: diff --git a/statesync/stateprovider.go b/statesync/stateprovider.go index cd833578e..e5a12b97f 100644 --- a/statesync/stateprovider.go +++ b/statesync/stateprovider.go @@ -194,7 +194,7 @@ func rpcClient(server string) (*rpchttp.HTTP, error) { if !strings.Contains(server, "://") { server = "http://" + server } - c, err := rpchttp.New(server, "/websocket") + c, err := rpchttp.New(server) if err != nil { return nil, err } diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 14418d6db..52ff83ba3 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -473,7 +473,7 @@ func (n Node) AddressRPC() string { // Client returns an RPC client for a node. func (n Node) Client() (*rpchttp.HTTP, error) { - return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort), "/websocket") + return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort)) } // keyGenerator generates pseudorandom Ed25519 keys based on a seed.