Files
tendermint/abci/client/socket_client.go
William Banfield 7854842f07 abci: revert buffer limit change (#7990)
This change changes the ABCI socket client to allow goroutines to block writing to the internal queue. This has the effect ensuring that callers of the ABCI methods do not error on a full internal queue at the expense of allowing the number of goroutines waiting on this internal queue to grow in an unbounded fashion. This tradeoff seems preferable since it allows callers of the ABCI methods to be certain that a request that was made will reach the application if it is available.

This change was initially implemented here: e13b4386ff and never landed on v0.34, only v0.35+
2022-02-25 15:07:28 -08:00

583 lines
16 KiB
Go

package abciclient
import (
"bufio"
"container/list"
"context"
"errors"
"fmt"
"io"
"net"
"reflect"
"time"
"github.com/tendermint/tendermint/abci/types"
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
tmnet "github.com/tendermint/tendermint/libs/net"
"github.com/tendermint/tendermint/libs/service"
)
const (
// reqQueueSize is the max number of queued async requests.
// (memory: 256MB max assuming 1MB transactions)
reqQueueSize = 256
)
type reqResWithContext struct {
R *ReqRes
C context.Context // if context.Err is not nil, reqRes will be thrown away (ignored)
}
// This is goroutine-safe, but users should beware that the application in
// general is not meant to be interfaced with concurrent callers.
type socketClient struct {
service.BaseService
addr string
mustConnect bool
conn net.Conn
reqQueue chan *reqResWithContext
mtx tmsync.Mutex
err error
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.
}
var _ Client = (*socketClient)(nil)
// NewSocketClient creates a new socket client, which connects to a given
// address. If mustConnect is true, the client will return an error upon start
// if it fails to connect.
func NewSocketClient(addr string, mustConnect bool) Client {
cli := &socketClient{
reqQueue: make(chan *reqResWithContext, reqQueueSize),
mustConnect: mustConnect,
addr: addr,
reqSent: list.New(),
resCb: nil,
}
cli.BaseService = *service.NewBaseService(nil, "socketClient", cli)
return cli
}
// OnStart implements Service by connecting to the server and spawning reading
// and writing goroutines.
func (cli *socketClient) OnStart() error {
var (
err error
conn net.Conn
)
for {
conn, err = tmnet.Connect(cli.addr)
if err != nil {
if cli.mustConnect {
return err
}
cli.Logger.Error(fmt.Sprintf("abci.socketClient failed to connect to %v. Retrying after %vs...",
cli.addr, dialRetryIntervalSeconds), "err", err)
time.Sleep(time.Second * dialRetryIntervalSeconds)
continue
}
cli.conn = conn
go cli.sendRequestsRoutine(conn)
go cli.recvResponseRoutine(conn)
return nil
}
}
// OnStop implements Service by closing connection and flushing all queues.
func (cli *socketClient) OnStop() {
if cli.conn != nil {
cli.conn.Close()
}
cli.drainQueue()
}
// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}
// SetResponseCallback sets a callback, which will be executed for each
// non-error & non-empty response from the server.
//
// NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
cli.resCb = resCb
cli.mtx.Unlock()
}
//----------------------------------------
func (cli *socketClient) sendRequestsRoutine(conn io.Writer) {
bw := bufio.NewWriter(conn)
for {
select {
case reqres := <-cli.reqQueue:
if reqres.C.Err() != nil {
cli.Logger.Debug("Request's context is done", "req", reqres.R, "err", reqres.C.Err())
continue
}
cli.willSendReq(reqres.R)
if err := types.WriteMessage(reqres.R.Request, bw); err != nil {
cli.stopForError(fmt.Errorf("write to buffer: %w", err))
return
}
if err := bw.Flush(); err != nil {
cli.stopForError(fmt.Errorf("flush buffer: %w", err))
return
}
case <-cli.Quit():
return
}
}
}
func (cli *socketClient) recvResponseRoutine(conn io.Reader) {
r := bufio.NewReader(conn)
for {
var res = &types.Response{}
err := types.ReadMessage(r, res)
if err != nil {
cli.stopForError(fmt.Errorf("read message: %w", err))
return
}
// cli.Logger.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
switch r := res.Value.(type) {
case *types.Response_Exception: // app responded with error
// XXX After setting cli.err, release waiters (e.g. reqres.Done())
cli.stopForError(errors.New(r.Exception.Error))
return
default:
err := cli.didRecvResponse(res)
if err != nil {
cli.stopForError(err)
return
}
}
}
}
func (cli *socketClient) willSendReq(reqres *ReqRes) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.reqSent.PushBack(reqres)
}
func (cli *socketClient) didRecvResponse(res *types.Response) error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// Get the first ReqRes.
next := cli.reqSent.Front()
if next == nil {
return fmt.Errorf("unexpected %v when nothing expected", reflect.TypeOf(res.Value))
}
reqres := next.Value.(*ReqRes)
if !resMatchesReq(reqres.Request, res) {
return fmt.Errorf("unexpected %v when response to %v expected",
reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
}
reqres.Response = res
reqres.Done() // release waiters
cli.reqSent.Remove(next) // pop first item from linked list
// Notify client listener if set (global callback).
if cli.resCb != nil {
cli.resCb(reqres.Request, res)
}
// Notify reqRes listener if set (request specific callback).
//
// NOTE: It is possible this callback isn't set on the reqres object. At this
// point, in which case it will be called after, when it is set.
reqres.InvokeCallback()
return nil
}
//----------------------------------------
func (cli *socketClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestEcho(msg))
}
func (cli *socketClient) FlushAsync(ctx context.Context) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestFlush())
}
func (cli *socketClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestInfo(req))
}
func (cli *socketClient) DeliverTxAsync(ctx context.Context, req types.RequestDeliverTx) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestDeliverTx(req))
}
func (cli *socketClient) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestCheckTx(req))
}
func (cli *socketClient) QueryAsync(ctx context.Context, req types.RequestQuery) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestQuery(req))
}
func (cli *socketClient) CommitAsync(ctx context.Context) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestCommit())
}
func (cli *socketClient) InitChainAsync(ctx context.Context, req types.RequestInitChain) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestInitChain(req))
}
func (cli *socketClient) BeginBlockAsync(ctx context.Context, req types.RequestBeginBlock) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestBeginBlock(req))
}
func (cli *socketClient) EndBlockAsync(ctx context.Context, req types.RequestEndBlock) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestEndBlock(req))
}
func (cli *socketClient) ListSnapshotsAsync(ctx context.Context, req types.RequestListSnapshots) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestListSnapshots(req))
}
func (cli *socketClient) OfferSnapshotAsync(ctx context.Context, req types.RequestOfferSnapshot) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestOfferSnapshot(req))
}
func (cli *socketClient) LoadSnapshotChunkAsync(
ctx context.Context,
req types.RequestLoadSnapshotChunk,
) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestLoadSnapshotChunk(req))
}
func (cli *socketClient) ApplySnapshotChunkAsync(
ctx context.Context,
req types.RequestApplySnapshotChunk,
) (*ReqRes, error) {
return cli.queueRequestAsync(ctx, types.ToRequestApplySnapshotChunk(req))
}
//----------------------------------------
func (cli *socketClient) FlushSync(ctx context.Context) error {
reqRes, err := cli.queueRequest(ctx, types.ToRequestFlush())
if err != nil {
return queueErr(err)
}
if err := cli.Error(); err != nil {
return err
}
gotResp := make(chan struct{})
go func() {
// NOTE: if we don't flush the queue, its possible to get stuck here
reqRes.Wait()
close(gotResp)
}()
select {
case <-gotResp:
return cli.Error()
case <-ctx.Done():
return ctx.Err()
}
}
func (cli *socketClient) EchoSync(ctx context.Context, msg string) (*types.ResponseEcho, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestEcho(msg))
if err != nil {
return nil, err
}
return reqres.Response.GetEcho(), nil
}
func (cli *socketClient) InfoSync(
ctx context.Context,
req types.RequestInfo,
) (*types.ResponseInfo, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestInfo(req))
if err != nil {
return nil, err
}
return reqres.Response.GetInfo(), nil
}
func (cli *socketClient) DeliverTxSync(
ctx context.Context,
req types.RequestDeliverTx,
) (*types.ResponseDeliverTx, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestDeliverTx(req))
if err != nil {
return nil, err
}
return reqres.Response.GetDeliverTx(), nil
}
func (cli *socketClient) CheckTxSync(
ctx context.Context,
req types.RequestCheckTx,
) (*types.ResponseCheckTx, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestCheckTx(req))
if err != nil {
return nil, err
}
return reqres.Response.GetCheckTx(), nil
}
func (cli *socketClient) QuerySync(
ctx context.Context,
req types.RequestQuery,
) (*types.ResponseQuery, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestQuery(req))
if err != nil {
return nil, err
}
return reqres.Response.GetQuery(), nil
}
func (cli *socketClient) CommitSync(ctx context.Context) (*types.ResponseCommit, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestCommit())
if err != nil {
return nil, err
}
return reqres.Response.GetCommit(), nil
}
func (cli *socketClient) InitChainSync(
ctx context.Context,
req types.RequestInitChain,
) (*types.ResponseInitChain, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestInitChain(req))
if err != nil {
return nil, err
}
return reqres.Response.GetInitChain(), nil
}
func (cli *socketClient) BeginBlockSync(
ctx context.Context,
req types.RequestBeginBlock,
) (*types.ResponseBeginBlock, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestBeginBlock(req))
if err != nil {
return nil, err
}
return reqres.Response.GetBeginBlock(), nil
}
func (cli *socketClient) EndBlockSync(
ctx context.Context,
req types.RequestEndBlock,
) (*types.ResponseEndBlock, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestEndBlock(req))
if err != nil {
return nil, err
}
return reqres.Response.GetEndBlock(), nil
}
func (cli *socketClient) ListSnapshotsSync(
ctx context.Context,
req types.RequestListSnapshots,
) (*types.ResponseListSnapshots, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestListSnapshots(req))
if err != nil {
return nil, err
}
return reqres.Response.GetListSnapshots(), nil
}
func (cli *socketClient) OfferSnapshotSync(
ctx context.Context,
req types.RequestOfferSnapshot,
) (*types.ResponseOfferSnapshot, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestOfferSnapshot(req))
if err != nil {
return nil, err
}
return reqres.Response.GetOfferSnapshot(), nil
}
func (cli *socketClient) LoadSnapshotChunkSync(
ctx context.Context,
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestLoadSnapshotChunk(req))
if err != nil {
return nil, err
}
return reqres.Response.GetLoadSnapshotChunk(), nil
}
func (cli *socketClient) ApplySnapshotChunkSync(
ctx context.Context,
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
reqres, err := cli.queueRequestAndFlushSync(ctx, types.ToRequestApplySnapshotChunk(req))
if err != nil {
return nil, err
}
return reqres.Response.GetApplySnapshotChunk(), nil
}
//----------------------------------------
// queueRequest enqueues req onto the queue. The request can break early if the
// the context is canceled. If the queue is full, this method blocks to allow
// the request to be placed onto the queue. This has the effect of creating an
// unbounded queue of goroutines waiting to write to this queue which is a bit
// antithetical to the purposes of a queue, however, undoing this behavior has
// dangerous upstream implications as a result of the usage of this behavior upstream.
// Remove at your peril.
//
// The caller is responsible for checking cli.Error.
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) {
reqres := NewReqRes(req)
select {
case cli.reqQueue <- &reqResWithContext{R: reqres, C: ctx}:
case <-ctx.Done():
return nil, ctx.Err()
}
return reqres, nil
}
func (cli *socketClient) queueRequestAsync(
ctx context.Context,
req *types.Request,
) (*ReqRes, error) {
reqres, err := cli.queueRequest(ctx, req)
if err != nil {
return nil, queueErr(err)
}
return reqres, cli.Error()
}
func (cli *socketClient) queueRequestAndFlushSync(
ctx context.Context,
req *types.Request,
) (*ReqRes, error) {
reqres, err := cli.queueRequest(ctx, req)
if err != nil {
return nil, queueErr(err)
}
if err := cli.FlushSync(ctx); err != nil {
return nil, err
}
return reqres, cli.Error()
}
func queueErr(e error) error {
return fmt.Errorf("can't queue req: %w", e)
}
// drainQueue marks as complete and discards all remaining pending requests
// from the queue.
func (cli *socketClient) drainQueue() {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// mark all in-flight messages as resolved (they will get cli.Error())
for req := cli.reqSent.Front(); req != nil; req = req.Next() {
reqres := req.Value.(*ReqRes)
reqres.Done()
}
// Mark all queued messages as resolved.
//
// TODO(creachadair): We can't simply range the channel, because it is never
// closed, and the writer continues to add work.
// See https://github.com/tendermint/tendermint/issues/6996.
for {
select {
case reqres := <-cli.reqQueue:
reqres.R.Done()
default:
return
}
}
}
//----------------------------------------
func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
switch req.Value.(type) {
case *types.Request_Echo:
_, ok = res.Value.(*types.Response_Echo)
case *types.Request_Flush:
_, ok = res.Value.(*types.Response_Flush)
case *types.Request_Info:
_, ok = res.Value.(*types.Response_Info)
case *types.Request_DeliverTx:
_, ok = res.Value.(*types.Response_DeliverTx)
case *types.Request_CheckTx:
_, ok = res.Value.(*types.Response_CheckTx)
case *types.Request_Commit:
_, ok = res.Value.(*types.Response_Commit)
case *types.Request_Query:
_, ok = res.Value.(*types.Response_Query)
case *types.Request_InitChain:
_, ok = res.Value.(*types.Response_InitChain)
case *types.Request_BeginBlock:
_, ok = res.Value.(*types.Response_BeginBlock)
case *types.Request_EndBlock:
_, ok = res.Value.(*types.Response_EndBlock)
case *types.Request_ApplySnapshotChunk:
_, ok = res.Value.(*types.Response_ApplySnapshotChunk)
case *types.Request_LoadSnapshotChunk:
_, ok = res.Value.(*types.Response_LoadSnapshotChunk)
case *types.Request_ListSnapshots:
_, ok = res.Value.(*types.Response_ListSnapshots)
case *types.Request_OfferSnapshot:
_, ok = res.Value.(*types.Response_OfferSnapshot)
}
return ok
}
func (cli *socketClient) stopForError(err error) {
if !cli.IsRunning() {
return
}
cli.mtx.Lock()
cli.err = err
cli.mtx.Unlock()
cli.Logger.Info("Stopping abci.socketClient", "reason", err)
if err := cli.Stop(); err != nil {
cli.Logger.Error("Error stopping abci.socketClient", "err", err)
}
}