revert modifications to flush logic

This commit is contained in:
Callum Waters
2022-11-21 12:42:31 +01:00
parent ff74e1b840
commit 7c857be3b5
4 changed files with 132 additions and 63 deletions

View File

@@ -125,6 +125,7 @@ func (cli *grpcClient) OnStop() {
if cli.conn != nil {
cli.conn.Close()
}
close(cli.chReqRes)
}
func (cli *grpcClient) StopForError(err error) {
@@ -147,7 +148,6 @@ func (cli *grpcClient) StopForError(err error) {
func (cli *grpcClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}
@@ -181,7 +181,10 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response)
//----------------------------------------
func (cli *grpcClient) Flush(ctx context.Context) error { return nil }
func (cli *grpcClient) Flush(ctx context.Context) error {
_, err := cli.client.Flush(ctx, types.ToRequestFlush().GetFlush(), grpc.WaitForReady(true))
return err
}
func (cli *grpcClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
return cli.client.Echo(ctx, types.ToRequestEcho(msg).GetEcho(), grpc.WaitForReady(true))

View File

@@ -14,10 +14,12 @@ import (
"github.com/tendermint/tendermint/abci/types"
tmnet "github.com/tendermint/tendermint/libs/net"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/libs/timer"
)
const (
reqQueueSize = 256 // TODO make configurable
reqQueueSize = 256 // TODO make configurable
flushThrottleMS = 20 // Don't wait longer than...
)
// socketClient is the client side implementation of the Tendermint
@@ -26,8 +28,6 @@ const (
//
// This is goroutine-safe. All calls are serialized to the server through an unbuffered queue. The socketClient
// tracks responses and expects them to respect the order of the requests sent.
//
// The buffer is flushed after every message sent.
type socketClient struct {
service.BaseService
@@ -35,7 +35,8 @@ type socketClient struct {
mustConnect bool
conn net.Conn
reqQueue chan *ReqRes
reqQueue chan *ReqRes
flushTimer *timer.ThrottleTimer
mtx sync.Mutex
err error
@@ -51,10 +52,12 @@ var _ Client = (*socketClient)(nil)
func NewSocketClient(addr string, mustConnect bool) Client {
cli := &socketClient{
reqQueue: make(chan *ReqRes, reqQueueSize),
flushTimer: timer.NewThrottleTimer("socketClient", flushThrottleMS),
mustConnect: mustConnect,
addr: addr,
reqSent: list.New(),
resCb: nil,
addr: addr,
reqSent: list.New(),
resCb: nil,
}
cli.BaseService = *service.NewBaseService(nil, "socketClient", cli)
return cli
@@ -95,6 +98,7 @@ func (cli *socketClient) OnStop() {
}
cli.flushQueue()
cli.flushTimer.Stop()
}
// Error returns an error if the client was stopped abruptly.
@@ -123,26 +127,37 @@ func (cli *socketClient) CheckTxAsync(ctx context.Context, req *types.RequestChe
//----------------------------------------
func (cli *socketClient) sendRequestsRoutine(conn io.Writer) {
bw := bufio.NewWriter(conn)
w := bufio.NewWriter(conn)
for {
select {
case <-cli.Quit():
return
case reqres := <-cli.reqQueue:
// N.B. We must enqueue before sending out the request, otherwise the
// server may reply before we do it, and the receiver will fail for an
// unsolicited reply.
cli.trackRequest(reqres)
if err := types.WriteMessage(reqres.Request, bw); err != nil {
err := types.WriteMessage(reqres.Request, w)
if err != nil {
cli.stopForError(fmt.Errorf("write to buffer: %w", err))
return
}
if err := bw.Flush(); err != nil {
cli.stopForError(fmt.Errorf("flush buffer: %w", err))
return
// If it's a flush request, flush the current buffer.
if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
err = w.Flush()
if err != nil {
cli.stopForError(fmt.Errorf("flush buffer: %w", err))
return
}
}
case <-cli.flushTimer.Ch: // flush queue
select {
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
default:
// Probably will fill the buffer, or retry later.
}
case <-cli.Quit():
return
}
}
}
@@ -155,8 +170,8 @@ func (cli *socketClient) recvResponseRoutine(conn io.Reader) {
}
var res = &types.Response{}
if err := types.ReadMessage(r, res); err != nil {
err := types.ReadMessage(r, res)
if err != nil {
cli.stopForError(fmt.Errorf("read message: %w", err))
return
}
@@ -221,43 +236,6 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
return nil
}
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) {
reqres := NewReqRes(req)
// TODO: set cli.err if reqQueue times out
select {
case cli.reqQueue <- reqres:
case <-ctx.Done():
return nil, ctx.Err()
}
return reqres, nil
}
// flushQueue marks as complete and discards all remaining pending requests
// from the queue.
func (cli *socketClient) flushQueue() {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// mark all in-flight messages as resolved (they will get cli.Error())
for req := cli.reqSent.Front(); req != nil; req = req.Next() {
reqres := req.Value.(*ReqRes)
reqres.Done()
}
// mark all queued messages as resolved
LOOP:
for {
select {
case reqres := <-cli.reqQueue:
reqres.Done()
default:
break LOOP
}
}
}
//----------------------------------------
func (cli *socketClient) Flush(ctx context.Context) error {
@@ -385,6 +363,51 @@ func (cli *socketClient) FinalizeBlock(ctx context.Context, req *types.RequestFi
return reqRes.Response.GetFinalizeBlock(), cli.Error()
}
func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) {
reqres := NewReqRes(req)
// TODO: set cli.err if reqQueue times out
select {
case cli.reqQueue <- reqres:
case <-ctx.Done():
return nil, ctx.Err()
}
// Maybe auto-flush, or unset auto-flush
switch req.Value.(type) {
case *types.Request_Flush:
cli.flushTimer.Unset()
default:
cli.flushTimer.Set()
}
return reqres, nil
}
// flushQueue marks as complete and discards all remaining pending requests
// from the queue.
func (cli *socketClient) flushQueue() {
cli.mtx.Lock()
defer cli.mtx.Unlock()
// mark all in-flight messages as resolved (they will get cli.Error())
for req := cli.reqSent.Front(); req != nil; req = req.Next() {
reqres := req.Value.(*ReqRes)
reqres.Done()
}
// mark all queued messages as resolved
LOOP:
for {
select {
case reqres := <-cli.reqQueue:
reqres.Done()
default:
break LOOP
}
}
}
//----------------------------------------
func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {

View File

@@ -33,7 +33,7 @@ func TestCalls(t *testing.T) {
}()
select {
case <-time.After(1 * time.Second):
case <-time.After(time.Second):
require.Fail(t, "No response arrived")
case err, ok := <-resp:
require.True(t, ok, "Must not close channel")
@@ -41,6 +41,38 @@ func TestCalls(t *testing.T) {
}
}
func TestHangingAsyncCalls(t *testing.T) {
app := slowApp{}
s, c := setupClientServer(t, app)
resp := make(chan error, 1)
go func() {
// Start BeginBlock and flush it
reqres, err := c.CheckTxAsync(context.Background(), &types.RequestCheckTx{})
require.NoError(t, err)
// wait 20 ms for all events to travel socket, but
// no response yet from server
time.Sleep(50 * time.Millisecond)
// kill the server, so the connections break
err = s.Stop()
require.NoError(t, err)
// wait for the response from BeginBlock
reqres.Wait()
fmt.Print(reqres)
resp <- c.Error()
}()
select {
case <-time.After(time.Second):
require.Fail(t, "No response arrived")
case err, ok := <-resp:
require.True(t, ok, "Must not close channel")
assert.Error(t, err, "We should get EOF error")
}
}
func setupClientServer(t *testing.T, app types.Application) (
service.Service, abcicli.Client) {
t.Helper()
@@ -55,7 +87,7 @@ func setupClientServer(t *testing.T, app types.Application) (
t.Cleanup(func() {
if err := s.Stop(); err != nil {
t.Error(err)
t.Log(err)
}
})
@@ -65,13 +97,22 @@ func setupClientServer(t *testing.T, app types.Application) (
t.Cleanup(func() {
if err := c.Stop(); err != nil {
t.Error(err)
t.Log(err)
}
})
return s, c
}
type slowApp struct {
types.BaseApplication
}
func (slowApp) CheckTxAsync(_ context.Context, req types.RequestCheckTx) types.ResponseCheckTx {
time.Sleep(200 * time.Millisecond)
return types.ResponseCheckTx{}
}
// TestCallbackInvokedWhenSetLaet ensures that the callback is invoked when
// set after the client completes the call into the app. Currently this
// test relies on the callback being allowed to be invoked twice if set multiple

View File

@@ -183,6 +183,7 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, resp
}()
for {
var req = &types.Request{}
err := types.ReadMessage(bufReader, req)
if err != nil {
@@ -303,11 +304,12 @@ func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, res
closeConn <- fmt.Errorf("error writing message: %w", err)
return
}
err = bufWriter.Flush()
if err != nil {
closeConn <- fmt.Errorf("error flushing write buffer: %w", err)
return
if _, ok := res.Value.(*types.Response_Flush); ok {
err = bufWriter.Flush()
if err != nil {
closeConn <- fmt.Errorf("error flushing write buffer: %w", err)
return
}
}
// If the application has responded with an exception, the server returns the error