From 7c857be3b58fd539ca676a372ba96bf41d1516f0 Mon Sep 17 00:00:00 2001 From: Callum Waters Date: Mon, 21 Nov 2022 12:42:31 +0100 Subject: [PATCH] revert modifications to flush logic --- abci/client/grpc_client.go | 7 +- abci/client/socket_client.go | 129 ++++++++++++++++++------------ abci/client/socket_client_test.go | 47 ++++++++++- abci/server/socket_server.go | 12 +-- 4 files changed, 132 insertions(+), 63 deletions(-) diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 05eaf4ac5..26fbf6337 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -125,6 +125,7 @@ func (cli *grpcClient) OnStop() { if cli.conn != nil { cli.conn.Close() } + close(cli.chReqRes) } func (cli *grpcClient) StopForError(err error) { @@ -147,7 +148,6 @@ func (cli *grpcClient) StopForError(err error) { func (cli *grpcClient) Error() error { cli.mtx.Lock() defer cli.mtx.Unlock() - return cli.err } @@ -181,7 +181,10 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) //---------------------------------------- -func (cli *grpcClient) Flush(ctx context.Context) error { return nil } +func (cli *grpcClient) Flush(ctx context.Context) error { + _, err := cli.client.Flush(ctx, types.ToRequestFlush().GetFlush(), grpc.WaitForReady(true)) + return err +} func (cli *grpcClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { return cli.client.Echo(ctx, types.ToRequestEcho(msg).GetEcho(), grpc.WaitForReady(true)) diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index de89c3ba9..aee9c64ac 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -14,10 +14,12 @@ import ( "github.com/tendermint/tendermint/abci/types" tmnet "github.com/tendermint/tendermint/libs/net" "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/libs/timer" ) const ( - reqQueueSize = 256 // TODO make configurable + reqQueueSize = 256 // TODO make configurable + flushThrottleMS = 20 // Don't wait longer than... ) // socketClient is the client side implementation of the Tendermint @@ -26,8 +28,6 @@ const ( // // This is goroutine-safe. All calls are serialized to the server through an unbuffered queue. The socketClient // tracks responses and expects them to respect the order of the requests sent. -// -// The buffer is flushed after every message sent. type socketClient struct { service.BaseService @@ -35,7 +35,8 @@ type socketClient struct { mustConnect bool conn net.Conn - reqQueue chan *ReqRes + reqQueue chan *ReqRes + flushTimer *timer.ThrottleTimer mtx sync.Mutex err error @@ -51,10 +52,12 @@ var _ Client = (*socketClient)(nil) func NewSocketClient(addr string, mustConnect bool) Client { cli := &socketClient{ reqQueue: make(chan *ReqRes, reqQueueSize), + flushTimer: timer.NewThrottleTimer("socketClient", flushThrottleMS), mustConnect: mustConnect, - addr: addr, - reqSent: list.New(), - resCb: nil, + + addr: addr, + reqSent: list.New(), + resCb: nil, } cli.BaseService = *service.NewBaseService(nil, "socketClient", cli) return cli @@ -95,6 +98,7 @@ func (cli *socketClient) OnStop() { } cli.flushQueue() + cli.flushTimer.Stop() } // Error returns an error if the client was stopped abruptly. @@ -123,26 +127,37 @@ func (cli *socketClient) CheckTxAsync(ctx context.Context, req *types.RequestChe //---------------------------------------- func (cli *socketClient) sendRequestsRoutine(conn io.Writer) { - bw := bufio.NewWriter(conn) + w := bufio.NewWriter(conn) for { select { - case <-cli.Quit(): - return case reqres := <-cli.reqQueue: // N.B. We must enqueue before sending out the request, otherwise the // server may reply before we do it, and the receiver will fail for an // unsolicited reply. cli.trackRequest(reqres) - if err := types.WriteMessage(reqres.Request, bw); err != nil { + err := types.WriteMessage(reqres.Request, w) + if err != nil { cli.stopForError(fmt.Errorf("write to buffer: %w", err)) return } - if err := bw.Flush(); err != nil { - cli.stopForError(fmt.Errorf("flush buffer: %w", err)) - return + // If it's a flush request, flush the current buffer. + if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { + err = w.Flush() + if err != nil { + cli.stopForError(fmt.Errorf("flush buffer: %w", err)) + return + } } + case <-cli.flushTimer.Ch: // flush queue + select { + case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): + default: + // Probably will fill the buffer, or retry later. + } + case <-cli.Quit(): + return } } } @@ -155,8 +170,8 @@ func (cli *socketClient) recvResponseRoutine(conn io.Reader) { } var res = &types.Response{} - - if err := types.ReadMessage(r, res); err != nil { + err := types.ReadMessage(r, res) + if err != nil { cli.stopForError(fmt.Errorf("read message: %w", err)) return } @@ -221,43 +236,6 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { return nil } -func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) { - reqres := NewReqRes(req) - - // TODO: set cli.err if reqQueue times out - select { - case cli.reqQueue <- reqres: - case <-ctx.Done(): - return nil, ctx.Err() - } - - return reqres, nil -} - -// flushQueue marks as complete and discards all remaining pending requests -// from the queue. -func (cli *socketClient) flushQueue() { - cli.mtx.Lock() - defer cli.mtx.Unlock() - - // mark all in-flight messages as resolved (they will get cli.Error()) - for req := cli.reqSent.Front(); req != nil; req = req.Next() { - reqres := req.Value.(*ReqRes) - reqres.Done() - } - - // mark all queued messages as resolved -LOOP: - for { - select { - case reqres := <-cli.reqQueue: - reqres.Done() - default: - break LOOP - } - } -} - //---------------------------------------- func (cli *socketClient) Flush(ctx context.Context) error { @@ -385,6 +363,51 @@ func (cli *socketClient) FinalizeBlock(ctx context.Context, req *types.RequestFi return reqRes.Response.GetFinalizeBlock(), cli.Error() } +func (cli *socketClient) queueRequest(ctx context.Context, req *types.Request) (*ReqRes, error) { + reqres := NewReqRes(req) + + // TODO: set cli.err if reqQueue times out + select { + case cli.reqQueue <- reqres: + case <-ctx.Done(): + return nil, ctx.Err() + } + + // Maybe auto-flush, or unset auto-flush + switch req.Value.(type) { + case *types.Request_Flush: + cli.flushTimer.Unset() + default: + cli.flushTimer.Set() + } + + return reqres, nil +} + +// flushQueue marks as complete and discards all remaining pending requests +// from the queue. +func (cli *socketClient) flushQueue() { + cli.mtx.Lock() + defer cli.mtx.Unlock() + + // mark all in-flight messages as resolved (they will get cli.Error()) + for req := cli.reqSent.Front(); req != nil; req = req.Next() { + reqres := req.Value.(*ReqRes) + reqres.Done() + } + + // mark all queued messages as resolved +LOOP: + for { + select { + case reqres := <-cli.reqQueue: + reqres.Done() + default: + break LOOP + } + } +} + //---------------------------------------- func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { diff --git a/abci/client/socket_client_test.go b/abci/client/socket_client_test.go index cbc0c5dd6..b881e82d5 100644 --- a/abci/client/socket_client_test.go +++ b/abci/client/socket_client_test.go @@ -33,7 +33,7 @@ func TestCalls(t *testing.T) { }() select { - case <-time.After(1 * time.Second): + case <-time.After(time.Second): require.Fail(t, "No response arrived") case err, ok := <-resp: require.True(t, ok, "Must not close channel") @@ -41,6 +41,38 @@ func TestCalls(t *testing.T) { } } +func TestHangingAsyncCalls(t *testing.T) { + app := slowApp{} + + s, c := setupClientServer(t, app) + + resp := make(chan error, 1) + go func() { + // Start BeginBlock and flush it + reqres, err := c.CheckTxAsync(context.Background(), &types.RequestCheckTx{}) + require.NoError(t, err) + // wait 20 ms for all events to travel socket, but + // no response yet from server + time.Sleep(50 * time.Millisecond) + // kill the server, so the connections break + err = s.Stop() + require.NoError(t, err) + + // wait for the response from BeginBlock + reqres.Wait() + fmt.Print(reqres) + resp <- c.Error() + }() + + select { + case <-time.After(time.Second): + require.Fail(t, "No response arrived") + case err, ok := <-resp: + require.True(t, ok, "Must not close channel") + assert.Error(t, err, "We should get EOF error") + } +} + func setupClientServer(t *testing.T, app types.Application) ( service.Service, abcicli.Client) { t.Helper() @@ -55,7 +87,7 @@ func setupClientServer(t *testing.T, app types.Application) ( t.Cleanup(func() { if err := s.Stop(); err != nil { - t.Error(err) + t.Log(err) } }) @@ -65,13 +97,22 @@ func setupClientServer(t *testing.T, app types.Application) ( t.Cleanup(func() { if err := c.Stop(); err != nil { - t.Error(err) + t.Log(err) } }) return s, c } +type slowApp struct { + types.BaseApplication +} + +func (slowApp) CheckTxAsync(_ context.Context, req types.RequestCheckTx) types.ResponseCheckTx { + time.Sleep(200 * time.Millisecond) + return types.ResponseCheckTx{} +} + // TestCallbackInvokedWhenSetLaet ensures that the callback is invoked when // set after the client completes the call into the app. Currently this // test relies on the callback being allowed to be invoked twice if set multiple diff --git a/abci/server/socket_server.go b/abci/server/socket_server.go index 6507ac661..1053e0d7f 100644 --- a/abci/server/socket_server.go +++ b/abci/server/socket_server.go @@ -183,6 +183,7 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn io.Reader, resp }() for { + var req = &types.Request{} err := types.ReadMessage(bufReader, req) if err != nil { @@ -303,11 +304,12 @@ func (s *SocketServer) handleResponses(closeConn chan error, conn io.Writer, res closeConn <- fmt.Errorf("error writing message: %w", err) return } - - err = bufWriter.Flush() - if err != nil { - closeConn <- fmt.Errorf("error flushing write buffer: %w", err) - return + if _, ok := res.Value.(*types.Response_Flush); ok { + err = bufWriter.Flush() + if err != nil { + closeConn <- fmt.Errorf("error flushing write buffer: %w", err) + return + } } // If the application has responded with an exception, the server returns the error