mirror of
https://github.com/tendermint/tendermint.git
synced 2026-04-29 20:06:56 +00:00
rpc/client/http: drop endpoint arg from New and add WSOptions (#6176)
also - replace `MaxReconnectAttempts`, `ReadWait`, `WriteWait` and `PingPeriod` options with `WSOptions` in `WSClient` (rpc/jsonrpc/client/ws_client.go). - set default write wait to 10s for `WSClient`(rpc/jsonrpc/client/ws_client.go) - unexpose `WSEvents`(rpc/client/http.go) Closes #6162
This commit is contained in:
@@ -39,6 +39,9 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- [all] \#6077 Change spelling from British English to American (@cmwaters)
|
||||
- Rename "Subscription.Cancelled()" to "Subscription.Canceled()" in libs/pubsub
|
||||
- Rename "behaviour" pkg to "behavior" and internalized it in blockchain v2
|
||||
- [rpc/client/http] \#6176 Remove `endpoint` arg from `New`, `NewWithTimeout` and `NewWithClient` (@melekes)
|
||||
- [rpc/client/http] \#6176 Unexpose `WSEvents` (@melekes)
|
||||
- [rpc/jsonrpc/client/ws_client] \#6176 `NewWS` no longer accepts options (use `NewWSWithOptions` and `OnReconnect` funcs to configure the client) (@melekes)
|
||||
|
||||
- Blockchain Protocol
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ func dumpCmdHandler(_ *cobra.Command, args []string) error {
|
||||
}
|
||||
}
|
||||
|
||||
rpc, err := rpchttp.New(nodeRPCAddr, "/websocket")
|
||||
rpc, err := rpchttp.New(nodeRPCAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new http client: %w", err)
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ go-routine state, and the node's WAL and config information. This aggregated dat
|
||||
is packaged into a compressed archive.
|
||||
|
||||
Example:
|
||||
$ tendermint debug 34255 /path/to/tm-debug.zip`,
|
||||
$ tendermint debug kill 34255 /path/to/tm-debug.zip`,
|
||||
Args: cobra.ExactArgs(2),
|
||||
RunE: killCmdHandler,
|
||||
}
|
||||
@@ -44,7 +44,7 @@ func killCmdHandler(cmd *cobra.Command, args []string) error {
|
||||
return errors.New("invalid output file")
|
||||
}
|
||||
|
||||
rpc, err := rpchttp.New(nodeRPCAddr, "/websocket")
|
||||
rpc, err := rpchttp.New(nodeRPCAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create new http client: %w", err)
|
||||
}
|
||||
|
||||
@@ -217,7 +217,7 @@ func runProxy(cmd *cobra.Command, args []string) error {
|
||||
cfg.WriteTimeout = config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
}
|
||||
|
||||
rpcClient, err := rpchttp.NewWithTimeout(primaryAddr, "/websocket", cfg.WriteTimeout)
|
||||
rpcClient, err := rpchttp.NewWithTimeout(primaryAddr, cfg.WriteTimeout)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create http client for %s: %w", primaryAddr, err)
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ func NewWithOptions(chainID, remote string, options Options) (provider.Provider,
|
||||
remote = "http://" + remote
|
||||
}
|
||||
|
||||
httpClient, err := rpchttp.NewWithTimeout(remote, "/websocket", options.Timeout)
|
||||
httpClient, err := rpchttp.NewWithTimeout(remote, options.Timeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -54,7 +54,7 @@ func TestProvider(t *testing.T) {
|
||||
chainID := genDoc.ChainID
|
||||
t.Log("chainID:", chainID)
|
||||
|
||||
c, err := rpchttp.New(rpcAddr, "/websocket")
|
||||
c, err := rpchttp.New(rpcAddr)
|
||||
require.Nil(t, err)
|
||||
|
||||
p := lighthttp.NewWithClient(chainID, c)
|
||||
|
||||
@@ -20,7 +20,7 @@ func ExampleHTTP_simple() {
|
||||
|
||||
// Create our RPC client
|
||||
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
|
||||
c, err := rpchttp.New(rpcAddr, "/websocket")
|
||||
c, err := rpchttp.New(rpcAddr)
|
||||
if err != nil {
|
||||
log.Fatal(err) //nolint:gocritic
|
||||
}
|
||||
@@ -72,7 +72,7 @@ func ExampleHTTP_batching() {
|
||||
|
||||
// Create our RPC client
|
||||
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
|
||||
c, err := rpchttp.New(rpcAddr, "/websocket")
|
||||
c, err := rpchttp.New(rpcAddr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -2,17 +2,11 @@ package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/bytes"
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
rpcclient "github.com/tendermint/tendermint/rpc/client"
|
||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
|
||||
@@ -39,7 +33,7 @@ the example for more details.
|
||||
|
||||
Example:
|
||||
|
||||
c, err := New("http://192.168.1.10:26657", "/websocket")
|
||||
c, err := New("http://192.168.1.10:26657")
|
||||
if err != nil {
|
||||
// handle error
|
||||
}
|
||||
@@ -63,7 +57,7 @@ type HTTP struct {
|
||||
rpc *jsonrpcclient.Client
|
||||
|
||||
*baseRPCClient
|
||||
*WSEvents
|
||||
*wsEvents
|
||||
}
|
||||
|
||||
// BatchHTTP provides the same interface as `HTTP`, but allows for batching of
|
||||
@@ -105,50 +99,58 @@ var _ rpcClient = (*baseRPCClient)(nil)
|
||||
//-----------------------------------------------------------------------------
|
||||
// HTTP
|
||||
|
||||
// New takes a remote endpoint in the form <protocol>://<host>:<port> and
|
||||
// the websocket path (which always seems to be "/websocket")
|
||||
// An error is returned on invalid remote. The function panics when remote is nil.
|
||||
func New(remote, wsEndpoint string) (*HTTP, error) {
|
||||
httpClient, err := jsonrpcclient.DefaultHTTPClient(remote)
|
||||
// New takes a remote endpoint in the form <protocol>://<host>:<port>. An error
|
||||
// is returned on invalid remote.
|
||||
func New(remote string) (*HTTP, error) {
|
||||
c, err := jsonrpcclient.DefaultHTTPClient(remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewWithClient(remote, wsEndpoint, httpClient)
|
||||
return NewWithClient(remote, c)
|
||||
}
|
||||
|
||||
// NewWithTimeout does the same thing as New, except you can set a Timeout for
|
||||
// http.Client. A Timeout of zero means no timeout.
|
||||
func NewWithTimeout(remote, wsEndpoint string, timeout time.Duration) (*HTTP, error) {
|
||||
httpClient, err := jsonrpcclient.DefaultHTTPClient(remote)
|
||||
func NewWithTimeout(remote string, t time.Duration) (*HTTP, error) {
|
||||
c, err := jsonrpcclient.DefaultHTTPClient(remote)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
httpClient.Timeout = timeout
|
||||
return NewWithClient(remote, wsEndpoint, httpClient)
|
||||
c.Timeout = t
|
||||
return NewWithClient(remote, c)
|
||||
}
|
||||
|
||||
// NewWithClient allows for setting a custom http client (See New).
|
||||
// An error is returned on invalid remote. The function panics when remote is nil.
|
||||
func NewWithClient(remote, wsEndpoint string, client *http.Client) (*HTTP, error) {
|
||||
if client == nil {
|
||||
panic("nil http.Client provided")
|
||||
// NewWithClient allows you to set a custom http client. An error is returned
|
||||
// on invalid remote. The function panics when client is nil.
|
||||
func NewWithClient(remote string, c *http.Client) (*HTTP, error) {
|
||||
if c == nil {
|
||||
panic("nil http.Client")
|
||||
}
|
||||
return NewWithClientAndWSOptions(remote, c, DefaultWSOptions())
|
||||
}
|
||||
|
||||
rc, err := jsonrpcclient.NewWithHTTPClient(remote, client)
|
||||
// NewWithClientAndWSOptions allows you to set a custom http client and
|
||||
// WebSocket options. An error is returned on invalid remote. The function
|
||||
// panics when client is nil.
|
||||
func NewWithClientAndWSOptions(remote string, c *http.Client, wso WSOptions) (*HTTP, error) {
|
||||
if c == nil {
|
||||
panic("nil http.Client")
|
||||
}
|
||||
rpc, err := jsonrpcclient.NewWithHTTPClient(remote, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wsEvents, err := newWSEvents(remote, wsEndpoint)
|
||||
wsEvents, err := newWsEvents(remote, wso)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
httpClient := &HTTP{
|
||||
rpc: rc,
|
||||
rpc: rpc,
|
||||
remote: remote,
|
||||
baseRPCClient: &baseRPCClient{caller: rc},
|
||||
WSEvents: wsEvents,
|
||||
baseRPCClient: &baseRPCClient{caller: rpc},
|
||||
wsEvents: wsEvents,
|
||||
}
|
||||
|
||||
return httpClient, nil
|
||||
@@ -158,7 +160,7 @@ var _ rpcclient.Client = (*HTTP)(nil)
|
||||
|
||||
// SetLogger sets a logger.
|
||||
func (c *HTTP) SetLogger(l log.Logger) {
|
||||
c.WSEvents.SetLogger(l)
|
||||
c.wsEvents.SetLogger(l)
|
||||
}
|
||||
|
||||
// Remote returns the remote network address in a string form.
|
||||
@@ -525,206 +527,3 @@ func (c *baseRPCClient) BroadcastEvidence(
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// WSEvents
|
||||
|
||||
var errNotRunning = errors.New("client is not running. Use .Start() method to start")
|
||||
|
||||
// WSEvents is a wrapper around WSClient, which implements EventsClient.
|
||||
type WSEvents struct {
|
||||
service.BaseService
|
||||
remote string
|
||||
endpoint string
|
||||
ws *jsonrpcclient.WSClient
|
||||
|
||||
mtx tmsync.RWMutex
|
||||
subscriptions map[string]chan ctypes.ResultEvent // query -> chan
|
||||
}
|
||||
|
||||
func newWSEvents(remote, endpoint string) (*WSEvents, error) {
|
||||
w := &WSEvents{
|
||||
endpoint: endpoint,
|
||||
remote: remote,
|
||||
subscriptions: make(map[string]chan ctypes.ResultEvent),
|
||||
}
|
||||
w.BaseService = *service.NewBaseService(nil, "WSEvents", w)
|
||||
|
||||
var err error
|
||||
w.ws, err = jsonrpcclient.NewWS(w.remote, w.endpoint, jsonrpcclient.OnReconnect(func() {
|
||||
// resubscribe immediately
|
||||
w.redoSubscriptionsAfter(0 * time.Second)
|
||||
}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w.ws.SetLogger(w.Logger)
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// OnStart implements service.Service by starting WSClient and event loop.
|
||||
func (w *WSEvents) OnStart() error {
|
||||
if err := w.ws.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go w.eventListener()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements service.Service by stopping WSClient.
|
||||
func (w *WSEvents) OnStop() {
|
||||
if err := w.ws.Stop(); err != nil {
|
||||
w.Logger.Error("Can't stop ws client", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe implements EventsClient by using WSClient to subscribe given
|
||||
// subscriber to query. By default, it returns a channel with cap=1. Error is
|
||||
// returned if it fails to subscribe.
|
||||
//
|
||||
// When reading from the channel, keep in mind there's a single events loop, so
|
||||
// if you don't read events for this subscription fast enough, other
|
||||
// subscriptions will slow down in effect.
|
||||
//
|
||||
// The channel is never closed to prevent clients from seeing an erroneous
|
||||
// event.
|
||||
//
|
||||
// It returns an error if WSEvents is not running.
|
||||
func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
|
||||
outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
|
||||
|
||||
if !w.IsRunning() {
|
||||
return nil, errNotRunning
|
||||
}
|
||||
|
||||
if err := w.ws.Subscribe(ctx, query); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
outCap := 1
|
||||
if len(outCapacity) > 0 {
|
||||
outCap = outCapacity[0]
|
||||
}
|
||||
|
||||
outc := make(chan ctypes.ResultEvent, outCap)
|
||||
w.mtx.Lock()
|
||||
// subscriber param is ignored because Tendermint will override it with
|
||||
// remote IP anyway.
|
||||
w.subscriptions[query] = outc
|
||||
w.mtx.Unlock()
|
||||
|
||||
return outc, nil
|
||||
}
|
||||
|
||||
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given
|
||||
// subscriber from query.
|
||||
//
|
||||
// It returns an error if WSEvents is not running.
|
||||
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
|
||||
if !w.IsRunning() {
|
||||
return errNotRunning
|
||||
}
|
||||
|
||||
if err := w.ws.Unsubscribe(ctx, query); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
_, ok := w.subscriptions[query]
|
||||
if ok {
|
||||
delete(w.subscriptions, query)
|
||||
}
|
||||
w.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
|
||||
// given subscriber from all the queries.
|
||||
//
|
||||
// It returns an error if WSEvents is not running.
|
||||
func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
|
||||
if !w.IsRunning() {
|
||||
return errNotRunning
|
||||
}
|
||||
|
||||
if err := w.ws.UnsubscribeAll(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
w.subscriptions = make(map[string]chan ctypes.ResultEvent)
|
||||
w.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// After being reconnected, it is necessary to redo subscription to server
|
||||
// otherwise no data will be automatically received.
|
||||
func (w *WSEvents) redoSubscriptionsAfter(d time.Duration) {
|
||||
time.Sleep(d)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
for q := range w.subscriptions {
|
||||
err := w.ws.Subscribe(ctx, q)
|
||||
if err != nil {
|
||||
w.Logger.Error("failed to resubscribe", "query", q, "err", err)
|
||||
delete(w.subscriptions, q)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isErrAlreadySubscribed(err error) bool {
|
||||
return strings.Contains(err.Error(), tmpubsub.ErrAlreadySubscribed.Error())
|
||||
}
|
||||
|
||||
func (w *WSEvents) eventListener() {
|
||||
for {
|
||||
select {
|
||||
case resp, ok := <-w.ws.ResponsesCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if resp.Error != nil {
|
||||
w.Logger.Error("WS error", "err", resp.Error.Error())
|
||||
// Error can be ErrAlreadySubscribed or max client (subscriptions per
|
||||
// client) reached or Tendermint exited.
|
||||
// We can ignore ErrAlreadySubscribed, but need to retry in other
|
||||
// cases.
|
||||
if !isErrAlreadySubscribed(resp.Error) {
|
||||
// Resubscribe after 1 second to give Tendermint time to restart (if
|
||||
// crashed).
|
||||
w.redoSubscriptionsAfter(1 * time.Second)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
result := new(ctypes.ResultEvent)
|
||||
err := tmjson.Unmarshal(resp.Result, result)
|
||||
if err != nil {
|
||||
w.Logger.Error("failed to unmarshal response", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
w.mtx.RLock()
|
||||
out, ok := w.subscriptions[result.Query]
|
||||
w.mtx.RUnlock()
|
||||
if ok {
|
||||
select {
|
||||
case out <- *result:
|
||||
case <-w.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-w.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
249
rpc/client/http/ws.go
Normal file
249
rpc/client/http/ws.go
Normal file
@@ -0,0 +1,249 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
tmsync "github.com/tendermint/tendermint/libs/sync"
|
||||
rpcclient "github.com/tendermint/tendermint/rpc/client"
|
||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client"
|
||||
)
|
||||
|
||||
var errNotRunning = errors.New("client is not running. Use .Start() method to start")
|
||||
|
||||
// WSOptions for the WS part of the HTTP client.
|
||||
type WSOptions struct {
|
||||
Path string // path (e.g. "/ws")
|
||||
|
||||
jsonrpcclient.WSOptions // WSClient options
|
||||
}
|
||||
|
||||
// DefaultWSOptions returns default WS options.
|
||||
// See jsonrpcclient.DefaultWSOptions.
|
||||
func DefaultWSOptions() WSOptions {
|
||||
return WSOptions{
|
||||
Path: "/websocket",
|
||||
WSOptions: jsonrpcclient.DefaultWSOptions(),
|
||||
}
|
||||
}
|
||||
|
||||
// Validate performs a basic validation of WSOptions.
|
||||
func (wso WSOptions) Validate() error {
|
||||
if len(wso.Path) <= 1 {
|
||||
return errors.New("empty Path")
|
||||
}
|
||||
if wso.Path[0] != '/' {
|
||||
return errors.New("leading slash is missing in Path")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// wsEvents is a wrapper around WSClient, which implements EventsClient.
|
||||
type wsEvents struct {
|
||||
service.BaseService
|
||||
ws *jsonrpcclient.WSClient
|
||||
|
||||
mtx tmsync.RWMutex
|
||||
subscriptions map[string]chan ctypes.ResultEvent // query -> chan
|
||||
}
|
||||
|
||||
var _ rpcclient.EventsClient = (*wsEvents)(nil)
|
||||
|
||||
func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) {
|
||||
// validate options
|
||||
if err := wso.Validate(); err != nil {
|
||||
return nil, fmt.Errorf("invalid WSOptions: %w", err)
|
||||
}
|
||||
|
||||
w := &wsEvents{
|
||||
subscriptions: make(map[string]chan ctypes.ResultEvent),
|
||||
}
|
||||
w.BaseService = *service.NewBaseService(nil, "wsEvents", w)
|
||||
|
||||
var err error
|
||||
w.ws, err = jsonrpcclient.NewWSWithOptions(remote, wso.Path, wso.WSOptions)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't create WS client: %w", err)
|
||||
}
|
||||
w.ws.OnReconnect(func() {
|
||||
// resubscribe immediately
|
||||
w.redoSubscriptionsAfter(0 * time.Second)
|
||||
})
|
||||
w.ws.SetLogger(w.Logger)
|
||||
|
||||
return w, nil
|
||||
}
|
||||
|
||||
// OnStart implements service.Service by starting WSClient and event loop.
|
||||
func (w *wsEvents) OnStart() error {
|
||||
if err := w.ws.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go w.eventListener()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements service.Service by stopping WSClient.
|
||||
func (w *wsEvents) OnStop() {
|
||||
if err := w.ws.Stop(); err != nil {
|
||||
w.Logger.Error("Can't stop ws client", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe implements EventsClient by using WSClient to subscribe given
|
||||
// subscriber to query. By default, it returns a channel with cap=1. Error is
|
||||
// returned if it fails to subscribe.
|
||||
//
|
||||
// When reading from the channel, keep in mind there's a single events loop, so
|
||||
// if you don't read events for this subscription fast enough, other
|
||||
// subscriptions will slow down in effect.
|
||||
//
|
||||
// The channel is never closed to prevent clients from seeing an erroneous
|
||||
// event.
|
||||
//
|
||||
// It returns an error if wsEvents is not running.
|
||||
func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string,
|
||||
outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
|
||||
|
||||
if !w.IsRunning() {
|
||||
return nil, errNotRunning
|
||||
}
|
||||
|
||||
if err := w.ws.Subscribe(ctx, query); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
outCap := 1
|
||||
if len(outCapacity) > 0 {
|
||||
outCap = outCapacity[0]
|
||||
}
|
||||
|
||||
outc := make(chan ctypes.ResultEvent, outCap)
|
||||
w.mtx.Lock()
|
||||
// subscriber param is ignored because Tendermint will override it with
|
||||
// remote IP anyway.
|
||||
w.subscriptions[query] = outc
|
||||
w.mtx.Unlock()
|
||||
|
||||
return outc, nil
|
||||
}
|
||||
|
||||
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given
|
||||
// subscriber from query.
|
||||
//
|
||||
// It returns an error if wsEvents is not running.
|
||||
func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
|
||||
if !w.IsRunning() {
|
||||
return errNotRunning
|
||||
}
|
||||
|
||||
if err := w.ws.Unsubscribe(ctx, query); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
_, ok := w.subscriptions[query]
|
||||
if ok {
|
||||
delete(w.subscriptions, query)
|
||||
}
|
||||
w.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
|
||||
// given subscriber from all the queries.
|
||||
//
|
||||
// It returns an error if wsEvents is not running.
|
||||
func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
|
||||
if !w.IsRunning() {
|
||||
return errNotRunning
|
||||
}
|
||||
|
||||
if err := w.ws.UnsubscribeAll(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
w.subscriptions = make(map[string]chan ctypes.ResultEvent)
|
||||
w.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// After being reconnected, it is necessary to redo subscription to server
|
||||
// otherwise no data will be automatically received.
|
||||
func (w *wsEvents) redoSubscriptionsAfter(d time.Duration) {
|
||||
time.Sleep(d)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
for q := range w.subscriptions {
|
||||
err := w.ws.Subscribe(ctx, q)
|
||||
if err != nil {
|
||||
w.Logger.Error("failed to resubscribe", "query", q, "err", err)
|
||||
delete(w.subscriptions, q)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func isErrAlreadySubscribed(err error) bool {
|
||||
return strings.Contains(err.Error(), tmpubsub.ErrAlreadySubscribed.Error())
|
||||
}
|
||||
|
||||
func (w *wsEvents) eventListener() {
|
||||
for {
|
||||
select {
|
||||
case resp, ok := <-w.ws.ResponsesCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if resp.Error != nil {
|
||||
w.Logger.Error("WS error", "err", resp.Error.Error())
|
||||
// Error can be ErrAlreadySubscribed or max client (subscriptions per
|
||||
// client) reached or Tendermint exited.
|
||||
// We can ignore ErrAlreadySubscribed, but need to retry in other
|
||||
// cases.
|
||||
if !isErrAlreadySubscribed(resp.Error) {
|
||||
// Resubscribe after 1 second to give Tendermint time to restart (if
|
||||
// crashed).
|
||||
w.redoSubscriptionsAfter(1 * time.Second)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
result := new(ctypes.ResultEvent)
|
||||
err := tmjson.Unmarshal(resp.Result, result)
|
||||
if err != nil {
|
||||
w.Logger.Error("failed to unmarshal response", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
w.mtx.RLock()
|
||||
out, ok := w.subscriptions[result.Query]
|
||||
w.mtx.RUnlock()
|
||||
if ok {
|
||||
select {
|
||||
case out <- *result:
|
||||
case <-w.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
case <-w.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -32,7 +32,7 @@ var (
|
||||
|
||||
func getHTTPClient() *rpchttp.HTTP {
|
||||
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
|
||||
c, err := rpchttp.New(rpcAddr, "/websocket")
|
||||
c, err := rpchttp.New(rpcAddr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -42,7 +42,7 @@ func getHTTPClient() *rpchttp.HTTP {
|
||||
|
||||
func getHTTPClientWithTimeout(timeout time.Duration) *rpchttp.HTTP {
|
||||
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
|
||||
c, err := rpchttp.NewWithTimeout(rpcAddr, "/websocket", timeout)
|
||||
c, err := rpchttp.NewWithTimeout(rpcAddr, timeout)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -64,7 +64,7 @@ func GetClients() []client.Client {
|
||||
|
||||
func TestNilCustomHTTPClient(t *testing.T) {
|
||||
require.Panics(t, func() {
|
||||
_, _ = rpchttp.NewWithClient("http://example.com", "/websocket", nil)
|
||||
_, _ = rpchttp.NewWithClient("http://example.com", nil)
|
||||
})
|
||||
require.Panics(t, func() {
|
||||
_, _ = rpcclient.NewWithHTTPClient("http://example.com", nil)
|
||||
@@ -73,7 +73,7 @@ func TestNilCustomHTTPClient(t *testing.T) {
|
||||
|
||||
func TestCustomHTTPClient(t *testing.T) {
|
||||
remote := rpctest.GetConfig().RPC.ListenAddress
|
||||
c, err := rpchttp.NewWithClient(remote, "/websocket", http.DefaultClient)
|
||||
c, err := rpchttp.NewWithClient(remote, http.DefaultClient)
|
||||
require.Nil(t, err)
|
||||
status, err := c.Status(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -121,12 +121,12 @@ func New(remote string) (*Client, error) {
|
||||
return NewWithHTTPClient(remote, httpClient)
|
||||
}
|
||||
|
||||
// NewWithHTTPClient returns a Client pointed at the given
|
||||
// address using a custom http client. An error is returned on invalid remote.
|
||||
// The function panics when remote is nil.
|
||||
func NewWithHTTPClient(remote string, client *http.Client) (*Client, error) {
|
||||
if client == nil {
|
||||
panic("nil http.Client provided")
|
||||
// NewWithHTTPClient returns a Client pointed at the given address using a
|
||||
// custom http client. An error is returned on invalid remote. The function
|
||||
// panics when client is nil.
|
||||
func NewWithHTTPClient(remote string, c *http.Client) (*Client, error) {
|
||||
if c == nil {
|
||||
panic("nil http.Client")
|
||||
}
|
||||
|
||||
parsedURL, err := newParsedURL(remote)
|
||||
@@ -144,7 +144,7 @@ func NewWithHTTPClient(remote string, client *http.Client) (*Client, error) {
|
||||
address: address,
|
||||
username: username,
|
||||
password: password,
|
||||
client: client,
|
||||
client: c,
|
||||
}
|
||||
|
||||
return rpcClient, nil
|
||||
|
||||
@@ -18,12 +18,23 @@ import (
|
||||
types "github.com/tendermint/tendermint/rpc/jsonrpc/types"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxReconnectAttempts = 25
|
||||
defaultWriteWait = 0
|
||||
defaultReadWait = 0
|
||||
defaultPingPeriod = 0
|
||||
)
|
||||
// WSOptions for WSClient.
|
||||
type WSOptions struct {
|
||||
MaxReconnectAttempts uint // maximum attempts to reconnect
|
||||
ReadWait time.Duration // deadline for any read op
|
||||
WriteWait time.Duration // deadline for any write op
|
||||
PingPeriod time.Duration // frequency with which pings are sent
|
||||
}
|
||||
|
||||
// DefaultWSOptions returns default WS options.
|
||||
func DefaultWSOptions() WSOptions {
|
||||
return WSOptions{
|
||||
MaxReconnectAttempts: 10, // first: 2 sec, last: 17 min.
|
||||
WriteWait: 10 * time.Second,
|
||||
ReadWait: 0,
|
||||
PingPeriod: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// WSClient is a JSON-RPC client, which uses WebSocket for communication with
|
||||
// the remote server.
|
||||
@@ -50,7 +61,7 @@ type WSClient struct { // nolint: maligned
|
||||
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
||||
|
||||
// Maximum reconnect attempts (0 or greater; default: 25).
|
||||
maxReconnectAttempts int
|
||||
maxReconnectAttempts uint
|
||||
|
||||
// Support both ws and wss protocols
|
||||
protocol string
|
||||
@@ -79,11 +90,15 @@ type WSClient struct { // nolint: maligned
|
||||
PingPongLatencyTimer metrics.Timer
|
||||
}
|
||||
|
||||
// NewWS returns a new client. See the commentary on the func(*WSClient)
|
||||
// functions for a detailed description of how to configure ping period and
|
||||
// pong wait time. The endpoint argument must begin with a `/`.
|
||||
// An error is returned on invalid remote. The function panics when remote is nil.
|
||||
func NewWS(remoteAddr, endpoint string, options ...func(*WSClient)) (*WSClient, error) {
|
||||
// NewWS returns a new client. The endpoint argument must begin with a `/`. An
|
||||
// error is returned on invalid remote.
|
||||
// It uses DefaultWSOptions.
|
||||
func NewWS(remoteAddr, endpoint string) (*WSClient, error) {
|
||||
return NewWSWithOptions(remoteAddr, endpoint, DefaultWSOptions())
|
||||
}
|
||||
|
||||
// NewWSWithOptions allows you to provide custom WSOptions.
|
||||
func NewWSWithOptions(remoteAddr, endpoint string, opts WSOptions) (*WSClient, error) {
|
||||
parsedURL, err := newParsedURL(remoteAddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -104,59 +119,23 @@ func NewWS(remoteAddr, endpoint string, options ...func(*WSClient)) (*WSClient,
|
||||
Endpoint: endpoint,
|
||||
PingPongLatencyTimer: metrics.NewTimer(),
|
||||
|
||||
maxReconnectAttempts: defaultMaxReconnectAttempts,
|
||||
readWait: defaultReadWait,
|
||||
writeWait: defaultWriteWait,
|
||||
pingPeriod: defaultPingPeriod,
|
||||
maxReconnectAttempts: opts.MaxReconnectAttempts,
|
||||
readWait: opts.ReadWait,
|
||||
writeWait: opts.WriteWait,
|
||||
pingPeriod: opts.PingPeriod,
|
||||
protocol: parsedURL.Scheme,
|
||||
|
||||
// sentIDs: make(map[types.JSONRPCIntID]bool),
|
||||
}
|
||||
c.BaseService = *service.NewBaseService(nil, "WSClient", c)
|
||||
for _, option := range options {
|
||||
option(c)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
|
||||
// It should only be used in the constructor and is not Goroutine-safe.
|
||||
func MaxReconnectAttempts(max int) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.maxReconnectAttempts = max
|
||||
}
|
||||
}
|
||||
|
||||
// ReadWait sets the amount of time to wait before a websocket read times out.
|
||||
// It should only be used in the constructor and is not Goroutine-safe.
|
||||
func ReadWait(readWait time.Duration) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.readWait = readWait
|
||||
}
|
||||
}
|
||||
|
||||
// WriteWait sets the amount of time to wait before a websocket write times out.
|
||||
// It should only be used in the constructor and is not Goroutine-safe.
|
||||
func WriteWait(writeWait time.Duration) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.writeWait = writeWait
|
||||
}
|
||||
}
|
||||
|
||||
// PingPeriod sets the duration for sending websocket pings.
|
||||
// It should only be used in the constructor - not Goroutine-safe.
|
||||
func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.pingPeriod = pingPeriod
|
||||
}
|
||||
}
|
||||
|
||||
// OnReconnect sets the callback, which will be called every time after
|
||||
// successful reconnect.
|
||||
func OnReconnect(cb func()) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.onReconnect = cb
|
||||
}
|
||||
// Could only be set before Start.
|
||||
func (c *WSClient) OnReconnect(cb func()) {
|
||||
c.onReconnect = cb
|
||||
}
|
||||
|
||||
// String returns WS client full address.
|
||||
@@ -275,7 +254,7 @@ func (c *WSClient) dial() error {
|
||||
// reconnect tries to redial up to maxReconnectAttempts with exponential
|
||||
// backoff.
|
||||
func (c *WSClient) reconnect() error {
|
||||
attempt := 0
|
||||
attempt := uint(0)
|
||||
|
||||
c.mtx.Lock()
|
||||
c.reconnecting = true
|
||||
@@ -288,7 +267,7 @@ func (c *WSClient) reconnect() error {
|
||||
|
||||
for {
|
||||
jitter := time.Duration(tmrand.Float64() * float64(time.Second)) // 1s == (1e9 ns)
|
||||
backoffDuration := jitter + ((1 << uint(attempt)) * time.Second)
|
||||
backoffDuration := jitter + ((1 << attempt) * time.Second)
|
||||
|
||||
c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
|
||||
time.Sleep(backoffDuration)
|
||||
|
||||
@@ -340,7 +340,7 @@ paths:
|
||||
import rpchttp "github.com/tendermint/rpc/client/http"
|
||||
import "github.com/tendermint/tendermint/types"
|
||||
|
||||
client := rpchttp.New("tcp:0.0.0.0:26657", "/websocket")
|
||||
client := rpchttp.New("tcp://0.0.0.0:26657")
|
||||
err := client.Start()
|
||||
if err != nil {
|
||||
handle error
|
||||
@@ -397,7 +397,7 @@ paths:
|
||||
operationId: unsubscribe
|
||||
description: |
|
||||
```go
|
||||
client := rpchttp.New("tcp:0.0.0.0:26657", "/websocket")
|
||||
client := rpchttp.New("tcp://0.0.0.0:26657")
|
||||
err := client.Start()
|
||||
if err != nil {
|
||||
handle error
|
||||
@@ -1221,7 +1221,7 @@ components:
|
||||
example: "5576458aef205977e18fd50b274e9b5d9014525a"
|
||||
listen_addr:
|
||||
type: string
|
||||
example: "tcp:0.0.0.0:26656"
|
||||
example: "tcp://0.0.0.0:26656"
|
||||
network:
|
||||
type: string
|
||||
example: "cosmoshub-2"
|
||||
@@ -1242,7 +1242,7 @@ components:
|
||||
example: "on"
|
||||
rpc_address:
|
||||
type: string
|
||||
example: "tcp:0.0.0.0:26657"
|
||||
example: "tcp://0.0.0.0:26657"
|
||||
SyncInfo:
|
||||
type: object
|
||||
properties:
|
||||
|
||||
@@ -194,7 +194,7 @@ func rpcClient(server string) (*rpchttp.HTTP, error) {
|
||||
if !strings.Contains(server, "://") {
|
||||
server = "http://" + server
|
||||
}
|
||||
c, err := rpchttp.New(server, "/websocket")
|
||||
c, err := rpchttp.New(server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -473,7 +473,7 @@ func (n Node) AddressRPC() string {
|
||||
|
||||
// Client returns an RPC client for a node.
|
||||
func (n Node) Client() (*rpchttp.HTTP, error) {
|
||||
return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort), "/websocket")
|
||||
return rpchttp.New(fmt.Sprintf("http://127.0.0.1:%v", n.ProxyPort))
|
||||
}
|
||||
|
||||
// keyGenerator generates pseudorandom Ed25519 keys based on a seed.
|
||||
|
||||
Reference in New Issue
Block a user