Files
tendermint/rpc/client/helpers.go
M. J. Fromberger 1995ef2572 rpc: Strip down the base RPC client interface. (#6971)
* rpc: Strip down the base RPC client interface.

Prior to this change, the RPC client interface requires implementing the entire
Service interface, but most of the methods of Service are not needed by the
concrete clients. Dissociate the Client interface from the Service interface.

- Extract only those methods of Service that are necessary to make the existing
  clients work.

- Update the clients to combine Start/Onstart and Stop/OnStop. This does not
  change what the clients do to start or stop. Only the websocket clients make
  use of this functionality anyway.

  The websocket implementation uses some plumbing from the BaseService helper.
  We should be able to excising that entirely, but the current interface
  dependencies among the clients would require a much larger change, and one
  that leaks into other (non-RPC) packages.

  As a less-invasive intermediate step, preserve the existing client behaviour
  (and tests) by extracting the necessary subset of the BaseService
  functionality to an analogous RunState helper for clients. I plan to obsolete
  that type in a future PR, but for now this makes a useful waypoint.

Related:
- Clean up client implementations.
- Update mocks.
2021-09-22 14:26:35 -07:00

170 lines
4.4 KiB
Go

package client
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
// Waiter is informed of current height, decided whether to quit early
type Waiter func(delta int64) (abort error)
// DefaultWaitStrategy is the standard backoff algorithm,
// but you can plug in another one
func DefaultWaitStrategy(delta int64) (abort error) {
if delta > 10 {
return fmt.Errorf("waiting for %d blocks... aborting", delta)
} else if delta > 0 {
// estimate of wait time....
// wait half a second for the next block (in progress)
// plus one second for every full block
delay := time.Duration(delta-1)*time.Second + 500*time.Millisecond
time.Sleep(delay)
}
return nil
}
// Wait for height will poll status at reasonable intervals until
// the block at the given height is available.
//
// If waiter is nil, we use DefaultWaitStrategy, but you can also
// provide your own implementation
func WaitForHeight(c StatusClient, h int64, waiter Waiter) error {
if waiter == nil {
waiter = DefaultWaitStrategy
}
delta := int64(1)
for delta > 0 {
s, err := c.Status(context.Background())
if err != nil {
return err
}
delta = h - s.SyncInfo.LatestBlockHeight
// wait for the time, or abort early
if err := waiter(delta); err != nil {
return err
}
}
return nil
}
// WaitForOneEvent subscribes to a websocket event for the given
// event time and returns upon receiving it one time, or
// when the timeout duration has expired.
//
// This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(c EventsClient, eventValue string, timeout time.Duration) (types.TMEventData, error) {
const subscriber = "helpers"
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// register for the next event of this type
eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(eventValue).String())
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %w", err)
}
// make sure to un-register after the test is over
defer func() {
if deferErr := c.UnsubscribeAll(ctx, subscriber); deferErr != nil {
panic(err)
}
}()
select {
case event := <-eventCh:
return event.Data.(types.TMEventData), nil
case <-ctx.Done():
return nil, errors.New("timed out waiting for event")
}
}
var (
// ErrClientRunning is returned by Start when the client is already running.
ErrClientRunning = errors.New("client already running")
// ErrClientNotRunning is returned by Stop when the client is not running.
ErrClientNotRunning = errors.New("client is not running")
)
// RunState is a helper that a client implementation can embed to implement
// common plumbing for keeping track of run state and logging.
//
// TODO(creachadair): This type is a temporary measure, and will be removed.
// See the discussion on #6971.
type RunState struct {
Logger log.Logger
mu sync.Mutex
name string
isRunning bool
quit chan struct{}
}
// NewRunState returns a new unstarted run state tracker with the given logging
// label and log sink. If logger == nil, a no-op logger is provided by default.
func NewRunState(name string, logger log.Logger) *RunState {
if logger == nil {
logger = log.NewNopLogger()
}
return &RunState{
name: name,
Logger: logger,
}
}
// Start sets the state to running, or reports an error.
func (r *RunState) Start() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.isRunning {
r.Logger.Error("not starting client, it is already started", "client", r.name)
return ErrClientRunning
}
r.Logger.Info("starting client", "client", r.name)
r.isRunning = true
r.quit = make(chan struct{})
return nil
}
// Stop sets the state to not running, or reports an error.
func (r *RunState) Stop() error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.isRunning {
r.Logger.Error("not stopping client; it is already stopped", "client", r.name)
return ErrClientNotRunning
}
r.Logger.Info("stopping client", "client", r.name)
r.isRunning = false
close(r.quit)
return nil
}
// SetLogger updates the log sink.
func (r *RunState) SetLogger(logger log.Logger) {
r.mu.Lock()
defer r.mu.Unlock()
r.Logger = logger
}
// IsRunning reports whether the state is running.
func (r *RunState) IsRunning() bool {
r.mu.Lock()
defer r.mu.Unlock()
return r.isRunning
}
// Quit returns a channel that is closed when a call to Stop succeeds.
func (r *RunState) Quit() <-chan struct{} {
r.mu.Lock()
defer r.mu.Unlock()
return r.quit
}