This commit is contained in:
Aleksandr Bezobchuk
2022-06-03 13:15:10 -04:00
parent a0f3b9e983
commit 342f0be09e

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"sync/atomic"
"time"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
@@ -13,7 +14,6 @@ import (
tmmath "github.com/tendermint/tendermint/libs/math"
tmos "github.com/tendermint/tendermint/libs/os"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
@@ -249,7 +249,6 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
defer mem.updateMtx.RUnlock()
txSize := len(tx)
if err := mem.isFull(txSize); err != nil {
return err
}
@@ -281,24 +280,36 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
return err
}
if !mem.cache.Push(tx) {
// Record a new sender for a tx we've already seen.
// Note it's possible a tx is still in the cache but no longer in the mempool
// (eg. after committing a block, txs are removed from mempool but not cache),
// so we only record the sender for txs still in the mempool.
if e, ok := mem.txsMap.Load(TxKey(tx)); ok {
memTx := e.(*clist.CElement).Value.(*mempoolTx)
memTx.senders.LoadOrStore(txInfo.SenderID, true)
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
// but they can spam the same tx with little cost to them atm.
}
txHash := tx.Key()
// We add the transaction to the mempool's cache and if the
// transaction is already present in the cache, i.e. false is returned, then we
// check if we've seen this transaction and error if we have.
if !mem.cache.Push(tx) {
mem.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID)
return ErrTxInCache
}
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))
reqRes.SetCallback(func(res *abci.Response) {
if mem.recheckCursor != nil {
panic("recheck cursor is non-nil in CheckTx callback")
}
wtx := &WrappedTx{
tx: tx,
hash: txHash,
timestamp: time.Now().UTC(),
height: mem.height,
}
mem.initTxCallback(wtx, res, txInfo)
mem.metrics.Size.Set(float64(mem.Size()))
if cb != nil {
cb(res)
}
})
return nil
}
@@ -324,38 +335,38 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {
mem.metrics.Size.Set(float64(mem.Size()))
}
// Request specific callback that should be set on individual reqRes objects
// to incorporate local information when processing the response.
// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
// NOTE: alternatively, we could include this information in the ABCI request itself.
//
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
// when all other response processing is complete.
//
// Used in CheckTx to record PeerID who sent us the tx.
func (mem *CListMempool) reqResCb(
tx []byte,
peerID uint16,
peerP2PID p2p.ID,
externalCb func(*abci.Response),
) func(res *abci.Response) {
return func(res *abci.Response) {
if mem.recheckCursor != nil {
// this should never happen
panic("recheck cursor is not nil in reqResCb")
}
// // Request specific callback that should be set on individual reqRes objects
// // to incorporate local information when processing the response.
// // This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
// // NOTE: alternatively, we could include this information in the ABCI request itself.
// //
// // External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
// // when all other response processing is complete.
// //
// // Used in CheckTx to record PeerID who sent us the tx.
// func (mem *CListMempool) reqResCb(
// tx []byte,
// peerID uint16,
// peerP2PID p2p.ID,
// externalCb func(*abci.Response),
// ) func(res *abci.Response) {
// return func(res *abci.Response) {
// if mem.recheckCursor != nil {
// // this should never happen
// panic("recheck cursor is not nil in reqResCb")
// }
mem.resCbFirstTime(tx, peerID, peerP2PID, res)
// mem.resCbFirstTime(tx, peerID, peerP2PID, res)
// update metrics
mem.metrics.Size.Set(float64(mem.Size()))
// // update metrics
// mem.metrics.Size.Set(float64(mem.Size()))
// passed in by the caller of CheckTx, eg. the RPC
if externalCb != nil {
externalCb(res)
}
}
}
// // passed in by the caller of CheckTx, eg. the RPC
// if externalCb != nil {
// externalCb(res)
// }
// }
// }
// Called from:
// - resCbFirstTime (lock not held) if tx is valid
@@ -390,75 +401,156 @@ func (mem *CListMempool) RemoveTxByKey(txKey [TxKeySize]byte, removeFromCache bo
}
}
func (mem *CListMempool) isFull(txSize int) error {
// canAddTx returns an error if we cannot insert the provided *WrappedTx into
// the mempool due to mempool configured constraints. Otherwise, nil is returned
// and the transaction can be inserted into the mempool.
func (mem *CListMempool) canAddTx(wtx *WrappedTx) error {
var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
numTxs = mem.Size()
sizeBytes = mem.SizeBytes()
)
if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
if numTxs >= mem.config.Size || int64(wtx.Size())+sizeBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize, mem.config.Size,
txsBytes, mem.config.MaxTxsBytes,
numTxs: numTxs,
maxTxs: mem.config.Size,
txsBytes: sizeBytes,
maxTxsBytes: mem.config.MaxTxsBytes,
}
}
return nil
}
// callback, which is called after the app checked the tx for the first time.
// initTxCallback performs the initial, i.e. the first, callback after CheckTx
// has been executed by the ABCI application. In other words, initTxCallback is
// called after executing CheckTx when we see a unique transaction for the first
// time. CheckTx can be called again for the same transaction at a later point
// in time when re-checking, however, this callback will not be called.
//
// The case where the app checks the tx for the second and subsequent times is
// handled by the resCbRecheck callback.
func (mem *CListMempool) resCbFirstTime(
tx []byte,
peerID uint16,
peerP2PID p2p.ID,
res *abci.Response,
) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
// Check mempool isn't full again to reduce the chance of exceeding the
// limits.
if err := mem.isFull(len(tx)); err != nil {
// remove from cache (mempool might have a space later)
mem.cache.Remove(tx)
mem.logger.Error(err.Error())
return
}
memTx := &mempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
tx: tx,
}
memTx.senders.Store(peerID, true)
mem.addTx(memTx)
mem.logger.Debug("added good transaction",
"tx", txID(tx),
"res", r,
"height", memTx.height,
"total", mem.Size(),
)
mem.notifyTxsAvailable()
} else {
// ignore bad transaction
mem.logger.Debug("rejected bad transaction",
"tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr)
mem.metrics.FailedTxs.Add(1)
if !mem.config.KeepInvalidTxsInCache {
// remove from cache (it might be good later)
mem.cache.Remove(tx)
}
}
default:
// ignore other messages
// After the ABCI application executes CheckTx, initTxCallback is called with
// the ABCI *Response object and TxInfo. If postCheck is defined on the mempool,
// we execute that first. If there is no error from postCheck (if defined) and
// the ABCI CheckTx response code is OK, we attempt to insert the transaction.
//
// When attempting to insert the transaction, we first check if there is
// sufficient capacity. If there is sufficient capacity, the transaction is
// inserted into the txStore and indexed across all indexes. Otherwise, if the
// mempool is full, we attempt to find a lower priority transaction to evict in
// place of the new incoming transaction. If no such transaction exists, the
// new incoming transaction is rejected.
//
// If the new incoming transaction fails CheckTx or postCheck fails, we reject
// the new incoming transaction.
//
// NOTE:
// - An explicit lock is NOT required.
func (mem *CListMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo TxInfo) {
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
if !ok {
return
}
var err error
if mem.postCheck != nil {
err = mem.postCheck(wtx.tx, checkTxRes.CheckTx)
}
if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
// ignore bad transactions
mem.logger.Info(
"rejected bad transaction",
"priority", wtx.priority,
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"peer_id", txInfo.SenderP2PID,
"code", checkTxRes.CheckTx.Code,
"post_check_err", err,
)
mem.metrics.FailedTxs.Add(1)
if !mem.config.KeepInvalidTxsInCache {
mem.cache.Remove(wtx.tx)
}
if err != nil {
checkTxRes.CheckTx.MempoolError = err.Error()
}
return
}
sender := checkTxRes.CheckTx.Sender
priority := checkTxRes.CheckTx.Priority
if len(sender) > 0 {
if wtx := mem.txStore.GetTxBySender(sender); wtx != nil {
mem.logger.Error(
"rejected incoming good transaction; tx already exists for sender",
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"sender", sender,
)
mem.metrics.FailedTxs.Add(1)
return
}
}
if err := mem.canAddTx(wtx); err != nil {
evictTxs := mem.priorityIndex.GetEvictableTxs(
priority,
int64(wtx.Size()),
mem.SizeBytes(),
mem.config.MaxTxsBytes,
)
if len(evictTxs) == 0 {
// No room for the new incoming transaction so we just remove it from
// the cache.
mem.cache.Remove(wtx.tx)
mem.logger.Error(
"rejected incoming good transaction; mempool full",
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err.Error(),
)
mem.metrics.FailedTxs.Add(1)
return
}
// evict an existing transaction(s)
//
// NOTE:
// - The transaction, toEvict, can be removed while a concurrent
// reCheckTx callback is being executed for the same transaction.
for _, toEvict := range evictTxs {
mem.removeTx(toEvict, true)
mem.logger.Debug(
"evicted existing good transaction; mempool full",
"old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()),
"old_priority", toEvict.priority,
"new_tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"new_priority", wtx.priority,
)
}
}
wtx.gasWanted = checkTxRes.CheckTx.GasWanted
wtx.priority = priority
wtx.sender = sender
wtx.peers = map[uint16]struct{}{
txInfo.SenderID: {},
}
mem.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
mem.metrics.Size.Set(float64(mem.Size()))
mem.insertTx(wtx)
mem.logger.Debug(
"inserted good transaction",
"priority", wtx.priority,
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"height", mem.height,
"num_txs", mem.Size(),
)
mem.notifyTxsAvailable()
}
// callback, which is called after the app rechecked the tx.