Backport priority mempool fixes from v0.35.x to v0.34.x. (#8962)

This is a manual backport of the changes from these commits:

- bc49f66 Add more unit tests for the priority mempool. (#8961)
- 9b02094 Fix unbounded heap growth in the priority mempool. (#8944)

Imports and type signatures have been updated to match the v0.34 usage.
This commit is contained in:
M. J. Fromberger
2022-07-08 07:22:07 -07:00
committed by GitHub
parent 1d9d947d88
commit 493dd69f31
8 changed files with 641 additions and 1360 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -77,8 +77,8 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
app := &application{kvstore.NewApplication()}
cc := proxy.NewLocalClientCreator(app)
cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|"))
cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|"))
cfg.Mempool.CacheSize = cacheSize
appConnMem, err := cc.NewABCIClient()
@@ -93,6 +93,18 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
}
// mustCheckTx invokes txmp.CheckTx for the given transaction and waits until
// its callback has finished executing. It fails t if CheckTx fails.
func mustCheckTx(t *testing.T, txmp *TxMempool, spec string) {
done := make(chan struct{})
if err := txmp.CheckTx([]byte(spec), func(*abci.Response) {
close(done)
}, mempool.TxInfo{}); err != nil {
t.Fatalf("CheckTx for %q failed: %v", spec, err)
}
<-done
}
func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
txs := make([]testTx, numTxs)
txInfo := mempool.TxInfo{SenderID: peerID}
@@ -194,6 +206,76 @@ func TestTxMempool_Size(t *testing.T) {
require.Equal(t, int64(2850), txmp.SizeBytes())
}
func TestTxMempool_Eviction(t *testing.T) {
txmp := setup(t, 0)
txmp.config.Size = 5
txmp.config.MaxTxsBytes = 60
txExists := func(spec string) bool {
txmp.Lock()
defer txmp.Unlock()
key := types.Tx(spec).Key()
_, ok := txmp.txByKey[key]
return ok
}
// A transaction bigger than the mempool should be rejected even when there
// are slots available.
mustCheckTx(t, txmp, "big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1")
require.Equal(t, 0, txmp.Size())
// Nearly-fill the mempool with a low-priority transaction, to show that it
// is evicted even when slots are available for a higher-priority tx.
const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2"
mustCheckTx(t, txmp, bigTx)
require.Equal(t, 1, txmp.Size()) // bigTx is the only element
require.True(t, txExists(bigTx))
require.Equal(t, int64(len(bigTx)), txmp.SizeBytes())
// The next transaction should evict bigTx, because it is higher priority
// but does not fit on size.
mustCheckTx(t, txmp, "key1=0000=25")
require.True(t, txExists("key1=0000=25"))
require.False(t, txExists(bigTx))
require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes())
// Now fill up the rest of the slots with other transactions.
mustCheckTx(t, txmp, "key2=0001=5")
mustCheckTx(t, txmp, "key3=0002=10")
mustCheckTx(t, txmp, "key4=0003=3")
mustCheckTx(t, txmp, "key5=0004=3")
// A new transaction with low priority should be discarded.
mustCheckTx(t, txmp, "key6=0005=1")
require.False(t, txExists("key6=0005=1"))
// A new transaction with higher priority should evict key5, which is the
// newest of the two transactions with lowest priority.
mustCheckTx(t, txmp, "key7=0006=7")
require.True(t, txExists("key7=0006=7")) // new transaction added
require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted
require.True(t, txExists("key4=0003=3")) // older low-priority tx retained
// Another new transaction evicts the other low-priority element.
mustCheckTx(t, txmp, "key8=0007=20")
require.True(t, txExists("key8=0007=20"))
require.False(t, txExists("key4=0003=3"))
// Now the lowest-priority tx is 5, so that should be the next to go.
mustCheckTx(t, txmp, "key9=0008=9")
require.True(t, txExists("key9=0008=9"))
require.False(t, txExists("k3y2=0001=5"))
// Add a transaction that requires eviction of multiple lower-priority
// entries, in order to fit the size of the element.
mustCheckTx(t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11
require.True(t, txExists("key1=0000=25"))
require.True(t, txExists("key8=0007=20"))
require.True(t, txExists("key10=0123456789abcdef=11"))
require.False(t, txExists("key3=0002=10"))
require.False(t, txExists("key9=0008=9"))
require.False(t, txExists("key7=0006=7"))
}
func TestTxMempool_Flush(t *testing.T) {
txmp := setup(t, 0)
txs := checkTxs(t, txmp, 100, 0)
@@ -436,6 +518,51 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) {
require.Zero(t, txmp.SizeBytes())
}
func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) {
txmp := setup(t, 50)
txmp.config.TTLDuration = 5 * time.Millisecond
added1 := checkTxs(t, txmp, 25, 0)
require.Equal(t, len(added1), txmp.Size())
// Wait a while, then add some more transactions that should not be expired
// when the first batch TTLs out.
//
// ms: 0 1 2 3 4 5 6
// ^ ^ ^ ^
// | | | +-- Update (triggers pruning)
// | | +------ first batch expires
// | +-------------- second batch added
// +-------------------------- first batch added
//
// The exact intervals are not important except that the delta should be
// large relative to the cost of CheckTx (ms vs. ns is fine here).
time.Sleep(3 * time.Millisecond)
added2 := checkTxs(t, txmp, 25, 1)
// Wait a while longer, so that the first batch will expire.
time.Sleep(3 * time.Millisecond)
// Trigger an update so that pruning will occur.
txmp.Lock()
defer txmp.Unlock()
require.NoError(t, txmp.Update(txmp.height+1, nil, nil, nil, nil))
// All the transactions in the original set should have been purged.
for _, tx := range added1 {
if _, ok := txmp.txByKey[tx.tx.Key()]; ok {
t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key())
}
}
// All the transactions added later should still be around.
for _, tx := range added2 {
if _, ok := txmp.txByKey[tx.tx.Key()]; !ok {
t.Errorf("Transaction %X should still be in the mempool, but is not", tx.tx.Key())
}
}
}
func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
txmp := setup(t, 500)
txmp.height = 100
@@ -443,7 +570,6 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
tTxs := checkTxs(t, txmp, 100, 0)
require.Equal(t, len(tTxs), txmp.Size())
require.Equal(t, 100, txmp.heightIndex.Size())
// reap 5 txs at the next height -- no txs should expire
reapedTxs := txmp.ReapMaxTxs(5)
@@ -457,12 +583,10 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
txmp.Unlock()
require.Equal(t, 95, txmp.Size())
require.Equal(t, 95, txmp.heightIndex.Size())
// check more txs at height 101
_ = checkTxs(t, txmp, 50, 1)
require.Equal(t, 145, txmp.Size())
require.Equal(t, 145, txmp.heightIndex.Size())
// Reap 5 txs at a height that would expire all the transactions from before
// the previous Update (height 100).
@@ -483,7 +607,6 @@ func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
txmp.Unlock()
require.GreaterOrEqual(t, txmp.Size(), 45)
require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45)
}
func TestTxMempool_CheckTxPostCheckError(t *testing.T) {

View File

@@ -1,159 +0,0 @@
package v1
import (
"container/heap"
"sort"
tmsync "github.com/tendermint/tendermint/libs/sync"
)
var _ heap.Interface = (*TxPriorityQueue)(nil)
// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
type TxPriorityQueue struct {
mtx tmsync.RWMutex
txs []*WrappedTx
}
func NewTxPriorityQueue() *TxPriorityQueue {
pq := &TxPriorityQueue{
txs: make([]*WrappedTx, 0),
}
heap.Init(pq)
return pq
}
// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be
// evicted to make room for another *WrappedTx with higher priority. If no such
// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx
// indicate that these transactions can be removed due to them being of lower
// priority and that their total sum in size allows room for the incoming
// transaction according to the mempool's configured limits.
func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int64) []*WrappedTx {
pq.mtx.RLock()
defer pq.mtx.RUnlock()
txs := make([]*WrappedTx, len(pq.txs))
copy(txs, pq.txs)
sort.Slice(txs, func(i, j int) bool {
return txs[i].priority < txs[j].priority
})
var (
toEvict []*WrappedTx
i int
)
currSize := totalSize
// Loop over all transactions in ascending priority order evaluating those
// that are only of less priority than the provided argument. We continue
// evaluating transactions until there is sufficient capacity for the new
// transaction (size) as defined by txSize.
for i < len(txs) && txs[i].priority < priority {
toEvict = append(toEvict, txs[i])
currSize -= int64(txs[i].Size())
if currSize+txSize <= cap {
return toEvict
}
i++
}
return nil
}
// NumTxs returns the number of transactions in the priority queue. It is
// thread safe.
func (pq *TxPriorityQueue) NumTxs() int {
pq.mtx.RLock()
defer pq.mtx.RUnlock()
return len(pq.txs)
}
// RemoveTx removes a specific transaction from the priority queue.
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()
if tx.heapIndex < len(pq.txs) {
heap.Remove(pq, tx.heapIndex)
}
}
// PushTx adds a valid transaction to the priority queue. It is thread safe.
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()
heap.Push(pq, tx)
}
// PopTx removes the top priority transaction from the queue. It is thread safe.
func (pq *TxPriorityQueue) PopTx() *WrappedTx {
pq.mtx.Lock()
defer pq.mtx.Unlock()
x := heap.Pop(pq)
if x != nil {
return x.(*WrappedTx)
}
return nil
}
// Push implements the Heap interface.
//
// NOTE: A caller should never call Push. Use PushTx instead.
func (pq *TxPriorityQueue) Push(x interface{}) {
n := len(pq.txs)
item := x.(*WrappedTx)
item.heapIndex = n
pq.txs = append(pq.txs, item)
}
// Pop implements the Heap interface.
//
// NOTE: A caller should never call Pop. Use PopTx instead.
func (pq *TxPriorityQueue) Pop() interface{} {
old := pq.txs
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.heapIndex = -1 // for safety
pq.txs = old[0 : n-1]
return item
}
// Len implements the Heap interface.
//
// NOTE: A caller should never call Len. Use NumTxs instead.
func (pq *TxPriorityQueue) Len() int {
return len(pq.txs)
}
// Less implements the Heap interface. It returns true if the transaction at
// position i in the queue is of less priority than the transaction at position j.
func (pq *TxPriorityQueue) Less(i, j int) bool {
// If there exists two transactions with the same priority, consider the one
// that we saw the earliest as the higher priority transaction.
if pq.txs[i].priority == pq.txs[j].priority {
return pq.txs[i].timestamp.Before(pq.txs[j].timestamp)
}
// We want Pop to give us the highest, not lowest, priority so we use greater
// than here.
return pq.txs[i].priority > pq.txs[j].priority
}
// Swap implements the Heap interface. It swaps two transactions in the queue.
func (pq *TxPriorityQueue) Swap(i, j int) {
pq.txs[i], pq.txs[j] = pq.txs[j], pq.txs[i]
pq.txs[i].heapIndex = i
pq.txs[j].heapIndex = j
}

