mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-05 04:55:18 +00:00
(manual cherry-pick of commit 22ed610083)
This commit is contained in:
1
go.mod
1
go.mod
@@ -45,6 +45,7 @@ require (
|
||||
|
||||
require (
|
||||
github.com/bufbuild/buf v1.4.0
|
||||
github.com/creachadair/taskgroup v0.3.2
|
||||
github.com/golangci/golangci-lint v1.47.0
|
||||
github.com/prometheus/common v0.34.0 // indirect
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca
|
||||
|
||||
2
go.sum
2
go.sum
@@ -245,6 +245,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsr
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/creachadair/taskgroup v0.3.2 h1:zlfutDS+5XG40AOxcHDSThxKzns8Tnr9jnr6VqkYlkM=
|
||||
github.com/creachadair/taskgroup v0.3.2/go.mod h1:wieWwecHVzsidg2CsUnFinW1faVN4+kq+TDlRJQ0Wbk=
|
||||
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
|
||||
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
github.com/creack/pty v1.1.11/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
|
||||
|
||||
@@ -2,12 +2,13 @@ package v1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/creachadair/taskgroup"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/clist"
|
||||
@@ -40,8 +41,7 @@ type TxMempool struct {
|
||||
cache mempool.TxCache // seen transactions
|
||||
|
||||
// Atomically-updated fields
|
||||
txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes
|
||||
txRecheck int64 // atomic: the number of pending recheck calls
|
||||
txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes
|
||||
|
||||
// Synchronized fields, protected by mtx.
|
||||
mtx *sync.RWMutex
|
||||
@@ -82,8 +82,6 @@ func NewTxMempool(
|
||||
txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
|
||||
}
|
||||
|
||||
proxyAppConn.SetResponseCallback(txmp.recheckTxCallback)
|
||||
|
||||
for _, opt := range options {
|
||||
opt(txmp)
|
||||
}
|
||||
@@ -218,30 +216,23 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
|
||||
return err
|
||||
}
|
||||
|
||||
// Initiate an ABCI CheckTx for this transaction. The callback is
|
||||
// responsible for adding the transaction to the pool if it survives.
|
||||
//
|
||||
// N.B.: We have to issue the call outside the lock. In a local client,
|
||||
// even an "async" call invokes its callback immediately which will make
|
||||
// the callback deadlock trying to acquire the same lock. This isn't a
|
||||
// problem with out-of-process calls, but this has to work for both.
|
||||
reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
|
||||
if err := txmp.proxyAppConn.FlushSync(); err != nil {
|
||||
// Invoke an ABCI CheckTx for this transaction.
|
||||
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{Tx: tx})
|
||||
if err != nil {
|
||||
txmp.cache.Remove(tx)
|
||||
return err
|
||||
}
|
||||
reqRes.SetCallback(func(res *abci.Response) {
|
||||
wtx := &WrappedTx{
|
||||
tx: tx,
|
||||
hash: tx.Key(),
|
||||
timestamp: time.Now().UTC(),
|
||||
height: height,
|
||||
}
|
||||
wtx.SetPeer(txInfo.SenderID)
|
||||
txmp.initialTxCallback(wtx, res)
|
||||
if cb != nil {
|
||||
cb(res)
|
||||
}
|
||||
})
|
||||
wtx := &WrappedTx{
|
||||
tx: tx,
|
||||
hash: tx.Key(),
|
||||
timestamp: time.Now().UTC(),
|
||||
height: height,
|
||||
}
|
||||
wtx.SetPeer(txInfo.SenderID)
|
||||
txmp.addNewTransaction(wtx, rsp)
|
||||
if cb != nil {
|
||||
cb(&abci.Response{Value: &abci.Response_CheckTx{CheckTx: rsp}})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -297,10 +288,6 @@ func (txmp *TxMempool) Flush() {
|
||||
cur = next
|
||||
}
|
||||
txmp.cache.Reset()
|
||||
|
||||
// Discard any pending recheck calls that may be in flight. The calls will
|
||||
// still complete, but will have no effect on the mempool.
|
||||
atomic.StoreInt64(&txmp.txRecheck, 0)
|
||||
}
|
||||
|
||||
// allEntriesSorted returns a slice of all the transactions currently in the
|
||||
@@ -396,12 +383,6 @@ func (txmp *TxMempool) Update(
|
||||
newPreFn mempool.PreCheckFunc,
|
||||
newPostFn mempool.PostCheckFunc,
|
||||
) error {
|
||||
// TODO(creachadair): This would be a nice safety check but requires Go 1.18.
|
||||
// // Safety check: The caller is required to hold the lock.
|
||||
// if txmp.mtx.TryLock() {
|
||||
// txmp.mtx.Unlock()
|
||||
// panic("mempool: Update caller does not hold the lock")
|
||||
// }
|
||||
// Safety check: Transactions and responses must match in number.
|
||||
if len(blockTxs) != len(deliverTxResponses) {
|
||||
panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses",
|
||||
@@ -449,9 +430,9 @@ func (txmp *TxMempool) Update(
|
||||
return nil
|
||||
}
|
||||
|
||||
// initialTxCallback handles the ABCI CheckTx response for the first time a
|
||||
// addNewTransaction handles the ABCI CheckTx response for the first time a
|
||||
// transaction is added to the mempool. A recheck after a block is committed
|
||||
// goes to the default callback (see recheckTxCallback).
|
||||
// goes to handleRecheckResult.
|
||||
//
|
||||
// If either the application rejected the transaction or a post-check hook is
|
||||
// defined and rejects the transaction, it is discarded.
|
||||
@@ -462,31 +443,22 @@ func (txmp *TxMempool) Update(
|
||||
// transactions are evicted.
|
||||
//
|
||||
// Finally, the new transaction is added and size stats updated.
|
||||
func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
||||
if !ok {
|
||||
txmp.logger.Error("mempool: received incorrect result type in CheckTx callback",
|
||||
"expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
|
||||
"got", reflect.TypeOf(res.Value).Name(),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) {
|
||||
txmp.mtx.Lock()
|
||||
defer txmp.mtx.Unlock()
|
||||
|
||||
var err error
|
||||
if txmp.postCheck != nil {
|
||||
err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
|
||||
err = txmp.postCheck(wtx.tx, checkTxRes)
|
||||
}
|
||||
|
||||
if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
|
||||
if err != nil || checkTxRes.Code != abci.CodeTypeOK {
|
||||
txmp.logger.Info(
|
||||
"rejected bad transaction",
|
||||
"priority", wtx.Priority(),
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"peer_id", wtx.peers,
|
||||
"code", checkTxRes.CheckTx.Code,
|
||||
"code", checkTxRes.Code,
|
||||
"post_check_err", err,
|
||||
)
|
||||
|
||||
@@ -501,13 +473,13 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
// If there was a post-check error, record its text in the result for
|
||||
// debugging purposes.
|
||||
if err != nil {
|
||||
checkTxRes.CheckTx.MempoolError = err.Error()
|
||||
checkTxRes.MempoolError = err.Error()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
priority := checkTxRes.CheckTx.Priority
|
||||
sender := checkTxRes.CheckTx.Sender
|
||||
priority := checkTxRes.Priority
|
||||
sender := checkTxRes.Sender
|
||||
|
||||
// Disallow multiple concurrent transactions from the same sender assigned
|
||||
// by the ABCI application. As a special case, an empty sender is not
|
||||
@@ -521,7 +493,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
"tx", fmt.Sprintf("%X", w.tx.Hash()),
|
||||
"sender", sender,
|
||||
)
|
||||
checkTxRes.CheckTx.MempoolError =
|
||||
checkTxRes.MempoolError =
|
||||
fmt.Sprintf("rejected valid incoming transaction; tx already exists for sender %q (%X)",
|
||||
sender, w.tx.Hash())
|
||||
txmp.metrics.RejectedTxs.Add(1)
|
||||
@@ -556,7 +528,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"err", err.Error(),
|
||||
)
|
||||
checkTxRes.CheckTx.MempoolError =
|
||||
checkTxRes.MempoolError =
|
||||
fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)",
|
||||
wtx.tx.Hash())
|
||||
txmp.metrics.RejectedTxs.Add(1)
|
||||
@@ -602,7 +574,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
|
||||
}
|
||||
}
|
||||
|
||||
wtx.SetGasWanted(checkTxRes.CheckTx.GasWanted)
|
||||
wtx.SetGasWanted(checkTxRes.GasWanted)
|
||||
wtx.SetPriority(priority)
|
||||
wtx.SetSender(sender)
|
||||
txmp.insertTx(wtx)
|
||||
@@ -629,33 +601,14 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
|
||||
atomic.AddInt64(&txmp.txsBytes, wtx.Size())
|
||||
}
|
||||
|
||||
// recheckTxCallback handles the responses from ABCI CheckTx calls issued
|
||||
// during the recheck phase of a block Update. It updates the recheck counter
|
||||
// and removes any transactions invalidated by the application.
|
||||
// handleRecheckResult handles the responses from ABCI CheckTx calls issued
|
||||
// during the recheck phase of a block Update. It removes any transactions
|
||||
// invalidated by the application.
|
||||
//
|
||||
// This callback is NOT executed for the initial CheckTx on a new transaction;
|
||||
// that case is handled by initialTxCallback instead.
|
||||
func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) {
|
||||
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
||||
if !ok {
|
||||
// Don't log this; this is the default callback and other response types
|
||||
// can safely be ignored.
|
||||
return
|
||||
}
|
||||
|
||||
// Check whether we are expecting recheck responses at this point.
|
||||
// If not, we will ignore the response, this usually means the mempool was Flushed.
|
||||
// If this is the "last" pending recheck, trigger a notification when it's been processed.
|
||||
numLeft := atomic.AddInt64(&txmp.txRecheck, -1)
|
||||
if numLeft == 0 {
|
||||
defer txmp.notifyTxsAvailable() // notify waiters on return, if mempool is non-empty
|
||||
} else if numLeft < 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// This method is NOT executed for the initial CheckTx on a new transaction;
|
||||
// that case is handled by addNewTransaction instead.
|
||||
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) {
|
||||
txmp.metrics.RecheckTimes.Add(1)
|
||||
tx := types.Tx(req.GetCheckTx().Tx)
|
||||
|
||||
txmp.mtx.Lock()
|
||||
defer txmp.mtx.Unlock()
|
||||
|
||||
@@ -671,11 +624,11 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response)
|
||||
// If a postcheck hook is defined, call it before checking the result.
|
||||
var err error
|
||||
if txmp.postCheck != nil {
|
||||
err = txmp.postCheck(tx, checkTxRes.CheckTx)
|
||||
err = txmp.postCheck(tx, checkTxRes)
|
||||
}
|
||||
|
||||
if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
|
||||
wtx.SetPriority(checkTxRes.CheckTx.Priority)
|
||||
if checkTxRes.Code == abci.CodeTypeOK && err == nil {
|
||||
wtx.SetPriority(checkTxRes.Priority)
|
||||
return // N.B. Size of mempool did not change
|
||||
}
|
||||
|
||||
@@ -684,7 +637,7 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response)
|
||||
"priority", wtx.Priority(),
|
||||
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
|
||||
"err", err,
|
||||
"code", checkTxRes.CheckTx.Code,
|
||||
"code", checkTxRes.Code,
|
||||
)
|
||||
txmp.removeTxByElement(elt)
|
||||
txmp.metrics.FailedTxs.Add(1)
|
||||
@@ -709,29 +662,43 @@ func (txmp *TxMempool) recheckTransactions() {
|
||||
"num_txs", txmp.Size(),
|
||||
"height", txmp.height,
|
||||
)
|
||||
// N.B.: We have to issue the calls outside the lock. In a local client,
|
||||
// even an "async" call invokes its callback immediately which will make the
|
||||
// callback deadlock trying to acquire the same lock. This isn't a problem
|
||||
// with out-of-process calls, but this has to work for both.
|
||||
txmp.mtx.Unlock()
|
||||
defer txmp.mtx.Lock()
|
||||
|
||||
atomic.StoreInt64(&txmp.txRecheck, int64(txmp.txs.Len()))
|
||||
// Collect transactions currently in the mempool requiring recheck.
|
||||
wtxs := make([]*WrappedTx, 0, txmp.txs.Len())
|
||||
for e := txmp.txs.Front(); e != nil; e = e.Next() {
|
||||
wtx := e.Value.(*WrappedTx)
|
||||
|
||||
// The response for this CheckTx is handled by the default recheckTxCallback.
|
||||
_ = txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{
|
||||
Tx: wtx.tx,
|
||||
Type: abci.CheckTxType_Recheck,
|
||||
})
|
||||
if err := txmp.proxyAppConn.FlushSync(); err != nil {
|
||||
atomic.AddInt64(&txmp.txRecheck, -1)
|
||||
txmp.logger.Error("mempool: error flushing re-CheckTx", "key", wtx.tx.Key(), "err", err)
|
||||
}
|
||||
wtxs = append(wtxs, e.Value.(*WrappedTx))
|
||||
}
|
||||
|
||||
txmp.proxyAppConn.FlushAsync()
|
||||
// Issue CheckTx calls for each remaining transaction, and when all the
|
||||
// rechecks are complete signal watchers that transactions may be available.
|
||||
go func() {
|
||||
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU())
|
||||
|
||||
for _, wtx := range wtxs {
|
||||
wtx := wtx
|
||||
start(func() error {
|
||||
// The response for this CheckTx is handled by the default recheckTxCallback.
|
||||
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{
|
||||
Tx: wtx.tx,
|
||||
Type: abci.CheckTxType_Recheck,
|
||||
})
|
||||
if err != nil {
|
||||
txmp.logger.Error("failed to execute CheckTx during recheck",
|
||||
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
|
||||
} else {
|
||||
txmp.handleRecheckResult(wtx.tx, rsp)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
_ = txmp.proxyAppConn.FlushAsync()
|
||||
|
||||
// When recheck is complete, trigger a notification for more transactions.
|
||||
_ = g.Wait()
|
||||
txmp.mtx.Lock()
|
||||
defer txmp.mtx.Unlock()
|
||||
txmp.notifyTxsAvailable()
|
||||
}()
|
||||
}
|
||||
|
||||
// canAddTx returns an error if we cannot insert the provided *WrappedTx into
|
||||
|
||||
Reference in New Issue
Block a user