From 1995ef2572bbefaa3bede1cb49bdba4e53157c05 Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 22 Sep 2021 14:26:35 -0700 Subject: [PATCH] rpc: Strip down the base RPC client interface. (#6971) * rpc: Strip down the base RPC client interface. Prior to this change, the RPC client interface requires implementing the entire Service interface, but most of the methods of Service are not needed by the concrete clients. Dissociate the Client interface from the Service interface. - Extract only those methods of Service that are necessary to make the existing clients work. - Update the clients to combine Start/Onstart and Stop/OnStop. This does not change what the clients do to start or stop. Only the websocket clients make use of this functionality anyway. The websocket implementation uses some plumbing from the BaseService helper. We should be able to excising that entirely, but the current interface dependencies among the clients would require a much larger change, and one that leaks into other (non-RPC) packages. As a less-invasive intermediate step, preserve the existing client behaviour (and tests) by extracting the necessary subset of the BaseService functionality to an analogous RunState helper for clients. I plan to obsolete that type in a future PR, but for now this makes a useful waypoint. Related: - Clean up client implementations. - Update mocks. --- rpc/client/helpers.go | 85 ++++++++++++++++++++++++++++++ rpc/client/http/http.go | 6 --- rpc/client/http/ws.go | 30 +++++------ rpc/client/interface.go | 17 ++++-- rpc/client/mock/client.go | 11 +--- rpc/client/mocks/client.go | 89 -------------------------------- rpc/jsonrpc/client/ws_client.go | 20 +++---- rpc/jsonrpc/server/ws_handler.go | 29 ++++++----- 8 files changed, 138 insertions(+), 149 deletions(-) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 49598e814..289d947a9 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "sync" "time" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) @@ -82,3 +84,86 @@ func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) ( return nil, errors.New("timed out waiting for event") } } + +var ( + // ErrClientRunning is returned by Start when the client is already running. + ErrClientRunning = errors.New("client already running") + + // ErrClientNotRunning is returned by Stop when the client is not running. + ErrClientNotRunning = errors.New("client is not running") +) + +// RunState is a helper that a client implementation can embed to implement +// common plumbing for keeping track of run state and logging. +// +// TODO(creachadair): This type is a temporary measure, and will be removed. +// See the discussion on #6971. +type RunState struct { + Logger log.Logger + + mu sync.Mutex + name string + isRunning bool + quit chan struct{} +} + +// NewRunState returns a new unstarted run state tracker with the given logging +// label and log sink. If logger == nil, a no-op logger is provided by default. +func NewRunState(name string, logger log.Logger) *RunState { + if logger == nil { + logger = log.NewNopLogger() + } + return &RunState{ + name: name, + Logger: logger, + } +} + +// Start sets the state to running, or reports an error. +func (r *RunState) Start() error { + r.mu.Lock() + defer r.mu.Unlock() + if r.isRunning { + r.Logger.Error("not starting client, it is already started", "client", r.name) + return ErrClientRunning + } + r.Logger.Info("starting client", "client", r.name) + r.isRunning = true + r.quit = make(chan struct{}) + return nil +} + +// Stop sets the state to not running, or reports an error. +func (r *RunState) Stop() error { + r.mu.Lock() + defer r.mu.Unlock() + if !r.isRunning { + r.Logger.Error("not stopping client; it is already stopped", "client", r.name) + return ErrClientNotRunning + } + r.Logger.Info("stopping client", "client", r.name) + r.isRunning = false + close(r.quit) + return nil +} + +// SetLogger updates the log sink. +func (r *RunState) SetLogger(logger log.Logger) { + r.mu.Lock() + defer r.mu.Unlock() + r.Logger = logger +} + +// IsRunning reports whether the state is running. +func (r *RunState) IsRunning() bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.isRunning +} + +// Quit returns a channel that is closed when a call to Stop succeeds. +func (r *RunState) Quit() <-chan struct{} { + r.mu.Lock() + defer r.mu.Unlock() + return r.quit +} diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index b66d9634f..c3a0f379b 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -6,7 +6,6 @@ import ( "time" "github.com/tendermint/tendermint/libs/bytes" - "github.com/tendermint/tendermint/libs/log" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/coretypes" jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" @@ -158,11 +157,6 @@ func NewWithClientAndWSOptions(remote string, c *http.Client, wso WSOptions) (*H var _ rpcclient.Client = (*HTTP)(nil) -// SetLogger sets a logger. -func (c *HTTP) SetLogger(l log.Logger) { - c.wsEvents.SetLogger(l) -} - // Remote returns the remote network address in a string form. func (c *HTTP) Remote() string { return c.remote diff --git a/rpc/client/http/ws.go b/rpc/client/http/ws.go index 2e08257d5..1735ca9c3 100644 --- a/rpc/client/http/ws.go +++ b/rpc/client/http/ws.go @@ -10,14 +10,11 @@ import ( tmsync "github.com/tendermint/tendermint/internal/libs/sync" tmjson "github.com/tendermint/tendermint/libs/json" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" - "github.com/tendermint/tendermint/libs/service" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/coretypes" jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" ) -var errNotRunning = errors.New("client is not running. Use .Start() method to start") - // WSOptions for the WS part of the HTTP client. type WSOptions struct { Path string // path (e.g. "/ws") @@ -48,7 +45,7 @@ func (wso WSOptions) Validate() error { // wsEvents is a wrapper around WSClient, which implements EventsClient. type wsEvents struct { - service.BaseService + *rpcclient.RunState ws *jsonrpcclient.WSClient mtx tmsync.RWMutex @@ -78,7 +75,7 @@ func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) { w := &wsEvents{ subscriptions: make(map[string]*wsSubscription), } - w.BaseService = *service.NewBaseService(nil, "wsEvents", w) + w.RunState = rpcclient.NewRunState("wsEvents", nil) var err error w.ws, err = jsonrpcclient.NewWSWithOptions(remote, wso.Path, wso.WSOptions) @@ -94,23 +91,20 @@ func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) { return w, nil } -// OnStart implements service.Service by starting WSClient and event loop. -func (w *wsEvents) OnStart() error { +// Start starts the websocket client and the event loop. +func (w *wsEvents) Start() error { if err := w.ws.Start(); err != nil { return err } - go w.eventListener() - return nil } -// OnStop implements service.Service by stopping WSClient. -func (w *wsEvents) OnStop() { - if err := w.ws.Stop(); err != nil { - w.Logger.Error("Can't stop ws client", "err", err) - } -} +// IsRunning reports whether the websocket client is running. +func (w *wsEvents) IsRunning() bool { return w.ws.IsRunning() } + +// Stop shuts down the websocket client. +func (w *wsEvents) Stop() error { return w.ws.Stop() } // Subscribe implements EventsClient by using WSClient to subscribe given // subscriber to query. By default, it returns a channel with cap=1. Error is @@ -128,7 +122,7 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { if !w.IsRunning() { - return nil, errNotRunning + return nil, rpcclient.ErrClientNotRunning } if err := w.ws.Subscribe(ctx, query); err != nil { @@ -156,7 +150,7 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, // It returns an error if wsEvents is not running. func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { if !w.IsRunning() { - return errNotRunning + return rpcclient.ErrClientNotRunning } if err := w.ws.Unsubscribe(ctx, query); err != nil { @@ -182,7 +176,7 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er // It returns an error if wsEvents is not running. func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { if !w.IsRunning() { - return errNotRunning + return rpcclient.ErrClientNotRunning } if err := w.ws.UnsubscribeAll(ctx); err != nil { diff --git a/rpc/client/interface.go b/rpc/client/interface.go index f63161118..ae54928df 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -24,17 +24,26 @@ import ( "context" "github.com/tendermint/tendermint/libs/bytes" - "github.com/tendermint/tendermint/libs/service" ctypes "github.com/tendermint/tendermint/rpc/coretypes" "github.com/tendermint/tendermint/types" ) //go:generate ../../scripts/mockery_generate.sh Client -// Client wraps most important rpc calls a client would make if you want to -// listen for events, test if it also implements events.EventSwitch. +// Client describes the interface of Tendermint RPC client implementations. type Client interface { - service.Service + // These methods define the operational structure of the client. + + // Start the client. Start must report an error if the client is running. + Start() error + + // Stop the client. Stop must report an error if the client is not running. + Stop() error + + // IsRunning reports whether the client is running. + IsRunning() bool + + // These embedded interfaces define the callable methods of the service. ABCIClient EventsClient HistoryClient diff --git a/rpc/client/mock/client.go b/rpc/client/mock/client.go index 70003569d..b0f27ae88 100644 --- a/rpc/client/mock/client.go +++ b/rpc/client/mock/client.go @@ -20,7 +20,6 @@ import ( "github.com/tendermint/tendermint/internal/rpc/core" "github.com/tendermint/tendermint/libs/bytes" - "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/coretypes" rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" @@ -29,15 +28,7 @@ import ( // Client wraps arbitrary implementations of the various interfaces. type Client struct { - client.ABCIClient - client.SignClient - client.HistoryClient - client.StatusClient - client.EventsClient - client.EvidenceClient - client.MempoolClient - service.Service - + client.Client env *core.Environment } diff --git a/rpc/client/mocks/client.go b/rpc/client/mocks/client.go index 3036c48f0..898f67aa6 100644 --- a/rpc/client/mocks/client.go +++ b/rpc/client/mocks/client.go @@ -10,8 +10,6 @@ import ( coretypes "github.com/tendermint/tendermint/rpc/coretypes" - log "github.com/tendermint/tendermint/libs/log" - mock "github.com/stretchr/testify/mock" types "github.com/tendermint/tendermint/types" @@ -542,74 +540,6 @@ func (_m *Client) NumUnconfirmedTxs(_a0 context.Context) (*coretypes.ResultUncon return r0, r1 } -// OnReset provides a mock function with given fields: -func (_m *Client) OnReset() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// OnStart provides a mock function with given fields: -func (_m *Client) OnStart() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// OnStop provides a mock function with given fields: -func (_m *Client) OnStop() { - _m.Called() -} - -// Quit provides a mock function with given fields: -func (_m *Client) Quit() <-chan struct{} { - ret := _m.Called() - - var r0 <-chan struct{} - if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan struct{}) - } - } - - return r0 -} - -// Reset provides a mock function with given fields: -func (_m *Client) Reset() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// SetLogger provides a mock function with given fields: _a0 -func (_m *Client) SetLogger(_a0 log.Logger) { - _m.Called(_a0) -} - // Start provides a mock function with given fields: func (_m *Client) Start() error { ret := _m.Called() @@ -661,20 +591,6 @@ func (_m *Client) Stop() error { return r0 } -// String provides a mock function with given fields: -func (_m *Client) String() string { - ret := _m.Called() - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - // Subscribe provides a mock function with given fields: ctx, subscriber, query, outCapacity func (_m *Client) Subscribe(ctx context.Context, subscriber string, query string, outCapacity ...int) (<-chan coretypes.ResultEvent, error) { _va := make([]interface{}, len(outCapacity)) @@ -824,8 +740,3 @@ func (_m *Client) Validators(ctx context.Context, height *int64, page *int, perP return r0, r1 } - -// Wait provides a mock function with given fields: -func (_m *Client) Wait() { - _m.Called() -} diff --git a/rpc/jsonrpc/client/ws_client.go b/rpc/jsonrpc/client/ws_client.go index f47186429..8530f32d9 100644 --- a/rpc/jsonrpc/client/ws_client.go +++ b/rpc/jsonrpc/client/ws_client.go @@ -14,7 +14,7 @@ import ( metrics "github.com/rcrowley/go-metrics" tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/libs/service" + tmclient "github.com/tendermint/tendermint/rpc/client" types "github.com/tendermint/tendermint/rpc/jsonrpc/types" ) @@ -41,6 +41,7 @@ func DefaultWSOptions() WSOptions { // // WSClient is safe for concurrent use by multiple goroutines. type WSClient struct { // nolint: maligned + *tmclient.RunState conn *websocket.Conn Address string // IP:PORT or /path/to/socket @@ -83,8 +84,6 @@ type WSClient struct { // nolint: maligned // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent. pingPeriod time.Duration - service.BaseService - // Time between sending a ping and receiving a pong. See // https://godoc.org/github.com/rcrowley/go-metrics#Timer. PingPongLatencyTimer metrics.Timer @@ -114,6 +113,7 @@ func NewWSWithOptions(remoteAddr, endpoint string, opts WSOptions) (*WSClient, e } c := &WSClient{ + RunState: tmclient.NewRunState("WSClient", nil), Address: parsedURL.GetTrimmedHostWithPath(), Dialer: dialFn, Endpoint: endpoint, @@ -127,7 +127,6 @@ func NewWSWithOptions(remoteAddr, endpoint string, opts WSOptions) (*WSClient, e // sentIDs: make(map[types.JSONRPCIntID]bool), } - c.BaseService = *service.NewBaseService(nil, "WSClient", c) return c, nil } @@ -143,9 +142,11 @@ func (c *WSClient) String() string { return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint) } -// OnStart implements service.Service by dialing a server and creating read and -// write routines. -func (c *WSClient) OnStart() error { +// Start dials the specified service address and starts the I/O routines. +func (c *WSClient) Start() error { + if err := c.RunState.Start(); err != nil { + return err + } err := c.dial() if err != nil { return err @@ -167,10 +168,9 @@ func (c *WSClient) OnStart() error { return nil } -// Stop overrides service.Service#Stop. There is no other way to wait until Quit -// channel is closed. +// Stop shuts down the client. func (c *WSClient) Stop() error { - if err := c.BaseService.Stop(); err != nil { + if err := c.RunState.Stop(); err != nil { return err } // only close user-facing channels when we can't write to them diff --git a/rpc/jsonrpc/server/ws_handler.go b/rpc/jsonrpc/server/ws_handler.go index c6a8a99b8..63731799b 100644 --- a/rpc/jsonrpc/server/ws_handler.go +++ b/rpc/jsonrpc/server/ws_handler.go @@ -13,7 +13,7 @@ import ( "github.com/gorilla/websocket" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/coretypes" types "github.com/tendermint/tendermint/rpc/jsonrpc/types" ) @@ -86,8 +86,8 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ }() // register connection - con := newWSConnection(wsConn, wm.funcMap, wm.wsConnOptions...) - con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) + logger := wm.logger.With("remote", wsConn.RemoteAddr()) + con := newWSConnection(wsConn, wm.funcMap, logger, wm.wsConnOptions...) wm.logger.Info("New websocket connection", "remote", con.remoteAddr) err = con.Start() // BLOCKING if err != nil { @@ -106,7 +106,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ // // In case of an error, the connection is stopped. type wsConnection struct { - service.BaseService + *client.RunState remoteAddr string baseConn *websocket.Conn @@ -150,9 +150,11 @@ type wsConnection struct { func newWSConnection( baseConn *websocket.Conn, funcMap map[string]*RPCFunc, + logger log.Logger, options ...func(*wsConnection), ) *wsConnection { wsc := &wsConnection{ + RunState: client.NewRunState("wsConnection", logger), remoteAddr: baseConn.RemoteAddr().String(), baseConn: baseConn, funcMap: funcMap, @@ -166,7 +168,6 @@ func newWSConnection( option(wsc) } wsc.baseConn.SetReadLimit(wsc.readLimit) - wsc.BaseService = *service.NewBaseService(nil, "wsConnection", wsc) return wsc } @@ -218,9 +219,11 @@ func ReadLimit(readLimit int64) func(*wsConnection) { } } -// OnStart implements service.Service by starting the read and write routines. It -// blocks until there's some error. -func (wsc *wsConnection) OnStart() error { +// Start starts the client service routines and blocks until there is an error. +func (wsc *wsConnection) Start() error { + if err := wsc.RunState.Start(); err != nil { + return err + } wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) // Read subscriptions/unsubscriptions to events @@ -231,16 +234,18 @@ func (wsc *wsConnection) OnStart() error { return nil } -// OnStop implements service.Service by unsubscribing remoteAddr from all -// subscriptions. -func (wsc *wsConnection) OnStop() { +// Stop unsubscribes the remote from all subscriptions. +func (wsc *wsConnection) Stop() error { + if err := wsc.RunState.Stop(); err != nil { + return err + } if wsc.onDisconnect != nil { wsc.onDisconnect(wsc.remoteAddr) } - if wsc.ctx != nil { wsc.cancel() } + return nil } // GetRemoteAddr returns the remote address of the underlying connection.