mempool: ensure async requests are flushed to the server (#9010)

In the v0.34 line, the socket and gRPC clients require explicit flushes to
ensure that the client and server have received an async request.  Add these
calls explicitly where required in the backport of the priority mempool.

In addition, the gRPC client's flush plumbing was not fully hooked up in the
v0.34 line, so this change includes that update as well.
This commit is contained in:
M. J. Fromberger
2022-07-14 15:45:58 -07:00
committed by GitHub
parent ba1711e706
commit 223ece93c8
2 changed files with 10 additions and 1 deletions

View File

@@ -340,7 +340,9 @@ func (cli *grpcClient) finishSyncCall(reqres *ReqRes) *types.Response {
//---------------------------------------- //----------------------------------------
func (cli *grpcClient) FlushSync() error { func (cli *grpcClient) FlushSync() error {
return nil reqres := cli.FlushAsync()
cli.finishSyncCall(reqres).GetFlush()
return cli.Error()
} }
func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) { func (cli *grpcClient) EchoSync(msg string) (*types.ResponseEcho, error) {

View File

@@ -226,6 +226,9 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
// the callback deadlock trying to acquire the same lock. This isn't a // the callback deadlock trying to acquire the same lock. This isn't a
// problem with out-of-process calls, but this has to work for both. // problem with out-of-process calls, but this has to work for both.
reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
if err := txmp.proxyAppConn.FlushSync(); err != nil {
return err
}
reqRes.SetCallback(func(res *abci.Response) { reqRes.SetCallback(func(res *abci.Response) {
wtx := &WrappedTx{ wtx := &WrappedTx{
tx: tx, tx: tx,
@@ -722,6 +725,10 @@ func (txmp *TxMempool) recheckTransactions() {
Tx: wtx.tx, Tx: wtx.tx,
Type: abci.CheckTxType_Recheck, Type: abci.CheckTxType_Recheck,
}) })
if err := txmp.proxyAppConn.FlushSync(); err != nil {
atomic.AddInt64(&txmp.txRecheck, -1)
txmp.logger.Error("mempool: error flushing re-CheckTx", "key", wtx.tx.Key(), "err", err)
}
} }
txmp.proxyAppConn.FlushAsync() txmp.proxyAppConn.FlushAsync()