diff --git a/internal/consensus/mempool_test.go b/internal/consensus/mempool_test.go index ed11f6840..59647f76b 100644 --- a/internal/consensus/mempool_test.go +++ b/internal/consensus/mempool_test.go @@ -214,12 +214,13 @@ func TestMempoolRmBadTx(t *testing.T) { emptyMempoolCh := make(chan struct{}) checkTxRespCh := make(chan struct{}) go func() { - // Try to send the tx through the mempool. + // Try to send an out-of-sequence transaction through the mempool. // CheckTx should not err, but the app should return a bad abci code // and the tx should get removed from the pool + binary.BigEndian.PutUint64(txBytes, uint64(5)) err := assertMempool(t, cs.txNotifier).CheckTx(ctx, txBytes, func(r *abci.ResponseCheckTx) { if r.Code != code.CodeTypeBadNonce { - t.Errorf("expected checktx to return bad nonce, got %v", r) + t.Errorf("expected checktx to return bad nonce, got %#v", r) return } checkTxRespCh <- struct{}{} @@ -312,14 +313,15 @@ func (app *CounterApplication) FinalizeBlock(_ context.Context, req *abci.Reques func (app *CounterApplication) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) { app.mu.Lock() defer app.mu.Unlock() - - txValue := txAsUint64(req.Tx) - if txValue != uint64(app.mempoolTxCount) { - return &abci.ResponseCheckTx{ - Code: code.CodeTypeBadNonce, - }, nil + if req.Type == abci.CheckTxType_New { + txValue := txAsUint64(req.Tx) + if txValue != uint64(app.mempoolTxCount) { + return &abci.ResponseCheckTx{ + Code: code.CodeTypeBadNonce, + }, nil + } + app.mempoolTxCount++ } - app.mempoolTxCount++ return &abci.ResponseCheckTx{Code: code.CodeTypeOK}, nil } @@ -332,8 +334,6 @@ func txAsUint64(tx []byte) uint64 { func (app *CounterApplication) Commit(context.Context) (*abci.ResponseCommit, error) { app.mu.Lock() defer app.mu.Unlock() - - app.mempoolTxCount = app.txCount return &abci.ResponseCommit{}, nil } diff --git a/internal/libs/clist/bench_test.go b/internal/libs/clist/bench_test.go index ee5d836a7..95973cc76 100644 --- a/internal/libs/clist/bench_test.go +++ b/internal/libs/clist/bench_test.go @@ -12,7 +12,7 @@ func BenchmarkDetaching(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { start.removed = true - start.detachNext() + start.DetachNext() start.DetachPrev() tmp := nxt nxt = nxt.Next() diff --git a/internal/libs/clist/clist.go b/internal/libs/clist/clist.go index 3969c94cc..9a0e5bcc8 100644 --- a/internal/libs/clist/clist.go +++ b/internal/libs/clist/clist.go @@ -103,7 +103,7 @@ func (e *CElement) Removed() bool { return isRemoved } -func (e *CElement) detachNext() { +func (e *CElement) DetachNext() { e.mtx.Lock() if !e.removed { e.mtx.Unlock() diff --git a/internal/mempool/cache.go b/internal/mempool/cache.go index c69fc80dd..3986cd585 100644 --- a/internal/mempool/cache.go +++ b/internal/mempool/cache.go @@ -22,6 +22,10 @@ type TxCache interface { // Remove removes the given raw transaction from the cache. Remove(tx types.Tx) + + // Has reports whether tx is present in the cache. Checking for presence is + // not treated as an access of the value. + Has(tx types.Tx) bool } var _ TxCache = (*LRUTxCache)(nil) @@ -97,6 +101,14 @@ func (c *LRUTxCache) Remove(tx types.Tx) { } } +func (c *LRUTxCache) Has(tx types.Tx) bool { + c.mtx.Lock() + defer c.mtx.Unlock() + + _, ok := c.cacheMap[tx.Key()] + return ok +} + // NopTxCache defines a no-op raw transaction cache. type NopTxCache struct{} @@ -105,3 +117,4 @@ var _ TxCache = (*NopTxCache)(nil) func (NopTxCache) Reset() {} func (NopTxCache) Push(types.Tx) bool { return true } func (NopTxCache) Remove(types.Tx) {} +func (NopTxCache) Has(types.Tx) bool { return false } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 2398180fc..0354eb28a 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -1,20 +1,20 @@ package mempool import ( - "bytes" "context" - "errors" "fmt" + "runtime" + "sort" "sync" "sync/atomic" "time" + "github.com/creachadair/taskgroup" abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/libs/log" - tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/types" ) @@ -23,73 +23,41 @@ var _ Mempool = (*TxMempool)(nil) // TxMempoolOption sets an optional parameter on the TxMempool. type TxMempoolOption func(*TxMempool) -// TxMempool defines a prioritized mempool data structure used by the v1 mempool -// reactor. It keeps a thread-safe priority queue of transactions that is used -// when a block proposer constructs a block and a thread-safe linked-list that -// is used to gossip transactions to peers in a FIFO manner. +// TxMempool implemements the Mempool interface and allows the application to +// set priority values on transactions in the CheckTx response. When selecting +// transactions to include in a block, higher-priority transactions are chosen +// first. When evicting transactions from the mempool for size constraints, +// lower-priority transactions are evicted sooner. +// +// Within the mempool, transactions are ordered by time of arrival, and are +// gossiped to the rest of the network based on that order (gossip order does +// not take priority into account). type TxMempool struct { + // Immutable fields logger log.Logger - metrics *Metrics config *config.MempoolConfig proxyAppConn abciclient.Client + metrics *Metrics + cache TxCache // seen transactions - // txsAvailable fires once for each height when the mempool is not empty - txsAvailable chan struct{} + // Atomically-updated fields + txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes + + // Synchronized fields, protected by mtx. + mtx *sync.RWMutex notifiedTxsAvailable bool + txsAvailable chan struct{} // one value sent per height when mempool is not empty + preCheck PreCheckFunc + postCheck PostCheckFunc + height int64 // the latest height passed to Update - // height defines the last block height process during Update() - height int64 - - // sizeBytes defines the total size of the mempool (sum of all tx bytes) - sizeBytes int64 - - // cache defines a fixed-size cache of already seen transactions as this - // reduces pressure on the proxyApp. - cache TxCache - - // txStore defines the main storage of valid transactions. Indexes are built - // on top of this store. - txStore *TxStore - - // gossipIndex defines the gossiping index of valid transactions via a - // thread-safe linked-list. We also use the gossip index as a cursor for - // rechecking transactions already in the mempool. - gossipIndex *clist.CList - - // recheckCursor and recheckEnd are used as cursors based on the gossip index - // to recheck transactions that are already in the mempool. Iteration is not - // thread-safe and transaction may be mutated in serial order. - // - // XXX/TODO: It might be somewhat of a codesmell to use the gossip index for - // iterator and cursor management when rechecking transactions. If the gossip - // index changes or is removed in a future refactor, this will have to be - // refactored. Instead, we should consider just keeping a slice of a snapshot - // of the mempool's current transactions during Update and an integer cursor - // into that slice. This, however, requires additional O(n) space complexity. - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - - // priorityIndex defines the priority index of valid transactions via a - // thread-safe priority queue. - priorityIndex *TxPriorityQueue - - // heightIndex defines a height-based, in ascending order, transaction index. - // i.e. older transactions are first. - heightIndex *WrappedTxList - - // timestampIndex defines a timestamp-based, in ascending order, transaction - // index. i.e. older transactions are first. - timestampIndex *WrappedTxList - - // A read/write lock is used to safe guard updates, insertions and deletions - // from the mempool. A read-lock is implicitly acquired when executing CheckTx, - // however, a caller must explicitly grab a write-lock via Lock when updating - // the mempool via Update(). - mtx sync.RWMutex - preCheck PreCheckFunc - postCheck PostCheckFunc + txs *clist.CList // valid transactions (passed CheckTx) + txByKey map[types.TxKey]*clist.CElement + txBySender map[string]*clist.CElement // for sender != "" } +// NewTxMempool constructs a new, empty priority mempool at the specified +// initial height and using the given config and options. func NewTxMempool( logger log.Logger, cfg *config.MempoolConfig, @@ -98,23 +66,16 @@ func NewTxMempool( ) *TxMempool { txmp := &TxMempool{ - logger: logger, - config: cfg, - proxyAppConn: proxyAppConn, - height: -1, - cache: NopTxCache{}, - metrics: NopMetrics(), - txStore: NewTxStore(), - gossipIndex: clist.New(), - priorityIndex: NewTxPriorityQueue(), - heightIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }), - timestampIndex: NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.timestamp.After(wtx2.timestamp) || wtx1.timestamp.Equal(wtx2.timestamp) - }), + logger: logger, + config: cfg, + proxyAppConn: proxyAppConn, + metrics: NopMetrics(), + cache: NopTxCache{}, + txs: clist.New(), + mtx: new(sync.RWMutex), + txByKey: make(map[types.TxKey]*clist.CElement), + txBySender: make(map[string]*clist.CElement), } - if cfg.CacheSize > 0 { txmp.cache = NewLRUTxCache(cfg.CacheSize) } @@ -147,47 +108,36 @@ func WithMetrics(metrics *Metrics) TxMempoolOption { // Lock obtains a write-lock on the mempool. A caller must be sure to explicitly // release the lock when finished. -func (txmp *TxMempool) Lock() { - txmp.mtx.Lock() -} +func (txmp *TxMempool) Lock() { txmp.mtx.Lock() } // Unlock releases a write-lock on the mempool. -func (txmp *TxMempool) Unlock() { - txmp.mtx.Unlock() -} +func (txmp *TxMempool) Unlock() { txmp.mtx.Unlock() } // Size returns the number of valid transactions in the mempool. It is // thread-safe. -func (txmp *TxMempool) Size() int { - return txmp.txStore.Size() -} +func (txmp *TxMempool) Size() int { return txmp.txs.Len() } // SizeBytes return the total sum in bytes of all the valid transactions in the // mempool. It is thread-safe. -func (txmp *TxMempool) SizeBytes() int64 { - return atomic.LoadInt64(&txmp.sizeBytes) -} +func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.txsBytes) } -// FlushAppConn executes FlushSync on the mempool's proxyAppConn. +// FlushAppConn executes Flush on the mempool's proxyAppConn. // -// NOTE: The caller must obtain a write-lock prior to execution. +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling FlushAppConn. func (txmp *TxMempool) FlushAppConn(ctx context.Context) error { + // N.B.: We have to issue the call outside the lock so that its callback can + // fire. It's safe to do this, the flush will block until complete. + // + // We could just not require the caller to hold the lock at all, but the + // semantics of the Mempool interface require the caller to hold it, and we + // can't change that without disrupting existing use. + txmp.mtx.Unlock() + defer txmp.mtx.Lock() + return txmp.proxyAppConn.Flush(ctx) } -// WaitForNextTx returns a blocking channel that will be closed when the next -// valid transaction is available to gossip. It is thread-safe. -func (txmp *TxMempool) WaitForNextTx() <-chan struct{} { - return txmp.gossipIndex.WaitChan() -} - -// NextGossipTx returns the next valid transaction to gossip. A caller must wait -// for WaitForNextTx to signal a transaction is available to gossip first. It is -// thread-safe. -func (txmp *TxMempool) NextGossipTx() *clist.CElement { - return txmp.gossipIndex.Front() -} - // EnableTxsAvailable enables the mempool to trigger events when transactions // are available on a block by block basis. func (txmp *TxMempool) EnableTxsAvailable() { @@ -199,232 +149,249 @@ func (txmp *TxMempool) EnableTxsAvailable() { // TxsAvailable returns a channel which fires once for every height, and only // when transactions are available in the mempool. It is thread-safe. -func (txmp *TxMempool) TxsAvailable() <-chan struct{} { - return txmp.txsAvailable -} +func (txmp *TxMempool) TxsAvailable() <-chan struct{} { return txmp.txsAvailable } -// CheckTx executes the ABCI CheckTx method for a given transaction. -// It acquires a read-lock and attempts to execute the application's -// CheckTx ABCI method synchronously. We return an error if any of -// the following happen: +// CheckTx adds the given transaction to the mempool if it fits and passes the +// application's ABCI CheckTx method. // -// - The CheckTx execution fails. -// - The transaction already exists in the cache and we've already received the -// transaction from the peer. Otherwise, if it solely exists in the cache, we -// return nil. -// - The transaction size exceeds the maximum transaction size as defined by the -// configuration provided to the mempool. -// - The transaction fails Pre-Check (if it is defined). -// - The proxyAppConn fails, e.g. the buffer is full. +// CheckTx reports an error without adding tx if: // -// If the mempool is full, we still execute CheckTx and attempt to find a lower -// priority transaction to evict. If such a transaction exists, we remove the -// lower priority transaction and add the new one with higher priority. +// - The size of tx exceeds the configured maximum transaction size. +// - The pre-check hook is defined and reports an error for tx. +// - The transaction already exists in the cache. +// - The proxy connection to the application fails. // -// NOTE: -// - The applications' CheckTx implementation may panic. -// - The caller is not to explicitly require any locks for executing CheckTx. +// If tx passes all of the above conditions, it is passed (asynchronously) to +// the application's ABCI CheckTx method and this CheckTx method returns nil. +// If cb != nil, it is called when the ABCI request completes to report the +// application response. +// +// If the application accepts the transaction and the mempool is full, the +// mempool evicts one or more of the lowest-priority transaction whose priority +// is (strictly) lower than the priority of tx and whose size together exceeds +// the size of tx, and adds tx instead. If no such transactions exist, tx is +// discarded. func (txmp *TxMempool) CheckTx( ctx context.Context, tx types.Tx, cb func(*abci.ResponseCheckTx), txInfo TxInfo, ) error { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + // During the initial phase of CheckTx, we do not need to modify any state. + // A transaction will not actually be added to the mempool until it survives + // a call to the ABCI CheckTx method and size constraint checks. + height, err := func() (int64, error) { + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() - if txSize := len(tx); txSize > txmp.config.MaxTxBytes { - return types.ErrTxTooLarge{ - Max: txmp.config.MaxTxBytes, - Actual: txSize, + // Reject transactions in excess of the configured maximum transaction size. + if len(tx) > txmp.config.MaxTxBytes { + return 0, types.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)} } - } - if txmp.preCheck != nil { - if err := txmp.preCheck(tx); err != nil { - return types.ErrPreCheck{Reason: err} + // If a precheck hook is defined, call it before invoking the application. + if txmp.preCheck != nil { + if err := txmp.preCheck(tx); err != nil { + return 0, types.ErrPreCheck{Reason: err} + } } - } - if err := txmp.proxyAppConn.Error(); err != nil { + // Early exit if the proxy connection has an error. + if err := txmp.proxyAppConn.Error(); err != nil { + return 0, err + } + + txKey := tx.Key() + + // Check for the transaction in the cache. + if !txmp.cache.Push(tx) { + // If the cached transaction is also in the pool, record its sender. + if elt, ok := txmp.txByKey[txKey]; ok { + w := elt.Value.(*WrappedTx) + w.SetPeer(txInfo.SenderID) + } + return 0, types.ErrTxInCache + } + return txmp.height, nil + }() + if err != nil { return err } - txHash := tx.Key() - - // We add the transaction to the mempool's cache and if the - // transaction is already present in the cache, i.e. false is returned, then we - // check if we've seen this transaction and error if we have. - if !txmp.cache.Push(tx) { - txmp.txStore.GetOrSetPeerByTxHash(txHash, txInfo.SenderID) - return types.ErrTxInCache - } - - res, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{Tx: tx}) + // Invoke an ABCI CheckTx for this transaction. + rsp, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{Tx: tx}) if err != nil { txmp.cache.Remove(tx) return err } - - if txmp.recheckCursor != nil { - return errors.New("recheck cursor is non-nil") - } - wtx := &WrappedTx{ tx: tx, - hash: txHash, + hash: tx.Key(), timestamp: time.Now().UTC(), - height: txmp.height, - } - - txmp.defaultTxCallback(tx, res) - err = txmp.initTxCallback(wtx, res, txInfo) - - if err != nil { - return err + height: height, } + wtx.SetPeer(txInfo.SenderID) if cb != nil { - cb(res) + cb(rsp) } - - return nil + return txmp.addNewTransaction(wtx, rsp) } +// RemoveTxByKey removes the transaction with the specified key from the +// mempool. It reports an error if no such transaction exists. This operation +// does not remove the transaction from the cache. func (txmp *TxMempool) RemoveTxByKey(txKey types.TxKey) error { - txmp.Lock() - defer txmp.Unlock() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + return txmp.removeTxByKey(txKey) +} - // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(txKey); wtx != nil { - txmp.removeTx(wtx, false) +// removeTxByKey removes the specified transaction key from the mempool. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) removeTxByKey(key types.TxKey) error { + if elt, ok := txmp.txByKey[key]; ok { + w := elt.Value.(*WrappedTx) + delete(txmp.txByKey, key) + delete(txmp.txBySender, w.sender) + txmp.txs.Remove(elt) + elt.DetachPrev() + elt.DetachNext() + atomic.AddInt64(&txmp.txsBytes, -w.Size()) return nil } - - return errors.New("transaction not found") + return fmt.Errorf("transaction %x not found", key) } -// Flush empties the mempool. It acquires a read-lock, fetches all the -// transactions currently in the transaction store and removes each transaction -// from the store and all indexes and finally resets the cache. -// -// NOTE: -// - Flushing the mempool may leave the mempool in an inconsistent state. +// removeTxByElement removes the specified transaction element from the mempool. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) removeTxByElement(elt *clist.CElement) { + w := elt.Value.(*WrappedTx) + delete(txmp.txByKey, w.tx.Key()) + delete(txmp.txBySender, w.sender) + txmp.txs.Remove(elt) + elt.DetachPrev() + elt.DetachNext() + atomic.AddInt64(&txmp.txsBytes, -w.Size()) +} + +// Flush purges the contents of the mempool and the cache, leaving both empty. +// The current height is not modified by this operation. func (txmp *TxMempool) Flush() { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() - txmp.heightIndex.Reset() - txmp.timestampIndex.Reset() - - for _, wtx := range txmp.txStore.GetAllTxs() { - txmp.removeTx(wtx, false) + // Remove all the transactions in the list explicitly, so that the sizes + // and indexes get updated properly. + cur := txmp.txs.Front() + for cur != nil { + next := cur.Next() + txmp.removeTxByElement(cur) + cur = next } - - atomic.SwapInt64(&txmp.sizeBytes, 0) txmp.cache.Reset() } -// ReapMaxBytesMaxGas returns a list of transactions within the provided size -// and gas constraints. Transaction are retrieved in priority order. +// allEntriesSorted returns a slice of all the transactions currently in the +// mempool, sorted in nonincreasing order by priority with ties broken by +// increasing order of arrival time. +func (txmp *TxMempool) allEntriesSorted() []*WrappedTx { + txmp.mtx.RLock() + defer txmp.mtx.RUnlock() + + all := make([]*WrappedTx, 0, len(txmp.txByKey)) + for _, tx := range txmp.txByKey { + all = append(all, tx.Value.(*WrappedTx)) + } + sort.Slice(all, func(i, j int) bool { + if all[i].priority == all[j].priority { + return all[i].timestamp.Before(all[j].timestamp) + } + return all[i].priority > all[j].priority // N.B. higher priorities first + }) + return all +} + +// ReapMaxBytesMaxGas returns a slice of valid transactions that fit within the +// size and gas constraints. The results are ordered by nonincreasing priority, +// with ties broken by increasing order of arrival. Reaping transactions does +// not remove them from the mempool. // -// NOTE: -// - Transactions returned are not removed from the mempool transaction -// store or indexes. +// If maxBytes < 0, no limit is set on the total size in bytes. +// If maxGas < 0, no limit is set on the total gas cost. +// +// If the mempool is empty or has no transactions fitting within the given +// constraints, the result will also be empty. func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + var totalGas, totalBytes int64 - var ( - totalGas int64 - totalSize int64 - ) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, txmp.priorityIndex.NumTxs()) - defer func() { - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) + var keep []types.Tx //nolint:prealloc + for _, w := range txmp.allEntriesSorted() { + // N.B. When computing byte size, we need to include the overhead for + // encoding as protobuf to send to the application. + totalGas += w.gasWanted + totalBytes += types.ComputeProtoSizeForTxs([]types.Tx{w.tx}) + if (maxGas >= 0 && totalGas > maxGas) || (maxBytes >= 0 && totalBytes > maxBytes) { + break } - }() - - txs := make([]types.Tx, 0, txmp.priorityIndex.NumTxs()) - for txmp.priorityIndex.NumTxs() > 0 { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) - size := types.ComputeProtoSizeForTxs([]types.Tx{wtx.tx}) - - // Ensure we have capacity for the transaction with respect to the - // transaction size. - if maxBytes > -1 && totalSize+size > maxBytes { - return txs[:len(txs)-1] - } - - totalSize += size - - // ensure we have capacity for the transaction with respect to total gas - gas := totalGas + wtx.gasWanted - if maxGas > -1 && gas > maxGas { - return txs[:len(txs)-1] - } - - totalGas = gas + keep = append(keep, w.tx) } - - return txs + return keep } -// ReapMaxTxs returns a list of transactions within the provided number of -// transactions bound. Transaction are retrieved in priority order. +// TxsWaitChan returns a channel that is closed when there is at least one +// transaction available to be gossiped. +func (txmp *TxMempool) TxsWaitChan() <-chan struct{} { return txmp.txs.WaitChan() } + +// TxsFront returns the frontmost element of the pending transaction list. +// It will be nil if the mempool is empty. +func (txmp *TxMempool) TxsFront() *clist.CElement { return txmp.txs.Front() } + +// ReapMaxTxs returns up to max transactions from the mempool. The results are +// ordered by nonincreasing priority with ties broken by increasing order of +// arrival. Reaping transactions does not remove them from the mempool. // -// NOTE: -// - Transactions returned are not removed from the mempool transaction -// store or indexes. +// If max < 0, all transactions in the mempool are reaped. +// +// The result may have fewer than max elements (possibly zero) if the mempool +// does not have that many transactions available. func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { - txmp.mtx.RLock() - defer txmp.mtx.RUnlock() + var keep []types.Tx //nolint:prealloc - numTxs := txmp.priorityIndex.NumTxs() - if max < 0 { - max = numTxs + for _, w := range txmp.allEntriesSorted() { + if max >= 0 && len(keep) >= max { + break + } + keep = append(keep, w.tx) } - - cap := tmmath.MinInt(numTxs, max) - - // wTxs contains a list of *WrappedTx retrieved from the priority queue that - // need to be re-enqueued prior to returning. - wTxs := make([]*WrappedTx, 0, cap) - txs := make([]types.Tx, 0, cap) - for txmp.priorityIndex.NumTxs() > 0 && len(txs) < max { - wtx := txmp.priorityIndex.PopTx() - txs = append(txs, wtx.tx) - wTxs = append(wTxs, wtx) - } - for _, wtx := range wTxs { - txmp.priorityIndex.PushTx(wtx) - } - return txs + return keep } -// Update iterates over all the transactions provided by the block producer, -// removes them from the cache (if applicable), and removes -// the transactions from the main transaction store and associated indexes. -// If there are transactions remaining in the mempool, we initiate a -// re-CheckTx for them (if applicable), otherwise, we notify the caller more -// transactions are available. +// Update removes all the given transactions from the mempool and the cache, +// and updates the current block height. The blockTxs and deliverTxResponses +// must have the same length with each response corresponding to the tx at the +// same offset. // -// NOTE: -// - The caller must explicitly acquire a write-lock. +// If the configuration enables recheck, Update sends each remaining +// transaction after removing blockTxs to the ABCI CheckTx method. Any +// transactions marked as invalid during recheck are also removed. +// +// The caller must hold an exclusive mempool lock (by calling txmp.Lock) before +// calling Update. func (txmp *TxMempool) Update( ctx context.Context, blockHeight int64, blockTxs types.Txs, - execTxResult []*abci.ExecTxResult, + deliverTxResponses []*abci.ExecTxResult, newPreFn PreCheckFunc, newPostFn PostCheckFunc, recheck bool, ) error { + // Safety check: Transactions and responses must match in number. + if len(blockTxs) != len(deliverTxResponses) { + panic(fmt.Sprintf("mempool: got %d transactions but %d ExecTx responses", + len(blockTxs), len(deliverTxResponses))) + } + txmp.height = blockHeight txmp.notifiedTxsAvailable = false @@ -436,18 +403,17 @@ func (txmp *TxMempool) Update( } for i, tx := range blockTxs { - if execTxResult[i].Code == abci.CodeTypeOK { - // add the valid committed transaction to the cache (if missing) + // Add successful committed transactions to the cache (if they are not + // already present). Transactions that failed to commit are removed from + // the cache unless the operator has explicitly requested we keep them. + if deliverTxResponses[i].Code == abci.CodeTypeOK { _ = txmp.cache.Push(tx) } else if !txmp.config.KeepInvalidTxsInCache { - // allow invalid transactions to be re-submitted txmp.cache.Remove(tx) } - // remove the committed transaction from the transaction store and indexes - if wtx := txmp.txStore.GetTxByHash(tx.Key()); wtx != nil { - txmp.removeTx(wtx, false) - } + // Regardless of success, remove the transaction from the mempool. + _ = txmp.removeTxByKey(tx.Key()) } txmp.purgeExpiredTxs(blockHeight) @@ -455,134 +421,173 @@ func (txmp *TxMempool) Update( // If there any uncommitted transactions left in the mempool, we either // initiate re-CheckTx per remaining transaction or notify that remaining // transactions are left. - if txmp.Size() > 0 { + size := txmp.Size() + txmp.metrics.Size.Set(float64(size)) + if size > 0 { if recheck { - txmp.logger.Debug( - "executing re-CheckTx for all remaining transactions", - "num_txs", txmp.Size(), - "height", blockHeight, - ) - txmp.updateReCheckTxs(ctx) + txmp.recheckTransactions(ctx) } else { txmp.notifyTxsAvailable() } } - - txmp.metrics.Size.Set(float64(txmp.Size())) return nil } -// initTxCallback is the callback invoked for a new unique transaction after CheckTx -// has been executed by the ABCI application for the first time on that transaction. -// CheckTx can be called again for the same transaction later when re-checking; -// however, this callback will not be called. +// addNewTransaction handles the ABCI CheckTx response for the first time a +// transaction is added to the mempool. A recheck after a block is committed +// goes to handleRecheckResult. // -// initTxCallback runs after the ABCI application executes CheckTx. -// It runs the postCheck hook if one is defined on the mempool. -// If the CheckTx response response code is not OK, or if the postCheck hook -// reports an error, the transaction is rejected. Otherwise, we attempt to insert -// the transaction into the mempool. +// If either the application rejected the transaction or a post-check hook is +// defined and rejects the transaction, it is discarded. // -// When inserting a transaction, we first check if there is sufficient capacity. -// If there is, the transaction is added to the txStore and all indexes. -// Otherwise, if the mempool is full, we attempt to find a lower priority transaction -// to evict in place of the new incoming transaction. If no such transaction exists, -// the new incoming transaction is rejected. +// Otherwise, if the mempool is full, check for lower-priority transactions +// that can be evicted to make room for the new one. If no such transactions +// exist, this transaction is logged and dropped; otherwise the selected +// transactions are evicted. // -// NOTE: -// - An explicit lock is NOT required. -func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.ResponseCheckTx, txInfo TxInfo) error { +// Finally, the new transaction is added and size stats updated. +func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) error { + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + var err error if txmp.postCheck != nil { - err = txmp.postCheck(wtx.tx, res) + err = txmp.postCheck(wtx.tx, checkTxRes) } - if err != nil || res.Code != abci.CodeTypeOK { - // ignore bad transactions + if err != nil || checkTxRes.Code != abci.CodeTypeOK { txmp.logger.Info( "rejected bad transaction", - "priority", wtx.priority, + "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "peer_id", txInfo.SenderNodeID, - "code", res.Code, + "peer_id", wtx.peers, + "code", checkTxRes.Code, "post_check_err", err, ) txmp.metrics.FailedTxs.Add(1) + // Remove the invalid transaction from the cache, unless the operator has + // instructed us to keep invalid transactions. if !txmp.config.KeepInvalidTxsInCache { txmp.cache.Remove(wtx.tx) } - return err + + if err != nil { + return err + } + // TODO(creachadair): Report an error for an invalid transaction. + // This is an API change, unfortunately, but should be made safe if it isn't. + // fmt.Errorf("invalid transaction: ABCI response code %d", checkTxRes.Code) + return nil } - sender := res.Sender - priority := res.Priority + priority := checkTxRes.Priority + sender := checkTxRes.Sender - if len(sender) > 0 { - if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil { - txmp.logger.Error( - "rejected incoming good transaction; tx already exists for sender", - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + // Disallow multiple concurrent transactions from the same sender assigned + // by the ABCI application. As a special case, an empty sender is not + // restricted. + if sender != "" { + elt, ok := txmp.txBySender[sender] + if ok { + w := elt.Value.(*WrappedTx) + txmp.logger.Debug( + "rejected valid incoming transaction; tx already exists for sender", + "tx", fmt.Sprintf("%X", w.tx.Hash()), "sender", sender, ) txmp.metrics.RejectedTxs.Add(1) + // TODO(creachadair): Report an error for a duplicate sender. + // This is an API change, unfortunately, but should be made safe if it isn't. + // fmt.Errorf("transaction rejected: tx already exists for sender %q (%X)", sender, w.tx.Hash()) return nil } } + // At this point the application has ruled the transaction valid, but the + // mempool might be full. If so, find the lowest-priority items with lower + // priority than the application assigned to this new one, and evict as many + // of them as necessary to make room for tx. If no such items exist, we + // discard tx. + if err := txmp.canAddTx(wtx); err != nil { - evictTxs := txmp.priorityIndex.GetEvictableTxs( - priority, - int64(wtx.Size()), - txmp.SizeBytes(), - txmp.config.MaxTxsBytes, - ) - if len(evictTxs) == 0 { - // No room for the new incoming transaction so we just remove it from - // the cache and return an error to the user. + var victims []*clist.CElement // eligible transactions for eviction + var victimBytes int64 // total size of victims + for cur := txmp.txs.Front(); cur != nil; cur = cur.Next() { + cw := cur.Value.(*WrappedTx) + if cw.priority < priority { + victims = append(victims, cur) + victimBytes += cw.Size() + } + } + + // If there are no suitable eviction candidates, or the total size of + // those candidates is not enough to make room for the new transaction, + // drop the new one. + if len(victims) == 0 || victimBytes < wtx.Size() { txmp.cache.Remove(wtx.tx) txmp.logger.Error( - "rejected incoming good transaction; mempool full", + "rejected valid incoming transaction; mempool is full", "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "err", err.Error(), ) txmp.metrics.RejectedTxs.Add(1) - return err + // TODO(creachadair): Report an error for a full mempool. + // This is an API change, unfortunately, but should be made safe if it isn't. + // fmt.Errorf("transaction rejected: mempool is full (%X)", wtx.tx.Hash()) + return nil } - // evict an existing transaction(s) - // - // NOTE: - // - The transaction, toEvict, can be removed while a concurrent - // reCheckTx callback is being executed for the same transaction. - for _, toEvict := range evictTxs { - txmp.removeTx(toEvict, true) + txmp.logger.Debug("evicting lower-priority transactions", + "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "new_priority", priority, + ) + + // Sort lowest priority items first so they will be evicted first. Break + // ties in favor of newer items (to maintain FIFO semantics in a group). + sort.Slice(victims, func(i, j int) bool { + iw := victims[i].Value.(*WrappedTx) + jw := victims[j].Value.(*WrappedTx) + if iw.Priority() == jw.Priority() { + return iw.timestamp.After(jw.timestamp) + } + return iw.Priority() < jw.Priority() + }) + + // Evict as many of the victims as necessary to make room. + var evictedBytes int64 + for _, vic := range victims { + w := vic.Value.(*WrappedTx) + txmp.logger.Debug( - "evicted existing good transaction; mempool full", - "old_tx", fmt.Sprintf("%X", toEvict.tx.Hash()), - "old_priority", toEvict.priority, - "new_tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "new_priority", wtx.priority, + "evicted valid existing transaction; mempool full", + "old_tx", fmt.Sprintf("%X", w.tx.Hash()), + "old_priority", w.priority, ) + txmp.removeTxByElement(vic) + txmp.cache.Remove(w.tx) txmp.metrics.EvictedTxs.Add(1) + + // We may not need to evict all the eligible transactions. Bail out + // early if we have made enough room. + evictedBytes += w.Size() + if evictedBytes >= wtx.Size() { + break + } } } - wtx.gasWanted = res.GasWanted - wtx.priority = priority - wtx.sender = sender - wtx.peers = map[uint16]struct{}{ - txInfo.SenderID: {}, - } + wtx.SetGasWanted(checkTxRes.GasWanted) + wtx.SetPriority(priority) + wtx.SetSender(sender) + txmp.insertTx(wtx) txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) txmp.metrics.Size.Set(float64(txmp.Size())) - - txmp.insertTx(wtx) txmp.logger.Debug( - "inserted good transaction", - "priority", wtx.priority, + "inserted new valid transaction", + "priority", wtx.Priority(), "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "height", txmp.height, "num_txs", txmp.Size(), @@ -591,149 +596,129 @@ func (txmp *TxMempool) initTxCallback(wtx *WrappedTx, res *abci.ResponseCheckTx, return nil } -// defaultTxCallback is the CheckTx application callback used when a -// transaction is being re-checked (if re-checking is enabled). The -// caller must hold a mempool write-lock (via Lock()) and when -// executing Update(), if the mempool is non-empty and Recheck is -// enabled, then all remaining transactions will be rechecked via -// CheckTx. The order transactions are rechecked must be the same as -// the order in which this callback is called. -func (txmp *TxMempool) defaultTxCallback(tx types.Tx, res *abci.ResponseCheckTx) { - if txmp.recheckCursor == nil { +func (txmp *TxMempool) insertTx(wtx *WrappedTx) { + elt := txmp.txs.PushBack(wtx) + txmp.txByKey[wtx.tx.Key()] = elt + if s := wtx.Sender(); s != "" { + txmp.txBySender[s] = elt + } + + atomic.AddInt64(&txmp.txsBytes, wtx.Size()) +} + +// handleRecheckResult handles the responses from ABCI CheckTx calls issued +// during the recheck phase of a block Update. It removes any transactions +// invalidated by the application. +// +// This method is NOT executed for the initial CheckTx on a new transaction; +// that case is handled by addNewTransaction instead. +func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) { + txmp.metrics.RecheckTimes.Add(1) + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + + // Find the transaction reported by the ABCI callback. It is possible the + // transaction was evicted during the recheck, in which case the transaction + // will be gone. + elt, ok := txmp.txByKey[tx.Key()] + if !ok { return } + wtx := elt.Value.(*WrappedTx) - txmp.metrics.RecheckTimes.Add(1) - - wtx := txmp.recheckCursor.Value.(*WrappedTx) - - // Search through the remaining list of tx to recheck for a transaction that matches - // the one we received from the ABCI application. - for { - if bytes.Equal(tx, wtx.tx) { - // We've found a tx in the recheck list that matches the tx that we - // received from the ABCI application. - // Break, and use this transaction for further checks. - break - } - - txmp.logger.Error( - "re-CheckTx transaction mismatch", - "got", wtx.tx.Hash(), - "expected", tx.Key(), - ) - - if txmp.recheckCursor == txmp.recheckEnd { - // we reached the end of the recheckTx list without finding a tx - // matching the one we received from the ABCI application. - // Return without processing any tx. - txmp.recheckCursor = nil - return - } - - txmp.recheckCursor = txmp.recheckCursor.Next() - wtx = txmp.recheckCursor.Value.(*WrappedTx) + // If a postcheck hook is defined, call it before checking the result. + var err error + if txmp.postCheck != nil { + err = txmp.postCheck(tx, checkTxRes) } - // Only evaluate transactions that have not been removed. This can happen - // if an existing transaction is evicted during CheckTx and while this - // callback is being executed for the same evicted transaction. - if !txmp.txStore.IsTxRemoved(wtx.hash) { - var err error - if txmp.postCheck != nil { - err = txmp.postCheck(tx, res) - } - - if res.Code == abci.CodeTypeOK && err == nil { - wtx.priority = res.Priority - } else { - txmp.logger.Debug( - "existing transaction no longer valid; failed re-CheckTx callback", - "priority", wtx.priority, - "tx", fmt.Sprintf("%X", wtx.tx.Hash()), - "err", err, - "code", res.Code, - ) - - if wtx.gossipEl != txmp.recheckCursor { - panic("corrupted reCheckTx cursor") - } - - txmp.removeTx(wtx, !txmp.config.KeepInvalidTxsInCache) - } + if checkTxRes.Code == abci.CodeTypeOK && err == nil { + wtx.SetPriority(checkTxRes.Priority) + return // N.B. Size of mempool did not change } - // move reCheckTx cursor to next element - if txmp.recheckCursor == txmp.recheckEnd { - txmp.recheckCursor = nil - } else { - txmp.recheckCursor = txmp.recheckCursor.Next() + txmp.logger.Debug( + "existing transaction no longer valid; failed re-CheckTx callback", + "priority", wtx.Priority(), + "tx", fmt.Sprintf("%X", wtx.tx.Hash()), + "err", err, + "code", checkTxRes.Code, + ) + txmp.removeTxByElement(elt) + txmp.metrics.FailedTxs.Add(1) + if !txmp.config.KeepInvalidTxsInCache { + txmp.cache.Remove(wtx.tx) } - - if txmp.recheckCursor == nil { - txmp.logger.Debug("finished rechecking transactions") - - if txmp.Size() > 0 { - txmp.notifyTxsAvailable() - } - } - txmp.metrics.Size.Set(float64(txmp.Size())) } -// updateReCheckTxs updates the recheck cursors using the gossipIndex. For -// each transaction, it executes CheckTx. The global callback defined on -// the proxyAppConn will be executed for each transaction after CheckTx is -// executed. +// recheckTransactions initiates re-CheckTx ABCI calls for all the transactions +// currently in the mempool. It reports the number of recheck calls that were +// successfully initiated. // -// NOTE: -// - The caller must have a write-lock when executing updateReCheckTxs. -func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) { +// Precondition: The mempool is not empty. +// The caller must hold txmp.mtx exclusively. +func (txmp *TxMempool) recheckTransactions(ctx context.Context) { if txmp.Size() == 0 { - panic("attempted to update re-CheckTx txs when mempool is empty") + panic("mempool: cannot run recheck on an empty mempool") + } + txmp.logger.Debug( + "executing re-CheckTx for all remaining transactions", + "num_txs", txmp.Size(), + "height", txmp.height, + ) + + // Collect transactions currently in the mempool requiring recheck. + wtxs := make([]*WrappedTx, 0, txmp.txs.Len()) + for e := txmp.txs.Front(); e != nil; e = e.Next() { + wtxs = append(wtxs, e.Value.(*WrappedTx)) } - txmp.recheckCursor = txmp.gossipIndex.Front() - txmp.recheckEnd = txmp.gossipIndex.Back() + // Issue CheckTx calls for each remaining transaction, and when all the + // rechecks are complete signal watchers that transactions may be available. + go func() { + g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU()) - for e := txmp.gossipIndex.Front(); e != nil; e = e.Next() { - wtx := e.Value.(*WrappedTx) - - // Only execute CheckTx if the transaction is not marked as removed which - // could happen if the transaction was evicted. - if !txmp.txStore.IsTxRemoved(wtx.hash) { - res, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{ - Tx: wtx.tx, - Type: abci.CheckTxType_Recheck, + for _, wtx := range wtxs { + wtx := wtx + start(func() error { + rsp, err := txmp.proxyAppConn.CheckTx(ctx, &abci.RequestCheckTx{ + Tx: wtx.tx, + Type: abci.CheckTxType_Recheck, + }) + if err != nil { + txmp.logger.Error("failed to execute CheckTx during recheck", + "err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash())) + } else { + txmp.handleRecheckResult(wtx.tx, rsp) + } + return nil }) - if err != nil { - // no need in retrying since the tx will be rechecked after the next block - txmp.logger.Error("failed to execute CheckTx during rechecking", "err", err) - continue - } - txmp.defaultTxCallback(wtx.tx, res) } - } + if err := txmp.proxyAppConn.Flush(ctx); err != nil { + txmp.logger.Error("failed to flush transactions during recheck", "err", err) + } - if err := txmp.proxyAppConn.Flush(ctx); err != nil { - txmp.logger.Error("failed to flush transactions during rechecking", "err", err) - } + // When recheck is complete, trigger a notification for more transactions. + _ = g.Wait() + txmp.mtx.Lock() + defer txmp.mtx.Unlock() + txmp.notifyTxsAvailable() + }() } // canAddTx returns an error if we cannot insert the provided *WrappedTx into -// the mempool due to mempool configured constraints. If it returns nil, -// the transaction can be inserted into the mempool. +// the mempool due to mempool configured constraints. Otherwise, nil is +// returned and the transaction can be inserted into the mempool. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { - var ( - numTxs = txmp.Size() - sizeBytes = txmp.SizeBytes() - ) + numTxs := txmp.Size() + txBytes := txmp.SizeBytes() - if numTxs >= txmp.config.Size || int64(wtx.Size())+sizeBytes > txmp.config.MaxTxsBytes { + if numTxs >= txmp.config.Size || wtx.Size()+txBytes > txmp.config.MaxTxsBytes { return types.ErrMempoolIsFull{ NumTxs: numTxs, MaxTxs: txmp.config.Size, - TxsBytes: sizeBytes, + TxsBytes: txBytes, MaxTxsBytes: txmp.config.MaxTxsBytes, } } @@ -741,96 +726,40 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { return nil } -func (txmp *TxMempool) insertTx(wtx *WrappedTx) { - txmp.txStore.SetTx(wtx) - txmp.priorityIndex.PushTx(wtx) - txmp.heightIndex.Insert(wtx) - txmp.timestampIndex.Insert(wtx) - - // Insert the transaction into the gossip index and mark the reference to the - // linked-list element, which will be needed at a later point when the - // transaction is removed. - gossipEl := txmp.gossipIndex.PushBack(wtx) - wtx.gossipEl = gossipEl - - atomic.AddInt64(&txmp.sizeBytes, int64(wtx.Size())) -} - -func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) { - if txmp.txStore.IsTxRemoved(wtx.hash) { - return - } - - txmp.txStore.RemoveTx(wtx) - txmp.priorityIndex.RemoveTx(wtx) - txmp.heightIndex.Remove(wtx) - txmp.timestampIndex.Remove(wtx) - - // Remove the transaction from the gossip index and cleanup the linked-list - // element so it can be garbage collected. - txmp.gossipIndex.Remove(wtx.gossipEl) - wtx.gossipEl.DetachPrev() - - atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size())) - - if removeFromCache { - txmp.cache.Remove(wtx.tx) - } -} - -// purgeExpiredTxs removes all transactions that have exceeded their respective -// height- and/or time-based TTLs from their respective indexes. Every expired -// transaction will be removed from the mempool, but preserved in the cache. +// purgeExpiredTxs removes all transactions from the mempool that have exceeded +// their respective height or time-based limits as of the given blockHeight. +// Transactions removed by this operation are not removed from the cache. // -// NOTE: purgeExpiredTxs must only be called during TxMempool#Update in which -// the caller has a write-lock on the mempool and so we can safely iterate over -// the height and time based indexes. +// The caller must hold txmp.mtx exclusively. func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { + if txmp.config.TTLNumBlocks == 0 && txmp.config.TTLDuration == 0 { + return // nothing to do + } + now := time.Now() - expiredTxs := make(map[types.TxKey]*WrappedTx) + cur := txmp.txs.Front() + for cur != nil { + // N.B. Grab the next element first, since if we remove cur its successor + // will be invalidated. + next := cur.Next() - if txmp.config.TTLNumBlocks > 0 { - purgeIdx := -1 - for i, wtx := range txmp.heightIndex.txs { - if (blockHeight - wtx.height) > txmp.config.TTLNumBlocks { - expiredTxs[wtx.tx.Key()] = wtx - purgeIdx = i - } else { - // since the index is sorted, we know no other txs can be be purged - break - } + w := cur.Value.(*WrappedTx) + if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks { + txmp.removeTxByElement(cur) + txmp.cache.Remove(w.tx) + txmp.metrics.EvictedTxs.Add(1) + } else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration { + txmp.removeTxByElement(cur) + txmp.cache.Remove(w.tx) + txmp.metrics.EvictedTxs.Add(1) } - - if purgeIdx >= 0 { - txmp.heightIndex.txs = txmp.heightIndex.txs[purgeIdx+1:] - } - } - - if txmp.config.TTLDuration > 0 { - purgeIdx := -1 - for i, wtx := range txmp.timestampIndex.txs { - if now.Sub(wtx.timestamp) > txmp.config.TTLDuration { - expiredTxs[wtx.tx.Key()] = wtx - purgeIdx = i - } else { - // since the index is sorted, we know no other txs can be be purged - break - } - } - - if purgeIdx >= 0 { - txmp.timestampIndex.txs = txmp.timestampIndex.txs[purgeIdx+1:] - } - } - - for _, wtx := range expiredTxs { - txmp.removeTx(wtx, false) + cur = next } } func (txmp *TxMempool) notifyTxsAvailable() { if txmp.Size() == 0 { - panic("attempt to notify txs available but mempool is empty!") + return // nothing to do } if txmp.txsAvailable != nil && !txmp.notifiedTxsAvailable { diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index 42fb13bdc..2071d1f05 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -86,9 +86,19 @@ func setup(t testing.TB, app abciclient.Client, cacheSize int, options ...TxMemp return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, app, options...) } -func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { - t.Helper() +// mustCheckTx invokes txmp.CheckTx for the given transaction and waits until +// its callback has finished executing. It fails t if CheckTx fails. +func mustCheckTx(ctx context.Context, t *testing.T, txmp *TxMempool, spec string) { + done := make(chan struct{}) + if err := txmp.CheckTx(ctx, []byte(spec), func(*abci.ResponseCheckTx) { + close(done) + }, TxInfo{}); err != nil { + t.Fatalf("CheckTx for %q failed: %v", spec, err) + } + <-done +} +func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { txs := make([]testTx, numTxs) txInfo := TxInfo{SenderID: peerID} @@ -217,6 +227,87 @@ func TestTxMempool_Size(t *testing.T) { require.Equal(t, int64(2850), txmp.SizeBytes()) } +func TestTxMempool_Eviction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(t, client, 1000) + txmp.config.Size = 5 + txmp.config.MaxTxsBytes = 60 + txExists := func(spec string) bool { + txmp.Lock() + defer txmp.Unlock() + key := types.Tx(spec).Key() + _, ok := txmp.txByKey[key] + return ok + } + t.Cleanup(client.Wait) + + // A transaction bigger than the mempool should be rejected even when there + // are slots available. + mustCheckTx(ctx, t, txmp, "big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1") + require.Equal(t, 0, txmp.Size()) + + // Nearly-fill the mempool with a low-priority transaction, to show that it + // is evicted even when slots are available for a higher-priority tx. + const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2" + mustCheckTx(ctx, t, txmp, bigTx) + require.Equal(t, 1, txmp.Size()) // bigTx is the only element + require.True(t, txExists(bigTx)) + require.Equal(t, int64(len(bigTx)), txmp.SizeBytes()) + + // The next transaction should evict bigTx, because it is higher priority + // but does not fit on size. + mustCheckTx(ctx, t, txmp, "key1=0000=25") + require.True(t, txExists("key1=0000=25")) + require.False(t, txExists(bigTx)) + require.False(t, txmp.cache.Has([]byte(bigTx))) + require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) + + // Now fill up the rest of the slots with other transactions. + mustCheckTx(ctx, t, txmp, "key2=0001=5") + mustCheckTx(ctx, t, txmp, "key3=0002=10") + mustCheckTx(ctx, t, txmp, "key4=0003=3") + mustCheckTx(ctx, t, txmp, "key5=0004=3") + + // A new transaction with low priority should be discarded. + mustCheckTx(ctx, t, txmp, "key6=0005=1") + require.False(t, txExists("key6=0005=1")) + + // A new transaction with higher priority should evict key5, which is the + // newest of the two transactions with lowest priority. + mustCheckTx(ctx, t, txmp, "key7=0006=7") + require.True(t, txExists("key7=0006=7")) // new transaction added + require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted + require.True(t, txExists("key4=0003=3")) // older low-priority tx retained + + // Another new transaction evicts the other low-priority element. + mustCheckTx(ctx, t, txmp, "key8=0007=20") + require.True(t, txExists("key8=0007=20")) + require.False(t, txExists("key4=0003=3")) + + // Now the lowest-priority tx is 5, so that should be the next to go. + mustCheckTx(ctx, t, txmp, "key9=0008=9") + require.True(t, txExists("key9=0008=9")) + require.False(t, txExists("k3y2=0001=5")) + + // Add a transaction that requires eviction of multiple lower-priority + // entries, in order to fit the size of the element. + mustCheckTx(ctx, t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11 + require.True(t, txExists("key1=0000=25")) + require.True(t, txExists("key8=0007=20")) + require.True(t, txExists("key10=0123456789abcdef=11")) + require.False(t, txExists("key3=0002=10")) + require.False(t, txExists("key9=0008=9")) + require.False(t, txExists("key7=0006=7")) +} + func TestTxMempool_Flush(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -537,7 +628,6 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { tTxs := checkTxs(ctx, t, txmp, 100, 0) require.Equal(t, len(tTxs), txmp.Size()) - require.Equal(t, 100, txmp.heightIndex.Size()) // reap 5 txs at the next height -- no txs should expire reapedTxs := txmp.ReapMaxTxs(5) @@ -551,12 +641,10 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp.Unlock() require.Equal(t, 95, txmp.Size()) - require.Equal(t, 95, txmp.heightIndex.Size()) // check more txs at height 101 _ = checkTxs(ctx, t, txmp, 50, 1) require.Equal(t, 145, txmp.Size()) - require.Equal(t, 145, txmp.heightIndex.Size()) // Reap 5 txs at a height that would expire all the transactions from before // the previous Update (height 100). @@ -577,7 +665,6 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp.Unlock() require.GreaterOrEqual(t, txmp.Size(), 45) - require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45) } func TestTxMempool_CheckTxPostCheckError(t *testing.T) { diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go deleted file mode 100644 index e31997397..000000000 --- a/internal/mempool/priority_queue.go +++ /dev/null @@ -1,158 +0,0 @@ -package mempool - -import ( - "container/heap" - "sort" - "sync" -) - -var _ heap.Interface = (*TxPriorityQueue)(nil) - -// TxPriorityQueue defines a thread-safe priority queue for valid transactions. -type TxPriorityQueue struct { - mtx sync.RWMutex - txs []*WrappedTx -} - -func NewTxPriorityQueue() *TxPriorityQueue { - pq := &TxPriorityQueue{ - txs: make([]*WrappedTx, 0), - } - - heap.Init(pq) - - return pq -} - -// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be -// evicted to make room for another *WrappedTx with higher priority. If no such -// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx -// indicate that these transactions can be removed due to them being of lower -// priority and that their total sum in size allows room for the incoming -// transaction according to the mempool's configured limits. -func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx { - pq.mtx.RLock() - defer pq.mtx.RUnlock() - - txs := make([]*WrappedTx, len(pq.txs)) - copy(txs, pq.txs) - - sort.Slice(txs, func(i, j int) bool { - return txs[i].priority < txs[j].priority - }) - - var ( - toEvict []*WrappedTx - i int - ) - - currSize := totalSize - - // Loop over all transactions in ascending priority order evaluating those - // that are only of less priority than the provided argument. We continue - // evaluating transactions until there is sufficient capacity for the new - // transaction (size) as defined by txSize. - for i < len(txs) && txs[i].priority < priority { - toEvict = append(toEvict, txs[i]) - currSize -= int64(txs[i].Size()) - - if currSize+txSize <= cap { - return toEvict - } - - i++ - } - - return nil -} - -// NumTxs returns the number of transactions in the priority queue. It is -// thread safe. -func (pq *TxPriorityQueue) NumTxs() int { - pq.mtx.RLock() - defer pq.mtx.RUnlock() - - return len(pq.txs) -} - -// RemoveTx removes a specific transaction from the priority queue. -func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - if tx.heapIndex < len(pq.txs) { - heap.Remove(pq, tx.heapIndex) - } -} - -// PushTx adds a valid transaction to the priority queue. It is thread safe. -func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - heap.Push(pq, tx) -} - -// PopTx removes the top priority transaction from the queue. It is thread safe. -func (pq *TxPriorityQueue) PopTx() *WrappedTx { - pq.mtx.Lock() - defer pq.mtx.Unlock() - - x := heap.Pop(pq) - if x != nil { - return x.(*WrappedTx) - } - - return nil -} - -// Push implements the Heap interface. -// -// NOTE: A caller should never call Push. Use PushTx instead. -func (pq *TxPriorityQueue) Push(x interface{}) { - n := len(pq.txs) - item := x.(*WrappedTx) - item.heapIndex = n - pq.txs = append(pq.txs, item) -} - -// Pop implements the Heap interface. -// -// NOTE: A caller should never call Pop. Use PopTx instead. -func (pq *TxPriorityQueue) Pop() interface{} { - old := pq.txs - n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.heapIndex = -1 // for safety - pq.txs = old[0 : n-1] - return item -} - -// Len implements the Heap interface. -// -// NOTE: A caller should never call Len. Use NumTxs instead. -func (pq *TxPriorityQueue) Len() int { - return len(pq.txs) -} - -// Less implements the Heap interface. It returns true if the transaction at -// position i in the queue is of less priority than the transaction at position j. -func (pq *TxPriorityQueue) Less(i, j int) bool { - // If there exists two transactions with the same priority, consider the one - // that we saw the earliest as the higher priority transaction. - if pq.txs[i].priority == pq.txs[j].priority { - return pq.txs[i].timestamp.Before(pq.txs[j].timestamp) - } - - // We want Pop to give us the highest, not lowest, priority so we use greater - // than here. - return pq.txs[i].priority > pq.txs[j].priority -} - -// Swap implements the Heap interface. It swaps two transactions in the queue. -func (pq *TxPriorityQueue) Swap(i, j int) { - pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i] - pq.txs[i].heapIndex = i - pq.txs[j].heapIndex = j -} diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go deleted file mode 100644 index 90f611162..000000000 --- a/internal/mempool/priority_queue_test.go +++ /dev/null @@ -1,176 +0,0 @@ -package mempool - -import ( - "math/rand" - "sort" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestTxPriorityQueue(t *testing.T) { - pq := NewTxPriorityQueue() - numTxs := 1000 - - priorities := make([]int, numTxs) - - var wg sync.WaitGroup - for i := 1; i <= numTxs; i++ { - priorities[i-1] = i - wg.Add(1) - - go func(i int) { - pq.PushTx(&WrappedTx{ - priority: int64(i), - timestamp: time.Now(), - }) - - wg.Done() - }(i) - } - - sort.Sort(sort.Reverse(sort.IntSlice(priorities))) - - wg.Wait() - require.Equal(t, numTxs, pq.NumTxs()) - - // Wait a second and push a tx with a duplicate priority - time.Sleep(time.Second) - now := time.Now() - pq.PushTx(&WrappedTx{ - priority: 1000, - timestamp: now, - }) - require.Equal(t, 1001, pq.NumTxs()) - - tx := pq.PopTx() - require.Equal(t, 1000, pq.NumTxs()) - require.Equal(t, int64(1000), tx.priority) - require.NotEqual(t, now, tx.timestamp) - - gotPriorities := make([]int, 0) - for pq.NumTxs() > 0 { - gotPriorities = append(gotPriorities, int(pq.PopTx().priority)) - } - - require.Equal(t, priorities, gotPriorities) -} - -func TestTxPriorityQueue_GetEvictableTxs(t *testing.T) { - pq := NewTxPriorityQueue() - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - values := make([]int, 1000) - - for i := 0; i < 1000; i++ { - tx := make([]byte, 5) // each tx is 5 bytes - _, err := rng.Read(tx) - require.NoError(t, err) - - x := rng.Intn(100000) - pq.PushTx(&WrappedTx{ - tx: tx, - priority: int64(x), - }) - - values[i] = x - } - - sort.Ints(values) - - max := values[len(values)-1] - min := values[0] - totalSize := int64(len(values) * 5) - - testCases := []struct { - name string - priority, txSize, totalSize, cap int64 - expectedLen int - }{ - { - name: "largest priority; single tx", - priority: int64(max + 1), - txSize: 5, - totalSize: totalSize, - cap: totalSize, - expectedLen: 1, - }, - { - name: "largest priority; multi tx", - priority: int64(max + 1), - txSize: 17, - totalSize: totalSize, - cap: totalSize, - expectedLen: 4, - }, - { - name: "largest priority; out of capacity", - priority: int64(max + 1), - txSize: totalSize + 1, - totalSize: totalSize, - cap: totalSize, - expectedLen: 0, - }, - { - name: "smallest priority; no tx", - priority: int64(min - 1), - txSize: 5, - totalSize: totalSize, - cap: totalSize, - expectedLen: 0, - }, - { - name: "small priority; no tx", - priority: int64(min), - txSize: 5, - totalSize: totalSize, - cap: totalSize, - expectedLen: 0, - }, - } - - for _, tc := range testCases { - tc := tc - - t.Run(tc.name, func(t *testing.T) { - evictTxs := pq.GetEvictableTxs(tc.priority, tc.txSize, tc.totalSize, tc.cap) - require.Len(t, evictTxs, tc.expectedLen) - }) - } -} - -func TestTxPriorityQueue_RemoveTx(t *testing.T) { - pq := NewTxPriorityQueue() - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - numTxs := 1000 - - values := make([]int, numTxs) - - for i := 0; i < numTxs; i++ { - x := rng.Intn(100000) - pq.PushTx(&WrappedTx{ - priority: int64(x), - }) - - values[i] = x - } - - require.Equal(t, numTxs, pq.NumTxs()) - - sort.Ints(values) - max := values[len(values)-1] - - wtx := pq.txs[pq.NumTxs()/2] - pq.RemoveTx(wtx) - require.Equal(t, numTxs-1, pq.NumTxs()) - require.Equal(t, int64(max), pq.PopTx().priority) - require.Equal(t, numTxs-2, pq.NumTxs()) - - require.NotPanics(t, func() { - pq.RemoveTx(&WrappedTx{heapIndex: numTxs}) - pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1}) - }) - require.Equal(t, numTxs-2, pq.NumTxs()) -} diff --git a/internal/mempool/reactor.go b/internal/mempool/reactor.go index 62cdf386c..1f4b3f78a 100644 --- a/internal/mempool/reactor.go +++ b/internal/mempool/reactor.go @@ -307,8 +307,8 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, m select { case <-ctx.Done(): return - case <-r.mempool.WaitForNextTx(): // wait until a tx is available - if nextGossipTx = r.mempool.NextGossipTx(); nextGossipTx == nil { + case <-r.mempool.TxsWaitChan(): // wait until a tx is available + if nextGossipTx = r.mempool.TxsFront(); nextGossipTx == nil { continue } } @@ -318,7 +318,7 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, m // NOTE: Transaction batching was disabled due to: // https://github.com/tendermint/tendermint/issues/5796 - if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok { + if !memTx.HasPeer(peerMempoolID) { // Send the mempool tx to the corresponding peer. Note, the peer may be // behind and thus would not be able to process the mempool tx correctly. if err := mempoolCh.Send(ctx, p2p.Envelope{ diff --git a/internal/mempool/reactor_test.go b/internal/mempool/reactor_test.go index ee7fe777f..bd6ccf8b2 100644 --- a/internal/mempool/reactor_test.go +++ b/internal/mempool/reactor_test.go @@ -68,7 +68,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode require.NoError(t, client.Start(ctx)) t.Cleanup(client.Wait) - mempool := setup(t, client, 0) + mempool := setup(t, client, 1<<20) rts.mempools[nodeID] = mempool rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf) @@ -170,7 +170,9 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { secondaryReactor.observePanic = observePanic firstTx := &WrappedTx{} + primaryMempool.Lock() primaryMempool.insertTx(firstTx) + primaryMempool.Unlock() // run the router rts.start(ctx, t) @@ -183,6 +185,8 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) { wg.Add(1) go func() { defer wg.Done() + primaryMempool.Lock() + defer primaryMempool.Unlock() primaryMempool.insertTx(next) }() } diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index c7113c951..1a221e2c3 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -1,11 +1,9 @@ package mempool import ( - "sort" "sync" "time" - "github.com/tendermint/tendermint/internal/libs/clist" "github.com/tendermint/tendermint/types" ) @@ -24,270 +22,78 @@ type TxInfo struct { // WrappedTx defines a wrapper around a raw transaction with additional metadata // that is used for indexing. type WrappedTx struct { - // tx represents the raw binary transaction data - tx types.Tx + tx types.Tx // the original transaction data + hash types.TxKey // the transaction hash + height int64 // height when this transaction was initially checked (for expiry) + timestamp time.Time // time when transaction was entered (for TTL) - // hash defines the transaction hash and the primary key used in the mempool - hash types.TxKey - - // height defines the height at which the transaction was validated at - height int64 - - // gasWanted defines the amount of gas the transaction sender requires - gasWanted int64 - - // priority defines the transaction's priority as specified by the application - // in the ResponseCheckTx response. - priority int64 - - // sender defines the transaction's sender as specified by the application in - // the ResponseCheckTx response. - sender string - - // timestamp is the time at which the node first received the transaction from - // a peer. It is used as a second dimension is prioritizing transactions when - // two transactions have the same priority. - timestamp time.Time - - // peers records a mapping of all peers that sent a given transaction - peers map[uint16]struct{} - - // heapIndex defines the index of the item in the heap - heapIndex int - - // gossipEl references the linked-list element in the gossip index - gossipEl *clist.CElement - - // removed marks the transaction as removed from the mempool. This is set - // during RemoveTx and is needed due to the fact that a given existing - // transaction in the mempool can be evicted when it is simultaneously having - // a reCheckTx callback executed. - removed bool + mtx sync.Mutex + gasWanted int64 // app: gas required to execute this transaction + priority int64 // app: priority value for this transaction + sender string // app: assigned sender label + peers map[uint16]bool // peer IDs who have sent us this transaction } -func (wtx *WrappedTx) Size() int { - return len(wtx.tx) -} +// Size reports the size of the raw transaction in bytes. +func (w *WrappedTx) Size() int64 { return int64(len(w.tx)) } -// TxStore implements a thread-safe mapping of valid transaction(s). -// -// NOTE: -// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative -// access is not allowed. Regardless, it is not expected for the mempool to -// need mutative access. -type TxStore struct { - mtx sync.RWMutex - hashTxs map[types.TxKey]*WrappedTx // primary index - senderTxs map[string]*WrappedTx // sender is defined by the ABCI application -} - -func NewTxStore() *TxStore { - return &TxStore{ - senderTxs: make(map[string]*WrappedTx), - hashTxs: make(map[types.TxKey]*WrappedTx), +// SetPeer adds the specified peer ID as a sender of w. +func (w *WrappedTx) SetPeer(id uint16) { + w.mtx.Lock() + defer w.mtx.Unlock() + if w.peers == nil { + w.peers = map[uint16]bool{id: true} + } else { + w.peers[id] = true } } -// Size returns the total number of transactions in the store. -func (txs *TxStore) Size() int { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return len(txs.hashTxs) -} - -// GetAllTxs returns all the transactions currently in the store. -func (txs *TxStore) GetAllTxs() []*WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wTxs := make([]*WrappedTx, len(txs.hashTxs)) - i := 0 - for _, wtx := range txs.hashTxs { - wTxs[i] = wtx - i++ - } - - return wTxs -} - -// GetTxBySender returns a *WrappedTx by the transaction's sender property -// defined by the ABCI application. -func (txs *TxStore) GetTxBySender(sender string) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.senderTxs[sender] -} - -// GetTxByHash returns a *WrappedTx by the transaction's hash. -func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - return txs.hashTxs[hash] -} - -// IsTxRemoved returns true if a transaction by hash is marked as removed and -// false otherwise. -func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wtx, ok := txs.hashTxs[hash] - if ok { - return wtx.removed - } - - return false -} - -// SetTx stores a *WrappedTx by it's hash. If the transaction also contains a -// non-empty sender, we additionally store the transaction by the sender as -// defined by the ABCI application. -func (txs *TxStore) SetTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - txs.senderTxs[wtx.sender] = wtx - } - - txs.hashTxs[wtx.tx.Key()] = wtx -} - -// RemoveTx removes a *WrappedTx from the transaction store. It deletes all -// indexes of the transaction. -func (txs *TxStore) RemoveTx(wtx *WrappedTx) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - if len(wtx.sender) > 0 { - delete(txs.senderTxs, wtx.sender) - } - - delete(txs.hashTxs, wtx.tx.Key()) - wtx.removed = true -} - -// TxHasPeer returns true if a transaction by hash has a given peer ID and false -// otherwise. If the transaction does not exist, false is returned. -func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool { - txs.mtx.RLock() - defer txs.mtx.RUnlock() - - wtx := txs.hashTxs[hash] - if wtx == nil { - return false - } - - _, ok := wtx.peers[peerID] +// HasPeer reports whether the specified peer ID is a sender of w. +func (w *WrappedTx) HasPeer(id uint16) bool { + w.mtx.Lock() + defer w.mtx.Unlock() + _, ok := w.peers[id] return ok } -// GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the -// given peerID to the WrappedTx's set of peers that sent us this transaction. -// We return true if we've already recorded the given peer for this transaction -// and false otherwise. If the transaction does not exist by hash, we return -// (nil, false). -func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) { - txs.mtx.Lock() - defer txs.mtx.Unlock() - - wtx := txs.hashTxs[hash] - if wtx == nil { - return nil, false - } - - if wtx.peers == nil { - wtx.peers = make(map[uint16]struct{}) - } - - if _, ok := wtx.peers[peerID]; ok { - return wtx, true - } - - wtx.peers[peerID] = struct{}{} - return wtx, false +// SetGasWanted sets the application-assigned gas requirement of w. +func (w *WrappedTx) SetGasWanted(gas int64) { + w.mtx.Lock() + defer w.mtx.Unlock() + w.gasWanted = gas } -// WrappedTxList implements a thread-safe list of *WrappedTx objects that can be -// used to build generic transaction indexes in the mempool. It accepts a -// comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx -// references which is used during Insert in order to determine sorted order. If -// less returns true, a <= b. -type WrappedTxList struct { - mtx sync.RWMutex - txs []*WrappedTx - less func(*WrappedTx, *WrappedTx) bool +// GasWanted reports the application-assigned gas requirement of w. +func (w *WrappedTx) GasWanted() int64 { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.gasWanted } -func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList { - return &WrappedTxList{ - txs: make([]*WrappedTx, 0), - less: less, - } +// SetSender sets the application-assigned sender of w. +func (w *WrappedTx) SetSender(sender string) { + w.mtx.Lock() + defer w.mtx.Unlock() + w.sender = sender } -// Size returns the number of WrappedTx objects in the list. -func (wtl *WrappedTxList) Size() int { - wtl.mtx.RLock() - defer wtl.mtx.RUnlock() - - return len(wtl.txs) +// Sender reports the application-assigned sender of w. +func (w *WrappedTx) Sender() string { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.sender } -// Reset resets the list of transactions to an empty list. -func (wtl *WrappedTxList) Reset() { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - wtl.txs = make([]*WrappedTx, 0) +// SetPriority sets the application-assigned priority of w. +func (w *WrappedTx) SetPriority(p int64) { + w.mtx.Lock() + defer w.mtx.Unlock() + w.priority = p } -// Insert inserts a WrappedTx reference into the sorted list based on the list's -// comparator function. -func (wtl *WrappedTxList) Insert(wtx *WrappedTx) { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - i := sort.Search(len(wtl.txs), func(i int) bool { - return wtl.less(wtl.txs[i], wtx) - }) - - if i == len(wtl.txs) { - // insert at the end - wtl.txs = append(wtl.txs, wtx) - return - } - - // Make space for the inserted element by shifting values at the insertion - // index up one index. - // - // NOTE: The call to append does not allocate memory when cap(wtl.txs) > len(wtl.txs). - wtl.txs = append(wtl.txs[:i+1], wtl.txs[i:]...) - wtl.txs[i] = wtx -} - -// Remove attempts to remove a WrappedTx from the sorted list. -func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { - wtl.mtx.Lock() - defer wtl.mtx.Unlock() - - i := sort.Search(len(wtl.txs), func(i int) bool { - return wtl.less(wtl.txs[i], wtx) - }) - - // Since the list is sorted, we evaluate all elements starting at i. Note, if - // the element does not exist, we may potentially evaluate the entire remainder - // of the list. However, a caller should not be expected to call Remove with a - // non-existing element. - for i < len(wtl.txs) { - if wtl.txs[i] == wtx { - wtl.txs = append(wtl.txs[:i], wtl.txs[i+1:]...) - return - } - - i++ - } +// Priority reports the application-assigned priority of w. +func (w *WrappedTx) Priority() int64 { + w.mtx.Lock() + defer w.mtx.Unlock() + return w.priority } diff --git a/internal/mempool/tx_test.go b/internal/mempool/tx_test.go deleted file mode 100644 index c6d494b04..000000000 --- a/internal/mempool/tx_test.go +++ /dev/null @@ -1,231 +0,0 @@ -package mempool - -import ( - "fmt" - "math/rand" - "sort" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/tendermint/tendermint/types" -) - -func TestTxStore_GetTxBySender(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - sender: "foo", - priority: 1, - timestamp: time.Now(), - } - - res := txs.GetTxBySender(wtx.sender) - require.Nil(t, res) - - txs.SetTx(wtx) - - res = txs.GetTxBySender(wtx.sender) - require.NotNil(t, res) - require.Equal(t, wtx, res) -} - -func TestTxStore_GetTxByHash(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - sender: "foo", - priority: 1, - timestamp: time.Now(), - } - - key := wtx.tx.Key() - res := txs.GetTxByHash(key) - require.Nil(t, res) - - txs.SetTx(wtx) - - res = txs.GetTxByHash(key) - require.NotNil(t, res) - require.Equal(t, wtx, res) -} - -func TestTxStore_SetTx(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - priority: 1, - timestamp: time.Now(), - } - - key := wtx.tx.Key() - txs.SetTx(wtx) - - res := txs.GetTxByHash(key) - require.NotNil(t, res) - require.Equal(t, wtx, res) - - wtx.sender = "foo" - txs.SetTx(wtx) - - res = txs.GetTxByHash(key) - require.NotNil(t, res) - require.Equal(t, wtx, res) -} - -func TestTxStore_GetOrSetPeerByTxHash(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - priority: 1, - timestamp: time.Now(), - } - - key := wtx.tx.Key() - txs.SetTx(wtx) - - res, ok := txs.GetOrSetPeerByTxHash(types.Tx([]byte("test_tx_2")).Key(), 15) - require.Nil(t, res) - require.False(t, ok) - - res, ok = txs.GetOrSetPeerByTxHash(key, 15) - require.NotNil(t, res) - require.False(t, ok) - - res, ok = txs.GetOrSetPeerByTxHash(key, 15) - require.NotNil(t, res) - require.True(t, ok) - - require.True(t, txs.TxHasPeer(key, 15)) - require.False(t, txs.TxHasPeer(key, 16)) -} - -func TestTxStore_RemoveTx(t *testing.T) { - txs := NewTxStore() - wtx := &WrappedTx{ - tx: []byte("test_tx"), - priority: 1, - timestamp: time.Now(), - } - - txs.SetTx(wtx) - - key := wtx.tx.Key() - res := txs.GetTxByHash(key) - require.NotNil(t, res) - - txs.RemoveTx(res) - - res = txs.GetTxByHash(key) - require.Nil(t, res) -} - -func TestTxStore_Size(t *testing.T) { - txStore := NewTxStore() - numTxs := 1000 - - for i := 0; i < numTxs; i++ { - txStore.SetTx(&WrappedTx{ - tx: []byte(fmt.Sprintf("test_tx_%d", i)), - priority: int64(i), - timestamp: time.Now(), - }) - } - - require.Equal(t, numTxs, txStore.Size()) -} - -func TestWrappedTxList_Reset(t *testing.T) { - list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }) - - require.Zero(t, list.Size()) - - for i := 0; i < 100; i++ { - list.Insert(&WrappedTx{height: int64(i)}) - } - - require.Equal(t, 100, list.Size()) - - list.Reset() - require.Zero(t, list.Size()) -} - -func TestWrappedTxList_Insert(t *testing.T) { - list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }) - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - var expected []int - for i := 0; i < 100; i++ { - height := rng.Int63n(10000) - expected = append(expected, int(height)) - list.Insert(&WrappedTx{height: height}) - - if i%10 == 0 { - list.Insert(&WrappedTx{height: height}) - expected = append(expected, int(height)) - } - } - - got := make([]int, list.Size()) - for i, wtx := range list.txs { - got[i] = int(wtx.height) - } - - sort.Ints(expected) - require.Equal(t, expected, got) -} - -func TestWrappedTxList_Remove(t *testing.T) { - list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool { - return wtx1.height >= wtx2.height - }) - - rng := rand.New(rand.NewSource(time.Now().UnixNano())) - - var txs []*WrappedTx - for i := 0; i < 100; i++ { - height := rng.Int63n(10000) - tx := &WrappedTx{height: height} - - txs = append(txs, tx) - list.Insert(tx) - - if i%10 == 0 { - tx = &WrappedTx{height: height} - list.Insert(tx) - txs = append(txs, tx) - } - } - - // remove a tx that does not exist - list.Remove(&WrappedTx{height: 20000}) - - // remove a tx that exists (by height) but not referenced - list.Remove(&WrappedTx{height: txs[0].height}) - - // remove a few existing txs - for i := 0; i < 25; i++ { - j := rng.Intn(len(txs)) - list.Remove(txs[j]) - txs = append(txs[:j], txs[j+1:]...) - } - - expected := make([]int, len(txs)) - for i, tx := range txs { - expected[i] = int(tx.height) - } - - got := make([]int, list.Size()) - for i, wtx := range list.txs { - got[i] = int(wtx.height) - } - - sort.Ints(expected) - require.Equal(t, expected, got) -}