View File

@@ -1,176 +0,0 @@
package v1
import (
"math/rand"
"sort"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestTxPriorityQueue(t *testing.T) {
pq := NewTxPriorityQueue()
numTxs := 1000
priorities := make([]int, numTxs)
var wg sync.WaitGroup
for i := 1; i <= numTxs; i++ {
priorities[i-1] = i
wg.Add(1)
go func(i int) {
pq.PushTx(&WrappedTx{
priority: int64(i),
timestamp: time.Now(),
})
wg.Done()
}(i)
}
sort.Sort(sort.Reverse(sort.IntSlice(priorities)))
wg.Wait()
require.Equal(t, numTxs, pq.NumTxs())
// Wait a second and push a tx with a duplicate priority
time.Sleep(time.Second)
now := time.Now()
pq.PushTx(&WrappedTx{
priority: 1000,
timestamp: now,
})
require.Equal(t, 1001, pq.NumTxs())
tx := pq.PopTx()
require.Equal(t, 1000, pq.NumTxs())
require.Equal(t, int64(1000), tx.priority)
require.NotEqual(t, now, tx.timestamp)
gotPriorities := make([]int, 0)
for pq.NumTxs() > 0 {
gotPriorities = append(gotPriorities, int(pq.PopTx().priority))
}
require.Equal(t, priorities, gotPriorities)
}
func TestTxPriorityQueue_GetEvictableTxs(t *testing.T) {
pq := NewTxPriorityQueue()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
values := make([]int, 1000)
for i := 0; i < 1000; i++ {
tx := make([]byte, 5) // each tx is 5 bytes
_, err := rng.Read(tx)
require.NoError(t, err)
x := rng.Intn(100000)
pq.PushTx(&WrappedTx{
tx: tx,
priority: int64(x),
})
values[i] = x
}
sort.Ints(values)
max := values[len(values)-1]
min := values[0]
totalSize := int64(len(values) * 5)
testCases := []struct {
name string
priority, txSize, totalSize, cap int64
expectedLen int
}{
{
name: "larest priority; single tx",
priority: int64(max + 1),
txSize: 5,
totalSize: totalSize,
cap: totalSize,
expectedLen: 1,
},
{
name: "larest priority; multi tx",
priority: int64(max + 1),
txSize: 17,
totalSize: totalSize,
cap: totalSize,
expectedLen: 4,
},
{
name: "larest priority; out of capacity",
priority: int64(max + 1),
txSize: totalSize + 1,
totalSize: totalSize,
cap: totalSize,
expectedLen: 0,
},
{
name: "smallest priority; no tx",
priority: int64(min - 1),
txSize: 5,
totalSize: totalSize,
cap: totalSize,
expectedLen: 0,
},
{
name: "small priority; no tx",
priority: int64(min),
txSize: 5,
totalSize: totalSize,
cap: totalSize,
expectedLen: 0,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
evictTxs := pq.GetEvictableTxs(tc.priority, tc.txSize, tc.totalSize, tc.cap)
require.Len(t, evictTxs, tc.expectedLen)
})
}
}
func TestTxPriorityQueue_RemoveTx(t *testing.T) {
pq := NewTxPriorityQueue()
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
numTxs := 1000
values := make([]int, numTxs)
for i := 0; i < numTxs; i++ {
x := rng.Intn(100000)
pq.PushTx(&WrappedTx{
priority: int64(x),
})
values[i] = x
}
require.Equal(t, numTxs, pq.NumTxs())
sort.Ints(values)
max := values[len(values)-1]
wtx := pq.txs[pq.NumTxs()/2]
pq.RemoveTx(wtx)
require.Equal(t, numTxs-1, pq.NumTxs())
require.Equal(t, int64(max), pq.PopTx().priority)
require.Equal(t, numTxs-2, pq.NumTxs())
require.NotPanics(t, func() {
pq.RemoveTx(&WrappedTx{heapIndex: numTxs})
pq.RemoveTx(&WrappedTx{heapIndex: numTxs + 1})
})
require.Equal(t, numTxs-2, pq.NumTxs())
}

