mirror of
https://github.com/tendermint/tendermint.git
synced 2025-12-23 06:15:19 +00:00
mempool: rework lock discipline to mitigate callback deadlocks (#9030)
The priority mempool has a stricter synchronization requirement than the legacy mempool. Under sufficiently-heavy load, exclusive access can lead to deadlocks when processing a large batch of transaction rechecks through an out-of-process application using the socket client. By design, a socket client stalls when its send buffer fills, during which time it holds a lock shared with the receive thread. While blocked in this state, a response read by the receive thread waits for the shared lock so the callback can be invoked. If we're lucky, the server will then read the next request and make enough room in the buffer for the sender to proceed. If not however (e.g., if the next request is bigger than the one just consumed), the receive thread is blocked: It is waiting on the lock and cannot read a response. Once the server's output buffer fills, the system deadlocks. This can happen with any sufficiently-busy workload, but is more likely during a large recheck in the v1 mempool, where the callbacks need exclusive access to mempool state. As a workaround, process rechecks for the priority mempool in their own goroutines outside the mempool mutex. Responses still head-of-line block, but will no longer get pushback due to contention on the mempool itself.
This commit is contained in:
@@ -3,12 +3,13 @@ package v1
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/creachadair/taskgroup"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/libs/clist"
|
||||
@@ -41,8 +42,7 @@ type TxMempool struct {
|
||||
cache mempool.TxCache // seen transactions
|
||||
|
||||
// Atomically-updated fields
|
||||
txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes
|
||||
txRecheck int64 // atomic: the number of pending recheck calls
|
||||
txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes
|
||||
|
||||
// Synchronized fields, protected by mtx.
|
||||
mtx *sync.RWMutex
|
||||
@@ -83,8 +83,6 @@ func NewTxMempool(
|
||||
txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
|
||||
}
|
||||
|
||||
proxyAppConn.SetResponseCallback(txmp.recheckTxCallback)
|
||||
|
||||
for _, opt := range options {
|
||||
opt(txmp)
|
||||
}
|
||||
@@ -182,7 +180,6 @@ func (txmp *TxMempool) CheckTx(
|
||||
cb func(*abci.Response),
|
||||
txInfo mempool.TxInfo,
|
||||
) error {
|
||||
|
||||
// During the initial phase of CheckTx, we do not need to modify any state.
|
||||
// A transaction will not actually be added to the mempool until it survives
|
||||
// a call to the ABCI CheckTx method and size constraint checks.
|
||||
@@ -224,31 +221,23 @@ func (txmp *TxMempool) CheckTx(
|
||||
return err
|
||||
}
|
||||
|
||||
// Initiate an ABCI CheckTx for this transaction. The callback is
|
||||
// responsible for adding the transaction to the pool if it survives.
|
||||
//
|
||||
// N.B.: We have to issue the call outside the lock. In a local client,
|
||||
// even an "async" call invokes its callback immediately which will make
|
||||
// the callback deadlock trying to acquire the same lock. This isn't a
|
||||
// problem with out-of-process calls, but this has to work for both.
|
||||
reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
|
||||
// Invoke an ABCI CheckTx for this transaction.
|
||||
rsp, err := txmp.proxyAppConn.CheckTxSync(ctx, abci.RequestCheckTx{Tx: tx})
|
||||
if err != nil {
|
||||
txmp.cache.Remove(tx)
|
||||
return err
|
||||
}
|
||||
reqRes.SetCallback(func(res *abci.Response) {
|
||||
wtx := &WrappedTx{
|
||||
tx: tx,
|
||||
hash: tx.Key(),
|
||||
timestamp: time.Now().UTC(),
|
||||
height: height,
|
||||
}
|
||||
wtx.SetPeer(txInfo.SenderID)
|
||||
txmp.initialTxCallback(wtx, res)
|
||||
if cb != nil {
|
||||
cb(res)
|
||||
}
|
||||
})
|
||||
wtx := &WrappedTx{
|
||||
tx: tx,
|
||||
hash: tx.Key(),
|
||||
timestamp: time.Now().UTC(),
|
||||
height: height,
|
||||
}
|
||||
wtx.SetPeer(txInfo.SenderID)
|
||||
txmp.addNewTransaction(wtx, rsp)
|
||||
if cb != nil {
|
||||
cb(&abci.Response{Value: &abci.Response_CheckTx{CheckTx: rsp}})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -304,10 +293,6 @@ func (txmp *TxMempool) Flush() {
|
||||
cur = next
|
||||
}
|
||||
txmp.cache.Reset()
|
||||
|
||||
// Discard any pending recheck calls that may be in flight. The calls will
|
||||
// still complete, but will have no effect on the mempool.
|
||||
atomic.StoreInt64(&txmp.txRecheck, 0)
|
||||
}
|
||||
|
||||
// allEntriesSorted returns a slice of all the transactions currently in the
|
||||
@@ -403,12 +388,6 @@ func (txmp *TxMempool) Update(
|
||||
newPreFn mempool.PreCheckFunc,
|
||||
newPostFn mempool.PostCheckFunc,
|
||||
) error {
|
||||
// TODO(creachadair): This would be a nice safety check but requires Go 1.18.
|
||||
// // Safety check: The caller is required to hold the lock.
|
||||
// if txmp.mtx.TryLock() {
|
||||
// txmp.mtx.Unlock()
|
||||
// panic("mempool: Update caller does not hold the lock")
|
||||
// }
|
||||
// Safety check: Transactions and responses must match in number.
|
||||
if len(blockTxs) != len(deliverTxResponses) {
|
||||
panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses",
|
||||
@@ -456,9 +435,9 @@ func (txmp *TxMempool) Update(
|
||||
return nil
|
||||
}
|
||||
|
||||
// initialTxCallback handles the ABCI CheckTx response for the first time a
|
||||
// addNewTransaction handles the ABCI CheckTx response for the first time a
|
||||
// transaction is added to the mempool. A recheck after a block is committed
|
||||
// goes to the default callback (see recheckTxCallback).
|
||||
// goes to handleRecheckResult.
|
||||
//
|
||||
// If either the application rejected the transaction or a post-check hook is
|
||||
// defined and rejects the transaction, it is discarded.
|
||||
@@ -469,31 +448,22 @@ func (txmp *TxMempool) Update(
|
||||
// transactions are evicted.
|
||||
//
|
||||
// Finally, the new transaction is added and size stats updated.
|
||||
func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
||||
if !ok {
|
||||
txmp.logger.Error("mempool: received incorrect result type in CheckTx callback",
|
||||
"expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
|
||||
"got", reflect.TypeOf(res.Value).Name(),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) {
|
||||
txmp.mtx.Lock()
|
||||
defer txmp.mtx.Unlock()
|
||||
|
||||
var err error
|
||||
if txmp.postCheck != nil {
|
||||
err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
|
||||
err = txmp.postCheck(wtx.tx, checkTxRes)
|
||||
}
|
||||
|
||||
if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
|
||||
if err != nil || checkTxRes.Code != abci.CodeTypeOK {
|
||||
txmp.logger.Info(
|
||||
"rejected bad transaction",
|
||||
"priority", wtx.Priority(),
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"peer_id", wtx.peers,
|
||||
"code", checkTxRes.CheckTx.Code,
|
||||
"code", checkTxRes.Code,
|
||||
"post_check_err", err,
|
||||
)
|
||||
|
||||
@@ -508,13 +478,13 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
// If there was a post-check error, record its text in the result for
|
||||
// debugging purposes.
|
||||
if err != nil {
|
||||
checkTxRes.CheckTx.MempoolError = err.Error()
|
||||
checkTxRes.MempoolError = err.Error()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
priority := checkTxRes.CheckTx.Priority
|
||||
sender := checkTxRes.CheckTx.Sender
|
||||
priority := checkTxRes.Priority
|
||||
sender := checkTxRes.Sender
|
||||
|
||||
// Disallow multiple concurrent transactions from the same sender assigned
|
||||
// by the ABCI application. As a special case, an empty sender is not
|
||||
@@ -528,7 +498,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
"tx", fmt.Sprintf("%X", w.tx.Hash()),
|
||||
"sender", sender,
|
||||
)
|
||||
checkTxRes.CheckTx.MempoolError =
|
||||
checkTxRes.MempoolError =
|
||||
fmt.Sprintf("rejected valid incoming transaction; tx already exists for sender %q (%X)",
|
||||
sender, w.tx.Hash())
|
||||
txmp.metrics.RejectedTxs.Add(1)
|
||||
@@ -563,7 +533,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"err", err.Error(),
|
||||
)
|
||||
checkTxRes.CheckTx.MempoolError =
|
||||
checkTxRes.MempoolError =
|
||||
fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)",
|
||||
wtx.tx.Hash())
|
||||
txmp.metrics.RejectedTxs.Add(1)
|
||||
@@ -609,7 +579,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
}
|
||||
}
|
||||
|
||||
wtx.SetGasWanted(checkTxRes.CheckTx.GasWanted)
|
||||
wtx.SetGasWanted(checkTxRes.GasWanted)
|
||||
wtx.SetPriority(priority)
|
||||
wtx.SetSender(sender)
|
||||
txmp.insertTx(wtx)
|
||||
@@ -636,33 +606,14 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
|
||||
atomic.AddInt64(&txmp.txsBytes, wtx.Size())
|
||||
}
|
||||
|
||||
// recheckTxCallback handles the responses from ABCI CheckTx calls issued
|
||||
// during the recheck phase of a block Update. It updates the recheck counter
|
||||
// and removes any transactions invalidated by the application.
|
||||
// handleRecheckResult handles the responses from ABCI CheckTx calls issued
|
||||
// during the recheck phase of a block Update. It removes any transactions
|
||||
// invalidated by the application.
|
||||
//
|
||||
// This callback is NOT executed for the initial CheckTx on a new transaction;
|
||||
// that case is handled by initialTxCallback instead.
|
||||
func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) {
|
||||
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
||||
if !ok {
|
||||
// Don't log this; this is the default callback and other response types
|
||||
// can safely be ignored.
|
||||
return
|
||||
}
|
||||
|
||||
// Check whether we are expecting recheck responses at this point.
|
||||
// If not, we will ignore the response, this usually means the mempool was Flushed.
|
||||
// If this is the "last" pending recheck, trigger a notification when it's been processed.
|
||||
numLeft := atomic.AddInt64(&txmp.txRecheck, -1)
|
||||
if numLeft == 0 {
|
||||
defer txmp.notifyTxsAvailable() // notify waiters on return, if mempool is non-empty
|
||||
} else if numLeft < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// This method is NOT executed for the initial CheckTx on a new transaction;
|
||||
// that case is handled by addNewTransaction instead.
|
||||
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) {
|
||||
txmp.metrics.RecheckTimes.Add(1)
|
||||
tx := types.Tx(req.GetCheckTx().Tx)
|
||||
|
||||
txmp.mtx.Lock()
|
||||
defer txmp.mtx.Unlock()
|
||||
|
||||
@@ -678,11 +629,11 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response)
|
||||
// If a postcheck hook is defined, call it before checking the result.
|
||||
var err error
|
||||
if txmp.postCheck != nil {
|
||||
err = txmp.postCheck(tx, checkTxRes.CheckTx)
|
||||
err = txmp.postCheck(tx, checkTxRes)
|
||||
}
|
||||
|
||||
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
|
||||
wtx.SetPriority(checkTxRes.CheckTx.Priority)
|
||||
if checkTxRes.Code == abci.CodeTypeOK && err == nil {
|
||||
wtx.SetPriority(checkTxRes.Priority)
|
||||
return // N.B. Size of mempool did not change
|
||||
}
|
||||
|
||||
@@ -691,7 +642,7 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response)
|
||||
"priority", wtx.Priority(),
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"err", err,
|
||||
"code", checkTxRes.CheckTx.Code,
|
||||
"code", checkTxRes.Code,
|
||||
)
|
||||
txmp.removeTxByElement(elt)
|
||||
txmp.metrics.FailedTxs.Add(1)
|
||||
@@ -716,33 +667,45 @@ func (txmp *TxMempool) recheckTransactions() {
|
||||
"num_txs", txmp.Size(),
|
||||
"height", txmp.height,
|
||||
)
|
||||
// N.B.: We have to issue the calls outside the lock. In a local client,
|
||||
// even an "async" call invokes its callback immediately which will make the
|
||||
// callback deadlock trying to acquire the same lock. This isn't a problem
|
||||
// with out-of-process calls, but this has to work for both.
|
||||
txmp.mtx.Unlock()
|
||||
defer txmp.mtx.Lock()
|
||||
|
||||
ctx := context.TODO()
|
||||
atomic.StoreInt64(&txmp.txRecheck, int64(txmp.txs.Len()))
|
||||
// Collect transactions currently in the mempool requiring recheck.
|
||||
wtxs := make([]*WrappedTx, 0, txmp.txs.Len())
|
||||
for e := txmp.txs.Front(); e != nil; e = e.Next() {
|
||||
wtx := e.Value.(*WrappedTx)
|
||||
wtxs = append(wtxs, e.Value.(*WrappedTx))
|
||||
}
|
||||
|
||||
// The response for this CheckTx is handled by the default recheckTxCallback.
|
||||
_, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
|
||||
Tx: wtx.tx,
|
||||
Type: abci.CheckTxType_Recheck,
|
||||
})
|
||||
if err != nil {
|
||||
txmp.logger.Error("failed to execute CheckTx during recheck",
|
||||
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
|
||||
atomic.AddInt64(&txmp.txRecheck, -1)
|
||||
// Issue CheckTx calls for each remaining transaction, and when all the
|
||||
// rechecks are complete signal watchers that transactions may be available.
|
||||
go func() {
|
||||
ctx := context.TODO()
|
||||
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU())
|
||||
|
||||
for _, wtx := range wtxs {
|
||||
wtx := wtx
|
||||
start(func() error {
|
||||
rsp, err := txmp.proxyAppConn.CheckTxSync(ctx, abci.RequestCheckTx{
|
||||
Tx: wtx.tx,
|
||||
Type: abci.CheckTxType_Recheck,
|
||||
})
|
||||
if err != nil {
|
||||
txmp.logger.Error("failed to execute CheckTx during recheck",
|
||||
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
|
||||
} else {
|
||||
txmp.handleRecheckResult(wtx.tx, rsp)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
|
||||
txmp.logger.Error("failed to flush transactions during recheck", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
|
||||
txmp.logger.Error("failed to flush transactions during recheck", "err", err)
|
||||
}
|
||||
// When recheck is complete, trigger a notification for more transactions.
|
||||
_ = g.Wait()
|
||||
txmp.mtx.Lock()
|
||||
defer txmp.mtx.Unlock()
|
||||
txmp.notifyTxsAvailable()
|
||||
}()
|
||||
}
|
||||
|
||||
// canAddTx returns an error if we cannot insert the provided *WrappedTx into
|
||||
|
||||
Reference in New Issue
Block a user