diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 49598e814..289d947a9 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "sync" "time" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/types" ) @@ -82,3 +84,86 @@ func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) ( return nil, errors.New("timed out waiting for event") } } + +var ( + // ErrClientRunning is returned by Start when the client is already running. + ErrClientRunning = errors.New("client already running") + + // ErrClientNotRunning is returned by Stop when the client is not running. + ErrClientNotRunning = errors.New("client is not running") +) + +// RunState is a helper that a client implementation can embed to implement +// common plumbing for keeping track of run state and logging. +// +// TODO(creachadair): This type is a temporary measure, and will be removed. +// See the discussion on #6971. +type RunState struct { + Logger log.Logger + + mu sync.Mutex + name string + isRunning bool + quit chan struct{} +} + +// NewRunState returns a new unstarted run state tracker with the given logging +// label and log sink. If logger == nil, a no-op logger is provided by default. +func NewRunState(name string, logger log.Logger) *RunState { + if logger == nil { + logger = log.NewNopLogger() + } + return &RunState{ + name: name, + Logger: logger, + } +} + +// Start sets the state to running, or reports an error. +func (r *RunState) Start() error { + r.mu.Lock() + defer r.mu.Unlock() + if r.isRunning { + r.Logger.Error("not starting client, it is already started", "client", r.name) + return ErrClientRunning + } + r.Logger.Info("starting client", "client", r.name) + r.isRunning = true + r.quit = make(chan struct{}) + return nil +} + +// Stop sets the state to not running, or reports an error. +func (r *RunState) Stop() error { + r.mu.Lock() + defer r.mu.Unlock() + if !r.isRunning { + r.Logger.Error("not stopping client; it is already stopped", "client", r.name) + return ErrClientNotRunning + } + r.Logger.Info("stopping client", "client", r.name) + r.isRunning = false + close(r.quit) + return nil +} + +// SetLogger updates the log sink. +func (r *RunState) SetLogger(logger log.Logger) { + r.mu.Lock() + defer r.mu.Unlock() + r.Logger = logger +} + +// IsRunning reports whether the state is running. +func (r *RunState) IsRunning() bool { + r.mu.Lock() + defer r.mu.Unlock() + return r.isRunning +} + +// Quit returns a channel that is closed when a call to Stop succeeds. +func (r *RunState) Quit() <-chan struct{} { + r.mu.Lock() + defer r.mu.Unlock() + return r.quit +} diff --git a/rpc/client/http/http.go b/rpc/client/http/http.go index b66d9634f..c3a0f379b 100644 --- a/rpc/client/http/http.go +++ b/rpc/client/http/http.go @@ -6,7 +6,6 @@ import ( "time" "github.com/tendermint/tendermint/libs/bytes" - "github.com/tendermint/tendermint/libs/log" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/coretypes" jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" @@ -158,11 +157,6 @@ func NewWithClientAndWSOptions(remote string, c *http.Client, wso WSOptions) (*H var _ rpcclient.Client = (*HTTP)(nil) -// SetLogger sets a logger. -func (c *HTTP) SetLogger(l log.Logger) { - c.wsEvents.SetLogger(l) -} - // Remote returns the remote network address in a string form. func (c *HTTP) Remote() string { return c.remote diff --git a/rpc/client/http/ws.go b/rpc/client/http/ws.go index 2e08257d5..1735ca9c3 100644 --- a/rpc/client/http/ws.go +++ b/rpc/client/http/ws.go @@ -10,14 +10,11 @@ import ( tmsync "github.com/tendermint/tendermint/internal/libs/sync" tmjson "github.com/tendermint/tendermint/libs/json" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" - "github.com/tendermint/tendermint/libs/service" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/coretypes" jsonrpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" ) -var errNotRunning = errors.New("client is not running. Use .Start() method to start") - // WSOptions for the WS part of the HTTP client. type WSOptions struct { Path string // path (e.g. "/ws") @@ -48,7 +45,7 @@ func (wso WSOptions) Validate() error { // wsEvents is a wrapper around WSClient, which implements EventsClient. type wsEvents struct { - service.BaseService + *rpcclient.RunState ws *jsonrpcclient.WSClient mtx tmsync.RWMutex @@ -78,7 +75,7 @@ func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) { w := &wsEvents{ subscriptions: make(map[string]*wsSubscription), } - w.BaseService = *service.NewBaseService(nil, "wsEvents", w) + w.RunState = rpcclient.NewRunState("wsEvents", nil) var err error w.ws, err = jsonrpcclient.NewWSWithOptions(remote, wso.Path, wso.WSOptions) @@ -94,23 +91,20 @@ func newWsEvents(remote string, wso WSOptions) (*wsEvents, error) { return w, nil } -// OnStart implements service.Service by starting WSClient and event loop. -func (w *wsEvents) OnStart() error { +// Start starts the websocket client and the event loop. +func (w *wsEvents) Start() error { if err := w.ws.Start(); err != nil { return err } - go w.eventListener() - return nil } -// OnStop implements service.Service by stopping WSClient. -func (w *wsEvents) OnStop() { - if err := w.ws.Stop(); err != nil { - w.Logger.Error("Can't stop ws client", "err", err) - } -} +// IsRunning reports whether the websocket client is running. +func (w *wsEvents) IsRunning() bool { return w.ws.IsRunning() } + +// Stop shuts down the websocket client. +func (w *wsEvents) Stop() error { return w.ws.Stop() } // Subscribe implements EventsClient by using WSClient to subscribe given // subscriber to query. By default, it returns a channel with cap=1. Error is @@ -128,7 +122,7 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { if !w.IsRunning() { - return nil, errNotRunning + return nil, rpcclient.ErrClientNotRunning } if err := w.ws.Subscribe(ctx, query); err != nil { @@ -156,7 +150,7 @@ func (w *wsEvents) Subscribe(ctx context.Context, subscriber, query string, // It returns an error if wsEvents is not running. func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { if !w.IsRunning() { - return errNotRunning + return rpcclient.ErrClientNotRunning } if err := w.ws.Unsubscribe(ctx, query); err != nil { @@ -182,7 +176,7 @@ func (w *wsEvents) Unsubscribe(ctx context.Context, subscriber, query string) er // It returns an error if wsEvents is not running. func (w *wsEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { if !w.IsRunning() { - return errNotRunning + return rpcclient.ErrClientNotRunning } if err := w.ws.UnsubscribeAll(ctx); err != nil { diff --git a/rpc/client/interface.go b/rpc/client/interface.go index f63161118..ae54928df 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -24,17 +24,26 @@ import ( "context" "github.com/tendermint/tendermint/libs/bytes" - "github.com/tendermint/tendermint/libs/service" ctypes "github.com/tendermint/tendermint/rpc/coretypes" "github.com/tendermint/tendermint/types" ) //go:generate ../../scripts/mockery_generate.sh Client -// Client wraps most important rpc calls a client would make if you want to -// listen for events, test if it also implements events.EventSwitch. +// Client describes the interface of Tendermint RPC client implementations. type Client interface { - service.Service + // These methods define the operational structure of the client. + + // Start the client. Start must report an error if the client is running. + Start() error + + // Stop the client. Stop must report an error if the client is not running. + Stop() error + + // IsRunning reports whether the client is running. + IsRunning() bool + + // These embedded interfaces define the callable methods of the service. ABCIClient EventsClient HistoryClient diff --git a/rpc/client/mock/client.go b/rpc/client/mock/client.go index 70003569d..b0f27ae88 100644 --- a/rpc/client/mock/client.go +++ b/rpc/client/mock/client.go @@ -20,7 +20,6 @@ import ( "github.com/tendermint/tendermint/internal/rpc/core" "github.com/tendermint/tendermint/libs/bytes" - "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/coretypes" rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types" @@ -29,15 +28,7 @@ import ( // Client wraps arbitrary implementations of the various interfaces. type Client struct { - client.ABCIClient - client.SignClient - client.HistoryClient - client.StatusClient - client.EventsClient - client.EvidenceClient - client.MempoolClient - service.Service - + client.Client env *core.Environment } diff --git a/rpc/client/mocks/client.go b/rpc/client/mocks/client.go index 3036c48f0..898f67aa6 100644 --- a/rpc/client/mocks/client.go +++ b/rpc/client/mocks/client.go @@ -10,8 +10,6 @@ import ( coretypes "github.com/tendermint/tendermint/rpc/coretypes" - log "github.com/tendermint/tendermint/libs/log" - mock "github.com/stretchr/testify/mock" types "github.com/tendermint/tendermint/types" @@ -542,74 +540,6 @@ func (_m *Client) NumUnconfirmedTxs(_a0 context.Context) (*coretypes.ResultUncon return r0, r1 } -// OnReset provides a mock function with given fields: -func (_m *Client) OnReset() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// OnStart provides a mock function with given fields: -func (_m *Client) OnStart() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// OnStop provides a mock function with given fields: -func (_m *Client) OnStop() { - _m.Called() -} - -// Quit provides a mock function with given fields: -func (_m *Client) Quit() <-chan struct{} { - ret := _m.Called() - - var r0 <-chan struct{} - if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(<-chan struct{}) - } - } - - return r0 -} - -// Reset provides a mock function with given fields: -func (_m *Client) Reset() error { - ret := _m.Called() - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// SetLogger provides a mock function with given fields: _a0 -func (_m *Client) SetLogger(_a0 log.Logger) { - _m.Called(_a0) -} - // Start provides a mock function with given fields: func (_m *Client) Start() error { ret := _m.Called() @@ -661,20 +591,6 @@ func (_m *Client) Stop() error { return r0 } -// String provides a mock function with given fields: -func (_m *Client) String() string { - ret := _m.Called() - - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - // Subscribe provides a mock function with given fields: ctx, subscriber, query, outCapacity func (_m *Client) Subscribe(ctx context.Context, subscriber string, query string, outCapacity ...int) (<-chan coretypes.ResultEvent, error) { _va := make([]interface{}, len(outCapacity)) @@ -824,8 +740,3 @@ func (_m *Client) Validators(ctx context.Context, height *int64, page *int, perP return r0, r1 } - -// Wait provides a mock function with given fields: -func (_m *Client) Wait() { - _m.Called() -} diff --git a/rpc/jsonrpc/client/ws_client.go b/rpc/jsonrpc/client/ws_client.go index f47186429..8530f32d9 100644 --- a/rpc/jsonrpc/client/ws_client.go +++ b/rpc/jsonrpc/client/ws_client.go @@ -14,7 +14,7 @@ import ( metrics "github.com/rcrowley/go-metrics" tmsync "github.com/tendermint/tendermint/internal/libs/sync" - "github.com/tendermint/tendermint/libs/service" + tmclient "github.com/tendermint/tendermint/rpc/client" types "github.com/tendermint/tendermint/rpc/jsonrpc/types" ) @@ -41,6 +41,7 @@ func DefaultWSOptions() WSOptions { // // WSClient is safe for concurrent use by multiple goroutines. type WSClient struct { // nolint: maligned + *tmclient.RunState conn *websocket.Conn Address string // IP:PORT or /path/to/socket @@ -83,8 +84,6 @@ type WSClient struct { // nolint: maligned // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent. pingPeriod time.Duration - service.BaseService - // Time between sending a ping and receiving a pong. See // https://godoc.org/github.com/rcrowley/go-metrics#Timer. PingPongLatencyTimer metrics.Timer @@ -114,6 +113,7 @@ func NewWSWithOptions(remoteAddr, endpoint string, opts WSOptions) (*WSClient, e } c := &WSClient{ + RunState: tmclient.NewRunState("WSClient", nil), Address: parsedURL.GetTrimmedHostWithPath(), Dialer: dialFn, Endpoint: endpoint, @@ -127,7 +127,6 @@ func NewWSWithOptions(remoteAddr, endpoint string, opts WSOptions) (*WSClient, e // sentIDs: make(map[types.JSONRPCIntID]bool), } - c.BaseService = *service.NewBaseService(nil, "WSClient", c) return c, nil } @@ -143,9 +142,11 @@ func (c *WSClient) String() string { return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint) } -// OnStart implements service.Service by dialing a server and creating read and -// write routines. -func (c *WSClient) OnStart() error { +// Start dials the specified service address and starts the I/O routines. +func (c *WSClient) Start() error { + if err := c.RunState.Start(); err != nil { + return err + } err := c.dial() if err != nil { return err @@ -167,10 +168,9 @@ func (c *WSClient) OnStart() error { return nil } -// Stop overrides service.Service#Stop. There is no other way to wait until Quit -// channel is closed. +// Stop shuts down the client. func (c *WSClient) Stop() error { - if err := c.BaseService.Stop(); err != nil { + if err := c.RunState.Stop(); err != nil { return err } // only close user-facing channels when we can't write to them diff --git a/rpc/jsonrpc/server/ws_handler.go b/rpc/jsonrpc/server/ws_handler.go index c6a8a99b8..63731799b 100644 --- a/rpc/jsonrpc/server/ws_handler.go +++ b/rpc/jsonrpc/server/ws_handler.go @@ -13,7 +13,7 @@ import ( "github.com/gorilla/websocket" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/coretypes" types "github.com/tendermint/tendermint/rpc/jsonrpc/types" ) @@ -86,8 +86,8 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ }() // register connection - con := newWSConnection(wsConn, wm.funcMap, wm.wsConnOptions...) - con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr())) + logger := wm.logger.With("remote", wsConn.RemoteAddr()) + con := newWSConnection(wsConn, wm.funcMap, logger, wm.wsConnOptions...) wm.logger.Info("New websocket connection", "remote", con.remoteAddr) err = con.Start() // BLOCKING if err != nil { @@ -106,7 +106,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ // // In case of an error, the connection is stopped. type wsConnection struct { - service.BaseService + *client.RunState remoteAddr string baseConn *websocket.Conn @@ -150,9 +150,11 @@ type wsConnection struct { func newWSConnection( baseConn *websocket.Conn, funcMap map[string]*RPCFunc, + logger log.Logger, options ...func(*wsConnection), ) *wsConnection { wsc := &wsConnection{ + RunState: client.NewRunState("wsConnection", logger), remoteAddr: baseConn.RemoteAddr().String(), baseConn: baseConn, funcMap: funcMap, @@ -166,7 +168,6 @@ func newWSConnection( option(wsc) } wsc.baseConn.SetReadLimit(wsc.readLimit) - wsc.BaseService = *service.NewBaseService(nil, "wsConnection", wsc) return wsc } @@ -218,9 +219,11 @@ func ReadLimit(readLimit int64) func(*wsConnection) { } } -// OnStart implements service.Service by starting the read and write routines. It -// blocks until there's some error. -func (wsc *wsConnection) OnStart() error { +// Start starts the client service routines and blocks until there is an error. +func (wsc *wsConnection) Start() error { + if err := wsc.RunState.Start(); err != nil { + return err + } wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) // Read subscriptions/unsubscriptions to events @@ -231,16 +234,18 @@ func (wsc *wsConnection) OnStart() error { return nil } -// OnStop implements service.Service by unsubscribing remoteAddr from all -// subscriptions. -func (wsc *wsConnection) OnStop() { +// Stop unsubscribes the remote from all subscriptions. +func (wsc *wsConnection) Stop() error { + if err := wsc.RunState.Stop(); err != nil { + return err + } if wsc.onDisconnect != nil { wsc.onDisconnect(wsc.remoteAddr) } - if wsc.ctx != nil { wsc.cancel() } + return nil } // GetRemoteAddr returns the remote address of the underlying connection.