mempool: reduce lock contention during CheckTx (cleanup) (#8983)

The way this was originally structured, we reacquired the lock after issuing
the initial ABCI CheckTx call, only to immediately release it. Restructure the
code so that this redundant acquire is no longer necessary.
This commit is contained in:
M. J. Fromberger
2022-07-12 08:00:29 -07:00
committed by GitHub
parent cb93d3b587
commit 9e64c95e56

View File

@@ -177,69 +177,70 @@ func (txmp *TxMempool) CheckTx(
// During the initial phase of CheckTx, we do not need to modify any state.
// A transaction will not actually be added to the mempool until it survives
// a call to the ABCI CheckTx method and size constraint checks.
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()
height, err := func() (int64, error) {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()
// Reject transactions in excess of the configured maximum transaction size.
if len(tx) > txmp.config.MaxTxBytes {
return types.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)}
}
// If a precheck hook is defined, call it before invoking the application.
if txmp.preCheck != nil {
if err := txmp.preCheck(tx); err != nil {
return types.ErrPreCheck{Reason: err}
// Reject transactions in excess of the configured maximum transaction size.
if len(tx) > txmp.config.MaxTxBytes {
return 0, types.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)}
}
}
// Early exit if the proxy connection has an error.
if err := txmp.proxyAppConn.Error(); err != nil {
// If a precheck hook is defined, call it before invoking the application.
if txmp.preCheck != nil {
if err := txmp.preCheck(tx); err != nil {
return 0, types.ErrPreCheck{Reason: err}
}
}
// Early exit if the proxy connection has an error.
if err := txmp.proxyAppConn.Error(); err != nil {
return 0, err
}
txKey := tx.Key()
// Check for the transaction in the cache.
if !txmp.cache.Push(tx) {
// If the cached transaction is also in the pool, record its sender.
if elt, ok := txmp.txByKey[txKey]; ok {
w := elt.Value.(*WrappedTx)
w.SetPeer(txInfo.SenderID)
}
return 0, types.ErrTxInCache
}
return txmp.height, nil
}()
if err != nil {
return err
}
txKey := tx.Key()
// Check for the transaction in the cache.
if !txmp.cache.Push(tx) {
// If the cached transaction is also in the pool, record its sender.
if elt, ok := txmp.txByKey[txKey]; ok {
w := elt.Value.(*WrappedTx)
w.SetPeer(txInfo.SenderID)
}
return types.ErrTxInCache
}
// Initiate an ABCI CheckTx for this transaction. The callback is
// responsible for adding the transaction to the pool if it survives.
return func() error {
// N.B.: We have to issue the call outside the lock. In a local client,
// even an "async" call invokes its callback immediately which will make
// the callback deadlock trying to acquire the same lock. This isn't a
// problem with out-of-process calls, but this has to work for both.
height := txmp.height
txmp.mtx.RUnlock()
defer txmp.mtx.RLock()
reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
if err != nil {
txmp.cache.Remove(tx)
return err
//
// N.B.: We have to issue the call outside the lock. In a local client,
// even an "async" call invokes its callback immediately which will make
// the callback deadlock trying to acquire the same lock. This isn't a
// problem with out-of-process calls, but this has to work for both.
reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
if err != nil {
txmp.cache.Remove(tx)
return err
}
reqRes.SetCallback(func(res *abci.Response) {
wtx := &WrappedTx{
tx: tx,
hash: tx.Key(),
timestamp: time.Now().UTC(),
height: height,
}
reqRes.SetCallback(func(res *abci.Response) {
wtx := &WrappedTx{
tx: tx,
hash: txKey,
timestamp: time.Now().UTC(),
height: height,
}
wtx.SetPeer(txInfo.SenderID)
txmp.initialTxCallback(wtx, res)
if cb != nil {
cb(res)
}
})
return nil
}()
wtx.SetPeer(txInfo.SenderID)
txmp.initialTxCallback(wtx, res)
if cb != nil {
cb(res)
}
})
return nil
}
// RemoveTxByKey removes the transaction with the specified key from the