From 8e9ae68459a715f9846207ed3339851d08d00fed Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Wed, 20 Jul 2022 15:53:55 -0700 Subject: [PATCH] Fix unbounded heap growth in the priority mempool. (#9058) This is a manual forward-port of #8944 and related fixes from v0.35.x. One difference of note is that the CheckTx response messages no longer have a field to record an error from the ABCI application. The code is set up so that these could be reported directly to the CheckTx caller, but it would be an API change, and right now a bunch of test plumbing depends on the existing semantics. Also fix up tests relying on implementation-specific mempool behavior. - Commit was setting the expected mempool size incorrectly. - Fix sequence test not to depend on the incorrect size. --- internal/consensus/mempool_test.go | 22 +- internal/libs/clist/bench_test.go | 2 +- internal/libs/clist/clist.go | 2 +- internal/mempool/cache.go | 13 + internal/mempool/mempool.go | 1013 +++++++++++------------ internal/mempool/mempool_test.go | 99 ++- internal/mempool/priority_queue.go | 158 ---- internal/mempool/priority_queue_test.go | 176 ---- internal/mempool/reactor.go | 6 +- internal/mempool/reactor_test.go | 6 +- internal/mempool/tx.go | 302 ++----- internal/mempool/tx_test.go | 231 ------ 12 files changed, 652 insertions(+), 1378 deletions(-) delete mode 100644 internal/mempool/priority_queue.go delete mode 100644 internal/mempool/priority_queue_test.go delete mode 100644 internal/mempool/tx_test.go diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index ed11f6840..59647f76b 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -214,12 +214,13 @@ func TestMempoolRmBadTx(t *testing.T) { emptyMempoolCh := make(chan struct{}) checkTxRespCh := make(chan struct{}) go func() { - // Try to send the tx through the mempool. + // Try to send an out-of-sequence transaction through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool + binary.BigEndian.PutUint64(txBytes, uint64(5)) err := assertMempool(t, cs.txNotifier).CheckTx(ctx, txBytes, func(r *abci.ResponseCheckTx) { if r.Code != code.CodeTypeBadNonce { - t.Errorf("expected checktx to return bad nonce, got %v", r) + t.Errorf("expected checktx to return bad nonce, got %#v", r) return } checkTxRespCh <- struct{}{} @@ -312,14 +313,15 @@ func (app *CounterApplication) FinalizeBlock(_ context.Context, req *abci.Reques func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { app.mu.Lock() defer app.mu.Unlock() - - txValue := txAsUint64(req.Tx) - if txValue != uint64(app.mempoolTxCount) { - return &abci.ResponseCheckTx{ - Code: code.CodeTypeBadNonce, - }, nil + if req.Type == abci.CheckTxType_New { + txValue := txAsUint64(req.Tx) + if txValue != uint64(app.mempoolTxCount) { + return &abci.ResponseCheckTx{ + Code: code.CodeTypeBadNonce, + }, nil + } + app.mempoolTxCount++ } - app.mempoolTxCount++ return &abci.ResponseCheckTx{Code: code.CodeTypeOK}, nil } @@ -332,8 +334,6 @@ func txAsUint64(tx []byte) uint64 { func (app *CounterApplication) Commit(context.Context) (*abci.ResponseCommit, error) { app.mu.Lock() defer app.mu.Unlock() - - app.mempoolTxCount = app.txCount return &abci.ResponseCommit{}, nil } diff --git a/internal/libs/clist/bench_test.go b/internal/libs/clist/bench_test.go index ee5d836a7..95973cc76 100644 --- a/internal/libs/clist/bench_test.go +++ b/internal/libs/clist/bench_test.go @@ -12,7 +12,7 @@ func BenchmarkDetaching(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { start.removed = true - start.detachNext() + start.DetachNext() start.DetachPrev() tmp := nxt nxt = nxt.Next() diff --git a/internal/libs/clist/clist.go b/internal/libs/clist/clist.go index 3969c94cc..9a0e5bcc8 100644 --- a/internal/libs/clist/clist.go +++ b/internal/libs/clist/clist.go @@ -103,7 +103,7 @@ func (e *CElement) Removed() bool { return isRemoved } -func (e *CElement) detachNext() { +func (e *CElement) DetachNext() { e.mtx.Lock() if !e.removed { e.mtx.Unlock() diff --git a/internal/mempool/cache.go b/internal/mempool/cache.go index c69fc80dd..3986cd585 100644 --- a/internal/mempool/cache.go +++ b/internal/mempool/cache.go @@ -22,6 +22,10 @@ type TxCache interface { // Remove removes the given raw transaction from the cache. Remove(tx types.Tx) + + // Has reports whether tx is present in the cache. Checking for presence is + // not treated as an access of the value. + Has(tx types.Tx) bool } var _ TxCache = (*LRUTxCache)(nil) @@ -97,6 +101,14 @@ func (c *LRUTxCache) Remove(tx types.Tx) { } } +func (c *LRUTxCache) Has(tx types.Tx) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + + _, ok := c.cacheMap[tx.Key()] + return ok +} + // NopTxCache defines a no-op raw transaction cache. type NopTxCache struct{} @@ -105,3 +117,4 @@ var _ TxCache = (*NopTxCache)(nil) func (NopTxCache) Reset() {} func (NopTxCache) Push(types.Tx) bool { return true } func (NopTxCache) Remove(types.Tx) {} +func (NopTxCache) Has(types.Tx) bool { return false } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 2398180fc..0354eb28a 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -1,20 +1,20 @@ package mempool import ( - "bytes" "context" - "errors" "fmt" + "runtime" + "sort" "sync" "sync/atomic" "time" + "github.com/creachadair/taskgroup" abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/types" ) @@ -23,73 +23,41 @@ var _ Mempool = (*TxMempool)(nil) // TxMempoolOption sets an optional parameter on the TxMempool. type TxMempoolOption func(*TxMempool) -// TxMempool defines a prioritized mempool data structure used by the v1 mempool -// reactor. It keeps a thread-safe priority queue of transactions that is used -// when a block proposer constructs a block and a thread-safe linked-list that -// is used to gossip transactions to peers in a FIFO manner. +// TxMempool implemements the Mempool interface and allows the application to +// set priority values on transactions in the CheckTx response. When selecting +// transactions to include in a block, higher-priority transactions are chosen +// first. When evicting transactions from the mempool for size constraints, +// lower-priority transactions are evicted sooner. +// +// Within the mempool, transactions are ordered by time of arrival, and are +// gossiped to the rest of the network based on that order (gossip order does +// not take priority into account). type TxMempool struct { + // Immutable fields logger log.Logger - metrics *Metrics config *config.MempoolConfig proxyAppConn abciclient.Client + metrics *Metrics + cache TxCache // seen transactions - // txsAvailable fires once for each height when the mempool is not empty - txsAvailable chan struct{} + // Atomically-updated fields + txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes + + // Synchronized fields, protected by mtx. + mtx *sync.RWMutex notifiedTxsAvailable bool + txsAvailable chan struct{} // one value sent per height when mempool is not empty + preCheck PreCheckFunc + postCheck PostCheckFunc + height int64 // the latest height passed to Update - // height defines the last block height process during Update() - height int64 - - // sizeBytes defines the total size of the mempool (sum of all tx bytes) - sizeBytes int64 - - // cache defines a fixed-size cache of already seen transactions as this - // reduces pressure on the proxyApp. - cache TxCache - - // txStore defines the main storage of valid transactions. Indexes are built - // on top of this store. - txStore *TxStore - - // gossipIndex defines the gossiping index of valid transactions via a - // thread-safe linked-list. We also use the gossip index as a cursor for - // rechecking transactions already in the mempool. - gossipIndex *clist.CList - - // recheckCursor and recheckEnd are used as cursors based on the gossip index - // to recheck transactions that are already in the mempool. Iteration is not - // thread-safe and transaction may be mutated in serial order. - // - // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for - // iterator and cursor management when rechecking transactions. If the gossip - // index changes or is removed in a future refactor, this will have to be - // refactored. Instead, we should consider just keeping a slice of a snapshot - // of the mempool's current transactions during Update and an integer cursor - // into that slice. This, however, requires additional O(n) space complexity. - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - - // priorityIndex defines the priority index of valid transactions via a - // thread-safe priority queue. - priorityIndex *TxPriorityQueue - - // heightIndex defines a height-based, in ascending order, transaction index. - // i.e. older transactions are first. - heightIndex *WrappedTxList - - // timestampIndex defines a timestamp-based, in ascending order, transaction - // index. i.e. older transactions are first. - timestampIndex *WrappedTxList - - // A read/write lock is used to safe guard updates, insertions and deletions - // from the mempool. A read-lock is implicitly acquired when executing CheckTx, - // however, a caller must explicitly grab a write-lock via Lock when updating - // the mempool via Update(). - mtx sync.RWMutex - preCheck PreCheckFunc - postCheck PostCheckFunc + txs *clist.CList // valid transactions (passed CheckTx) + txByKey map[types.TxKey]*clist.CElement + txBySender map[string]*clist.CElement // for sender != "" } +// NewTxMempool constructs a new, empty priority mempool at the specified +// initial height and using the given config and options. func NewTxMempool( logger log.Logger, cfg *config.MempoolConfig, @@ -98,23 +66,16 @@ func NewTxMempool( ) *TxMempool { txmp := &TxMempool{ - logger: logger, - config: cfg, - proxyAppConn: proxyAppConn, - height: -1, - cache: NopTxCache{}, - metrics: NopMetrics(), - txStore: NewTxStore(), - gossipIndex: clist.New(), - priorityIndex: NewTxPriorityQueue(), - heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }), - timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp) - }), + logger: logger, + config: cfg, + proxyAppConn: proxyAppConn, + metrics: NopMetrics(), + cache: NopTxCache{}, + txs: clist.New(), + mtx: new(sync.RWMutex), + txByKey: make(map[types.TxKey]*clist.CElement), + txBySender: make(map[string]*clist.CElement), } - if cfg.CacheSize > 0 { txmp.cache = NewLRUTxCache(cfg.CacheSize) } @@ -147,47 +108,36 @@ func WithMetrics(metrics *Metrics) TxMempoolOption { // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly // release the lock when finished. -func (txmp *TxMempool) Lock() { - txmp.mtx.Lock() -} +func (txmp *TxMempool) Lock() { txmp.mtx.Lock() } // Unlock releases a write-lock on the mempool. -func (txmp *TxMempool) Unlock() { - txmp.mtx.Unlock() -} +func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() } // Size returns the number of valid transactions in the mempool. It is // thread-safe. -func (txmp *TxMempool) Size() int { - return txmp.txStore.Size() -} +func (txmp *TxMempool) Size() int { return txmp.txs.Len() } // SizeBytes return the total sum in bytes of all the valid transactions in the // mempool. It is thread-safe. -func (txmp *TxMempool) SizeBytes() int64 { - return atomic.LoadInt64(&txmp.sizeBytes) -} +func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.txsBytes) } -// FlushAppConn executes FlushSync on the mempool's proxyAppConn. +// FlushAppConn executes Flush on the mempool's proxyAppConn. // -// NOTE: The caller must obtain a write-lock prior to execution. +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling FlushAppConn. func (txmp *TxMempool) FlushAppConn(ctx context.Context) error { + // N.B.: We have to issue the call outside the lock so that its callback can + // fire. It's safe to do this, the flush will block until complete. + // + // We could just not require the caller to hold the lock at all, but the + // semantics of the Mempool interface require the caller to hold it, and we + // can't change that without disrupting existing use. + txmp.mtx.Unlock() + defer txmp.mtx.Lock() + return txmp.proxyAppConn.Flush(ctx) } -// WaitForNextTx returns a blocking channel that will be closed when the next -// valid transaction is available to gossip. It is thread-safe. -func (txmp *TxMempool) WaitForNextTx() <-chan struct{} { - return txmp.gossipIndex.WaitChan() -} - -// NextGossipTx returns the next valid transaction to gossip. A caller must wait -// for WaitForNextTx to signal a transaction is available to gossip first. It is -// thread-safe. -func (txmp *TxMempool) NextGossipTx() *clist.CElement { - return txmp.gossipIndex.Front() -} - // EnableTxsAvailable enables the mempool to trigger events when transactions // are available on a block by block basis. func (txmp *TxMempool) EnableTxsAvailable() { @@ -199,232 +149,249 @@ func (txmp *TxMempool) EnableTxsAvailable() { // TxsAvailable returns a channel which fires once for every height, and only // when transactions are available in the mempool. It is thread-safe. -func (txmp *TxMempool) TxsAvailable() <-chan struct{} { - return txmp.txsAvailable -} +func (txmp *TxMempool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } -// CheckTx executes the ABCI CheckTx method for a given transaction. -// It acquires a read-lock and attempts to execute the application's -// CheckTx ABCI method synchronously. We return an error if any of -// the following happen: +// CheckTx adds the given transaction to the mempool if it fits and passes the +// application's ABCI CheckTx method. // -// - The CheckTx execution fails. -// - The transaction already exists in the cache and we've already received the -// transaction from the peer. Otherwise, if it solely exists in the cache, we -// return nil. -// - The transaction size exceeds the maximum transaction size as defined by the -// configuration provided to the mempool. -// - The transaction fails Pre-Check (if it is defined). -// - The proxyAppConn fails, e.g. the buffer is full. +// CheckTx reports an error without adding tx if: // -// If the mempool is full, we still execute CheckTx and attempt to find a lower -// priority transaction to evict. If such a transaction exists, we remove the -// lower priority transaction and add the new one with higher priority. +// - The size of tx exceeds the configured maximum transaction size. +// - The pre-check hook is defined and reports an error for tx. +// - The transaction already exists in the cache. +// - The proxy connection to the application fails. // -// NOTE: -// - The applications' CheckTx implementation may panic. -// - The caller is not to explicitly require any locks for executing CheckTx. +// If tx passes all of the above conditions, it is passed (asynchronously) to +// the application's ABCI CheckTx method and this CheckTx method returns nil. +// If cb != nil, it is called when the ABCI request completes to report the +// application response. +// +// If the application accepts the transaction and the mempool is full, the +// mempool evicts one or more of the lowest-priority transaction whose priority +// is (strictly) lower than the priority of tx and whose size together exceeds +// the size of tx, and adds tx instead. If no such transactions exist, tx is +// discarded. func (txmp *TxMempool) CheckTx( ctx context.Context, tx types.Tx, cb func(*abci.ResponseCheckTx), txInfo TxInfo, ) error { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + // During the initial phase of CheckTx, we do not need to modify any state. + // A transaction will not actually be added to the mempool until it survives + // a call to the ABCI CheckTx method and size constraint checks. + height, err := func() (int64, error) { + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() - if txSize := len(tx); txSize > txmp.config.MaxTxBytes { - return types.ErrTxTooLarge{ - Max: txmp.config.MaxTxBytes, - Actual: txSize, + // Reject transactions in excess of the configured maximum transaction size. + if len(tx) > txmp.config.MaxTxBytes { + return 0, types.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)} } - } - if txmp.preCheck != nil { - if err := txmp.preCheck(tx); err != nil { - return types.ErrPreCheck{Reason: err} + // If a precheck hook is defined, call it before invoking the application. + if txmp.preCheck != nil { + if err := txmp.preCheck(tx); err != nil { + return 0, types.ErrPreCheck{Reason: err} + } } - } - if err := txmp.proxyAppConn.Error(); err != nil { + // Early exit if the proxy connection has an error. + if err := txmp.proxyAppConn.Error(); err != nil { + return 0, err + } + + txKey := tx.Key() + + // Check for the transaction in the cache. + if !txmp.cache.Push(tx) { + // If the cached transaction is also in the pool, record its sender. + if elt, ok := txmp.txByKey[txKey]; ok { + w := elt.Value.(*WrappedTx) + w.SetPeer(txInfo.SenderID) + } + return 0, types.ErrTxInCache + } + return txmp.height, nil + }() + if err != nil { return err } - txHash := tx.Key() - - // We add the transaction to the mempool's cache and if the - // transaction is already present in the cache, i.e. false is returned, then we - // check if we've seen this transaction and error if we have. - if !txmp.cache.Push(tx) { - txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID) - return types.ErrTxInCache - } - - res, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{Tx: tx}) + // Invoke an ABCI CheckTx for this transaction. + rsp, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{Tx: tx}) if err != nil { txmp.cache.Remove(tx) return err } - - if txmp.recheckCursor != nil { - return errors.New("recheck cursor is non-nil") - } - wtx := &WrappedTx{ tx: tx, - hash: txHash, + hash: tx.Key(), timestamp: time.Now().UTC(), - height: txmp.height, - } - - txmp.defaultTxCallback(tx, res) - err = txmp.initTxCallback(wtx, res, txInfo) - - if err != nil { - return err + height: height, } + wtx.SetPeer(txInfo.SenderID) if cb != nil { - cb(res) + cb(rsp) } - - return nil + return txmp.addNewTransaction(wtx, rsp) } +// RemoveTxByKey removes the transaction with the specified key from the +// mempool. It reports an error if no such transaction exists. This operation +// does not remove the transaction from the cache. func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { - txmp.Lock() - defer txmp.Unlock() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + return txmp.removeTxByKey(txKey) +} - // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil { - txmp.removeTx(wtx, false) +// removeTxByKey removes the specified transaction key from the mempool. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) removeTxByKey(key types.TxKey) error { + if elt, ok := txmp.txByKey[key]; ok { + w := elt.Value.(*WrappedTx) + delete(txmp.txByKey, key) + delete(txmp.txBySender, w.sender) + txmp.txs.Remove(elt) + elt.DetachPrev() + elt.DetachNext() + atomic.AddInt64(&txmp.txsBytes, -w.Size()) return nil } - - return errors.New("transaction not found") + return fmt.Errorf("transaction %x not found", key) } -// Flush empties the mempool. It acquires a read-lock, fetches all the -// transactions currently in the transaction store and removes each transaction -// from the store and all indexes and finally resets the cache. -// -// NOTE: -// - Flushing the mempool may leave the mempool in an inconsistent state. +// removeTxByElement removes the specified transaction element from the mempool. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) removeTxByElement(elt *clist.CElement) { + w := elt.Value.(*WrappedTx) + delete(txmp.txByKey, w.tx.Key()) + delete(txmp.txBySender, w.sender) + txmp.txs.Remove(elt) + elt.DetachPrev() + elt.DetachNext() + atomic.AddInt64(&txmp.txsBytes, -w.Size()) +} + +// Flush purges the contents of the mempool and the cache, leaving both empty. +// The current height is not modified by this operation. func (txmp *TxMempool) Flush() { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() - txmp.heightIndex.Reset() - txmp.timestampIndex.Reset() - - for _, wtx := range txmp.txStore.GetAllTxs() { - txmp.removeTx(wtx, false) + // Remove all the transactions in the list explicitly, so that the sizes + // and indexes get updated properly. + cur := txmp.txs.Front() + for cur != nil { + next := cur.Next() + txmp.removeTxByElement(cur) + cur = next } - - atomic.SwapInt64(&txmp.sizeBytes, 0) txmp.cache.Reset() } -// ReapMaxBytesMaxGas returns a list of transactions within the provided size -// and gas constraints. Transaction are retrieved in priority order. +// allEntriesSorted returns a slice of all the transactions currently in the +// mempool, sorted in nonincreasing order by priority with ties broken by +// increasing order of arrival time. +func (txmp *TxMempool) allEntriesSorted() []*WrappedTx { + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() + + all := make([]*WrappedTx, 0, len(txmp.txByKey)) + for _, tx := range txmp.txByKey { + all = append(all, tx.Value.(*WrappedTx)) + } + sort.Slice(all, func(i, j int) bool { + if all[i].priority == all[j].priority { + return all[i].timestamp.Before(all[j].timestamp) + } + return all[i].priority > all[j].priority // N.B. higher priorities first + }) + return all +} + +// ReapMaxBytesMaxGas returns a slice of valid transactions that fit within the +// size and gas constraints. The results are ordered by nonincreasing priority, +// with ties broken by increasing order of arrival. Reaping transactions does +// not remove them from the mempool. // -// NOTE: -// - Transactions returned are not removed from the mempool transaction -// store or indexes. +// If maxBytes < 0, no limit is set on the total size in bytes. +// If maxGas < 0, no limit is set on the total gas cost. +// +// If the mempool is empty or has no transactions fitting within the given +// constraints, the result will also be empty. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + var totalGas, totalBytes int64 - var ( - totalGas int64 - totalSize int64 - ) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs()) - defer func() { - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) + var keep []types.Tx //nolint:prealloc + for _, w := range txmp.allEntriesSorted() { + // N.B. When computing byte size, we need to include the overhead for + // encoding as protobuf to send to the application. + totalGas += w.gasWanted + totalBytes += types.ComputeProtoSizeForTxs([]types.Tx{w.tx}) + if (maxGas >= 0 && totalGas > maxGas) || (maxBytes >= 0 && totalBytes > maxBytes) { + break } - }() - - txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs()) - for txmp.priorityIndex.NumTxs() > 0 { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) - size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx}) - - // Ensure we have capacity for the transaction with respect to the - // transaction size. - if maxBytes > -1 && totalSize+size > maxBytes { - return txs[:len(txs)-1] - } - - totalSize += size - - // ensure we have capacity for the transaction with respect to total gas - gas := totalGas + wtx.gasWanted - if maxGas > -1 && gas > maxGas { - return txs[:len(txs)-1] - } - - totalGas = gas + keep = append(keep, w.tx) } - - return txs + return keep } -// ReapMaxTxs returns a list of transactions within the provided number of -// transactions bound. Transaction are retrieved in priority order. +// TxsWaitChan returns a channel that is closed when there is at least one +// transaction available to be gossiped. +func (txmp *TxMempool) TxsWaitChan() <-chan struct{} { return txmp.txs.WaitChan() } + +// TxsFront returns the frontmost element of the pending transaction list. +// It will be nil if the mempool is empty. +func (txmp *TxMempool) TxsFront() *clist.CElement { return txmp.txs.Front() } + +// ReapMaxTxs returns up to max transactions from the mempool. The results are +// ordered by nonincreasing priority with ties broken by increasing order of +// arrival. Reaping transactions does not remove them from the mempool. // -// NOTE: -// - Transactions returned are not removed from the mempool transaction -// store or indexes. +// If max < 0, all transactions in the mempool are reaped. +// +// The result may have fewer than max elements (possibly zero) if the mempool +// does not have that many transactions available. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + var keep []types.Tx //nolint:prealloc - numTxs := txmp.priorityIndex.NumTxs() - if max < 0 { - max = numTxs + for _, w := range txmp.allEntriesSorted() { + if max >= 0 && len(keep) >= max { + break + } + keep = append(keep, w.tx) } - - cap := tmmath.MinInt(numTxs, max) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, cap) - txs := make([]types.Tx, 0, cap) - for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) - } - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) - } - return txs + return keep } -// Update iterates over all the transactions provided by the block producer, -// removes them from the cache (if applicable), and removes -// the transactions from the main transaction store and associated indexes. -// If there are transactions remaining in the mempool, we initiate a -// re-CheckTx for them (if applicable), otherwise, we notify the caller more -// transactions are available. +// Update removes all the given transactions from the mempool and the cache, +// and updates the current block height. The blockTxs and deliverTxResponses +// must have the same length with each response corresponding to the tx at the +// same offset. // -// NOTE: -// - The caller must explicitly acquire a write-lock. +// If the configuration enables recheck, Update sends each remaining +// transaction after removing blockTxs to the ABCI CheckTx method. Any +// transactions marked as invalid during recheck are also removed. +// +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling Update. func (txmp *TxMempool) Update( ctx context.Context, blockHeight int64, blockTxs types.Txs, - execTxResult []*abci.ExecTxResult, + deliverTxResponses []*abci.ExecTxResult, newPreFn PreCheckFunc, newPostFn PostCheckFunc, recheck bool, ) error { + // Safety check: Transactions and responses must match in number. + if len(blockTxs) != len(deliverTxResponses) { + panic(fmt.Sprintf("mempool: got %d transactions but %d ExecTx responses", + len(blockTxs), len(deliverTxResponses))) + } + txmp.height = blockHeight txmp.notifiedTxsAvailable = false @@ -436,18 +403,17 @@ func (txmp *TxMempool) Update( } for i, tx := range blockTxs { - if execTxResult[i].Code == abci.CodeTypeOK { - // add the valid committed transaction to the cache (if missing) + // Add successful committed transactions to the cache (if they are not + // already present). Transactions that failed to commit are removed from + // the cache unless the operator has explicitly requested we keep them. + if deliverTxResponses[i].Code == abci.CodeTypeOK { _ = txmp.cache.Push(tx) } else if !txmp.config.KeepInvalidTxsInCache { - // allow invalid transactions to be re-submitted txmp.cache.Remove(tx) } - // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil { - txmp.removeTx(wtx, false) - } + // Regardless of success, remove the transaction from the mempool. + _ = txmp.removeTxByKey(tx.Key()) } txmp.purgeExpiredTxs(blockHeight) @@ -455,134 +421,173 @@ func (txmp *TxMempool) Update( // If there any uncommitted transactions left in the mempool, we either // initiate re-CheckTx per remaining transaction or notify that remaining // transactions are left. - if txmp.Size() > 0 { + size := txmp.Size() + txmp.metrics.Size.Set(float64(size)) + if size > 0 { if recheck { - txmp.logger.Debug( - "executing re-CheckTx for all remaining transactions", - "num_txs", txmp.Size(), - "height", blockHeight, - ) - txmp.updateReCheckTxs(ctx) + txmp.recheckTransactions(ctx) } else { txmp.notifyTxsAvailable() } } - - txmp.metrics.Size.Set(float64(txmp.Size())) return nil } -// initTxCallback is the callback invoked for a new unique transaction after CheckTx -// has been executed by the ABCI application for the first time on that transaction. -// CheckTx can be called again for the same transaction later when re-checking; -// however, this callback will not be called. +// addNewTransaction handles the ABCI CheckTx response for the first time a +// transaction is added to the mempool. A recheck after a block is committed +// goes to handleRecheckResult. // -// initTxCallback runs after the ABCI application executes CheckTx. -// It runs the postCheck hook if one is defined on the mempool. -// If the CheckTx response response code is not OK, or if the postCheck hook -// reports an error, the transaction is rejected. Otherwise, we attempt to insert -// the transaction into the mempool. +// If either the application rejected the transaction or a post-check hook is +// defined and rejects the transaction, it is discarded. // -// When inserting a transaction, we first check if there is sufficient capacity. -// If there is, the transaction is added to the txStore and all indexes. -// Otherwise, if the mempool is full, we attempt to find a lower priority transaction -// to evict in place of the new incoming transaction. If no such transaction exists, -// the new incoming transaction is rejected. +// Otherwise, if the mempool is full, check for lower-priority transactions +// that can be evicted to make room for the new one. If no such transactions +// exist, this transaction is logged and dropped; otherwise the selected +// transactions are evicted. // -// NOTE: -// - An explicit lock is NOT required. -func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.ResponseCheckTx, txInfo TxInfo) error { +// Finally, the new transaction is added and size stats updated. +func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) error { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + var err error if txmp.postCheck != nil { - err = txmp.postCheck(wtx.tx, res) + err = txmp.postCheck(wtx.tx, checkTxRes) } - if err != nil || res.Code != abci.CodeTypeOK { - // ignore bad transactions + if err != nil || checkTxRes.Code != abci.CodeTypeOK { txmp.logger.Info( "rejected bad transaction", - "priority", wtx.priority, + "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "peer_id", txInfo.SenderNodeID, - "code", res.Code, + "peer_id", wtx.peers, + "code", checkTxRes.Code, "post_check_err", err, ) txmp.metrics.FailedTxs.Add(1) + // Remove the invalid transaction from the cache, unless the operator has + // instructed us to keep invalid transactions. if !txmp.config.KeepInvalidTxsInCache { txmp.cache.Remove(wtx.tx) } - return err + + if err != nil { + return err + } + // TODO(creachadair): Report an error for an invalid transaction. + // This is an API change, unfortunately, but should be made safe if it isn't. + // fmt.Errorf("invalid transaction: ABCI response code %d", checkTxRes.Code) + return nil } - sender := res.Sender - priority := res.Priority + priority := checkTxRes.Priority + sender := checkTxRes.Sender - if len(sender) > 0 { - if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil { - txmp.logger.Error( - "rejected incoming good transaction; tx already exists for sender", - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + // Disallow multiple concurrent transactions from the same sender assigned + // by the ABCI application. As a special case, an empty sender is not + // restricted. + if sender != "" { + elt, ok := txmp.txBySender[sender] + if ok { + w := elt.Value.(*WrappedTx) + txmp.logger.Debug( + "rejected valid incoming transaction; tx already exists for sender", + "tx", fmt.Sprintf("%X", w.tx.Hash()), "sender", sender, ) txmp.metrics.RejectedTxs.Add(1) + // TODO(creachadair): Report an error for a duplicate sender. + // This is an API change, unfortunately, but should be made safe if it isn't. + // fmt.Errorf("transaction rejected: tx already exists for sender %q (%X)", sender, w.tx.Hash()) return nil } } + // At this point the application has ruled the transaction valid, but the + // mempool might be full. If so, find the lowest-priority items with lower + // priority than the application assigned to this new one, and evict as many + // of them as necessary to make room for tx. If no such items exist, we + // discard tx. + if err := txmp.canAddTx(wtx); err != nil { - evictTxs := txmp.priorityIndex.GetEvictableTxs( - priority, - int64(wtx.Size()), - txmp.SizeBytes(), - txmp.config.MaxTxsBytes, - ) - if len(evictTxs) == 0 { - // No room for the new incoming transaction so we just remove it from - // the cache and return an error to the user. + var victims []*clist.CElement // eligible transactions for eviction + var victimBytes int64 // total size of victims + for cur := txmp.txs.Front(); cur != nil; cur = cur.Next() { + cw := cur.Value.(*WrappedTx) + if cw.priority < priority { + victims = append(victims, cur) + victimBytes += cw.Size() + } + } + + // If there are no suitable eviction candidates, or the total size of + // those candidates is not enough to make room for the new transaction, + // drop the new one. + if len(victims) == 0 || victimBytes < wtx.Size() { txmp.cache.Remove(wtx.tx) txmp.logger.Error( - "rejected incoming good transaction; mempool full", + "rejected valid incoming transaction; mempool is full", "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err.Error(), ) txmp.metrics.RejectedTxs.Add(1) - return err + // TODO(creachadair): Report an error for a full mempool. + // This is an API change, unfortunately, but should be made safe if it isn't. + // fmt.Errorf("transaction rejected: mempool is full (%X)", wtx.tx.Hash()) + return nil } - // evict an existing transaction(s) - // - // NOTE: - // - The transaction, toEvict, can be removed while a concurrent - // reCheckTx callback is being executed for the same transaction. - for _, toEvict := range evictTxs { - txmp.removeTx(toEvict, true) + txmp.logger.Debug("evicting lower-priority transactions", + "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "new_priority", priority, + ) + + // Sort lowest priority items first so they will be evicted first. Break + // ties in favor of newer items (to maintain FIFO semantics in a group). + sort.Slice(victims, func(i, j int) bool { + iw := victims[i].Value.(*WrappedTx) + jw := victims[j].Value.(*WrappedTx) + if iw.Priority() == jw.Priority() { + return iw.timestamp.After(jw.timestamp) + } + return iw.Priority() < jw.Priority() + }) + + // Evict as many of the victims as necessary to make room. + var evictedBytes int64 + for _, vic := range victims { + w := vic.Value.(*WrappedTx) + txmp.logger.Debug( - "evicted existing good transaction; mempool full", - "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()), - "old_priority", toEvict.priority, - "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "new_priority", wtx.priority, + "evicted valid existing transaction; mempool full", + "old_tx", fmt.Sprintf("%X", w.tx.Hash()), + "old_priority", w.priority, ) + txmp.removeTxByElement(vic) + txmp.cache.Remove(w.tx) txmp.metrics.EvictedTxs.Add(1) + + // We may not need to evict all the eligible transactions. Bail out + // early if we have made enough room. + evictedBytes += w.Size() + if evictedBytes >= wtx.Size() { + break + } } } - wtx.gasWanted = res.GasWanted - wtx.priority = priority - wtx.sender = sender - wtx.peers = map[uint16]struct{}{ - txInfo.SenderID: {}, - } + wtx.SetGasWanted(checkTxRes.GasWanted) + wtx.SetPriority(priority) + wtx.SetSender(sender) + txmp.insertTx(wtx) txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) txmp.metrics.Size.Set(float64(txmp.Size())) - - txmp.insertTx(wtx) txmp.logger.Debug( - "inserted good transaction", - "priority", wtx.priority, + "inserted new valid transaction", + "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "height", txmp.height, "num_txs", txmp.Size(), @@ -591,149 +596,129 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.ResponseCheckTx, return nil } -// defaultTxCallback is the CheckTx application callback used when a -// transaction is being re-checked (if re-checking is enabled). The -// caller must hold a mempool write-lock (via Lock()) and when -// executing Update(), if the mempool is non-empty and Recheck is -// enabled, then all remaining transactions will be rechecked via -// CheckTx. The order transactions are rechecked must be the same as -// the order in which this callback is called. -func (txmp *TxMempool) defaultTxCallback(tx types.Tx, res *abci.ResponseCheckTx) { - if txmp.recheckCursor == nil { +func (txmp *TxMempool) insertTx(wtx *WrappedTx) { + elt := txmp.txs.PushBack(wtx) + txmp.txByKey[wtx.tx.Key()] = elt + if s := wtx.Sender(); s != "" { + txmp.txBySender[s] = elt + } + + atomic.AddInt64(&txmp.txsBytes, wtx.Size()) +} + +// handleRecheckResult handles the responses from ABCI CheckTx calls issued +// during the recheck phase of a block Update. It removes any transactions +// invalidated by the application. +// +// This method is NOT executed for the initial CheckTx on a new transaction; +// that case is handled by addNewTransaction instead. +func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) { + txmp.metrics.RecheckTimes.Add(1) + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + + // Find the transaction reported by the ABCI callback. It is possible the + // transaction was evicted during the recheck, in which case the transaction + // will be gone. + elt, ok := txmp.txByKey[tx.Key()] + if !ok { return } + wtx := elt.Value.(*WrappedTx) - txmp.metrics.RecheckTimes.Add(1) - - wtx := txmp.recheckCursor.Value.(*WrappedTx) - - // Search through the remaining list of tx to recheck for a transaction that matches - // the one we received from the ABCI application. - for { - if bytes.Equal(tx, wtx.tx) { - // We've found a tx in the recheck list that matches the tx that we - // received from the ABCI application. - // Break, and use this transaction for further checks. - break - } - - txmp.logger.Error( - "re-CheckTx transaction mismatch", - "got", wtx.tx.Hash(), - "expected", tx.Key(), - ) - - if txmp.recheckCursor == txmp.recheckEnd { - // we reached the end of the recheckTx list without finding a tx - // matching the one we received from the ABCI application. - // Return without processing any tx. - txmp.recheckCursor = nil - return - } - - txmp.recheckCursor = txmp.recheckCursor.Next() - wtx = txmp.recheckCursor.Value.(*WrappedTx) + // If a postcheck hook is defined, call it before checking the result. + var err error + if txmp.postCheck != nil { + err = txmp.postCheck(tx, checkTxRes) } - // Only evaluate transactions that have not been removed. This can happen - // if an existing transaction is evicted during CheckTx and while this - // callback is being executed for the same evicted transaction. - if !txmp.txStore.IsTxRemoved(wtx.hash) { - var err error - if txmp.postCheck != nil { - err = txmp.postCheck(tx, res) - } - - if res.Code == abci.CodeTypeOK && err == nil { - wtx.priority = res.Priority - } else { - txmp.logger.Debug( - "existing transaction no longer valid; failed re-CheckTx callback", - "priority", wtx.priority, - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "err", err, - "code", res.Code, - ) - - if wtx.gossipEl != txmp.recheckCursor { - panic("corrupted reCheckTx cursor") - } - - txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) - } + if checkTxRes.Code == abci.CodeTypeOK && err == nil { + wtx.SetPriority(checkTxRes.Priority) + return // N.B. Size of mempool did not change } - // move reCheckTx cursor to next element - if txmp.recheckCursor == txmp.recheckEnd { - txmp.recheckCursor = nil - } else { - txmp.recheckCursor = txmp.recheckCursor.Next() + txmp.logger.Debug( + "existing transaction no longer valid; failed re-CheckTx callback", + "priority", wtx.Priority(), + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "err", err, + "code", checkTxRes.Code, + ) + txmp.removeTxByElement(elt) + txmp.metrics.FailedTxs.Add(1) + if !txmp.config.KeepInvalidTxsInCache { + txmp.cache.Remove(wtx.tx) } - - if txmp.recheckCursor == nil { - txmp.logger.Debug("finished rechecking transactions") - - if txmp.Size() > 0 { - txmp.notifyTxsAvailable() - } - } - txmp.metrics.Size.Set(float64(txmp.Size())) } -// updateReCheckTxs updates the recheck cursors using the gossipIndex. For -// each transaction, it executes CheckTx. The global callback defined on -// the proxyAppConn will be executed for each transaction after CheckTx is -// executed. +// recheckTransactions initiates re-CheckTx ABCI calls for all the transactions +// currently in the mempool. It reports the number of recheck calls that were +// successfully initiated. // -// NOTE: -// - The caller must have a write-lock when executing updateReCheckTxs. -func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) { +// Precondition: The mempool is not empty. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) recheckTransactions(ctx context.Context) { if txmp.Size() == 0 { - panic("attempted to update re-CheckTx txs when mempool is empty") + panic("mempool: cannot run recheck on an empty mempool") + } + txmp.logger.Debug( + "executing re-CheckTx for all remaining transactions", + "num_txs", txmp.Size(), + "height", txmp.height, + ) + + // Collect transactions currently in the mempool requiring recheck. + wtxs := make([]*WrappedTx, 0, txmp.txs.Len()) + for e := txmp.txs.Front(); e != nil; e = e.Next() { + wtxs = append(wtxs, e.Value.(*WrappedTx)) } - txmp.recheckCursor = txmp.gossipIndex.Front() - txmp.recheckEnd = txmp.gossipIndex.Back() + // Issue CheckTx calls for each remaining transaction, and when all the + // rechecks are complete signal watchers that transactions may be available. + go func() { + g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU()) - for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() { - wtx := e.Value.(*WrappedTx) - - // Only execute CheckTx if the transaction is not marked as removed which - // could happen if the transaction was evicted. - if !txmp.txStore.IsTxRemoved(wtx.hash) { - res, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{ - Tx: wtx.tx, - Type: abci.CheckTxType_Recheck, + for _, wtx := range wtxs { + wtx := wtx + start(func() error { + rsp, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{ + Tx: wtx.tx, + Type: abci.CheckTxType_Recheck, + }) + if err != nil { + txmp.logger.Error("failed to execute CheckTx during recheck", + "err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) + } else { + txmp.handleRecheckResult(wtx.tx, rsp) + } + return nil }) - if err != nil { - // no need in retrying since the tx will be rechecked after the next block - txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err) - continue - } - txmp.defaultTxCallback(wtx.tx, res) } - } + if err := txmp.proxyAppConn.Flush(ctx); err != nil { + txmp.logger.Error("failed to flush transactions during recheck", "err", err) + } - if err := txmp.proxyAppConn.Flush(ctx); err != nil { - txmp.logger.Error("failed to flush transactions during rechecking", "err", err) - } + // When recheck is complete, trigger a notification for more transactions. + _ = g.Wait() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + txmp.notifyTxsAvailable() + }() } // canAddTx returns an error if we cannot insert the provided *WrappedTx into -// the mempool due to mempool configured constraints. If it returns nil, -// the transaction can be inserted into the mempool. +// the mempool due to mempool configured constraints. Otherwise, nil is +// returned and the transaction can be inserted into the mempool. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { - var ( - numTxs = txmp.Size() - sizeBytes = txmp.SizeBytes() - ) + numTxs := txmp.Size() + txBytes := txmp.SizeBytes() - if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes { + if numTxs >= txmp.config.Size || wtx.Size()+txBytes > txmp.config.MaxTxsBytes { return types.ErrMempoolIsFull{ NumTxs: numTxs, MaxTxs: txmp.config.Size, - TxsBytes: sizeBytes, + TxsBytes: txBytes, MaxTxsBytes: txmp.config.MaxTxsBytes, } } @@ -741,96 +726,40 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { return nil } -func (txmp *TxMempool) insertTx(wtx *WrappedTx) { - txmp.txStore.SetTx(wtx) - txmp.priorityIndex.PushTx(wtx) - txmp.heightIndex.Insert(wtx) - txmp.timestampIndex.Insert(wtx) - - // Insert the transaction into the gossip index and mark the reference to the - // linked-list element, which will be needed at a later point when the - // transaction is removed. - gossipEl := txmp.gossipIndex.PushBack(wtx) - wtx.gossipEl = gossipEl - - atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size())) -} - -func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { - if txmp.txStore.IsTxRemoved(wtx.hash) { - return - } - - txmp.txStore.RemoveTx(wtx) - txmp.priorityIndex.RemoveTx(wtx) - txmp.heightIndex.Remove(wtx) - txmp.timestampIndex.Remove(wtx) - - // Remove the transaction from the gossip index and cleanup the linked-list - // element so it can be garbage collected. - txmp.gossipIndex.Remove(wtx.gossipEl) - wtx.gossipEl.DetachPrev() - - atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) - - if removeFromCache { - txmp.cache.Remove(wtx.tx) - } -} - -// purgeExpiredTxs removes all transactions that have exceeded their respective -// height- and/or time-based TTLs from their respective indexes. Every expired -// transaction will be removed from the mempool, but preserved in the cache. +// purgeExpiredTxs removes all transactions from the mempool that have exceeded +// their respective height or time-based limits as of the given blockHeight. +// Transactions removed by this operation are not removed from the cache. // -// NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which -// the caller has a write-lock on the mempool and so we can safely iterate over -// the height and time based indexes. +// The caller must hold txmp.mtx exclusively. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { + if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 { + return // nothing to do + } + now := time.Now() - expiredTxs := make(map[types.TxKey]*WrappedTx) + cur := txmp.txs.Front() + for cur != nil { + // N.B. Grab the next element first, since if we remove cur its successor + // will be invalidated. + next := cur.Next() - if txmp.config.TTLNumBlocks > 0 { - purgeIdx := -1 - for i, wtx := range txmp.heightIndex.txs { - if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks { - expiredTxs[wtx.tx.Key()] = wtx - purgeIdx = i - } else { - // since the index is sorted, we know no other txs can be be purged - break - } + w := cur.Value.(*WrappedTx) + if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks { + txmp.removeTxByElement(cur) + txmp.cache.Remove(w.tx) + txmp.metrics.EvictedTxs.Add(1) + } else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration { + txmp.removeTxByElement(cur) + txmp.cache.Remove(w.tx) + txmp.metrics.EvictedTxs.Add(1) } - - if purgeIdx >= 0 { - txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:] - } - } - - if txmp.config.TTLDuration > 0 { - purgeIdx := -1 - for i, wtx := range txmp.timestampIndex.txs { - if now.Sub(wtx.timestamp) > txmp.config.TTLDuration { - expiredTxs[wtx.tx.Key()] = wtx - purgeIdx = i - } else { - // since the index is sorted, we know no other txs can be be purged - break - } - } - - if purgeIdx >= 0 { - txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:] - } - } - - for _, wtx := range expiredTxs { - txmp.removeTx(wtx, false) + cur = next } } func (txmp *TxMempool) notifyTxsAvailable() { if txmp.Size() == 0 { - panic("attempt to notify txs available but mempool is empty!") + return // nothing to do } if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable { diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 538cb3e1f..a6fea9a8a 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -86,9 +86,19 @@ func setup(t testing.TB, app abciclient.Client, cacheSize int, options ...TxMemp return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, app, options...) } -func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { - t.Helper() +// mustCheckTx invokes txmp.CheckTx for the given transaction and waits until +// its callback has finished executing. It fails t if CheckTx fails. +func mustCheckTx(ctx context.Context, t *testing.T, txmp *TxMempool, spec string) { + done := make(chan struct{}) + if err := txmp.CheckTx(ctx, []byte(spec), func(*abci.ResponseCheckTx) { + close(done) + }, TxInfo{}); err != nil { + t.Fatalf("CheckTx for %q failed: %v", spec, err) + } + <-done +} +func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { txs := make([]testTx, numTxs) txInfo := TxInfo{SenderID: peerID} @@ -217,6 +227,87 @@ func TestTxMempool_Size(t *testing.T) { require.Equal(t, int64(2850), txmp.SizeBytes()) } +func TestTxMempool_Eviction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(t, client, 1000) + txmp.config.Size = 5 + txmp.config.MaxTxsBytes = 60 + txExists := func(spec string) bool { + txmp.Lock() + defer txmp.Unlock() + key := types.Tx(spec).Key() + _, ok := txmp.txByKey[key] + return ok + } + t.Cleanup(client.Wait) + + // A transaction bigger than the mempool should be rejected even when there + // are slots available. + mustCheckTx(ctx, t, txmp, "big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1") + require.Equal(t, 0, txmp.Size()) + + // Nearly-fill the mempool with a low-priority transaction, to show that it + // is evicted even when slots are available for a higher-priority tx. + const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2" + mustCheckTx(ctx, t, txmp, bigTx) + require.Equal(t, 1, txmp.Size()) // bigTx is the only element + require.True(t, txExists(bigTx)) + require.Equal(t, int64(len(bigTx)), txmp.SizeBytes()) + + // The next transaction should evict bigTx, because it is higher priority + // but does not fit on size. + mustCheckTx(ctx, t, txmp, "key1=0000=25") + require.True(t, txExists("key1=0000=25")) + require.False(t, txExists(bigTx)) + require.False(t, txmp.cache.Has([]byte(bigTx))) + require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) + + // Now fill up the rest of the slots with other transactions. + mustCheckTx(ctx, t, txmp, "key2=0001=5") + mustCheckTx(ctx, t, txmp, "key3=0002=10") + mustCheckTx(ctx, t, txmp, "key4=0003=3") + mustCheckTx(ctx, t, txmp, "key5=0004=3") + + // A new transaction with low priority should be discarded. + mustCheckTx(ctx, t, txmp, "key6=0005=1") + require.False(t, txExists("key6=0005=1")) + + // A new transaction with higher priority should evict key5, which is the + // newest of the two transactions with lowest priority. + mustCheckTx(ctx, t, txmp, "key7=0006=7") + require.True(t, txExists("key7=0006=7")) // new transaction added + require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted + require.True(t, txExists("key4=0003=3")) // older low-priority tx retained + + // Another new transaction evicts the other low-priority element. + mustCheckTx(ctx, t, txmp, "key8=0007=20") + require.True(t, txExists("key8=0007=20")) + require.False(t, txExists("key4=0003=3")) + + // Now the lowest-priority tx is 5, so that should be the next to go. + mustCheckTx(ctx, t, txmp, "key9=0008=9") + require.True(t, txExists("key9=0008=9")) + require.False(t, txExists("k3y2=0001=5")) + + // Add a transaction that requires eviction of multiple lower-priority + // entries, in order to fit the size of the element. + mustCheckTx(ctx, t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11 + require.True(t, txExists("key1=0000=25")) + require.True(t, txExists("key8=0007=20")) + require.True(t, txExists("key10=0123456789abcdef=11")) + require.False(t, txExists("key3=0002=10")) + require.False(t, txExists("key9=0008=9")) + require.False(t, txExists("key7=0006=7")) +} + func TestTxMempool_Flush(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -537,7 +628,6 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { tTxs := checkTxs(ctx, t, txmp, 100, 0) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, 100, txmp.heightIndex.Size()) // reap 5 txs at the next height -- no txs should expire reapedTxs := txmp.ReapMaxTxs(5) @@ -551,12 +641,10 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp.Unlock() require.Equal(t, 95, txmp.Size()) - require.Equal(t, 95, txmp.heightIndex.Size()) // check more txs at height 101 _ = checkTxs(ctx, t, txmp, 50, 1) require.Equal(t, 145, txmp.Size()) - require.Equal(t, 145, txmp.heightIndex.Size()) // Reap 5 txs at a height that would expire all the transactions from before // the previous Update (height 100). @@ -577,7 +665,6 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp.Unlock() require.GreaterOrEqual(t, txmp.Size(), 45) - require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45) } func TestTxMempool_CheckTxPostCheckError(t *testing.T) { diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go deleted file mode 100644 index e31997397..000000000 --- a/internal/mempool/priority_queue.go +++ /dev/null @@ -1,158 +0,0 @@ -package mempool - -import ( - "container/heap" - "sort" - "sync" -) - -var _ heap.Interface = (*TxPriorityQueue)(nil) - -// TxPriorityQueue defines a thread-safe priority queue for valid transactions. -type TxPriorityQueue struct { - mtx sync.RWMutex - txs []*WrappedTx -} - -func NewTxPriorityQueue() *TxPriorityQueue { - pq := &TxPriorityQueue{ - txs: make([]*WrappedTx, 0), - } - - heap.Init(pq) - - return pq -} - -// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be -// evicted to make room for another *WrappedTx with higher priority. If no such -// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx -// indicate that these transactions can be removed due to them being of lower -// priority and that their total sum in size allows room for the incoming -// transaction according to the mempool's configured limits. -func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx { - pq.mtx.RLock() - defer pq.mtx.RUnlock() - - txs := make([]*WrappedTx, len(pq.txs)) - copy(txs, pq.txs) - - sort.Slice(txs, func(i, j int) bool { - return txs[i].priority < txs[j].priority - }) - - var ( - toEvict []*WrappedTx - i int - ) - - currSize := totalSize - - // Loop over all transactions in ascending priority order evaluating those - // that are only of less priority than the provided argument. We continue - // evaluating transactions until there is sufficient capacity for the new - // transaction (size) as defined by txSize. - for i < len(txs) && txs[i].priority < priority { - toEvict = append(toEvict, txs[i]) - currSize -= int64(txs[i].Size()) - - if currSize+txSize <= cap { - return toEvict - } - - i++ - } - - return nil -} - -// NumTxs returns the number of transactions in the priority queue. It is -// thread safe. -func (pq *TxPriorityQueue) NumTxs() int { - pq.mtx.RLock() - defer pq.mtx.RUnlock() - - return len(pq.txs) -} - -// RemoveTx removes a specific transaction from the priority queue. -func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - if tx.heapIndex < len(pq.txs) { - heap.Remove(pq, tx.heapIndex) - } -} - -// PushTx adds a valid transaction to the priority queue. It is thread safe. -func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - heap.Push(pq, tx) -} - -// PopTx removes the top priority transaction from the queue. It is thread safe. -func (pq *TxPriorityQueue) PopTx() *WrappedTx { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - x := heap.Pop(pq) - if x != nil { - return x.(*WrappedTx) - } - - return nil -} - -// Push implements the Heap interface. -// -// NOTE: A caller should never call Push. Use PushTx instead. -func (pq *TxPriorityQueue) Push(x interface{}) { - n := len(pq.txs) - item := x.(*WrappedTx) - item.heapIndex = n - pq.txs = append(pq.txs, item) -} - -// Pop implements the Heap interface. -// -// NOTE: A caller should never call Pop. Use PopTx instead. -func (pq *TxPriorityQueue) Pop() interface{} { - old := pq.txs - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.heapIndex = -1 // for safety - pq.txs = old[0 : n-1] - return item -} - -// Len implements the Heap interface. -// -// NOTE: A caller should never call Len. Use NumTxs instead. -func (pq *TxPriorityQueue) Len() int { - return len(pq.txs) -} - -// Less implements the Heap interface. It returns true if the transaction at -// position i in the queue is of less priority than the transaction at position j. -func (pq *TxPriorityQueue) Less(i, j int) bool { - // If there exists two transactions with the same priority, consider the one - // that we saw the earliest as the higher priority transaction. - if pq.txs[i].priority == pq.txs[j].priority { - return pq.txs[i].timestamp.Before(pq.txs[j].timestamp) - } - - // We want Pop to give us the highest, not lowest, priority so we use greater - // than here. - return pq.txs[i].priority > pq.txs[j].priority -} - -// Swap implements the Heap interface. It swaps two transactions in the queue. -func (pq *TxPriorityQueue) Swap(i, j int) { - pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i] - pq.txs[i].heapIndex = i - pq.txs[j].heapIndex = j -} diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go deleted file mode 100644 index ddc84806d..000000000 --- a/internal/mempool/priority_queue_test.go +++ /dev/null @@ -1,176 +0,0 @@ -package mempool - -import ( - "math/rand" - "sort" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestTxPriorityQueue(t *testing.T) { - pq := NewTxPriorityQueue() - numTxs := 1000 - - priorities := make([]int, numTxs) - - var wg sync.WaitGroup - for i := 1; i <= numTxs; i++ { - priorities[i-1] = i - wg.Add(1) - - go func(i int) { - pq.PushTx(&WrappedTx{ - priority: int64(i), - timestamp: time.Now(), - }) - - wg.Done() - }(i) - } - - sort.Sort(sort.Reverse(sort.IntSlice(priorities))) - - wg.Wait() - require.Equal(t, numTxs, pq.NumTxs()) - - // Wait a second and push a tx with a duplicate priority - time.Sleep(time.Second) - now := time.Now() - pq.PushTx(&WrappedTx{ - priority: 1000, - timestamp: now, - }) - require.Equal(t, 1001, pq.NumTxs()) - - tx := pq.PopTx() - require.Equal(t, 1000, pq.NumTxs()) - require.Equal(t, int64(1000), tx.priority) - require.NotEqual(t, now, tx.timestamp) - - gotPriorities := make([]int, 0) - for pq.NumTxs() > 0 { - gotPriorities = append(gotPriorities, int(pq.PopTx().priority)) - } - - require.Equal(t, priorities, gotPriorities) -} - -func TestTxPriorityQueue_GetEvictableTxs(t *testing.T) { - pq := NewTxPriorityQueue() - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - values := make([]int, 1000) - - for i := 0; i < 1000; i++ { - tx := make([]byte, 5) // each tx is 5 bytes - _, err := rng.Read(tx) - require.NoError(t, err) - - x := rng.Intn(100000) - pq.PushTx(&WrappedTx{ - tx: tx, - priority: int64(x), - }) - - values[i] = x - } - - sort.Ints(values) - - max := values[len(values)-1] - min := values[0] - totalSize := int64(len(values) * 5) - - testCases := []struct { - name string - priority, txSize, totalSize, cap int64 - expectedLen int - }{ - { - name: "larest priority; single tx", - priority: int64(max + 1), - txSize: 5, - totalSize: totalSize, - cap: totalSize, - expectedLen: 1, - }, - { - name: "larest priority; multi tx", - priority: int64(max + 1), - txSize: 17, - totalSize: totalSize, - cap: totalSize, - expectedLen: 4, - }, - { - name: "larest priority; out of capacity", - priority: int64(max + 1), - txSize: totalSize + 1, - totalSize: totalSize, - cap: totalSize, - expectedLen: 0, - }, - { - name: "smallest priority; no tx", - priority: int64(min - 1), - txSize: 5, - totalSize: totalSize, - cap: totalSize, - expectedLen: 0, - }, - { - name: "small priority; no tx", - priority: int64(min), - txSize: 5, - totalSize: totalSize, - cap: totalSize, - expectedLen: 0, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - evictTxs := pq.GetEvictableTxs(tc.priority, tc.txSize, tc.totalSize, tc.cap) - require.Len(t, evictTxs, tc.expectedLen) - }) - } -} - -func TestTxPriorityQueue_RemoveTx(t *testing.T) { - pq := NewTxPriorityQueue() - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - numTxs := 1000 - - values := make([]int, numTxs) - - for i := 0; i < numTxs; i++ { - x := rng.Intn(100000) - pq.PushTx(&WrappedTx{ - priority: int64(x), - }) - - values[i] = x - } - - require.Equal(t, numTxs, pq.NumTxs()) - - sort.Ints(values) - max := values[len(values)-1] - - wtx := pq.txs[pq.NumTxs()/2] - pq.RemoveTx(wtx) - require.Equal(t, numTxs-1, pq.NumTxs()) - require.Equal(t, int64(max), pq.PopTx().priority) - require.Equal(t, numTxs-2, pq.NumTxs()) - - require.NotPanics(t, func() { - pq.RemoveTx(&WrappedTx{heapIndex: numTxs}) - pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1}) - }) - require.Equal(t, numTxs-2, pq.NumTxs()) -} diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 62cdf386c..1f4b3f78a 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -307,8 +307,8 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, m select { case <-ctx.Done(): return - case <-r.mempool.WaitForNextTx(): // wait until a tx is available - if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil { + case <-r.mempool.TxsWaitChan(): // wait until a tx is available + if nextGossipTx = r.mempool.TxsFront(); nextGossipTx == nil { continue } } @@ -318,7 +318,7 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, m // NOTE: Transaction batching was disabled due to: // https://github.com/tendermint/tendermint/issues/5796 - if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok { + if !memTx.HasPeer(peerMempoolID) { // Send the mempool tx to the corresponding peer. Note, the peer may be // behind and thus would not be able to process the mempool tx correctly. if err := mempoolCh.Send(ctx, p2p.Envelope{ diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index ee7fe777f..bd6ccf8b2 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -68,7 +68,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode require.NoError(t, client.Start(ctx)) t.Cleanup(client.Wait) - mempool := setup(t, client, 0) + mempool := setup(t, client, 1<<20) rts.mempools[nodeID] = mempool rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf) @@ -170,7 +170,9 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { secondaryReactor.observePanic = observePanic firstTx := &WrappedTx{} + primaryMempool.Lock() primaryMempool.insertTx(firstTx) + primaryMempool.Unlock() // run the router rts.start(ctx, t) @@ -183,6 +185,8 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { wg.Add(1) go func() { defer wg.Done() + primaryMempool.Lock() + defer primaryMempool.Unlock() primaryMempool.insertTx(next) }() } diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index c7113c951..1a221e2c3 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -1,11 +1,9 @@ package mempool import ( - "sort" "sync" "time" - "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/types" ) @@ -24,270 +22,78 @@ type TxInfo struct { // WrappedTx defines a wrapper around a raw transaction with additional metadata // that is used for indexing. type WrappedTx struct { - // tx represents the raw binary transaction data - tx types.Tx + tx types.Tx // the original transaction data + hash types.TxKey // the transaction hash + height int64 // height when this transaction was initially checked (for expiry) + timestamp time.Time // time when transaction was entered (for TTL) - // hash defines the transaction hash and the primary key used in the mempool - hash types.TxKey - - // height defines the height at which the transaction was validated at - height int64 - - // gasWanted defines the amount of gas the transaction sender requires - gasWanted int64 - - // priority defines the transaction's priority as specified by the application - // in the ResponseCheckTx response. - priority int64 - - // sender defines the transaction's sender as specified by the application in - // the ResponseCheckTx response. - sender string - - // timestamp is the time at which the node first received the transaction from - // a peer. It is used as a second dimension is prioritizing transactions when - // two transactions have the same priority. - timestamp time.Time - - // peers records a mapping of all peers that sent a given transaction - peers map[uint16]struct{} - - // heapIndex defines the index of the item in the heap - heapIndex int - - // gossipEl references the linked-list element in the gossip index - gossipEl *clist.CElement - - // removed marks the transaction as removed from the mempool. This is set - // during RemoveTx and is needed due to the fact that a given existing - // transaction in the mempool can be evicted when it is simultaneously having - // a reCheckTx callback executed. - removed bool + mtx sync.Mutex + gasWanted int64 // app: gas required to execute this transaction + priority int64 // app: priority value for this transaction + sender string // app: assigned sender label + peers map[uint16]bool // peer IDs who have sent us this transaction } -func (wtx *WrappedTx) Size() int { - return len(wtx.tx) -} +// Size reports the size of the raw transaction in bytes. +func (w *WrappedTx) Size() int64 { return int64(len(w.tx)) } -// TxStore implements a thread-safe mapping of valid transaction(s). -// -// NOTE: -// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative -// access is not allowed. Regardless, it is not expected for the mempool to -// need mutative access. -type TxStore struct { - mtx sync.RWMutex - hashTxs map[types.TxKey]*WrappedTx // primary index - senderTxs map[string]*WrappedTx // sender is defined by the ABCI application -} - -func NewTxStore() *TxStore { - return &TxStore{ - senderTxs: make(map[string]*WrappedTx), - hashTxs: make(map[types.TxKey]*WrappedTx), +// SetPeer adds the specified peer ID as a sender of w. +func (w *WrappedTx) SetPeer(id uint16) { + w.mtx.Lock() + defer w.mtx.Unlock() + if w.peers == nil { + w.peers = map[uint16]bool{id: true} + } else { + w.peers[id] = true } } -// Size returns the total number of transactions in the store. -func (txs *TxStore) Size() int { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return len(txs.hashTxs) -} - -// GetAllTxs returns all the transactions currently in the store. -func (txs *TxStore) GetAllTxs() []*WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wTxs := make([]*WrappedTx, len(txs.hashTxs)) - i := 0 - for _, wtx := range txs.hashTxs { - wTxs[i] = wtx - i++ - } - - return wTxs -} - -// GetTxBySender returns a *WrappedTx by the transaction's sender property -// defined by the ABCI application. -func (txs *TxStore) GetTxBySender(sender string) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.senderTxs[sender] -} - -// GetTxByHash returns a *WrappedTx by the transaction's hash. -func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.hashTxs[hash] -} - -// IsTxRemoved returns true if a transaction by hash is marked as removed and -// false otherwise. -func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wtx, ok := txs.hashTxs[hash] - if ok { - return wtx.removed - } - - return false -} - -// SetTx stores a *WrappedTx by it's hash. If the transaction also contains a -// non-empty sender, we additionally store the transaction by the sender as -// defined by the ABCI application. -func (txs *TxStore) SetTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - txs.senderTxs[wtx.sender] = wtx - } - - txs.hashTxs[wtx.tx.Key()] = wtx -} - -// RemoveTx removes a *WrappedTx from the transaction store. It deletes all -// indexes of the transaction. -func (txs *TxStore) RemoveTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - delete(txs.senderTxs, wtx.sender) - } - - delete(txs.hashTxs, wtx.tx.Key()) - wtx.removed = true -} - -// TxHasPeer returns true if a transaction by hash has a given peer ID and false -// otherwise. If the transaction does not exist, false is returned. -func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wtx := txs.hashTxs[hash] - if wtx == nil { - return false - } - - _, ok := wtx.peers[peerID] +// HasPeer reports whether the specified peer ID is a sender of w. +func (w *WrappedTx) HasPeer(id uint16) bool { + w.mtx.Lock() + defer w.mtx.Unlock() + _, ok := w.peers[id] return ok } -// GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the -// given peerID to the WrappedTx's set of peers that sent us this transaction. -// We return true if we've already recorded the given peer for this transaction -// and false otherwise. If the transaction does not exist by hash, we return -// (nil, false). -func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - wtx := txs.hashTxs[hash] - if wtx == nil { - return nil, false - } - - if wtx.peers == nil { - wtx.peers = make(map[uint16]struct{}) - } - - if _, ok := wtx.peers[peerID]; ok { - return wtx, true - } - - wtx.peers[peerID] = struct{}{} - return wtx, false +// SetGasWanted sets the application-assigned gas requirement of w. +func (w *WrappedTx) SetGasWanted(gas int64) { + w.mtx.Lock() + defer w.mtx.Unlock() + w.gasWanted = gas } -// WrappedTxList implements a thread-safe list of *WrappedTx objects that can be -// used to build generic transaction indexes in the mempool. It accepts a -// comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx -// references which is used during Insert in order to determine sorted order. If -// less returns true, a <= b. -type WrappedTxList struct { - mtx sync.RWMutex - txs []*WrappedTx - less func(*WrappedTx, *WrappedTx) bool +// GasWanted reports the application-assigned gas requirement of w. +func (w *WrappedTx) GasWanted() int64 { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.gasWanted } -func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList { - return &WrappedTxList{ - txs: make([]*WrappedTx, 0), - less: less, - } +// SetSender sets the application-assigned sender of w. +func (w *WrappedTx) SetSender(sender string) { + w.mtx.Lock() + defer w.mtx.Unlock() + w.sender = sender } -// Size returns the number of WrappedTx objects in the list. -func (wtl *WrappedTxList) Size() int { - wtl.mtx.RLock() - defer wtl.mtx.RUnlock() - - return len(wtl.txs) +// Sender reports the application-assigned sender of w. +func (w *WrappedTx) Sender() string { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.sender } -// Reset resets the list of transactions to an empty list. -func (wtl *WrappedTxList) Reset() { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - wtl.txs = make([]*WrappedTx, 0) +// SetPriority sets the application-assigned priority of w. +func (w *WrappedTx) SetPriority(p int64) { + w.mtx.Lock() + defer w.mtx.Unlock() + w.priority = p } -// Insert inserts a WrappedTx reference into the sorted list based on the list's -// comparator function. -func (wtl *WrappedTxList) Insert(wtx *WrappedTx) { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - i := sort.Search(len(wtl.txs), func(i int) bool { - return wtl.less(wtl.txs[i], wtx) - }) - - if i == len(wtl.txs) { - // insert at the end - wtl.txs = append(wtl.txs, wtx) - return - } - - // Make space for the inserted element by shifting values at the insertion - // index up one index. - // - // NOTE: The call to append does not allocate memory when cap(wtl.txs) > len(wtl.txs). - wtl.txs = append(wtl.txs[:i+1], wtl.txs[i:]...) - wtl.txs[i] = wtx -} - -// Remove attempts to remove a WrappedTx from the sorted list. -func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - i := sort.Search(len(wtl.txs), func(i int) bool { - return wtl.less(wtl.txs[i], wtx) - }) - - // Since the list is sorted, we evaluate all elements starting at i. Note, if - // the element does not exist, we may potentially evaluate the entire remainder - // of the list. However, a caller should not be expected to call Remove with a - // non-existing element. - for i < len(wtl.txs) { - if wtl.txs[i] == wtx { - wtl.txs = append(wtl.txs[:i], wtl.txs[i+1:]...) - return - } - - i++ - } +// Priority reports the application-assigned priority of w. +func (w *WrappedTx) Priority() int64 { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.priority } diff --git a/internal/mempool/tx_test.go b/internal/mempool/tx_test.go deleted file mode 100644 index c6d494b04..000000000 --- a/internal/mempool/tx_test.go +++ /dev/null @@ -1,231 +0,0 @@ -package mempool - -import ( - "fmt" - "math/rand" - "sort" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/tendermint/tendermint/types" -) - -func TestTxStore_GetTxBySender(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - sender: "foo", - priority: 1, - timestamp: time.Now(), - } - - res := txs.GetTxBySender(wtx.sender) - require.Nil(t, res) - - txs.SetTx(wtx) - - res = txs.GetTxBySender(wtx.sender) - require.NotNil(t, res) - require.Equal(t, wtx, res) -} - -func TestTxStore_GetTxByHash(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - sender: "foo", - priority: 1, - timestamp: time.Now(), - } - - key := wtx.tx.Key() - res := txs.GetTxByHash(key) - require.Nil(t, res) - - txs.SetTx(wtx) - - res = txs.GetTxByHash(key) - require.NotNil(t, res) - require.Equal(t, wtx, res) -} - -func TestTxStore_SetTx(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - priority: 1, - timestamp: time.Now(), - } - - key := wtx.tx.Key() - txs.SetTx(wtx) - - res := txs.GetTxByHash(key) - require.NotNil(t, res) - require.Equal(t, wtx, res) - - wtx.sender = "foo" - txs.SetTx(wtx) - - res = txs.GetTxByHash(key) - require.NotNil(t, res) - require.Equal(t, wtx, res) -} - -func TestTxStore_GetOrSetPeerByTxHash(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - priority: 1, - timestamp: time.Now(), - } - - key := wtx.tx.Key() - txs.SetTx(wtx) - - res, ok := txs.GetOrSetPeerByTxHash(types.Tx([]byte("test_tx_2")).Key(), 15) - require.Nil(t, res) - require.False(t, ok) - - res, ok = txs.GetOrSetPeerByTxHash(key, 15) - require.NotNil(t, res) - require.False(t, ok) - - res, ok = txs.GetOrSetPeerByTxHash(key, 15) - require.NotNil(t, res) - require.True(t, ok) - - require.True(t, txs.TxHasPeer(key, 15)) - require.False(t, txs.TxHasPeer(key, 16)) -} - -func TestTxStore_RemoveTx(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - priority: 1, - timestamp: time.Now(), - } - - txs.SetTx(wtx) - - key := wtx.tx.Key() - res := txs.GetTxByHash(key) - require.NotNil(t, res) - - txs.RemoveTx(res) - - res = txs.GetTxByHash(key) - require.Nil(t, res) -} - -func TestTxStore_Size(t *testing.T) { - txStore := NewTxStore() - numTxs := 1000 - - for i := 0; i < numTxs; i++ { - txStore.SetTx(&WrappedTx{ - tx: []byte(fmt.Sprintf("test_tx_%d", i)), - priority: int64(i), - timestamp: time.Now(), - }) - } - - require.Equal(t, numTxs, txStore.Size()) -} - -func TestWrappedTxList_Reset(t *testing.T) { - list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }) - - require.Zero(t, list.Size()) - - for i := 0; i < 100; i++ { - list.Insert(&WrappedTx{height: int64(i)}) - } - - require.Equal(t, 100, list.Size()) - - list.Reset() - require.Zero(t, list.Size()) -} - -func TestWrappedTxList_Insert(t *testing.T) { - list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }) - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - var expected []int - for i := 0; i < 100; i++ { - height := rng.Int63n(10000) - expected = append(expected, int(height)) - list.Insert(&WrappedTx{height: height}) - - if i%10 == 0 { - list.Insert(&WrappedTx{height: height}) - expected = append(expected, int(height)) - } - } - - got := make([]int, list.Size()) - for i, wtx := range list.txs { - got[i] = int(wtx.height) - } - - sort.Ints(expected) - require.Equal(t, expected, got) -} - -func TestWrappedTxList_Remove(t *testing.T) { - list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }) - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - var txs []*WrappedTx - for i := 0; i < 100; i++ { - height := rng.Int63n(10000) - tx := &WrappedTx{height: height} - - txs = append(txs, tx) - list.Insert(tx) - - if i%10 == 0 { - tx = &WrappedTx{height: height} - list.Insert(tx) - txs = append(txs, tx) - } - } - - // remove a tx that does not exist - list.Remove(&WrappedTx{height: 20000}) - - // remove a tx that exists (by height) but not referenced - list.Remove(&WrappedTx{height: txs[0].height}) - - // remove a few existing txs - for i := 0; i < 25; i++ { - j := rng.Intn(len(txs)) - list.Remove(txs[j]) - txs = append(txs[:j], txs[j+1:]...) - } - - expected := make([]int, len(txs)) - for i, tx := range txs { - expected[i] = int(tx.height) - } - - got := make([]int, list.Size()) - for i, wtx := range list.txs { - got[i] = int(wtx.height) - } - - sort.Ints(expected) - require.Equal(t, expected, got) -}