Files
tendermint/rpc/jsonrpc/client/ws_client.go
M. J. Fromberger 75b1b1d6c5 rpc: simplify the handling of JSON-RPC request and response IDs (#7738)
* rpc: simplify the handling of JSON-RPC request and response IDs

Replace the ID wrapper interface with plain JSON. Internally, the client
libraries use only integer IDs, and the server does not care about the ID
structure apart from checking its validity.

Basic structure of this change:

- Remove the jsonrpcid interface and its helpers.
- Unexport the ID field of request and response.
- Add helpers for constructing requests and responses.
- Fix up usage and tests.
2022-01-31 12:11:42 -08:00

508 lines
14 KiB
Go

package client
import (
"context"
"encoding/json"
"fmt"
mrand "math/rand"
"net"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
metrics "github.com/rcrowley/go-metrics"
tmclient "github.com/tendermint/tendermint/rpc/client"
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
)
// wsOptions carries optional settings for a websocket connection.
type wsOptions struct {
MaxReconnectAttempts uint // maximum attempts to reconnect
ReadWait time.Duration // deadline for any read op
WriteWait time.Duration // deadline for any write op
PingPeriod time.Duration // frequency with which pings are sent
}
// defaultWSOptions are the default websocket connection settings.
var defaultWSOptions = wsOptions{
MaxReconnectAttempts: 10, // first: 2 sec, last: 17 min.
WriteWait: 10 * time.Second,
ReadWait: 0,
PingPeriod: 0,
}
// WSClient is a JSON-RPC client, which uses WebSocket for communication with
// the remote server.
//
// WSClient is safe for concurrent use by multiple goroutines.
type WSClient struct { // nolint: maligned
*tmclient.RunState
conn *websocket.Conn
Address string // IP:PORT or /path/to/socket
Endpoint string // /websocket/url/endpoint
Dialer func(string, string) (net.Conn, error)
// Single user facing channel to read RPCResponses from, closed only when the
// client is being stopped.
ResponsesCh chan rpctypes.RPCResponse
// Callback, which will be called each time after successful reconnect.
onReconnect func()
// internal channels
send chan rpctypes.RPCRequest // user requests
backlog chan rpctypes.RPCRequest // stores a single user request received during a conn failure
reconnectAfter chan error // reconnect requests
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
// Maximum reconnect attempts (0 or greater; default: 25).
maxReconnectAttempts uint
// Support both ws and wss protocols
protocol string
wg sync.WaitGroup
mtx sync.RWMutex
sentLastPingAt time.Time
reconnecting bool
nextReqID int
// sentIDs map[types.JSONRPCIntID]bool // IDs of the requests currently in flight
// Time allowed to write a message to the server. 0 means block until operation succeeds.
writeWait time.Duration
// Time allowed to read the next message from the server. 0 means block until operation succeeds.
readWait time.Duration
// Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
pingPeriod time.Duration
// Time between sending a ping and receiving a pong. See
// https://godoc.org/github.com/rcrowley/go-metrics#Timer.
PingPongLatencyTimer metrics.Timer
}
// NewWS returns a new client with default options. The endpoint argument must
// begin with a `/`. An error is returned on invalid remote.
func NewWS(remoteAddr, endpoint string) (*WSClient, error) {
opts := defaultWSOptions
parsedURL, err := newParsedURL(remoteAddr)
if err != nil {
return nil, err
}
// default to ws protocol, unless wss is explicitly specified
if parsedURL.Scheme != protoWSS {
parsedURL.Scheme = protoWS
}
dialFn, err := makeHTTPDialer(remoteAddr)
if err != nil {
return nil, err
}
c := &WSClient{
RunState: tmclient.NewRunState("WSClient", nil),
Address: parsedURL.GetTrimmedHostWithPath(),
Dialer: dialFn,
Endpoint: endpoint,
maxReconnectAttempts: opts.MaxReconnectAttempts,
readWait: opts.ReadWait,
writeWait: opts.WriteWait,
pingPeriod: opts.PingPeriod,
protocol: parsedURL.Scheme,
// sentIDs: make(map[types.JSONRPCIntID]bool),
}
c.PingPongLatencyTimer = metrics.NewTimer()
return c, nil
}
// OnReconnect sets the callback, which will be called every time after
// successful reconnect.
// Could only be set before Start.
func (c *WSClient) OnReconnect(cb func()) {
c.onReconnect = cb
}
// String returns WS client full address.
func (c *WSClient) String() string {
return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint)
}
// Start dials the specified service address and starts the I/O routines.
func (c *WSClient) Start(ctx context.Context) error {
if err := c.RunState.Start(ctx); err != nil {
return err
}
err := c.dial()
if err != nil {
return err
}
c.ResponsesCh = make(chan rpctypes.RPCResponse)
c.send = make(chan rpctypes.RPCRequest)
// 1 additional error may come from the read/write
// goroutine depending on which failed first.
c.reconnectAfter = make(chan error, 1)
// capacity for 1 request. a user won't be able to send more because the send
// channel is unbuffered.
c.backlog = make(chan rpctypes.RPCRequest, 1)
c.startReadWriteRoutines(ctx)
go c.reconnectRoutine(ctx)
return nil
}
// Stop shuts down the client.
func (c *WSClient) Stop() error {
if err := c.RunState.Stop(); err != nil {
return err
}
// only close user-facing channels when we can't write to them
c.wg.Wait()
c.PingPongLatencyTimer.Stop()
close(c.ResponsesCh)
return nil
}
// IsReconnecting returns true if the client is reconnecting right now.
func (c *WSClient) IsReconnecting() bool {
c.mtx.RLock()
defer c.mtx.RUnlock()
return c.reconnecting
}
// IsActive returns true if the client is running and not reconnecting.
func (c *WSClient) IsActive() bool {
return c.IsRunning() && !c.IsReconnecting()
}
// Send the given RPC request to the server. Results will be available on
// ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
// ctx.Done is closed.
func (c *WSClient) Send(ctx context.Context, request rpctypes.RPCRequest) error {
select {
case c.send <- request:
c.Logger.Info("sent a request", "req", request)
// c.mtx.Lock()
// c.sentIDs[request.ID.(types.JSONRPCIntID)] = true
// c.mtx.Unlock()
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Call enqueues a call request onto the Send queue. Requests are JSON encoded.
func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
req := rpctypes.NewRequest(c.nextRequestID())
if err := req.SetMethodAndParams(method, params); err != nil {
return err
}
return c.Send(ctx, req)
}
// Private methods
func (c *WSClient) nextRequestID() int {
c.mtx.Lock()
defer c.mtx.Unlock()
id := c.nextReqID
c.nextReqID++
return id
}
func (c *WSClient) dial() error {
dialer := &websocket.Dialer{
NetDial: c.Dialer,
Proxy: http.ProxyFromEnvironment,
}
rHeader := http.Header{}
conn, _, err := dialer.Dial(c.protocol+"://"+c.Address+c.Endpoint, rHeader) // nolint:bodyclose
if err != nil {
return err
}
c.conn = conn
return nil
}
// reconnect tries to redial up to maxReconnectAttempts with exponential
// backoff.
func (c *WSClient) reconnect(ctx context.Context) error {
attempt := uint(0)
c.mtx.Lock()
c.reconnecting = true
c.mtx.Unlock()
defer func() {
c.mtx.Lock()
c.reconnecting = false
c.mtx.Unlock()
}()
timer := time.NewTimer(0)
defer timer.Stop()
for {
// nolint:gosec // G404: Use of weak random number generator
jitter := time.Duration(mrand.Float64() * float64(time.Second)) // 1s == (1e9 ns)
backoffDuration := jitter + ((1 << attempt) * time.Second)
c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
timer.Reset(backoffDuration)
select {
case <-ctx.Done():
return nil
case <-timer.C:
}
err := c.dial()
if err != nil {
c.Logger.Error("failed to redial", "err", err)
} else {
c.Logger.Info("reconnected")
if c.onReconnect != nil {
go c.onReconnect()
}
return nil
}
attempt++
if attempt > c.maxReconnectAttempts {
return fmt.Errorf("reached maximum reconnect attempts: %w", err)
}
}
}
func (c *WSClient) startReadWriteRoutines(ctx context.Context) {
c.wg.Add(2)
c.readRoutineQuit = make(chan struct{})
go c.readRoutine(ctx)
go c.writeRoutine(ctx)
}
func (c *WSClient) processBacklog() error {
select {
case request := <-c.backlog:
if c.writeWait > 0 {
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
c.Logger.Error("failed to set write deadline", "err", err)
}
}
if err := c.conn.WriteJSON(request); err != nil {
c.Logger.Error("failed to resend request", "err", err)
c.reconnectAfter <- err
// requeue request
c.backlog <- request
return err
}
c.Logger.Info("resend a request", "req", request)
default:
}
return nil
}
func (c *WSClient) reconnectRoutine(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case originalError := <-c.reconnectAfter:
// wait until writeRoutine and readRoutine finish
c.wg.Wait()
if err := c.reconnect(ctx); err != nil {
c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
if err = c.Stop(); err != nil {
c.Logger.Error("failed to stop conn", "error", err)
}
return
}
// drain reconnectAfter
LOOP:
for {
select {
case <-ctx.Done():
return
case <-c.reconnectAfter:
default:
break LOOP
}
}
err := c.processBacklog()
if err == nil {
c.startReadWriteRoutines(ctx)
}
}
}
}
// The client ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *WSClient) writeRoutine(ctx context.Context) {
var ticker *time.Ticker
if c.pingPeriod > 0 {
// ticker with a predefined period
ticker = time.NewTicker(c.pingPeriod)
} else {
// ticker that never fires
ticker = &time.Ticker{C: make(<-chan time.Time)}
}
defer func() {
ticker.Stop()
c.conn.Close()
// err != nil {
// ignore error; it will trigger in tests
// likely because it's closing an already closed connection
// }
c.wg.Done()
}()
for {
select {
case request := <-c.send:
if c.writeWait > 0 {
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
c.Logger.Error("failed to set write deadline", "err", err)
}
}
if err := c.conn.WriteJSON(request); err != nil {
c.Logger.Error("failed to send request", "err", err)
c.reconnectAfter <- err
// add request to the backlog, so we don't lose it
c.backlog <- request
return
}
case <-ticker.C:
if c.writeWait > 0 {
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
c.Logger.Error("failed to set write deadline", "err", err)
}
}
if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
c.Logger.Error("failed to write ping", "err", err)
c.reconnectAfter <- err
return
}
c.mtx.Lock()
c.sentLastPingAt = time.Now()
c.mtx.Unlock()
case <-c.readRoutineQuit:
return
case <-ctx.Done():
if err := c.conn.WriteMessage(
websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
); err != nil {
c.Logger.Error("failed to write message", "err", err)
}
return
}
}
}
// The client ensures that there is at most one reader to a connection by
// executing all reads from this goroutine.
func (c *WSClient) readRoutine(ctx context.Context) {
defer func() {
c.conn.Close()
c.wg.Done()
}()
c.conn.SetPongHandler(func(string) error {
// gather latency stats
c.mtx.RLock()
t := c.sentLastPingAt
c.mtx.RUnlock()
c.PingPongLatencyTimer.UpdateSince(t)
return nil
})
for {
// reset deadline for every message type (control or data)
if c.readWait > 0 {
if err := c.conn.SetReadDeadline(time.Now().Add(c.readWait)); err != nil {
c.Logger.Error("failed to set read deadline", "err", err)
}
}
_, data, err := c.conn.ReadMessage()
if err != nil {
if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
return
}
c.Logger.Error("failed to read response", "err", err)
close(c.readRoutineQuit)
c.reconnectAfter <- err
return
}
var response rpctypes.RPCResponse
err = json.Unmarshal(data, &response)
if err != nil {
c.Logger.Error("failed to parse response", "err", err, "data", string(data))
continue
}
// TODO: events resulting from /subscribe do not work with ->
// because they are implemented as responses with the subscribe request's
// ID. According to the spec, they should be notifications (requests
// without IDs).
// https://github.com/tendermint/tendermint/issues/2949
// c.mtx.Lock()
// if _, ok := c.sentIDs[response.ID.(types.JSONRPCIntID)]; !ok {
// c.Logger.Error("unsolicited response ID", "id", response.ID, "expected", c.sentIDs)
// c.mtx.Unlock()
// continue
// }
// delete(c.sentIDs, response.ID.(types.JSONRPCIntID))
// c.mtx.Unlock()
// Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
// both readRoutine and writeRoutine
c.Logger.Info("got response", "id", response.ID, "result", response.Result)
select {
case <-ctx.Done():
return
case c.ResponsesCh <- response:
}
}
}
// Predefined methods
// Subscribe to a query. Note the server must have a "subscribe" route
// defined.
func (c *WSClient) Subscribe(ctx context.Context, query string) error {
params := map[string]interface{}{"query": query}
return c.Call(ctx, "subscribe", params)
}
// Unsubscribe from a query. Note the server must have a "unsubscribe" route
// defined.
func (c *WSClient) Unsubscribe(ctx context.Context, query string) error {
params := map[string]interface{}{"query": query}
return c.Call(ctx, "unsubscribe", params)
}
// UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
// defined.
func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
params := map[string]interface{}{}
return c.Call(ctx, "unsubscribe_all", params)
}