From 493dd69f315ae880a7786df2c82ee92797c0f7ef Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Fri, 8 Jul 2022 07:22:07 -0700 Subject: [PATCH] Backport priority mempool fixes from v0.35.x to v0.34.x. (#8962) This is a manual backport of the changes from these commits: - bc49f66 Add more unit tests for the priority mempool. (#8961) - 9b02094 Fix unbounded heap growth in the priority mempool. (#8944) Imports and type signatures have been updated to match the v0.34 usage. --- CHANGELOG_PENDING.md | 7 +- mempool/v1/mempool.go | 986 ++++++++++++++---------------- mempool/v1/mempool_test.go | 133 +++- mempool/v1/priority_queue.go | 159 ----- mempool/v1/priority_queue_test.go | 176 ------ mempool/v1/reactor.go | 6 +- mempool/v1/tx.go | 304 ++------- mempool/v1/tx_test.go | 230 ------- 8 files changed, 641 insertions(+), 1360 deletions(-) delete mode 100644 mempool/v1/priority_queue.go delete mode 100644 mempool/v1/priority_queue_test.go delete mode 100644 mempool/v1/tx_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 71fd1afee..67373a2c1 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -2,7 +2,7 @@ ## v0.34.20 -Special thanks to external contributors on this release: +Special thanks to external contributors on this release: @yihuang Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermint). @@ -24,5 +24,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES -- [blocksync] [\#8496](https://github.com/tendermint/tendermint/pull/8496) validate block against state before persisting it to disk (@cmwaters) -- (indexer) [#8625](https://github.com/tendermint/tendermint/pull/8625) Fix overriding tx index of duplicated txs. +- [blocksync] [#8496](https://github.com/tendermint/tendermint/pull/8496) validate block against state before persisting it to disk. (@cmwaters) +- [indexer] [#8625](https://github.com/tendermint/tendermint/pull/8625) Fix overriding tx index of duplicated txs. (@yihuang) +- [mempool] \#8962 Backport priority mempool fixes from v0.35.x to v0.34.x (@creachdair). diff --git a/mempool/v1/mempool.go b/mempool/v1/mempool.go index 9293bd16f..ff48356aa 100644 --- a/mempool/v1/mempool.go +++ b/mempool/v1/mempool.go @@ -1,10 +1,10 @@ package v1 import ( - "bytes" - "errors" "fmt" "reflect" + "sort" + "sync" "sync/atomic" "time" @@ -12,8 +12,6 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" - tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" @@ -24,73 +22,42 @@ var _ mempool.Mempool = (*TxMempool)(nil) // TxMempoolOption sets an optional parameter on the TxMempool. type TxMempoolOption func(*TxMempool) -// TxMempool defines a prioritized mempool data structure used by the v1 mempool -// reactor. It keeps a thread-safe priority queue of transactions that is used -// when a block proposer constructs a block and a thread-safe linked-list that -// is used to gossip transactions to peers in a FIFO manner. +// TxMempool implemements the Mempool interface and allows the application to +// set priority values on transactions in the CheckTx response. When selecting +// transactions to include in a block, higher-priority transactions are chosen +// first. When evicting transactions from the mempool for size constraints, +// lower-priority transactions are evicted sooner. +// +// Within the mempool, transactions are ordered by time of arrival, and are +// gossiped to the rest of the network based on that order (gossip order does +// not take priority into account). type TxMempool struct { + // Immutable fields logger log.Logger - metrics *mempool.Metrics config *config.MempoolConfig proxyAppConn proxy.AppConnMempool + metrics *mempool.Metrics + cache mempool.TxCache // seen transactions - // txsAvailable fires once for each height when the mempool is not empty - txsAvailable chan struct{} + // Atomically-updated fields + txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes + txRecheck int64 // atomic: the number of pending recheck calls + + // Synchronized fields, protected by mtx. + mtx *sync.RWMutex notifiedTxsAvailable bool + txsAvailable chan struct{} // one value sent per height when mempool is not empty + preCheck mempool.PreCheckFunc + postCheck mempool.PostCheckFunc + height int64 // the latest height passed to Update - // height defines the last block height process during Update() - height int64 - - // sizeBytes defines the total size of the mempool (sum of all tx bytes) - sizeBytes int64 - - // cache defines a fixed-size cache of already seen transactions as this - // reduces pressure on the proxyApp. - cache mempool.TxCache - - // txStore defines the main storage of valid transactions. Indexes are built - // on top of this store. - txStore *TxStore - - // gossipIndex defines the gossiping index of valid transactions via a - // thread-safe linked-list. We also use the gossip index as a cursor for - // rechecking transactions already in the mempool. - gossipIndex *clist.CList - - // recheckCursor and recheckEnd are used as cursors based on the gossip index - // to recheck transactions that are already in the mempool. Iteration is not - // thread-safe and transaction may be mutated in serial order. - // - // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for - // iterator and cursor management when rechecking transactions. If the gossip - // index changes or is removed in a future refactor, this will have to be - // refactored. Instead, we should consider just keeping a slice of a snapshot - // of the mempool's current transactions during Update and an integer cursor - // into that slice. This, however, requires additional O(n) space complexity. - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - - // priorityIndex defines the priority index of valid transactions via a - // thread-safe priority queue. - priorityIndex *TxPriorityQueue - - // heightIndex defines a height-based, in ascending order, transaction index. - // i.e. older transactions are first. - heightIndex *WrappedTxList - - // timestampIndex defines a timestamp-based, in ascending order, transaction - // index. i.e. older transactions are first. - timestampIndex *WrappedTxList - - // A read/write lock is used to safe guard updates, insertions and deletions - // from the mempool. A read-lock is implicitly acquired when executing CheckTx, - // however, a caller must explicitly grab a write-lock via Lock when updating - // the mempool via Update(). - mtx tmsync.RWMutex - preCheck mempool.PreCheckFunc - postCheck mempool.PostCheckFunc + txs *clist.CList // valid transactions (passed CheckTx) + txByKey map[types.TxKey]*clist.CElement + txBySender map[string]*clist.CElement // for sender != "" } +// NewTxMempool constructs a new, empty priority mempool at the specified +// initial height and using the given config and options. func NewTxMempool( logger log.Logger, cfg *config.MempoolConfig, @@ -100,28 +67,22 @@ func NewTxMempool( ) *TxMempool { txmp := &TxMempool{ - logger: logger, - config: cfg, - proxyAppConn: proxyAppConn, - height: height, - cache: mempool.NopTxCache{}, - metrics: mempool.NopMetrics(), - txStore: NewTxStore(), - gossipIndex: clist.New(), - priorityIndex: NewTxPriorityQueue(), - heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }), - timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp) - }), + logger: logger, + config: cfg, + proxyAppConn: proxyAppConn, + metrics: mempool.NopMetrics(), + cache: mempool.NopTxCache{}, + txs: clist.New(), + mtx: new(sync.RWMutex), + height: height, + txByKey: make(map[types.TxKey]*clist.CElement), + txBySender: make(map[string]*clist.CElement), } - if cfg.CacheSize > 0 { txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize) } - proxyAppConn.SetResponseCallback(txmp.defaultTxCallback) + proxyAppConn.SetResponseCallback(txmp.recheckTxCallback) for _, opt := range options { opt(txmp) @@ -151,46 +112,24 @@ func WithMetrics(metrics *mempool.Metrics) TxMempoolOption { // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly // release the lock when finished. -func (txmp *TxMempool) Lock() { - txmp.mtx.Lock() -} +func (txmp *TxMempool) Lock() { txmp.mtx.Lock() } // Unlock releases a write-lock on the mempool. -func (txmp *TxMempool) Unlock() { - txmp.mtx.Unlock() -} +func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() } // Size returns the number of valid transactions in the mempool. It is // thread-safe. -func (txmp *TxMempool) Size() int { - return txmp.txStore.Size() -} +func (txmp *TxMempool) Size() int { return txmp.txs.Len() } // SizeBytes return the total sum in bytes of all the valid transactions in the // mempool. It is thread-safe. -func (txmp *TxMempool) SizeBytes() int64 { - return atomic.LoadInt64(&txmp.sizeBytes) -} +func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.txsBytes) } // FlushAppConn executes FlushSync on the mempool's proxyAppConn. // -// NOTE: The caller must obtain a write-lock via Lock() prior to execution. -func (txmp *TxMempool) FlushAppConn() error { - return txmp.proxyAppConn.FlushSync() -} - -// WaitForNextTx returns a blocking channel that will be closed when the next -// valid transaction is available to gossip. It is thread-safe. -func (txmp *TxMempool) WaitForNextTx() <-chan struct{} { - return txmp.gossipIndex.WaitChan() -} - -// NextGossipTx returns the next valid transaction to gossip. A caller must wait -// for WaitForNextTx to signal a transaction is available to gossip first. It is -// thread-safe. -func (txmp *TxMempool) NextGossipTx() *clist.CElement { - return txmp.gossipIndex.Front() -} +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling FlushAppConn. +func (txmp *TxMempool) FlushAppConn() error { return txmp.proxyAppConn.FlushSync() } // EnableTxsAvailable enables the mempool to trigger events when transactions // are available on a block by block basis. @@ -203,223 +142,238 @@ func (txmp *TxMempool) EnableTxsAvailable() { // TxsAvailable returns a channel which fires once for every height, and only // when transactions are available in the mempool. It is thread-safe. -func (txmp *TxMempool) TxsAvailable() <-chan struct{} { - return txmp.txsAvailable -} +func (txmp *TxMempool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } -// CheckTx executes the ABCI CheckTx method for a given transaction. It acquires -// a read-lock attempts to execute the application's CheckTx ABCI method via -// CheckTxAsync. We return an error if any of the following happen: +// CheckTx adds the given transaction to the mempool if it fits and passes the +// application's ABCI CheckTx method. // -// - The CheckTxAsync execution fails. -// - The transaction already exists in the cache and we've already received the -// transaction from the peer. Otherwise, if it solely exists in the cache, we -// return nil. -// - The transaction size exceeds the maximum transaction size as defined by the -// configuration provided to the mempool. -// - The transaction fails Pre-Check (if it is defined). -// - The proxyAppConn fails, e.g. the buffer is full. +// CheckTx reports an error without adding tx if: // -// If the mempool is full, we still execute CheckTx and attempt to find a lower -// priority transaction to evict. If such a transaction exists, we remove the -// lower priority transaction and add the new one with higher priority. +// - The size of tx exceeds the configured maximum transaction size. +// - The pre-check hook is defined and reports an error for tx. +// - The transaction already exists in the cache. +// - The proxy connection to the application fails. // -// NOTE: -// - The applications' CheckTx implementation may panic. -// - The caller is not to explicitly require any locks for executing CheckTx. -func (txmp *TxMempool) CheckTx( - tx types.Tx, - cb func(*abci.Response), - txInfo mempool.TxInfo, -) error { +// If tx passes all of the above conditions, it is passed (asynchronously) to +// the application's ABCI CheckTx method and this CheckTx method returns nil. +// If cb != nil, it is called when the ABCI request completes to report the +// application response. +// +// If the application accepts the transaction and the mempool is full, the +// mempool evicts one or more of the lowest-priority transaction whose priority +// is (strictly) lower than the priority of tx and whose size together exceeds +// the size of tx, and adds tx instead. If no such transactions exist, tx is +// discarded. +func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo mempool.TxInfo) error { + // During the initial phase of CheckTx, we do not need to modify any state. + // A transaction will not actually be added to the mempool until it survives + // a call to the ABCI CheckTx method and size constraint checks. txmp.mtx.RLock() defer txmp.mtx.RUnlock() - txSize := len(tx) - if txSize > txmp.config.MaxTxBytes { - return mempool.ErrTxTooLarge{ - Max: txmp.config.MaxTxBytes, - Actual: txSize, - } + // Reject transactions in excess of the configured maximum transaction size. + if len(tx) > txmp.config.MaxTxBytes { + return mempool.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)} } + // If a precheck hook is defined, call it before invoking the application. if txmp.preCheck != nil { if err := txmp.preCheck(tx); err != nil { - return mempool.ErrPreCheck{ - Reason: err, - } + return mempool.ErrPreCheck{Reason: err} } } + // Early exit if the proxy connection has an error. if err := txmp.proxyAppConn.Error(); err != nil { return err } - txHash := tx.Key() + txKey := tx.Key() - // We add the transaction to the mempool's cache and if the - // transaction is already present in the cache, i.e. false is returned, then we - // check if we've seen this transaction and error if we have. + // Check for the transaction in the cache. if !txmp.cache.Push(tx) { - txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID) + // If the cached transaction is also in the pool, record its sender. + if elt, ok := txmp.txByKey[txKey]; ok { + w := elt.Value.(*WrappedTx) + w.SetPeer(txInfo.SenderID) + } return mempool.ErrTxInCache } - reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - reqRes.SetCallback(func(res *abci.Response) { - if txmp.recheckCursor != nil { - panic("recheck cursor is non-nil in CheckTx callback") - } + // Initiate an ABCI CheckTx for this transaction. The callback is + // responsible for adding the transaction to the pool if it survives. + return func() error { + // N.B.: We have to issue the call outside the lock. In a local client, + // even an "async" call invokes its callback immediately which will make + // the callback deadlock trying to acquire the same lock. This isn't a + // problem with out-of-process calls, but this has to work for both. + height := txmp.height + txmp.mtx.RUnlock() + defer txmp.mtx.RLock() - wtx := &WrappedTx{ - tx: tx, - hash: txHash, - timestamp: time.Now().UTC(), - height: txmp.height, - } - txmp.initTxCallback(wtx, res, txInfo) - - if cb != nil { - cb(res) - } - }) - - return nil + reqRes := txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) + reqRes.SetCallback(func(res *abci.Response) { + wtx := &WrappedTx{ + tx: tx, + hash: txKey, + timestamp: time.Now().UTC(), + height: height, + } + wtx.SetPeer(txInfo.SenderID) + txmp.initialTxCallback(wtx, res) + if cb != nil { + cb(res) + } + }) + return nil + }() } +// RemoveTxByKey removes the transaction with the specified key from the +// mempool. It reports an error if no such transaction exists. This operation +// does not remove the transaction from the cache. func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { - txmp.Lock() - defer txmp.Unlock() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + return txmp.removeTxByKey(txKey) +} - // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil { - txmp.removeTx(wtx, false) +// removeTxByKey removes the specified transaction key from the mempool. +// The caller must hold txmp.mtx excluxively. +func (txmp *TxMempool) removeTxByKey(key types.TxKey) error { + if elt, ok := txmp.txByKey[key]; ok { + w := elt.Value.(*WrappedTx) + delete(txmp.txByKey, key) + delete(txmp.txBySender, w.sender) + txmp.txs.Remove(elt) + elt.DetachPrev() + elt.DetachNext() + atomic.AddInt64(&txmp.txsBytes, -w.Size()) return nil } - - return errors.New("transaction not found") + return fmt.Errorf("transaction %x not found", key) } -// Flush flushes out the mempool. It acquires a read-lock, fetches all the -// transactions currently in the transaction store and removes each transaction -// from the store and all indexes and finally resets the cache. -// -// NOTE: -// - Flushing the mempool may leave the mempool in an inconsistent state. +// removeTxByElement removes the specified transaction element from the mempool. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) removeTxByElement(elt *clist.CElement) { + w := elt.Value.(*WrappedTx) + delete(txmp.txByKey, w.tx.Key()) + delete(txmp.txBySender, w.sender) + txmp.txs.Remove(elt) + elt.DetachPrev() + elt.DetachNext() + atomic.AddInt64(&txmp.txsBytes, -w.Size()) +} + +// Flush purges the contents of the mempool and the cache, leaving both empty. +// The current height is not modified by this operation. func (txmp *TxMempool) Flush() { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() - txmp.heightIndex.Reset() - txmp.timestampIndex.Reset() - - for _, wtx := range txmp.txStore.GetAllTxs() { - txmp.removeTx(wtx, false) + // Remove all the transactions in the list explicitly, so that the sizes + // and indexes get updated properly. + cur := txmp.txs.Front() + for cur != nil { + next := cur.Next() + txmp.removeTxByElement(cur) + cur = next } - - atomic.SwapInt64(&txmp.sizeBytes, 0) txmp.cache.Reset() + + // Discard any pending recheck calls that may be in flight. The calls will + // still complete, but will have no effect on the mempool. + atomic.StoreInt64(&txmp.txRecheck, 0) } -// ReapMaxBytesMaxGas returns a list of transactions within the provided size -// and gas constraints. Transaction are retrieved in priority order. +// allEntriesSorted returns a slice of all the transactions currently in the +// mempool, sorted in nonincreasing order by priority with ties broken by +// increasing order of arrival time. +func (txmp *TxMempool) allEntriesSorted() []*WrappedTx { + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() + + all := make([]*WrappedTx, 0, len(txmp.txByKey)) + for _, tx := range txmp.txByKey { + all = append(all, tx.Value.(*WrappedTx)) + } + sort.Slice(all, func(i, j int) bool { + if all[i].priority == all[j].priority { + return all[i].timestamp.Before(all[j].timestamp) + } + return all[i].priority > all[j].priority // N.B. higher priorities first + }) + return all +} + +// ReapMaxBytesMaxGas returns a slice of valid transactions that fit within the +// size and gas constraints. The results are ordered by nonincreasing priority, +// with ties broken by increasing order of arrival. Reaping transactions does +// not remove them from the mempool. // -// NOTE: -// - A read-lock is acquired. -// - Transactions returned are not actually removed from the mempool transaction -// store or indexes. +// If maxBytes < 0, no limit is set on the total size in bytes. +// If maxGas < 0, no limit is set on the total gas cost. +// +// If the mempool is empty or has no transactions fitting within the given +// constraints, the result will also be empty. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + var totalGas, totalBytes int64 - var ( - totalGas int64 - totalSize int64 - ) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs()) - defer func() { - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) + var keep []types.Tx //nolint:prealloc + for _, w := range txmp.allEntriesSorted() { + // N.B. When computing byte size, we need to include the overhead for + // encoding as protobuf to send to the application. + totalGas += w.gasWanted + totalBytes += types.ComputeProtoSizeForTxs([]types.Tx{w.tx}) + if (maxGas >= 0 && totalGas > maxGas) || (maxBytes >= 0 && totalBytes > maxBytes) { + break } - }() - - txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs()) - for txmp.priorityIndex.NumTxs() > 0 { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) - size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx}) - - // Ensure we have capacity for the transaction with respect to the - // transaction size. - if maxBytes > -1 && totalSize+size > maxBytes { - return txs[:len(txs)-1] - } - - totalSize += size - - // ensure we have capacity for the transaction with respect to total gas - gas := totalGas + wtx.gasWanted - if maxGas > -1 && gas > maxGas { - return txs[:len(txs)-1] - } - - totalGas = gas + keep = append(keep, w.tx) } - - return txs + return keep } -// ReapMaxTxs returns a list of transactions within the provided number of -// transactions bound. Transaction are retrieved in priority order. +// TxsWaitChan returns a channel that is closed when there is at least one +// transaction available to be gossiped. +func (txmp *TxMempool) TxsWaitChan() <-chan struct{} { return txmp.txs.WaitChan() } + +// TxsFront returns the frontmost element of the pending transaction list. +// It will be nil if the mempool is empty. +func (txmp *TxMempool) TxsFront() *clist.CElement { return txmp.txs.Front() } + +// ReapMaxTxs returns up to max transactions from the mempool. The results are +// ordered by nonincreasing priority with ties broken by increasing order of +// arrival. Reaping transactions does not remove them from the mempool. // -// NOTE: -// - A read-lock is acquired. -// - Transactions returned are not actually removed from the mempool transaction -// store or indexes. +// If max < 0, all transactions in the mempool are reaped. +// +// The result may have fewer than max elements (possibly zero) if the mempool +// does not have that many transactions available. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + var keep []types.Tx //nolint:prealloc - numTxs := txmp.priorityIndex.NumTxs() - if max < 0 { - max = numTxs - } - - cap := tmmath.MinInt(numTxs, max) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, cap) - defer func() { - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) + for _, w := range txmp.allEntriesSorted() { + if max >= 0 && len(keep) >= max { + break } - }() - - txs := make([]types.Tx, 0, cap) - for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) + keep = append(keep, w.tx) } - - return txs + return keep } -// Update iterates over all the transactions provided by the caller, i.e. the -// block producer, and removes them from the cache (if applicable) and removes -// the transactions from the main transaction store and associated indexes. -// Finally, if there are trainsactions remaining in the mempool, we initiate a -// re-CheckTx for them (if applicable), otherwise, we notify the caller more -// transactions are available. +// Update removes all the given transactions from the mempool and the cache, +// and updates the current block height. The blockTxs and deliverTxResponses +// must have the same length with each response corresponding to the tx at the +// same offset. // -// NOTE: -// - The caller must explicitly acquire a write-lock via Lock(). +// If the configuration enables recheck, Update sends each remaining +// transaction after removing blockTxs to the ABCI CheckTx method. Any +// transactions marked as invalid during recheck are also removed. +// +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling Update. func (txmp *TxMempool) Update( blockHeight int64, blockTxs types.Txs, @@ -427,6 +381,17 @@ func (txmp *TxMempool) Update( newPreFn mempool.PreCheckFunc, newPostFn mempool.PostCheckFunc, ) error { + // TODO(creachadair): This would be a nice safety check but requires Go 1.18. + // // Safety check: The caller is required to hold the lock. + // if txmp.mtx.TryLock() { + // txmp.mtx.Unlock() + // panic("mempool: Update caller does not hold the lock") + // } + // Safety check: Transactions and responses must match in number. + if len(blockTxs) != len(deliverTxResponses) { + panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses", + len(blockTxs), len(deliverTxResponses))) + } txmp.height = blockHeight txmp.notifiedTxsAvailable = false @@ -439,18 +404,17 @@ func (txmp *TxMempool) Update( } for i, tx := range blockTxs { + // Add successful committed transactions to the cache (if they are not + // already present). Transactions that failed to commit are removed from + // the cache unless the operator has explicitly requested we keep them. if deliverTxResponses[i].Code == abci.CodeTypeOK { - // add the valid committed transaction to the cache (if missing) _ = txmp.cache.Push(tx) } else if !txmp.config.KeepInvalidTxsInCache { - // allow invalid transactions to be re-submitted txmp.cache.Remove(tx) } - // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil { - txmp.removeTx(wtx, false) - } + // Regardless of success, remove the transaction from the mempool. + _ = txmp.removeTxByKey(tx.Key()) } txmp.purgeExpiredTxs(blockHeight) @@ -458,300 +422,310 @@ func (txmp *TxMempool) Update( // If there any uncommitted transactions left in the mempool, we either // initiate re-CheckTx per remaining transaction or notify that remaining // transactions are left. - if txmp.Size() > 0 { + size := txmp.Size() + txmp.metrics.Size.Set(float64(size)) + if size > 0 { if txmp.config.Recheck { - txmp.logger.Debug( - "executing re-CheckTx for all remaining transactions", - "num_txs", txmp.Size(), - "height", blockHeight, - ) - txmp.updateReCheckTxs() + txmp.recheckTransactions() } else { txmp.notifyTxsAvailable() } } - - txmp.metrics.Size.Set(float64(txmp.Size())) return nil } -// initTxCallback performs the initial, i.e. the first, callback after CheckTx -// has been executed by the ABCI application. In other words, initTxCallback is -// called after executing CheckTx when we see a unique transaction for the first -// time. CheckTx can be called again for the same transaction at a later point -// in time when re-checking, however, this callback will not be called. +// initialTxCallback handles the ABCI CheckTx response for the first time a +// transaction is added to the mempool. A recheck after a block is committed +// goes to the default callback (see recheckTxCallback). // -// After the ABCI application executes CheckTx, initTxCallback is called with -// the ABCI *Response object and TxInfo. If postCheck is defined on the mempool, -// we execute that first. If there is no error from postCheck (if defined) and -// the ABCI CheckTx response code is OK, we attempt to insert the transaction. +// If either the application rejected the transaction or a post-check hook is +// defined and rejects the transaction, it is discarded. // -// When attempting to insert the transaction, we first check if there is -// sufficient capacity. If there is sufficient capacity, the transaction is -// inserted into the txStore and indexed across all indexes. Otherwise, if the -// mempool is full, we attempt to find a lower priority transaction to evict in -// place of the new incoming transaction. If no such transaction exists, the -// new incoming transaction is rejected. +// Otherwise, if the mempool is full, check for lower-priority transactions +// that can be evicted to make room for the new one. If no such transactions +// exist, this transaction is logged and dropped; otherwise the selected +// transactions are evicted. // -// If the new incoming transaction fails CheckTx or postCheck fails, we reject -// the new incoming transaction. -// -// NOTE: -// - An explicit lock is NOT required. -func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.Response, txInfo mempool.TxInfo) { +// Finally, the new transaction is added and size stats updated. +func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) { checkTxRes, ok := res.Value.(*abci.Response_CheckTx) if !ok { return } + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + var err error if txmp.postCheck != nil { err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx) } if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK { - // ignore bad transactions txmp.logger.Info( "rejected bad transaction", - "priority", wtx.priority, + "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "peer_id", txInfo.SenderP2PID, + "peer_id", wtx.peers, "code", checkTxRes.CheckTx.Code, "post_check_err", err, ) txmp.metrics.FailedTxs.Add(1) + // Remove the invalid transaction from the cache, unless the operator has + // instructed us to keep invalid transactions. if !txmp.config.KeepInvalidTxsInCache { txmp.cache.Remove(wtx.tx) } + + // If there was a post-check error, record its text in the result for + // debugging purposes. if err != nil { checkTxRes.CheckTx.MempoolError = err.Error() } return } - sender := checkTxRes.CheckTx.Sender priority := checkTxRes.CheckTx.Priority + sender := checkTxRes.CheckTx.Sender - if len(sender) > 0 { - if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil { - txmp.logger.Error( - "rejected incoming good transaction; tx already exists for sender", - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + // Disallow multiple concurrent transactions from the same sender assigned + // by the ABCI application. As a special case, an empty sender is not + // restricted. + if sender != "" { + elt, ok := txmp.txBySender[sender] + if ok { + w := elt.Value.(*WrappedTx) + txmp.logger.Debug( + "rejected valid incoming transaction; tx already exists for sender", + "tx", fmt.Sprintf("%X", w.tx.Hash()), "sender", sender, ) + checkTxRes.CheckTx.MempoolError = + fmt.Sprintf("rejected valid incoming transaction; tx already exists for sender %q (%X)", + sender, w.tx.Hash()) txmp.metrics.RejectedTxs.Add(1) return } } + // At this point the application has ruled the transaction valid, but the + // mempool might be full. If so, find the lowest-priority items with lower + // priority than the application assigned to this new one, and evict as many + // of them as necessary to make room for tx. If no such items exist, we + // discard tx. + if err := txmp.canAddTx(wtx); err != nil { - evictTxs := txmp.priorityIndex.GetEvictableTxs( - priority, - int64(wtx.Size()), - txmp.SizeBytes(), - txmp.config.MaxTxsBytes, - ) - if len(evictTxs) == 0 { - // No room for the new incoming transaction so we just remove it from - // the cache. + var victims []*clist.CElement // eligible transactions for eviction + var victimBytes int64 // total size of victims + for cur := txmp.txs.Front(); cur != nil; cur = cur.Next() { + cw := cur.Value.(*WrappedTx) + if cw.priority < priority { + victims = append(victims, cur) + victimBytes += cw.Size() + } + } + + // If there are no suitable eviction candidates, or the total size of + // those candidates is not enough to make room for the new transaction, + // drop the new one. + if len(victims) == 0 || victimBytes < wtx.Size() { txmp.cache.Remove(wtx.tx) txmp.logger.Error( - "rejected incoming good transaction; mempool full", + "rejected valid incoming transaction; mempool is full", "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err.Error(), ) + checkTxRes.CheckTx.MempoolError = + fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)", + wtx.tx.Hash()) txmp.metrics.RejectedTxs.Add(1) return } - // evict an existing transaction(s) - // - // NOTE: - // - The transaction, toEvict, can be removed while a concurrent - // reCheckTx callback is being executed for the same transaction. - for _, toEvict := range evictTxs { - txmp.removeTx(toEvict, true) + txmp.logger.Debug("evicting lower-priority transactions", + "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "new_priority", priority, + ) + + // Sort lowest priority items first so they will be evicted first. Break + // ties in favor of newer items (to maintain FIFO semantics in a group). + sort.Slice(victims, func(i, j int) bool { + iw := victims[i].Value.(*WrappedTx) + jw := victims[j].Value.(*WrappedTx) + if iw.Priority() == jw.Priority() { + return iw.timestamp.After(jw.timestamp) + } + return iw.Priority() < jw.Priority() + }) + + // Evict as many of the victims as necessary to make room. + var evictedBytes int64 + for _, vic := range victims { + w := vic.Value.(*WrappedTx) + txmp.logger.Debug( - "evicted existing good transaction; mempool full", - "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()), - "old_priority", toEvict.priority, - "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "new_priority", wtx.priority, + "evicted valid existing transaction; mempool full", + "old_tx", fmt.Sprintf("%X", w.tx.Hash()), + "old_priority", w.priority, ) + txmp.removeTxByElement(vic) txmp.metrics.EvictedTxs.Add(1) + + // We may not need to evict all the eligible transactions. Bail out + // early if we have made enough room. + evictedBytes += w.Size() + if evictedBytes >= wtx.Size() { + break + } } } - wtx.gasWanted = checkTxRes.CheckTx.GasWanted - wtx.priority = priority - wtx.sender = sender - wtx.peers = map[uint16]struct{}{ - txInfo.SenderID: {}, - } + wtx.SetGasWanted(checkTxRes.CheckTx.GasWanted) + wtx.SetPriority(priority) + wtx.SetSender(sender) + txmp.insertTx(wtx) txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) txmp.metrics.Size.Set(float64(txmp.Size())) - - txmp.insertTx(wtx) txmp.logger.Debug( - "inserted good transaction", - "priority", wtx.priority, + "inserted new valid transaction", + "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "height", txmp.height, "num_txs", txmp.Size(), ) txmp.notifyTxsAvailable() - } -// defaultTxCallback performs the default CheckTx application callback. This is -// NOT executed when a transaction is first seen/received. Instead, this callback -// is executed during re-checking transactions (if enabled). A caller, i.e a -// block proposer, acquires a mempool write-lock via Lock() and when executing -// Update(), if the mempool is non-empty and Recheck is enabled, then all -// remaining transactions will be rechecked via CheckTxAsync. The order in which -// they are rechecked must be the same order in which this callback is called -// per transaction. -func (txmp *TxMempool) defaultTxCallback(req *abci.Request, res *abci.Response) { - if txmp.recheckCursor == nil { - return +func (txmp *TxMempool) insertTx(wtx *WrappedTx) { + elt := txmp.txs.PushBack(wtx) + txmp.txByKey[wtx.tx.Key()] = elt + if s := wtx.Sender(); s != "" { + txmp.txBySender[s] = elt } - txmp.metrics.RecheckTimes.Add(1) + atomic.AddInt64(&txmp.txsBytes, wtx.Size()) +} +// recheckTxCallback handles the responses from ABCI CheckTx calls issued +// during the recheck phase of a block Update. It updates the recheck counter +// and removes any transactions invalidated by the application. +// +// This callback is NOT executed for the initial CheckTx on a new transaction; +// that case is handled by initialTxCallback instead. +func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) { checkTxRes, ok := res.Value.(*abci.Response_CheckTx) if !ok { - txmp.logger.Error("received incorrect type in mempool callback", + txmp.logger.Error("mempool: received incorrect result type in CheckTx callback", "expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(), "got", reflect.TypeOf(res.Value).Name(), ) return } - tx := req.GetCheckTx().Tx - wtx := txmp.recheckCursor.Value.(*WrappedTx) - // Search through the remaining list of tx to recheck for a transaction that matches - // the one we received from the ABCI application. - for { - if bytes.Equal(tx, wtx.tx) { - // We've found a tx in the recheck list that matches the tx that we - // received from the ABCI application. - // Break, and use this transaction for further checks. - break - } - - txmp.logger.Error( - "re-CheckTx transaction mismatch", - "got", wtx.tx.Hash(), - "expected", types.Tx(tx).Key(), - ) - - if txmp.recheckCursor == txmp.recheckEnd { - // we reached the end of the recheckTx list without finding a tx - // matching the one we received from the ABCI application. - // Return without processing any tx. - txmp.recheckCursor = nil - return - } - - txmp.recheckCursor = txmp.recheckCursor.Next() - wtx = txmp.recheckCursor.Value.(*WrappedTx) + // Check whether we are expecting recheck responses at this point. + // If not, we will ignore the response, this usually means the mempool was Flushed. + // If this is the "last" pending recheck, trigger a notification when it's been processed. + numLeft := atomic.AddInt64(&txmp.txRecheck, -1) + if numLeft == 0 { + defer txmp.notifyTxsAvailable() // notify waiters on return, if mempool is non-empty + } else if numLeft < 0 { + return } - // Only evaluate transactions that have not been removed. This can happen - // if an existing transaction is evicted during CheckTx and while this - // callback is being executed for the same evicted transaction. - if !txmp.txStore.IsTxRemoved(wtx.hash) { - var err error - if txmp.postCheck != nil { - err = txmp.postCheck(tx, checkTxRes.CheckTx) - } + txmp.metrics.RecheckTimes.Add(1) + tx := types.Tx(req.GetCheckTx().Tx) - if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { - wtx.priority = checkTxRes.CheckTx.Priority - } else { - txmp.logger.Debug( - "existing transaction no longer valid; failed re-CheckTx callback", - "priority", wtx.priority, - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "err", err, - "code", checkTxRes.CheckTx.Code, - ) + txmp.mtx.Lock() + defer txmp.mtx.Unlock() - if wtx.gossipEl != txmp.recheckCursor { - panic("corrupted reCheckTx cursor") - } + // Find the transaction reported by the ABCI callback. It is possible the + // transaction was evicted during the recheck, in which case the transaction + // will be gone. + elt, ok := txmp.txByKey[tx.Key()] + if !ok { + return + } + wtx := elt.Value.(*WrappedTx) - txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) - } + // If a postcheck hook is defined, call it before checking the result. + var err error + if txmp.postCheck != nil { + err = txmp.postCheck(tx, checkTxRes.CheckTx) } - // move reCheckTx cursor to next element - if txmp.recheckCursor == txmp.recheckEnd { - txmp.recheckCursor = nil - } else { - txmp.recheckCursor = txmp.recheckCursor.Next() + if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil { + wtx.SetPriority(checkTxRes.CheckTx.Priority) + return // N.B. Size of mempool did not change } - if txmp.recheckCursor == nil { - txmp.logger.Debug("finished rechecking transactions") - - if txmp.Size() > 0 { - txmp.notifyTxsAvailable() - } + txmp.logger.Debug( + "existing transaction no longer valid; failed re-CheckTx callback", + "priority", wtx.Priority(), + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "err", err, + "code", checkTxRes.CheckTx.Code, + ) + txmp.removeTxByElement(elt) + txmp.metrics.FailedTxs.Add(1) + if !txmp.config.KeepInvalidTxsInCache { + txmp.cache.Remove(wtx.tx) } - txmp.metrics.Size.Set(float64(txmp.Size())) } -// updateReCheckTxs updates the recheck cursors by using the gossipIndex. For -// each transaction, it executes CheckTxAsync. The global callback defined on -// the proxyAppConn will be executed for each transaction after CheckTx is -// executed. +// recheckTransactions initiates re-CheckTx ABCI calls for all the transactions +// currently in the mempool. It reports the number of recheck calls that were +// successfully initiated. // -// NOTE: -// - The caller must have a write-lock when executing updateReCheckTxs. -func (txmp *TxMempool) updateReCheckTxs() { +// Precondition: The mempool is not empty. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) recheckTransactions() { if txmp.Size() == 0 { - panic("attempted to update re-CheckTx txs when mempool is empty") + panic("mempool: cannot run recheck on an empty mempool") } + txmp.logger.Debug( + "executing re-CheckTx for all remaining transactions", + "num_txs", txmp.Size(), + "height", txmp.height, + ) + // N.B.: We have to issue the calls outside the lock. In a local client, + // even an "async" call invokes its callback immediately which will make the + // callback deadlock trying to acquire the same lock. This isn't a problem + // with out-of-process calls, but this has to work for both. + txmp.mtx.Unlock() + defer txmp.mtx.Lock() - txmp.recheckCursor = txmp.gossipIndex.Front() - txmp.recheckEnd = txmp.gossipIndex.Back() - - for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() { + atomic.StoreInt64(&txmp.txRecheck, int64(txmp.txs.Len())) + for e := txmp.txs.Front(); e != nil; e = e.Next() { wtx := e.Value.(*WrappedTx) - // Only execute CheckTx if the transaction is not marked as removed which - // could happen if the transaction was evicted. - if !txmp.txStore.IsTxRemoved(wtx.hash) { - txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ - Tx: wtx.tx, - Type: abci.CheckTxType_Recheck, - }) - - } + // The response for this CheckTx is handled by the default recheckTxCallback. + _ = txmp.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ + Tx: wtx.tx, + Type: abci.CheckTxType_Recheck, + }) } - txmp.proxyAppConn.FlushAsync() + if err := txmp.proxyAppConn.FlushAsync(); err != nil { + txmp.logger.Error("failed to flush transactions during recheck", "err", err) + } } // canAddTx returns an error if we cannot insert the provided *WrappedTx into -// the mempool due to mempool configured constraints. Otherwise, nil is returned -// and the transaction can be inserted into the mempool. +// the mempool due to mempool configured constraints. Otherwise, nil is +// returned and the transaction can be inserted into the mempool. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { - var ( - numTxs = txmp.Size() - sizeBytes = txmp.SizeBytes() - ) + numTxs := txmp.Size() + txBytes := txmp.SizeBytes() - if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes { + if numTxs >= txmp.config.Size || wtx.Size()+txBytes > txmp.config.MaxTxsBytes { return mempool.ErrMempoolIsFull{ NumTxs: numTxs, MaxTxs: txmp.config.Size, - TxsBytes: sizeBytes, + TxsBytes: txBytes, MaxTxsBytes: txmp.config.MaxTxsBytes, } } @@ -759,96 +733,38 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { return nil } -func (txmp *TxMempool) insertTx(wtx *WrappedTx) { - txmp.txStore.SetTx(wtx) - txmp.priorityIndex.PushTx(wtx) - txmp.heightIndex.Insert(wtx) - txmp.timestampIndex.Insert(wtx) - - // Insert the transaction into the gossip index and mark the reference to the - // linked-list element, which will be needed at a later point when the - // transaction is removed. - gossipEl := txmp.gossipIndex.PushBack(wtx) - wtx.gossipEl = gossipEl - - atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size())) -} - -func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { - if txmp.txStore.IsTxRemoved(wtx.hash) { - return - } - - txmp.txStore.RemoveTx(wtx) - txmp.priorityIndex.RemoveTx(wtx) - txmp.heightIndex.Remove(wtx) - txmp.timestampIndex.Remove(wtx) - - // Remove the transaction from the gossip index and cleanup the linked-list - // element so it can be garbage collected. - txmp.gossipIndex.Remove(wtx.gossipEl) - wtx.gossipEl.DetachPrev() - - atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) - - if removeFromCache { - txmp.cache.Remove(wtx.tx) - } -} - -// purgeExpiredTxs removes all transactions that have exceeded their respective -// height and/or time based TTLs from their respective indexes. Every expired -// transaction will be removed from the mempool entirely, except for the cache. +// purgeExpiredTxs removes all transactions from the mempool that have exceeded +// their respective height or time-based limits as of the given blockHeight. +// Transactions removed by this operation are not removed from the cache. // -// NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which -// the caller has a write-lock on the mempool and so we can safely iterate over -// the height and time based indexes. +// The caller must hold txmp.mtx exclusively. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { + if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 { + return // nothing to do + } + now := time.Now() - expiredTxs := make(map[types.TxKey]*WrappedTx) + cur := txmp.txs.Front() + for cur != nil { + // N.B. Grab the next element first, since if we remove cur its successor + // will be invalidated. + next := cur.Next() - if txmp.config.TTLNumBlocks > 0 { - purgeIdx := -1 - for i, wtx := range txmp.heightIndex.txs { - if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks { - expiredTxs[wtx.tx.Key()] = wtx - purgeIdx = i - } else { - // since the index is sorted, we know no other txs can be be purged - break - } + w := cur.Value.(*WrappedTx) + if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks { + txmp.removeTxByElement(cur) + txmp.metrics.EvictedTxs.Add(1) + } else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration { + txmp.removeTxByElement(cur) + txmp.metrics.EvictedTxs.Add(1) } - - if purgeIdx >= 0 { - txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:] - } - } - - if txmp.config.TTLDuration > 0 { - purgeIdx := -1 - for i, wtx := range txmp.timestampIndex.txs { - if now.Sub(wtx.timestamp) > txmp.config.TTLDuration { - expiredTxs[wtx.tx.Key()] = wtx - purgeIdx = i - } else { - // since the index is sorted, we know no other txs can be be purged - break - } - } - - if purgeIdx >= 0 { - txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:] - } - } - - for _, wtx := range expiredTxs { - txmp.removeTx(wtx, false) + cur = next } } func (txmp *TxMempool) notifyTxsAvailable() { if txmp.Size() == 0 { - panic("attempt to notify txs available but mempool is empty!") + return // nothing to do } if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable { diff --git a/mempool/v1/mempool_test.go b/mempool/v1/mempool_test.go index 385a782b1..5869eb48b 100644 --- a/mempool/v1/mempool_test.go +++ b/mempool/v1/mempool_test.go @@ -77,8 +77,8 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool { app := &application{kvstore.NewApplication()} cc := proxy.NewLocalClientCreator(app) - cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) + cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) cfg.Mempool.CacheSize = cacheSize appConnMem, err := cc.NewABCIClient() @@ -93,6 +93,18 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool { return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...) } +// mustCheckTx invokes txmp.CheckTx for the given transaction and waits until +// its callback has finished executing. It fails t if CheckTx fails. +func mustCheckTx(t *testing.T, txmp *TxMempool, spec string) { + done := make(chan struct{}) + if err := txmp.CheckTx([]byte(spec), func(*abci.Response) { + close(done) + }, mempool.TxInfo{}); err != nil { + t.Fatalf("CheckTx for %q failed: %v", spec, err) + } + <-done +} + func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { txs := make([]testTx, numTxs) txInfo := mempool.TxInfo{SenderID: peerID} @@ -194,6 +206,76 @@ func TestTxMempool_Size(t *testing.T) { require.Equal(t, int64(2850), txmp.SizeBytes()) } +func TestTxMempool_Eviction(t *testing.T) { + txmp := setup(t, 0) + txmp.config.Size = 5 + txmp.config.MaxTxsBytes = 60 + txExists := func(spec string) bool { + txmp.Lock() + defer txmp.Unlock() + key := types.Tx(spec).Key() + _, ok := txmp.txByKey[key] + return ok + } + + // A transaction bigger than the mempool should be rejected even when there + // are slots available. + mustCheckTx(t, txmp, "big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1") + require.Equal(t, 0, txmp.Size()) + + // Nearly-fill the mempool with a low-priority transaction, to show that it + // is evicted even when slots are available for a higher-priority tx. + const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2" + mustCheckTx(t, txmp, bigTx) + require.Equal(t, 1, txmp.Size()) // bigTx is the only element + require.True(t, txExists(bigTx)) + require.Equal(t, int64(len(bigTx)), txmp.SizeBytes()) + + // The next transaction should evict bigTx, because it is higher priority + // but does not fit on size. + mustCheckTx(t, txmp, "key1=0000=25") + require.True(t, txExists("key1=0000=25")) + require.False(t, txExists(bigTx)) + require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) + + // Now fill up the rest of the slots with other transactions. + mustCheckTx(t, txmp, "key2=0001=5") + mustCheckTx(t, txmp, "key3=0002=10") + mustCheckTx(t, txmp, "key4=0003=3") + mustCheckTx(t, txmp, "key5=0004=3") + + // A new transaction with low priority should be discarded. + mustCheckTx(t, txmp, "key6=0005=1") + require.False(t, txExists("key6=0005=1")) + + // A new transaction with higher priority should evict key5, which is the + // newest of the two transactions with lowest priority. + mustCheckTx(t, txmp, "key7=0006=7") + require.True(t, txExists("key7=0006=7")) // new transaction added + require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted + require.True(t, txExists("key4=0003=3")) // older low-priority tx retained + + // Another new transaction evicts the other low-priority element. + mustCheckTx(t, txmp, "key8=0007=20") + require.True(t, txExists("key8=0007=20")) + require.False(t, txExists("key4=0003=3")) + + // Now the lowest-priority tx is 5, so that should be the next to go. + mustCheckTx(t, txmp, "key9=0008=9") + require.True(t, txExists("key9=0008=9")) + require.False(t, txExists("k3y2=0001=5")) + + // Add a transaction that requires eviction of multiple lower-priority + // entries, in order to fit the size of the element. + mustCheckTx(t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11 + require.True(t, txExists("key1=0000=25")) + require.True(t, txExists("key8=0007=20")) + require.True(t, txExists("key10=0123456789abcdef=11")) + require.False(t, txExists("key3=0002=10")) + require.False(t, txExists("key9=0008=9")) + require.False(t, txExists("key7=0006=7")) +} + func TestTxMempool_Flush(t *testing.T) { txmp := setup(t, 0) txs := checkTxs(t, txmp, 100, 0) @@ -436,6 +518,51 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) { require.Zero(t, txmp.SizeBytes()) } +func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) { + txmp := setup(t, 50) + txmp.config.TTLDuration = 5 * time.Millisecond + + added1 := checkTxs(t, txmp, 25, 0) + require.Equal(t, len(added1), txmp.Size()) + + // Wait a while, then add some more transactions that should not be expired + // when the first batch TTLs out. + // + // ms: 0 1 2 3 4 5 6 + // ^ ^ ^ ^ + // | | | +-- Update (triggers pruning) + // | | +------ first batch expires + // | +-------------- second batch added + // +-------------------------- first batch added + // + // The exact intervals are not important except that the delta should be + // large relative to the cost of CheckTx (ms vs. ns is fine here). + time.Sleep(3 * time.Millisecond) + added2 := checkTxs(t, txmp, 25, 1) + + // Wait a while longer, so that the first batch will expire. + time.Sleep(3 * time.Millisecond) + + // Trigger an update so that pruning will occur. + txmp.Lock() + defer txmp.Unlock() + require.NoError(t, txmp.Update(txmp.height+1, nil, nil, nil, nil)) + + // All the transactions in the original set should have been purged. + for _, tx := range added1 { + if _, ok := txmp.txByKey[tx.tx.Key()]; ok { + t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key()) + } + } + + // All the transactions added later should still be around. + for _, tx := range added2 { + if _, ok := txmp.txByKey[tx.tx.Key()]; !ok { + t.Errorf("Transaction %X should still be in the mempool, but is not", tx.tx.Key()) + } + } +} + func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp := setup(t, 500) txmp.height = 100 @@ -443,7 +570,6 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { tTxs := checkTxs(t, txmp, 100, 0) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, 100, txmp.heightIndex.Size()) // reap 5 txs at the next height -- no txs should expire reapedTxs := txmp.ReapMaxTxs(5) @@ -457,12 +583,10 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp.Unlock() require.Equal(t, 95, txmp.Size()) - require.Equal(t, 95, txmp.heightIndex.Size()) // check more txs at height 101 _ = checkTxs(t, txmp, 50, 1) require.Equal(t, 145, txmp.Size()) - require.Equal(t, 145, txmp.heightIndex.Size()) // Reap 5 txs at a height that would expire all the transactions from before // the previous Update (height 100). @@ -483,7 +607,6 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp.Unlock() require.GreaterOrEqual(t, txmp.Size(), 45) - require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45) } func TestTxMempool_CheckTxPostCheckError(t *testing.T) { diff --git a/mempool/v1/priority_queue.go b/mempool/v1/priority_queue.go deleted file mode 100644 index b2bf59f2b..000000000 --- a/mempool/v1/priority_queue.go +++ /dev/null @@ -1,159 +0,0 @@ -package v1 - -import ( - "container/heap" - "sort" - - tmsync "github.com/tendermint/tendermint/libs/sync" -) - -var _ heap.Interface = (*TxPriorityQueue)(nil) - -// TxPriorityQueue defines a thread-safe priority queue for valid transactions. -type TxPriorityQueue struct { - mtx tmsync.RWMutex - txs []*WrappedTx -} - -func NewTxPriorityQueue() *TxPriorityQueue { - pq := &TxPriorityQueue{ - txs: make([]*WrappedTx, 0), - } - - heap.Init(pq) - - return pq -} - -// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be -// evicted to make room for another *WrappedTx with higher priority. If no such -// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx -// indicate that these transactions can be removed due to them being of lower -// priority and that their total sum in size allows room for the incoming -// transaction according to the mempool's configured limits. -func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx { - pq.mtx.RLock() - defer pq.mtx.RUnlock() - - txs := make([]*WrappedTx, len(pq.txs)) - copy(txs, pq.txs) - - sort.Slice(txs, func(i, j int) bool { - return txs[i].priority < txs[j].priority - }) - - var ( - toEvict []*WrappedTx - i int - ) - - currSize := totalSize - - // Loop over all transactions in ascending priority order evaluating those - // that are only of less priority than the provided argument. We continue - // evaluating transactions until there is sufficient capacity for the new - // transaction (size) as defined by txSize. - for i < len(txs) && txs[i].priority < priority { - toEvict = append(toEvict, txs[i]) - currSize -= int64(txs[i].Size()) - - if currSize+txSize <= cap { - return toEvict - } - - i++ - } - - return nil -} - -// NumTxs returns the number of transactions in the priority queue. It is -// thread safe. -func (pq *TxPriorityQueue) NumTxs() int { - pq.mtx.RLock() - defer pq.mtx.RUnlock() - - return len(pq.txs) -} - -// RemoveTx removes a specific transaction from the priority queue. -func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - if tx.heapIndex < len(pq.txs) { - heap.Remove(pq, tx.heapIndex) - } -} - -// PushTx adds a valid transaction to the priority queue. It is thread safe. -func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - heap.Push(pq, tx) -} - -// PopTx removes the top priority transaction from the queue. It is thread safe. -func (pq *TxPriorityQueue) PopTx() *WrappedTx { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - x := heap.Pop(pq) - if x != nil { - return x.(*WrappedTx) - } - - return nil -} - -// Push implements the Heap interface. -// -// NOTE: A caller should never call Push. Use PushTx instead. -func (pq *TxPriorityQueue) Push(x interface{}) { - n := len(pq.txs) - item := x.(*WrappedTx) - item.heapIndex = n - pq.txs = append(pq.txs, item) -} - -// Pop implements the Heap interface. -// -// NOTE: A caller should never call Pop. Use PopTx instead. -func (pq *TxPriorityQueue) Pop() interface{} { - old := pq.txs - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.heapIndex = -1 // for safety - pq.txs = old[0 : n-1] - return item -} - -// Len implements the Heap interface. -// -// NOTE: A caller should never call Len. Use NumTxs instead. -func (pq *TxPriorityQueue) Len() int { - return len(pq.txs) -} - -// Less implements the Heap interface. It returns true if the transaction at -// position i in the queue is of less priority than the transaction at position j. -func (pq *TxPriorityQueue) Less(i, j int) bool { - // If there exists two transactions with the same priority, consider the one - // that we saw the earliest as the higher priority transaction. - if pq.txs[i].priority == pq.txs[j].priority { - return pq.txs[i].timestamp.Before(pq.txs[j].timestamp) - } - - // We want Pop to give us the highest, not lowest, priority so we use greater - // than here. - return pq.txs[i].priority > pq.txs[j].priority -} - -// Swap implements the Heap interface. It swaps two transactions in the queue. -func (pq *TxPriorityQueue) Swap(i, j int) { - pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i] - pq.txs[i].heapIndex = i - pq.txs[j].heapIndex = j -} diff --git a/mempool/v1/priority_queue_test.go b/mempool/v1/priority_queue_test.go deleted file mode 100644 index c0048f388..000000000 --- a/mempool/v1/priority_queue_test.go +++ /dev/null @@ -1,176 +0,0 @@ -package v1 - -import ( - "math/rand" - "sort" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestTxPriorityQueue(t *testing.T) { - pq := NewTxPriorityQueue() - numTxs := 1000 - - priorities := make([]int, numTxs) - - var wg sync.WaitGroup - for i := 1; i <= numTxs; i++ { - priorities[i-1] = i - wg.Add(1) - - go func(i int) { - pq.PushTx(&WrappedTx{ - priority: int64(i), - timestamp: time.Now(), - }) - - wg.Done() - }(i) - } - - sort.Sort(sort.Reverse(sort.IntSlice(priorities))) - - wg.Wait() - require.Equal(t, numTxs, pq.NumTxs()) - - // Wait a second and push a tx with a duplicate priority - time.Sleep(time.Second) - now := time.Now() - pq.PushTx(&WrappedTx{ - priority: 1000, - timestamp: now, - }) - require.Equal(t, 1001, pq.NumTxs()) - - tx := pq.PopTx() - require.Equal(t, 1000, pq.NumTxs()) - require.Equal(t, int64(1000), tx.priority) - require.NotEqual(t, now, tx.timestamp) - - gotPriorities := make([]int, 0) - for pq.NumTxs() > 0 { - gotPriorities = append(gotPriorities, int(pq.PopTx().priority)) - } - - require.Equal(t, priorities, gotPriorities) -} - -func TestTxPriorityQueue_GetEvictableTxs(t *testing.T) { - pq := NewTxPriorityQueue() - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - values := make([]int, 1000) - - for i := 0; i < 1000; i++ { - tx := make([]byte, 5) // each tx is 5 bytes - _, err := rng.Read(tx) - require.NoError(t, err) - - x := rng.Intn(100000) - pq.PushTx(&WrappedTx{ - tx: tx, - priority: int64(x), - }) - - values[i] = x - } - - sort.Ints(values) - - max := values[len(values)-1] - min := values[0] - totalSize := int64(len(values) * 5) - - testCases := []struct { - name string - priority, txSize, totalSize, cap int64 - expectedLen int - }{ - { - name: "larest priority; single tx", - priority: int64(max + 1), - txSize: 5, - totalSize: totalSize, - cap: totalSize, - expectedLen: 1, - }, - { - name: "larest priority; multi tx", - priority: int64(max + 1), - txSize: 17, - totalSize: totalSize, - cap: totalSize, - expectedLen: 4, - }, - { - name: "larest priority; out of capacity", - priority: int64(max + 1), - txSize: totalSize + 1, - totalSize: totalSize, - cap: totalSize, - expectedLen: 0, - }, - { - name: "smallest priority; no tx", - priority: int64(min - 1), - txSize: 5, - totalSize: totalSize, - cap: totalSize, - expectedLen: 0, - }, - { - name: "small priority; no tx", - priority: int64(min), - txSize: 5, - totalSize: totalSize, - cap: totalSize, - expectedLen: 0, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - evictTxs := pq.GetEvictableTxs(tc.priority, tc.txSize, tc.totalSize, tc.cap) - require.Len(t, evictTxs, tc.expectedLen) - }) - } -} - -func TestTxPriorityQueue_RemoveTx(t *testing.T) { - pq := NewTxPriorityQueue() - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - numTxs := 1000 - - values := make([]int, numTxs) - - for i := 0; i < numTxs; i++ { - x := rng.Intn(100000) - pq.PushTx(&WrappedTx{ - priority: int64(x), - }) - - values[i] = x - } - - require.Equal(t, numTxs, pq.NumTxs()) - - sort.Ints(values) - max := values[len(values)-1] - - wtx := pq.txs[pq.NumTxs()/2] - pq.RemoveTx(wtx) - require.Equal(t, numTxs-1, pq.NumTxs()) - require.Equal(t, int64(max), pq.PopTx().priority) - require.Equal(t, numTxs-2, pq.NumTxs()) - - require.NotPanics(t, func() { - pq.RemoveTx(&WrappedTx{heapIndex: numTxs}) - pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1}) - }) - require.Equal(t, numTxs-2, pq.NumTxs()) -} diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index a233d6d32..4da51bab8 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -198,8 +198,8 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // start from the beginning. if next == nil { select { - case <-memR.mempool.WaitForNextTx(): // Wait until a tx is available - if next = memR.mempool.NextGossipTx(); next == nil { + case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available + if next = memR.mempool.TxsFront(); next == nil { continue } @@ -232,7 +232,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 - if ok := memR.mempool.txStore.TxHasPeer(memTx.hash, peerID); !ok { + if !memTx.HasPeer(peerID) { msg := protomem.Message{ Sum: &protomem.Message_Txs{ Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, diff --git a/mempool/v1/tx.go b/mempool/v1/tx.go index b3b920d2e..88522a8a7 100644 --- a/mempool/v1/tx.go +++ b/mempool/v1/tx.go @@ -1,281 +1,87 @@ package v1 import ( - "sort" + "sync" "time" - "github.com/tendermint/tendermint/libs/clist" - tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/types" ) // WrappedTx defines a wrapper around a raw transaction with additional metadata // that is used for indexing. type WrappedTx struct { - // tx represents the raw binary transaction data - tx types.Tx + tx types.Tx // the original transaction data + hash types.TxKey // the transaction hash + height int64 // height when this transaction was initially checked (for expiry) + timestamp time.Time // time when transaction was entered (for TTL) - // hash defines the transaction hash and the primary key used in the mempool - hash types.TxKey - - // height defines the height at which the transaction was validated at - height int64 - - // gasWanted defines the amount of gas the transaction sender requires - gasWanted int64 - - // priority defines the transaction's priority as specified by the application - // in the ResponseCheckTx response. - priority int64 - - // sender defines the transaction's sender as specified by the application in - // the ResponseCheckTx response. - sender string - - // timestamp is the time at which the node first received the transaction from - // a peer. It is used as a second dimension is prioritizing transactions when - // two transactions have the same priority. - timestamp time.Time - - // peers records a mapping of all peers that sent a given transaction - peers map[uint16]struct{} - - // heapIndex defines the index of the item in the heap - heapIndex int - - // gossipEl references the linked-list element in the gossip index - gossipEl *clist.CElement - - // removed marks the transaction as removed from the mempool. This is set - // during RemoveTx and is needed due to the fact that a given existing - // transaction in the mempool can be evicted when it is simultaneously having - // a reCheckTx callback executed. - removed bool + mtx sync.Mutex + gasWanted int64 // app: gas required to execute this transaction + priority int64 // app: priority value for this transaction + sender string // app: assigned sender label + peers map[uint16]bool // peer IDs who have sent us this transaction } -func (wtx *WrappedTx) Size() int { - return len(wtx.tx) -} +// Size reports the size of the raw transaction in bytes. +func (w *WrappedTx) Size() int64 { return int64(len(w.tx)) } -// TxStore implements a thread-safe mapping of valid transaction(s). -// -// NOTE: -// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative -// access is not allowed. Regardless, it is not expected for the mempool to -// need mutative access. -type TxStore struct { - mtx tmsync.RWMutex - hashTxs map[types.TxKey]*WrappedTx // primary index - senderTxs map[string]*WrappedTx // sender is defined by the ABCI application -} - -func NewTxStore() *TxStore { - return &TxStore{ - senderTxs: make(map[string]*WrappedTx), - hashTxs: make(map[types.TxKey]*WrappedTx), +// SetPeer adds the specified peer ID as a sender of w. +func (w *WrappedTx) SetPeer(id uint16) { + w.mtx.Lock() + defer w.mtx.Unlock() + if w.peers == nil { + w.peers = map[uint16]bool{id: true} + } else { + w.peers[id] = true } } -// Size returns the total number of transactions in the store. -func (txs *TxStore) Size() int { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return len(txs.hashTxs) -} - -// GetAllTxs returns all the transactions currently in the store. -func (txs *TxStore) GetAllTxs() []*WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wTxs := make([]*WrappedTx, len(txs.hashTxs)) - i := 0 - for _, wtx := range txs.hashTxs { - wTxs[i] = wtx - i++ - } - - return wTxs -} - -// GetTxBySender returns a *WrappedTx by the transaction's sender property -// defined by the ABCI application. -func (txs *TxStore) GetTxBySender(sender string) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.senderTxs[sender] -} - -// GetTxByHash returns a *WrappedTx by the transaction's hash. -func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.hashTxs[hash] -} - -// IsTxRemoved returns true if a transaction by hash is marked as removed and -// false otherwise. -func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wtx, ok := txs.hashTxs[hash] - if ok { - return wtx.removed - } - - return false -} - -// SetTx stores a *WrappedTx by it's hash. If the transaction also contains a -// non-empty sender, we additionally store the transaction by the sender as -// defined by the ABCI application. -func (txs *TxStore) SetTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - txs.senderTxs[wtx.sender] = wtx - } - - txs.hashTxs[wtx.tx.Key()] = wtx -} - -// RemoveTx removes a *WrappedTx from the transaction store. It deletes all -// indexes of the transaction. -func (txs *TxStore) RemoveTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - delete(txs.senderTxs, wtx.sender) - } - - delete(txs.hashTxs, wtx.tx.Key()) - wtx.removed = true -} - -// TxHasPeer returns true if a transaction by hash has a given peer ID and false -// otherwise. If the transaction does not exist, false is returned. -func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wtx := txs.hashTxs[hash] - if wtx == nil { - return false - } - - _, ok := wtx.peers[peerID] +// HasPeer reports whether the specified peer ID is a sender of w. +func (w *WrappedTx) HasPeer(id uint16) bool { + w.mtx.Lock() + defer w.mtx.Unlock() + _, ok := w.peers[id] return ok } -// GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the -// given peerID to the WrappedTx's set of peers that sent us this transaction. -// We return true if we've already recorded the given peer for this transaction -// and false otherwise. If the transaction does not exist by hash, we return -// (nil, false). -func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - wtx := txs.hashTxs[hash] - if wtx == nil { - return nil, false - } - - if wtx.peers == nil { - wtx.peers = make(map[uint16]struct{}) - } - - if _, ok := wtx.peers[peerID]; ok { - return wtx, true - } - - wtx.peers[peerID] = struct{}{} - return wtx, false +// SetGasWanted sets the application-assigned gas requirement of w. +func (w *WrappedTx) SetGasWanted(gas int64) { + w.mtx.Lock() + defer w.mtx.Unlock() + w.gasWanted = gas } -// WrappedTxList implements a thread-safe list of *WrappedTx objects that can be -// used to build generic transaction indexes in the mempool. It accepts a -// comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx -// references which is used during Insert in order to determine sorted order. If -// less returns true, a <= b. -type WrappedTxList struct { - mtx tmsync.RWMutex - txs []*WrappedTx - less func(*WrappedTx, *WrappedTx) bool +// GasWanted reports the application-assigned gas requirement of w. +func (w *WrappedTx) GasWanted() int64 { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.gasWanted } -func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList { - return &WrappedTxList{ - txs: make([]*WrappedTx, 0), - less: less, - } +// SetSender sets the application-assigned sender of w. +func (w *WrappedTx) SetSender(sender string) { + w.mtx.Lock() + defer w.mtx.Unlock() + w.sender = sender } -// Size returns the number of WrappedTx objects in the list. -func (wtl *WrappedTxList) Size() int { - wtl.mtx.RLock() - defer wtl.mtx.RUnlock() - - return len(wtl.txs) +// Sender reports the application-assigned sender of w. +func (w *WrappedTx) Sender() string { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.sender } -// Reset resets the list of transactions to an empty list. -func (wtl *WrappedTxList) Reset() { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - wtl.txs = make([]*WrappedTx, 0) +// SetPriority sets the application-assigned priority of w. +func (w *WrappedTx) SetPriority(p int64) { + w.mtx.Lock() + defer w.mtx.Unlock() + w.priority = p } -// Insert inserts a WrappedTx reference into the sorted list based on the list's -// comparator function. -func (wtl *WrappedTxList) Insert(wtx *WrappedTx) { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - i := sort.Search(len(wtl.txs), func(i int) bool { - return wtl.less(wtl.txs[i], wtx) - }) - - if i == len(wtl.txs) { - // insert at the end - wtl.txs = append(wtl.txs, wtx) - return - } - - // Make space for the inserted element by shifting values at the insertion - // index up one index. - // - // NOTE: The call to append does not allocate memory when cap(wtl.txs) > len(wtl.txs). - wtl.txs = append(wtl.txs[:i+1], wtl.txs[i:]...) - wtl.txs[i] = wtx -} - -// Remove attempts to remove a WrappedTx from the sorted list. -func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - i := sort.Search(len(wtl.txs), func(i int) bool { - return wtl.less(wtl.txs[i], wtx) - }) - - // Since the list is sorted, we evaluate all elements starting at i. Note, if - // the element does not exist, we may potentially evaluate the entire remainder - // of the list. However, a caller should not be expected to call Remove with a - // non-existing element. - for i < len(wtl.txs) { - if wtl.txs[i] == wtx { - wtl.txs = append(wtl.txs[:i], wtl.txs[i+1:]...) - return - } - - i++ - } +// Priority reports the application-assigned priority of w. +func (w *WrappedTx) Priority() int64 { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.priority } diff --git a/mempool/v1/tx_test.go b/mempool/v1/tx_test.go deleted file mode 100644 index fb4beafab..000000000 --- a/mempool/v1/tx_test.go +++ /dev/null @@ -1,230 +0,0 @@ -package v1 - -import ( - "fmt" - "math/rand" - "sort" - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/types" -) - -func TestTxStore_GetTxBySender(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - sender: "foo", - priority: 1, - timestamp: time.Now(), - } - - res := txs.GetTxBySender(wtx.sender) - require.Nil(t, res) - - txs.SetTx(wtx) - - res = txs.GetTxBySender(wtx.sender) - require.NotNil(t, res) - require.Equal(t, wtx, res) -} - -func TestTxStore_GetTxByHash(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - sender: "foo", - priority: 1, - timestamp: time.Now(), - } - - key := wtx.tx.Key() - res := txs.GetTxByHash(key) - require.Nil(t, res) - - txs.SetTx(wtx) - - res = txs.GetTxByHash(key) - require.NotNil(t, res) - require.Equal(t, wtx, res) -} - -func TestTxStore_SetTx(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - priority: 1, - timestamp: time.Now(), - } - - key := wtx.tx.Key() - txs.SetTx(wtx) - - res := txs.GetTxByHash(key) - require.NotNil(t, res) - require.Equal(t, wtx, res) - - wtx.sender = "foo" - txs.SetTx(wtx) - - res = txs.GetTxByHash(key) - require.NotNil(t, res) - require.Equal(t, wtx, res) -} - -func TestTxStore_GetOrSetPeerByTxHash(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - priority: 1, - timestamp: time.Now(), - } - - key := wtx.tx.Key() - txs.SetTx(wtx) - - res, ok := txs.GetOrSetPeerByTxHash(types.Tx([]byte("test_tx_2")).Key(), 15) - require.Nil(t, res) - require.False(t, ok) - - res, ok = txs.GetOrSetPeerByTxHash(key, 15) - require.NotNil(t, res) - require.False(t, ok) - - res, ok = txs.GetOrSetPeerByTxHash(key, 15) - require.NotNil(t, res) - require.True(t, ok) - - require.True(t, txs.TxHasPeer(key, 15)) - require.False(t, txs.TxHasPeer(key, 16)) -} - -func TestTxStore_RemoveTx(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - priority: 1, - timestamp: time.Now(), - } - - txs.SetTx(wtx) - - key := wtx.tx.Key() - res := txs.GetTxByHash(key) - require.NotNil(t, res) - - txs.RemoveTx(res) - - res = txs.GetTxByHash(key) - require.Nil(t, res) -} - -func TestTxStore_Size(t *testing.T) { - txStore := NewTxStore() - numTxs := 1000 - - for i := 0; i < numTxs; i++ { - txStore.SetTx(&WrappedTx{ - tx: []byte(fmt.Sprintf("test_tx_%d", i)), - priority: int64(i), - timestamp: time.Now(), - }) - } - - require.Equal(t, numTxs, txStore.Size()) -} - -func TestWrappedTxList_Reset(t *testing.T) { - list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }) - - require.Zero(t, list.Size()) - - for i := 0; i < 100; i++ { - list.Insert(&WrappedTx{height: int64(i)}) - } - - require.Equal(t, 100, list.Size()) - - list.Reset() - require.Zero(t, list.Size()) -} - -func TestWrappedTxList_Insert(t *testing.T) { - list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }) - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - var expected []int - for i := 0; i < 100; i++ { - height := rng.Int63n(10000) - expected = append(expected, int(height)) - list.Insert(&WrappedTx{height: height}) - - if i%10 == 0 { - list.Insert(&WrappedTx{height: height}) - expected = append(expected, int(height)) - } - } - - got := make([]int, list.Size()) - for i, wtx := range list.txs { - got[i] = int(wtx.height) - } - - sort.Ints(expected) - require.Equal(t, expected, got) -} - -func TestWrappedTxList_Remove(t *testing.T) { - list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }) - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - var txs []*WrappedTx - for i := 0; i < 100; i++ { - height := rng.Int63n(10000) - tx := &WrappedTx{height: height} - - txs = append(txs, tx) - list.Insert(tx) - - if i%10 == 0 { - tx = &WrappedTx{height: height} - list.Insert(tx) - txs = append(txs, tx) - } - } - - // remove a tx that does not exist - list.Remove(&WrappedTx{height: 20000}) - - // remove a tx that exists (by height) but not referenced - list.Remove(&WrappedTx{height: txs[0].height}) - - // remove a few existing txs - for i := 0; i < 25; i++ { - j := rng.Intn(len(txs)) - list.Remove(txs[j]) - txs = append(txs[:j], txs[j+1:]...) - } - - expected := make([]int, len(txs)) - for i, tx := range txs { - expected[i] = int(tx.height) - } - - got := make([]int, list.Size()) - for i, wtx := range list.txs { - got[i] = int(wtx.height) - } - - sort.Ints(expected) - require.Equal(t, expected, got) -}