View File

@@ -198,8 +198,8 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// start from the beginning.
if next == nil {
select {
case <-memR.mempool.WaitForNextTx(): // Wait until a tx is available
if next = memR.mempool.NextGossipTx(); next == nil {
case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available
if next = memR.mempool.TxsFront(); next == nil {
continue
}
@@ -232,7 +232,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
// NOTE: Transaction batching was disabled due to
// https://github.com/tendermint/tendermint/issues/5796
if ok := memR.mempool.txStore.TxHasPeer(memTx.hash, peerID); !ok {
if !memTx.HasPeer(peerID) {
msg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}},

View File

@@ -1,281 +1,87 @@
package v1
import (
"sort"
"sync"
"time"
"github.com/tendermint/tendermint/libs/clist"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/types"
)
// WrappedTx defines a wrapper around a raw transaction with additional metadata
// that is used for indexing.
type WrappedTx struct {
// tx represents the raw binary transaction data
tx types.Tx
tx types.Tx // the original transaction data
hash types.TxKey // the transaction hash
height int64 // height when this transaction was initially checked (for expiry)
timestamp time.Time // time when transaction was entered (for TTL)
// hash defines the transaction hash and the primary key used in the mempool
hash types.TxKey
// height defines the height at which the transaction was validated at
height int64
// gasWanted defines the amount of gas the transaction sender requires
gasWanted int64
// priority defines the transaction's priority as specified by the application
// in the ResponseCheckTx response.
priority int64
// sender defines the transaction's sender as specified by the application in
// the ResponseCheckTx response.
sender string
// timestamp is the time at which the node first received the transaction from
// a peer. It is used as a second dimension is prioritizing transactions when
// two transactions have the same priority.
timestamp time.Time
// peers records a mapping of all peers that sent a given transaction
peers map[uint16]struct{}
// heapIndex defines the index of the item in the heap
heapIndex int
// gossipEl references the linked-list element in the gossip index
gossipEl *clist.CElement
// removed marks the transaction as removed from the mempool. This is set
// during RemoveTx and is needed due to the fact that a given existing
// transaction in the mempool can be evicted when it is simultaneously having
// a reCheckTx callback executed.
removed bool
mtx sync.Mutex
gasWanted int64 // app: gas required to execute this transaction
priority int64 // app: priority value for this transaction
sender string // app: assigned sender label
peers map[uint16]bool // peer IDs who have sent us this transaction
}
func (wtx *WrappedTx) Size() int {
return len(wtx.tx)
}
// Size reports the size of the raw transaction in bytes.
func (w *WrappedTx) Size() int64 { return int64(len(w.tx)) }
// TxStore implements a thread-safe mapping of valid transaction(s).
//
// NOTE:
// - Concurrent read-only access to a *WrappedTx object is OK. However, mutative
// access is not allowed. Regardless, it is not expected for the mempool to
// need mutative access.
type TxStore struct {
mtx tmsync.RWMutex
hashTxs map[types.TxKey]*WrappedTx // primary index
senderTxs map[string]*WrappedTx // sender is defined by the ABCI application
}
func NewTxStore() *TxStore {
return &TxStore{
senderTxs: make(map[string]*WrappedTx),
hashTxs: make(map[types.TxKey]*WrappedTx),
// SetPeer adds the specified peer ID as a sender of w.
func (w *WrappedTx) SetPeer(id uint16) {
w.mtx.Lock()
defer w.mtx.Unlock()
if w.peers == nil {
w.peers = map[uint16]bool{id: true}
} else {
w.peers[id] = true
}
}
// Size returns the total number of transactions in the store.
func (txs *TxStore) Size() int {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
return len(txs.hashTxs)
}
// GetAllTxs returns all the transactions currently in the store.
func (txs *TxStore) GetAllTxs() []*WrappedTx {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
wTxs := make([]*WrappedTx, len(txs.hashTxs))
i := 0
for _, wtx := range txs.hashTxs {
wTxs[i] = wtx
i++
}
return wTxs
}
// GetTxBySender returns a *WrappedTx by the transaction's sender property
// defined by the ABCI application.
func (txs *TxStore) GetTxBySender(sender string) *WrappedTx {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
return txs.senderTxs[sender]
}
// GetTxByHash returns a *WrappedTx by the transaction's hash.
func (txs *TxStore) GetTxByHash(hash types.TxKey) *WrappedTx {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
return txs.hashTxs[hash]
}
// IsTxRemoved returns true if a transaction by hash is marked as removed and
// false otherwise.
func (txs *TxStore) IsTxRemoved(hash types.TxKey) bool {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
wtx, ok := txs.hashTxs[hash]
if ok {
return wtx.removed
}
return false
}
// SetTx stores a *WrappedTx by it's hash. If the transaction also contains a
// non-empty sender, we additionally store the transaction by the sender as
// defined by the ABCI application.
func (txs *TxStore) SetTx(wtx *WrappedTx) {
txs.mtx.Lock()
defer txs.mtx.Unlock()
if len(wtx.sender) > 0 {
txs.senderTxs[wtx.sender] = wtx
}
txs.hashTxs[wtx.tx.Key()] = wtx
}
// RemoveTx removes a *WrappedTx from the transaction store. It deletes all
// indexes of the transaction.
func (txs *TxStore) RemoveTx(wtx *WrappedTx) {
txs.mtx.Lock()
defer txs.mtx.Unlock()
if len(wtx.sender) > 0 {
delete(txs.senderTxs, wtx.sender)
}
delete(txs.hashTxs, wtx.tx.Key())
wtx.removed = true
}
// TxHasPeer returns true if a transaction by hash has a given peer ID and false
// otherwise. If the transaction does not exist, false is returned.
func (txs *TxStore) TxHasPeer(hash types.TxKey, peerID uint16) bool {
txs.mtx.RLock()
defer txs.mtx.RUnlock()
wtx := txs.hashTxs[hash]
if wtx == nil {
return false
}
_, ok := wtx.peers[peerID]
// HasPeer reports whether the specified peer ID is a sender of w.
func (w *WrappedTx) HasPeer(id uint16) bool {
w.mtx.Lock()
defer w.mtx.Unlock()
_, ok := w.peers[id]
return ok
}
// GetOrSetPeerByTxHash looks up a WrappedTx by transaction hash and adds the
// given peerID to the WrappedTx's set of peers that sent us this transaction.
// We return true if we've already recorded the given peer for this transaction
// and false otherwise. If the transaction does not exist by hash, we return
// (nil, false).
func (txs *TxStore) GetOrSetPeerByTxHash(hash types.TxKey, peerID uint16) (*WrappedTx, bool) {
txs.mtx.Lock()
defer txs.mtx.Unlock()
wtx := txs.hashTxs[hash]
if wtx == nil {
return nil, false
}
if wtx.peers == nil {
wtx.peers = make(map[uint16]struct{})
}
if _, ok := wtx.peers[peerID]; ok {
return wtx, true
}
wtx.peers[peerID] = struct{}{}
return wtx, false
// SetGasWanted sets the application-assigned gas requirement of w.
func (w *WrappedTx) SetGasWanted(gas int64) {
w.mtx.Lock()
defer w.mtx.Unlock()
w.gasWanted = gas
}
// WrappedTxList implements a thread-safe list of *WrappedTx objects that can be
// used to build generic transaction indexes in the mempool. It accepts a
// comparator function, less(a, b *WrappedTx) bool, that compares two WrappedTx
// references which is used during Insert in order to determine sorted order. If
// less returns true, a <= b.
type WrappedTxList struct {
mtx tmsync.RWMutex
txs []*WrappedTx
less func(*WrappedTx, *WrappedTx) bool
// GasWanted reports the application-assigned gas requirement of w.
func (w *WrappedTx) GasWanted() int64 {
w.mtx.Lock()
defer w.mtx.Unlock()
return w.gasWanted
}
func NewWrappedTxList(less func(*WrappedTx, *WrappedTx) bool) *WrappedTxList {
return &WrappedTxList{
txs: make([]*WrappedTx, 0),
less: less,
}
// SetSender sets the application-assigned sender of w.
func (w *WrappedTx) SetSender(sender string) {
w.mtx.Lock()
defer w.mtx.Unlock()
w.sender = sender
}
// Size returns the number of WrappedTx objects in the list.
func (wtl *WrappedTxList) Size() int {
wtl.mtx.RLock()
defer wtl.mtx.RUnlock()
return len(wtl.txs)
// Sender reports the application-assigned sender of w.
func (w *WrappedTx) Sender() string {
w.mtx.Lock()
defer w.mtx.Unlock()
return w.sender
}
// Reset resets the list of transactions to an empty list.
func (wtl *WrappedTxList) Reset() {
wtl.mtx.Lock()
defer wtl.mtx.Unlock()
wtl.txs = make([]*WrappedTx, 0)
// SetPriority sets the application-assigned priority of w.
func (w *WrappedTx) SetPriority(p int64) {
w.mtx.Lock()
defer w.mtx.Unlock()
w.priority = p
}
// Insert inserts a WrappedTx reference into the sorted list based on the list's
// comparator function.
func (wtl *WrappedTxList) Insert(wtx *WrappedTx) {
wtl.mtx.Lock()
defer wtl.mtx.Unlock()
i := sort.Search(len(wtl.txs), func(i int) bool {
return wtl.less(wtl.txs[i], wtx)
})
if i == len(wtl.txs) {
// insert at the end
wtl.txs = append(wtl.txs, wtx)
return
}
// Make space for the inserted element by shifting values at the insertion
// index up one index.
//
// NOTE: The call to append does not allocate memory when cap(wtl.txs) > len(wtl.txs).
wtl.txs = append(wtl.txs[:i+1], wtl.txs[i:]...)
wtl.txs[i] = wtx
}
// Remove attempts to remove a WrappedTx from the sorted list.
func (wtl *WrappedTxList) Remove(wtx *WrappedTx) {
wtl.mtx.Lock()
defer wtl.mtx.Unlock()
i := sort.Search(len(wtl.txs), func(i int) bool {
return wtl.less(wtl.txs[i], wtx)
})
// Since the list is sorted, we evaluate all elements starting at i. Note, if
// the element does not exist, we may potentially evaluate the entire remainder
// of the list. However, a caller should not be expected to call Remove with a
// non-existing element.
for i < len(wtl.txs) {
if wtl.txs[i] == wtx {
wtl.txs = append(wtl.txs[:i], wtl.txs[i+1:]...)
return
}
i++
}
// Priority reports the application-assigned priority of w.
func (w *WrappedTx) Priority() int64 {
w.mtx.Lock()
defer w.mtx.Unlock()
return w.priority
}

View File

@@ -1,230 +0,0 @@
package v1
import (
"fmt"
"math/rand"
"sort"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/types"
)
func TestTxStore_GetTxBySender(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
sender: "foo",
priority: 1,
timestamp: time.Now(),
}
res := txs.GetTxBySender(wtx.sender)
require.Nil(t, res)
txs.SetTx(wtx)
res = txs.GetTxBySender(wtx.sender)
require.NotNil(t, res)
require.Equal(t, wtx, res)
}
func TestTxStore_GetTxByHash(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
sender: "foo",
priority: 1,
timestamp: time.Now(),
}
key := wtx.tx.Key()
res := txs.GetTxByHash(key)
require.Nil(t, res)
txs.SetTx(wtx)
res = txs.GetTxByHash(key)
require.NotNil(t, res)
require.Equal(t, wtx, res)
}
func TestTxStore_SetTx(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
priority: 1,
timestamp: time.Now(),
}
key := wtx.tx.Key()
txs.SetTx(wtx)
res := txs.GetTxByHash(key)
require.NotNil(t, res)
require.Equal(t, wtx, res)
wtx.sender = "foo"
txs.SetTx(wtx)
res = txs.GetTxByHash(key)
require.NotNil(t, res)
require.Equal(t, wtx, res)
}
func TestTxStore_GetOrSetPeerByTxHash(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
priority: 1,
timestamp: time.Now(),
}
key := wtx.tx.Key()
txs.SetTx(wtx)
res, ok := txs.GetOrSetPeerByTxHash(types.Tx([]byte("test_tx_2")).Key(), 15)
require.Nil(t, res)
require.False(t, ok)
res, ok = txs.GetOrSetPeerByTxHash(key, 15)
require.NotNil(t, res)
require.False(t, ok)
res, ok = txs.GetOrSetPeerByTxHash(key, 15)
require.NotNil(t, res)
require.True(t, ok)
require.True(t, txs.TxHasPeer(key, 15))
require.False(t, txs.TxHasPeer(key, 16))
}
func TestTxStore_RemoveTx(t *testing.T) {
txs := NewTxStore()
wtx := &WrappedTx{
tx: []byte("test_tx"),
priority: 1,
timestamp: time.Now(),
}
txs.SetTx(wtx)
key := wtx.tx.Key()
res := txs.GetTxByHash(key)
require.NotNil(t, res)
txs.RemoveTx(res)
res = txs.GetTxByHash(key)
require.Nil(t, res)
}
func TestTxStore_Size(t *testing.T) {
txStore := NewTxStore()
numTxs := 1000
for i := 0; i < numTxs; i++ {
txStore.SetTx(&WrappedTx{
tx: []byte(fmt.Sprintf("test_tx_%d", i)),
priority: int64(i),
timestamp: time.Now(),
})
}
require.Equal(t, numTxs, txStore.Size())
}
func TestWrappedTxList_Reset(t *testing.T) {
list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.height >= wtx2.height
})
require.Zero(t, list.Size())
for i := 0; i < 100; i++ {
list.Insert(&WrappedTx{height: int64(i)})
}
require.Equal(t, 100, list.Size())
list.Reset()
require.Zero(t, list.Size())
}
func TestWrappedTxList_Insert(t *testing.T) {
list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.height >= wtx2.height
})
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
var expected []int
for i := 0; i < 100; i++ {
height := rng.Int63n(10000)
expected = append(expected, int(height))
list.Insert(&WrappedTx{height: height})
if i%10 == 0 {
list.Insert(&WrappedTx{height: height})
expected = append(expected, int(height))
}
}
got := make([]int, list.Size())
for i, wtx := range list.txs {
got[i] = int(wtx.height)
}
sort.Ints(expected)
require.Equal(t, expected, got)
}
func TestWrappedTxList_Remove(t *testing.T) {
list := NewWrappedTxList(func(wtx1, wtx2 *WrappedTx) bool {
return wtx1.height >= wtx2.height
})
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
var txs []*WrappedTx
for i := 0; i < 100; i++ {
height := rng.Int63n(10000)
tx := &WrappedTx{height: height}
txs = append(txs, tx)
list.Insert(tx)
if i%10 == 0 {
tx = &WrappedTx{height: height}
list.Insert(tx)
txs = append(txs, tx)
}
}
// remove a tx that does not exist
list.Remove(&WrappedTx{height: 20000})
// remove a tx that exists (by height) but not referenced
list.Remove(&WrappedTx{height: txs[0].height})
// remove a few existing txs
for i := 0; i < 25; i++ {
j := rng.Intn(len(txs))
list.Remove(txs[j])
txs = append(txs[:j], txs[j+1:]...)
}
expected := make([]int, len(txs))
for i, tx := range txs {
expected[i] = int(tx.height)
}
got := make([]int, list.Size())
for i, wtx := range list.txs {
got[i] = int(wtx.height)
}
sort.Ints(expected)
require.Equal(t, expected, got)
}