From 342f0be09e4b12dfd2e186d5beef4f75c3483c2d Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 3 Jun 2022 13:15:10 -0400 Subject: [PATCH] updates --- mempool/clist_mempool.go | 294 +++++++++++++++++++++++++-------------- 1 file changed, 193 insertions(+), 101 deletions(-) diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index db4d4bada..f93796027 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "sync/atomic" + "time" abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" @@ -13,7 +14,6 @@ import ( tmmath "github.com/tendermint/tendermint/libs/math" tmos "github.com/tendermint/tendermint/libs/os" tmsync "github.com/tendermint/tendermint/libs/sync" - "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -249,7 +249,6 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx defer mem.updateMtx.RUnlock() txSize := len(tx) - if err := mem.isFull(txSize); err != nil { return err } @@ -281,24 +280,36 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx return err } - if !mem.cache.Push(tx) { - // Record a new sender for a tx we've already seen. - // Note it's possible a tx is still in the cache but no longer in the mempool - // (eg. after committing a block, txs are removed from mempool but not cache), - // so we only record the sender for txs still in the mempool. - if e, ok := mem.txsMap.Load(TxKey(tx)); ok { - memTx := e.(*clist.CElement).Value.(*mempoolTx) - memTx.senders.LoadOrStore(txInfo.SenderID, true) - // TODO: consider punishing peer for dups, - // its non-trivial since invalid txs can become valid, - // but they can spam the same tx with little cost to them atm. - } + txHash := tx.Key() + // We add the transaction to the mempool's cache and if the + // transaction is already present in the cache, i.e. false is returned, then we + // check if we've seen this transaction and error if we have. + if !mem.cache.Push(tx) { + mem.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID) return ErrTxInCache } reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) + reqRes.SetCallback(func(res *abci.Response) { + if mem.recheckCursor != nil { + panic("recheck cursor is non-nil in CheckTx callback") + } + + wtx := &WrappedTx{ + tx: tx, + hash: txHash, + timestamp: time.Now().UTC(), + height: mem.height, + } + mem.initTxCallback(wtx, res, txInfo) + + mem.metrics.Size.Set(float64(mem.Size())) + + if cb != nil { + cb(res) + } + }) return nil } @@ -324,38 +335,38 @@ func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { mem.metrics.Size.Set(float64(mem.Size())) } -// Request specific callback that should be set on individual reqRes objects -// to incorporate local information when processing the response. -// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them. -// NOTE: alternatively, we could include this information in the ABCI request itself. -// -// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called -// when all other response processing is complete. -// -// Used in CheckTx to record PeerID who sent us the tx. -func (mem *CListMempool) reqResCb( - tx []byte, - peerID uint16, - peerP2PID p2p.ID, - externalCb func(*abci.Response), -) func(res *abci.Response) { - return func(res *abci.Response) { - if mem.recheckCursor != nil { - // this should never happen - panic("recheck cursor is not nil in reqResCb") - } +// // Request specific callback that should be set on individual reqRes objects +// // to incorporate local information when processing the response. +// // This allows us to track the peer that sent us this tx, so we can avoid sending it back to them. +// // NOTE: alternatively, we could include this information in the ABCI request itself. +// // +// // External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called +// // when all other response processing is complete. +// // +// // Used in CheckTx to record PeerID who sent us the tx. +// func (mem *CListMempool) reqResCb( +// tx []byte, +// peerID uint16, +// peerP2PID p2p.ID, +// externalCb func(*abci.Response), +// ) func(res *abci.Response) { +// return func(res *abci.Response) { +// if mem.recheckCursor != nil { +// // this should never happen +// panic("recheck cursor is not nil in reqResCb") +// } - mem.resCbFirstTime(tx, peerID, peerP2PID, res) +// mem.resCbFirstTime(tx, peerID, peerP2PID, res) - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) +// // update metrics +// mem.metrics.Size.Set(float64(mem.Size())) - // passed in by the caller of CheckTx, eg. the RPC - if externalCb != nil { - externalCb(res) - } - } -} +// // passed in by the caller of CheckTx, eg. the RPC +// if externalCb != nil { +// externalCb(res) +// } +// } +// } // Called from: // - resCbFirstTime (lock not held) if tx is valid @@ -390,75 +401,156 @@ func (mem *CListMempool) RemoveTxByKey(txKey [TxKeySize]byte, removeFromCache bo } } -func (mem *CListMempool) isFull(txSize int) error { +// canAddTx returns an error if we cannot insert the provided *WrappedTx into +// the mempool due to mempool configured constraints. Otherwise, nil is returned +// and the transaction can be inserted into the mempool. +func (mem *CListMempool) canAddTx(wtx *WrappedTx) error { var ( - memSize = mem.Size() - txsBytes = mem.TxsBytes() + numTxs = mem.Size() + sizeBytes = mem.SizeBytes() ) - if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes { + if numTxs >= mem.config.Size || int64(wtx.Size())+sizeBytes > mem.config.MaxTxsBytes { return ErrMempoolIsFull{ - memSize, mem.config.Size, - txsBytes, mem.config.MaxTxsBytes, + numTxs: numTxs, + maxTxs: mem.config.Size, + txsBytes: sizeBytes, + maxTxsBytes: mem.config.MaxTxsBytes, } } return nil } -// callback, which is called after the app checked the tx for the first time. +// initTxCallback performs the initial, i.e. the first, callback after CheckTx +// has been executed by the ABCI application. In other words, initTxCallback is +// called after executing CheckTx when we see a unique transaction for the first +// time. CheckTx can be called again for the same transaction at a later point +// in time when re-checking, however, this callback will not be called. // -// The case where the app checks the tx for the second and subsequent times is -// handled by the resCbRecheck callback. -func (mem *CListMempool) resCbFirstTime( - tx []byte, - peerID uint16, - peerP2PID p2p.ID, - res *abci.Response, -) { - switch r := res.Value.(type) { - case *abci.Response_CheckTx: - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - // Check mempool isn't full again to reduce the chance of exceeding the - // limits. - if err := mem.isFull(len(tx)); err != nil { - // remove from cache (mempool might have a space later) - mem.cache.Remove(tx) - mem.logger.Error(err.Error()) - return - } - - memTx := &mempoolTx{ - height: mem.height, - gasWanted: r.CheckTx.GasWanted, - tx: tx, - } - memTx.senders.Store(peerID, true) - mem.addTx(memTx) - mem.logger.Debug("added good transaction", - "tx", txID(tx), - "res", r, - "height", memTx.height, - "total", mem.Size(), - ) - mem.notifyTxsAvailable() - } else { - // ignore bad transaction - mem.logger.Debug("rejected bad transaction", - "tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr) - mem.metrics.FailedTxs.Add(1) - if !mem.config.KeepInvalidTxsInCache { - // remove from cache (it might be good later) - mem.cache.Remove(tx) - } - } - default: - // ignore other messages +// After the ABCI application executes CheckTx, initTxCallback is called with +// the ABCI *Response object and TxInfo. If postCheck is defined on the mempool, +// we execute that first. If there is no error from postCheck (if defined) and +// the ABCI CheckTx response code is OK, we attempt to insert the transaction. +// +// When attempting to insert the transaction, we first check if there is +// sufficient capacity. If there is sufficient capacity, the transaction is +// inserted into the txStore and indexed across all indexes. Otherwise, if the +// mempool is full, we attempt to find a lower priority transaction to evict in +// place of the new incoming transaction. If no such transaction exists, the +// new incoming transaction is rejected. +// +// If the new incoming transaction fails CheckTx or postCheck fails, we reject +// the new incoming transaction. +// +// NOTE: +// - An explicit lock is NOT required. +func (mem *CListMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo TxInfo) { + checkTxRes, ok := res.Value.(*abci.Response_CheckTx) + if !ok { + return } + + var err error + if mem.postCheck != nil { + err = mem.postCheck(wtx.tx, checkTxRes.CheckTx) + } + + if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK { + // ignore bad transactions + mem.logger.Info( + "rejected bad transaction", + "priority", wtx.priority, + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "peer_id", txInfo.SenderP2PID, + "code", checkTxRes.CheckTx.Code, + "post_check_err", err, + ) + + mem.metrics.FailedTxs.Add(1) + + if !mem.config.KeepInvalidTxsInCache { + mem.cache.Remove(wtx.tx) + } + if err != nil { + checkTxRes.CheckTx.MempoolError = err.Error() + } + + return + } + + sender := checkTxRes.CheckTx.Sender + priority := checkTxRes.CheckTx.Priority + + if len(sender) > 0 { + if wtx := mem.txStore.GetTxBySender(sender); wtx != nil { + mem.logger.Error( + "rejected incoming good transaction; tx already exists for sender", + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "sender", sender, + ) + mem.metrics.FailedTxs.Add(1) + return + } + } + + if err := mem.canAddTx(wtx); err != nil { + evictTxs := mem.priorityIndex.GetEvictableTxs( + priority, + int64(wtx.Size()), + mem.SizeBytes(), + mem.config.MaxTxsBytes, + ) + if len(evictTxs) == 0 { + // No room for the new incoming transaction so we just remove it from + // the cache. + mem.cache.Remove(wtx.tx) + mem.logger.Error( + "rejected incoming good transaction; mempool full", + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "err", err.Error(), + ) + mem.metrics.FailedTxs.Add(1) + return + } + + // evict an existing transaction(s) + // + // NOTE: + // - The transaction, toEvict, can be removed while a concurrent + // reCheckTx callback is being executed for the same transaction. + for _, toEvict := range evictTxs { + mem.removeTx(toEvict, true) + mem.logger.Debug( + "evicted existing good transaction; mempool full", + "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()), + "old_priority", toEvict.priority, + "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "new_priority", wtx.priority, + ) + } + } + + wtx.gasWanted = checkTxRes.CheckTx.GasWanted + wtx.priority = priority + wtx.sender = sender + wtx.peers = map[uint16]struct{}{ + txInfo.SenderID: {}, + } + + mem.metrics.TxSizeBytes.Observe(float64(wtx.Size())) + mem.metrics.Size.Set(float64(mem.Size())) + + mem.insertTx(wtx) + mem.logger.Debug( + "inserted good transaction", + "priority", wtx.priority, + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "height", mem.height, + "num_txs", mem.Size(), + ) + + mem.notifyTxsAvailable() } // callback, which is called after the app rechecked the tx.