diff --git a/internal/mempool/v1/mempool.go b/internal/mempool/v1/mempool.go index 622ee2c86..9bb5f2b0a 100644 --- a/internal/mempool/v1/mempool.go +++ b/internal/mempool/v1/mempool.go @@ -3,12 +3,13 @@ package v1 import ( "context" "fmt" - "reflect" + "runtime" "sort" "sync" "sync/atomic" "time" + "github.com/creachadair/taskgroup" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/libs/clist" @@ -41,8 +42,7 @@ type TxMempool struct { cache mempool.TxCache // seen transactions // Atomically-updated fields - txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes - txRecheck int64 // atomic: the number of pending recheck calls + txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes // Synchronized fields, protected by mtx. mtx *sync.RWMutex @@ -83,8 +83,6 @@ func NewTxMempool( txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize) } - proxyAppConn.SetResponseCallback(txmp.recheckTxCallback) - for _, opt := range options { opt(txmp) } @@ -182,7 +180,6 @@ func (txmp *TxMempool) CheckTx( cb func(*abci.Response), txInfo mempool.TxInfo, ) error { - // During the initial phase of CheckTx, we do not need to modify any state. // A transaction will not actually be added to the mempool until it survives // a call to the ABCI CheckTx method and size constraint checks. @@ -224,31 +221,23 @@ func (txmp *TxMempool) CheckTx( return err } - // Initiate an ABCI CheckTx for this transaction. The callback is - // responsible for adding the transaction to the pool if it survives. - // - // N.B.: We have to issue the call outside the lock. In a local client, - // even an "async" call invokes its callback immediately which will make - // the callback deadlock trying to acquire the same lock. This isn't a - // problem with out-of-process calls, but this has to work for both. - reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx}) + // Invoke an ABCI CheckTx for this transaction. + rsp, err := txmp.proxyAppConn.CheckTxSync(ctx, abci.RequestCheckTx{Tx: tx}) if err != nil { txmp.cache.Remove(tx) return err } - reqRes.SetCallback(func(res *abci.Response) { - wtx := &WrappedTx{ - tx: tx, - hash: tx.Key(), - timestamp: time.Now().UTC(), - height: height, - } - wtx.SetPeer(txInfo.SenderID) - txmp.initialTxCallback(wtx, res) - if cb != nil { - cb(res) - } - }) + wtx := &WrappedTx{ + tx: tx, + hash: tx.Key(), + timestamp: time.Now().UTC(), + height: height, + } + wtx.SetPeer(txInfo.SenderID) + txmp.addNewTransaction(wtx, rsp) + if cb != nil { + cb(&abci.Response{Value: &abci.Response_CheckTx{CheckTx: rsp}}) + } return nil } @@ -304,10 +293,6 @@ func (txmp *TxMempool) Flush() { cur = next } txmp.cache.Reset() - - // Discard any pending recheck calls that may be in flight. The calls will - // still complete, but will have no effect on the mempool. - atomic.StoreInt64(&txmp.txRecheck, 0) } // allEntriesSorted returns a slice of all the transactions currently in the @@ -403,12 +388,6 @@ func (txmp *TxMempool) Update( newPreFn mempool.PreCheckFunc, newPostFn mempool.PostCheckFunc, ) error { - // TODO(creachadair): This would be a nice safety check but requires Go 1.18. - // // Safety check: The caller is required to hold the lock. - // if txmp.mtx.TryLock() { - // txmp.mtx.Unlock() - // panic("mempool: Update caller does not hold the lock") - // } // Safety check: Transactions and responses must match in number. if len(blockTxs) != len(deliverTxResponses) { panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses", @@ -456,9 +435,9 @@ func (txmp *TxMempool) Update( return nil } -// initialTxCallback handles the ABCI CheckTx response for the first time a +// addNewTransaction handles the ABCI CheckTx response for the first time a // transaction is added to the mempool. A recheck after a block is committed -// goes to the default callback (see recheckTxCallback). +// goes to handleRecheckResult. // // If either the application rejected the transaction or a post-check hook is // defined and rejects the transaction, it is discarded. @@ -469,31 +448,22 @@ func (txmp *TxMempool) Update( // transactions are evicted. // // Finally, the new transaction is added and size stats updated. -func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - if !ok { - txmp.logger.Error("mempool: received incorrect result type in CheckTx callback", - "expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(), - "got", reflect.TypeOf(res.Value).Name(), - ) - return - } - +func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) { txmp.mtx.Lock() defer txmp.mtx.Unlock() var err error if txmp.postCheck != nil { - err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx) + err = txmp.postCheck(wtx.tx, checkTxRes) } - if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK { + if err != nil || checkTxRes.Code != abci.CodeTypeOK { txmp.logger.Info( "rejected bad transaction", "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "peer_id", wtx.peers, - "code", checkTxRes.CheckTx.Code, + "code", checkTxRes.Code, "post_check_err", err, ) @@ -508,13 +478,13 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { // If there was a post-check error, record its text in the result for // debugging purposes. if err != nil { - checkTxRes.CheckTx.MempoolError = err.Error() + checkTxRes.MempoolError = err.Error() } return } - priority := checkTxRes.CheckTx.Priority - sender := checkTxRes.CheckTx.Sender + priority := checkTxRes.Priority + sender := checkTxRes.Sender // Disallow multiple concurrent transactions from the same sender assigned // by the ABCI application. As a special case, an empty sender is not @@ -528,7 +498,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { "tx", fmt.Sprintf("%X", w.tx.Hash()), "sender", sender, ) - checkTxRes.CheckTx.MempoolError = + checkTxRes.MempoolError = fmt.Sprintf("rejected valid incoming transaction; tx already exists for sender %q (%X)", sender, w.tx.Hash()) txmp.metrics.RejectedTxs.Add(1) @@ -563,7 +533,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err.Error(), ) - checkTxRes.CheckTx.MempoolError = + checkTxRes.MempoolError = fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)", wtx.tx.Hash()) txmp.metrics.RejectedTxs.Add(1) @@ -609,7 +579,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { } } - wtx.SetGasWanted(checkTxRes.CheckTx.GasWanted) + wtx.SetGasWanted(checkTxRes.GasWanted) wtx.SetPriority(priority) wtx.SetSender(sender) txmp.insertTx(wtx) @@ -636,33 +606,14 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) { atomic.AddInt64(&txmp.txsBytes, wtx.Size()) } -// recheckTxCallback handles the responses from ABCI CheckTx calls issued -// during the recheck phase of a block Update. It updates the recheck counter -// and removes any transactions invalidated by the application. +// handleRecheckResult handles the responses from ABCI CheckTx calls issued +// during the recheck phase of a block Update. It removes any transactions +// invalidated by the application. // -// This callback is NOT executed for the initial CheckTx on a new transaction; -// that case is handled by initialTxCallback instead. -func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) { - checkTxRes, ok := res.Value.(*abci.Response_CheckTx) - if !ok { - // Don't log this; this is the default callback and other response types - // can safely be ignored. - return - } - - // Check whether we are expecting recheck responses at this point. - // If not, we will ignore the response, this usually means the mempool was Flushed. - // If this is the "last" pending recheck, trigger a notification when it's been processed. - numLeft := atomic.AddInt64(&txmp.txRecheck, -1) - if numLeft == 0 { - defer txmp.notifyTxsAvailable() // notify waiters on return, if mempool is non-empty - } else if numLeft < 0 { - return - } - +// This method is NOT executed for the initial CheckTx on a new transaction; +// that case is handled by addNewTransaction instead. +func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) { txmp.metrics.RecheckTimes.Add(1) - tx := types.Tx(req.GetCheckTx().Tx) - txmp.mtx.Lock() defer txmp.mtx.Unlock() @@ -678,11 +629,11 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) // If a postcheck hook is defined, call it before checking the result. var err error if txmp.postCheck != nil { - err = txmp.postCheck(tx, checkTxRes.CheckTx) + err = txmp.postCheck(tx, checkTxRes) } - if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { - wtx.SetPriority(checkTxRes.CheckTx.Priority) + if checkTxRes.Code == abci.CodeTypeOK && err == nil { + wtx.SetPriority(checkTxRes.Priority) return // N.B. Size of mempool did not change } @@ -691,7 +642,7 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err, - "code", checkTxRes.CheckTx.Code, + "code", checkTxRes.Code, ) txmp.removeTxByElement(elt) txmp.metrics.FailedTxs.Add(1) @@ -716,33 +667,45 @@ func (txmp *TxMempool) recheckTransactions() { "num_txs", txmp.Size(), "height", txmp.height, ) - // N.B.: We have to issue the calls outside the lock. In a local client, - // even an "async" call invokes its callback immediately which will make the - // callback deadlock trying to acquire the same lock. This isn't a problem - // with out-of-process calls, but this has to work for both. - txmp.mtx.Unlock() - defer txmp.mtx.Lock() - ctx := context.TODO() - atomic.StoreInt64(&txmp.txRecheck, int64(txmp.txs.Len())) + // Collect transactions currently in the mempool requiring recheck. + wtxs := make([]*WrappedTx, 0, txmp.txs.Len()) for e := txmp.txs.Front(); e != nil; e = e.Next() { - wtx := e.Value.(*WrappedTx) + wtxs = append(wtxs, e.Value.(*WrappedTx)) + } - // The response for this CheckTx is handled by the default recheckTxCallback. - _, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{ - Tx: wtx.tx, - Type: abci.CheckTxType_Recheck, - }) - if err != nil { - txmp.logger.Error("failed to execute CheckTx during recheck", - "err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) - atomic.AddInt64(&txmp.txRecheck, -1) + // Issue CheckTx calls for each remaining transaction, and when all the + // rechecks are complete signal watchers that transactions may be available. + go func() { + ctx := context.TODO() + g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU()) + + for _, wtx := range wtxs { + wtx := wtx + start(func() error { + rsp, err := txmp.proxyAppConn.CheckTxSync(ctx, abci.RequestCheckTx{ + Tx: wtx.tx, + Type: abci.CheckTxType_Recheck, + }) + if err != nil { + txmp.logger.Error("failed to execute CheckTx during recheck", + "err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) + } else { + txmp.handleRecheckResult(wtx.tx, rsp) + } + return nil + }) + } + if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil { + txmp.logger.Error("failed to flush transactions during recheck", "err", err) } - } - if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil { - txmp.logger.Error("failed to flush transactions during recheck", "err", err) - } + // When recheck is complete, trigger a notification for more transactions. + _ = g.Wait() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + txmp.notifyTxsAvailable() + }() } // canAddTx returns an error if we cannot insert the provided *WrappedTx into