From 223ece93c812f120ac9401646a2b43a2703ec0be Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Thu, 14 Jul 2022 15:45:58 -0700 Subject: [PATCH] mempool: ensure async requests are flushed to the server (#9010) In the v0.34 line, the socket and gRPC clients require explicit flushes to ensure that the client and server have received an async request. Add these calls explicitly where required in the backport of the priority mempool. In addition, the gRPC client's flush plumbing was not fully hooked up in the v0.34 line, so this change includes that update as well. --- abci/client/grpc_client.go | 4 +++- mempool/v1/mempool.go | 7 +++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 6d01a7503..f271345ab 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -340,7 +340,9 @@ func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response { //---------------------------------------- func (cli *grpcClient) FlushSync() error { - return nil + reqres := cli.FlushAsync() + cli.finishSyncCall(reqres).GetFlush() + return cli.Error() } func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) { diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 49ac909a2..08482db91 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -226,6 +226,9 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp // the callback deadlock trying to acquire the same lock. This isn't a // problem with out-of-process calls, but this has to work for both. reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) + if err := txmp.proxyAppConn.FlushSync(); err != nil { + return err + } reqRes.SetCallback(func(res *abci.Response) { wtx := &WrappedTx{ tx: tx, @@ -722,6 +725,10 @@ func (txmp *TxMempool) recheckTransactions() { Tx: wtx.tx, Type: abci.CheckTxType_Recheck, }) + if err := txmp.proxyAppConn.FlushSync(); err != nil { + atomic.AddInt64(&txmp.txRecheck, -1) + txmp.logger.Error("mempool: error flushing re-CheckTx", "key", wtx.tx.Key(), "err", err) + } } txmp.proxyAppConn.FlushAsync()