mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-03 02:22:04 +00:00
This is a manual forward-port of #8944 and related fixes from v0.35.x. One difference of note is that the CheckTx response messages no longer have a field to record an error from the ABCI application. The code is set up so that these could be reported directly to the CheckTx caller, but it would be an API change, and right now a bunch of test plumbing depends on the existing semantics. Also fix up tests relying on implementation-specific mempool behavior. - Commit was setting the expected mempool size incorrectly. - Fix sequence test not to depend on the incorrect size.
726 lines
20 KiB
Go
726 lines
20 KiB
Go
package mempool
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
abciclient "github.com/tendermint/tendermint/abci/client"
|
|
"github.com/tendermint/tendermint/abci/example/code"
|
|
"github.com/tendermint/tendermint/abci/example/kvstore"
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
// application extends the KV store application by overriding CheckTx to provide
|
|
// transaction priority based on the value in the key/value pair.
|
|
type application struct {
|
|
*kvstore.Application
|
|
}
|
|
|
|
type testTx struct {
|
|
tx types.Tx
|
|
priority int64
|
|
}
|
|
|
|
func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*abci.ResponseCheckTx, error) {
|
|
var (
|
|
priority int64
|
|
sender string
|
|
)
|
|
|
|
// infer the priority from the raw transaction value (sender=key=value)
|
|
parts := bytes.Split(req.Tx, []byte("="))
|
|
if len(parts) == 3 {
|
|
v, err := strconv.ParseInt(string(parts[2]), 10, 64)
|
|
if err != nil {
|
|
return &abci.ResponseCheckTx{
|
|
Priority: priority,
|
|
Code: 100,
|
|
GasWanted: 1,
|
|
}, nil
|
|
}
|
|
|
|
priority = v
|
|
sender = string(parts[0])
|
|
} else {
|
|
return &abci.ResponseCheckTx{
|
|
Priority: priority,
|
|
Code: 101,
|
|
GasWanted: 1,
|
|
}, nil
|
|
}
|
|
|
|
return &abci.ResponseCheckTx{
|
|
Priority: priority,
|
|
Sender: sender,
|
|
Code: code.CodeTypeOK,
|
|
GasWanted: 1,
|
|
}, nil
|
|
}
|
|
|
|
func setup(t testing.TB, app abciclient.Client, cacheSize int, options ...TxMempoolOption) *TxMempool {
|
|
t.Helper()
|
|
|
|
logger := log.NewNopLogger()
|
|
|
|
cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
|
|
require.NoError(t, err)
|
|
cfg.Mempool.CacheSize = cacheSize
|
|
|
|
t.Cleanup(func() { os.RemoveAll(cfg.RootDir) })
|
|
|
|
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, app, options...)
|
|
}
|
|
|
|
// mustCheckTx invokes txmp.CheckTx for the given transaction and waits until
|
|
// its callback has finished executing. It fails t if CheckTx fails.
|
|
func mustCheckTx(ctx context.Context, t *testing.T, txmp *TxMempool, spec string) {
|
|
done := make(chan struct{})
|
|
if err := txmp.CheckTx(ctx, []byte(spec), func(*abci.ResponseCheckTx) {
|
|
close(done)
|
|
}, TxInfo{}); err != nil {
|
|
t.Fatalf("CheckTx for %q failed: %v", spec, err)
|
|
}
|
|
<-done
|
|
}
|
|
|
|
func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
|
|
txs := make([]testTx, numTxs)
|
|
txInfo := TxInfo{SenderID: peerID}
|
|
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
|
for i := 0; i < numTxs; i++ {
|
|
prefix := make([]byte, 20)
|
|
_, err := rng.Read(prefix)
|
|
require.NoError(t, err)
|
|
|
|
priority := int64(rng.Intn(9999-1000) + 1000)
|
|
|
|
txs[i] = testTx{
|
|
tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)),
|
|
priority: priority,
|
|
}
|
|
require.NoError(t, txmp.CheckTx(ctx, txs[i].tx, nil, txInfo))
|
|
}
|
|
|
|
return txs
|
|
}
|
|
|
|
func convertTex(in []testTx) types.Txs {
|
|
out := make([]types.Tx, len(in))
|
|
|
|
for idx := range in {
|
|
out[idx] = in[idx].tx
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
func TestTxMempool_TxsAvailable(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 0)
|
|
txmp.EnableTxsAvailable()
|
|
|
|
ensureNoTxFire := func() {
|
|
timer := time.NewTimer(500 * time.Millisecond)
|
|
select {
|
|
case <-txmp.TxsAvailable():
|
|
require.Fail(t, "unexpected transactions event")
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
|
|
ensureTxFire := func() {
|
|
timer := time.NewTimer(500 * time.Millisecond)
|
|
select {
|
|
case <-txmp.TxsAvailable():
|
|
case <-timer.C:
|
|
require.Fail(t, "expected transactions event")
|
|
}
|
|
}
|
|
|
|
// ensure no event as we have not executed any transactions yet
|
|
ensureNoTxFire()
|
|
|
|
// Execute CheckTx for some transactions and ensure TxsAvailable only fires
|
|
// once.
|
|
txs := checkTxs(ctx, t, txmp, 100, 0)
|
|
ensureTxFire()
|
|
ensureNoTxFire()
|
|
|
|
rawTxs := make([]types.Tx, len(txs))
|
|
for i, tx := range txs {
|
|
rawTxs[i] = tx.tx
|
|
}
|
|
|
|
responses := make([]*abci.ExecTxResult, len(rawTxs[:50]))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ExecTxResult{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
// commit half the transactions and ensure we fire an event
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(ctx, 1, rawTxs[:50], responses, nil, nil, true))
|
|
txmp.Unlock()
|
|
ensureTxFire()
|
|
ensureNoTxFire()
|
|
|
|
// Execute CheckTx for more transactions and ensure we do not fire another
|
|
// event as we're still on the same height (1).
|
|
_ = checkTxs(ctx, t, txmp, 100, 0)
|
|
ensureNoTxFire()
|
|
}
|
|
|
|
func TestTxMempool_Size(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 0)
|
|
txs := checkTxs(ctx, t, txmp, 100, 0)
|
|
require.Equal(t, len(txs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
|
|
rawTxs := make([]types.Tx, len(txs))
|
|
for i, tx := range txs {
|
|
rawTxs[i] = tx.tx
|
|
}
|
|
|
|
responses := make([]*abci.ExecTxResult, len(rawTxs[:50]))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ExecTxResult{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(ctx, 1, rawTxs[:50], responses, nil, nil, true))
|
|
txmp.Unlock()
|
|
|
|
require.Equal(t, len(rawTxs)/2, txmp.Size())
|
|
require.Equal(t, int64(2850), txmp.SizeBytes())
|
|
}
|
|
|
|
func TestTxMempool_Eviction(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 1000)
|
|
txmp.config.Size = 5
|
|
txmp.config.MaxTxsBytes = 60
|
|
txExists := func(spec string) bool {
|
|
txmp.Lock()
|
|
defer txmp.Unlock()
|
|
key := types.Tx(spec).Key()
|
|
_, ok := txmp.txByKey[key]
|
|
return ok
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
// A transaction bigger than the mempool should be rejected even when there
|
|
// are slots available.
|
|
mustCheckTx(ctx, t, txmp, "big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1")
|
|
require.Equal(t, 0, txmp.Size())
|
|
|
|
// Nearly-fill the mempool with a low-priority transaction, to show that it
|
|
// is evicted even when slots are available for a higher-priority tx.
|
|
const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2"
|
|
mustCheckTx(ctx, t, txmp, bigTx)
|
|
require.Equal(t, 1, txmp.Size()) // bigTx is the only element
|
|
require.True(t, txExists(bigTx))
|
|
require.Equal(t, int64(len(bigTx)), txmp.SizeBytes())
|
|
|
|
// The next transaction should evict bigTx, because it is higher priority
|
|
// but does not fit on size.
|
|
mustCheckTx(ctx, t, txmp, "key1=0000=25")
|
|
require.True(t, txExists("key1=0000=25"))
|
|
require.False(t, txExists(bigTx))
|
|
require.False(t, txmp.cache.Has([]byte(bigTx)))
|
|
require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes())
|
|
|
|
// Now fill up the rest of the slots with other transactions.
|
|
mustCheckTx(ctx, t, txmp, "key2=0001=5")
|
|
mustCheckTx(ctx, t, txmp, "key3=0002=10")
|
|
mustCheckTx(ctx, t, txmp, "key4=0003=3")
|
|
mustCheckTx(ctx, t, txmp, "key5=0004=3")
|
|
|
|
// A new transaction with low priority should be discarded.
|
|
mustCheckTx(ctx, t, txmp, "key6=0005=1")
|
|
require.False(t, txExists("key6=0005=1"))
|
|
|
|
// A new transaction with higher priority should evict key5, which is the
|
|
// newest of the two transactions with lowest priority.
|
|
mustCheckTx(ctx, t, txmp, "key7=0006=7")
|
|
require.True(t, txExists("key7=0006=7")) // new transaction added
|
|
require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted
|
|
require.True(t, txExists("key4=0003=3")) // older low-priority tx retained
|
|
|
|
// Another new transaction evicts the other low-priority element.
|
|
mustCheckTx(ctx, t, txmp, "key8=0007=20")
|
|
require.True(t, txExists("key8=0007=20"))
|
|
require.False(t, txExists("key4=0003=3"))
|
|
|
|
// Now the lowest-priority tx is 5, so that should be the next to go.
|
|
mustCheckTx(ctx, t, txmp, "key9=0008=9")
|
|
require.True(t, txExists("key9=0008=9"))
|
|
require.False(t, txExists("k3y2=0001=5"))
|
|
|
|
// Add a transaction that requires eviction of multiple lower-priority
|
|
// entries, in order to fit the size of the element.
|
|
mustCheckTx(ctx, t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11
|
|
require.True(t, txExists("key1=0000=25"))
|
|
require.True(t, txExists("key8=0007=20"))
|
|
require.True(t, txExists("key10=0123456789abcdef=11"))
|
|
require.False(t, txExists("key3=0002=10"))
|
|
require.False(t, txExists("key9=0008=9"))
|
|
require.False(t, txExists("key7=0006=7"))
|
|
}
|
|
|
|
func TestTxMempool_Flush(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 0)
|
|
txs := checkTxs(ctx, t, txmp, 100, 0)
|
|
require.Equal(t, len(txs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
|
|
rawTxs := make([]types.Tx, len(txs))
|
|
for i, tx := range txs {
|
|
rawTxs[i] = tx.tx
|
|
}
|
|
|
|
responses := make([]*abci.ExecTxResult, len(rawTxs[:50]))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ExecTxResult{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(ctx, 1, rawTxs[:50], responses, nil, nil, true))
|
|
txmp.Unlock()
|
|
|
|
txmp.Flush()
|
|
require.Zero(t, txmp.Size())
|
|
require.Equal(t, int64(0), txmp.SizeBytes())
|
|
}
|
|
|
|
func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 0)
|
|
tTxs := checkTxs(ctx, t, txmp, 100, 0) // all txs request 1 gas unit
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
|
|
txMap := make(map[types.TxKey]testTx)
|
|
priorities := make([]int64, len(tTxs))
|
|
for i, tTx := range tTxs {
|
|
txMap[tTx.tx.Key()] = tTx
|
|
priorities[i] = tTx.priority
|
|
}
|
|
|
|
sort.Slice(priorities, func(i, j int) bool {
|
|
// sort by priority, i.e. decreasing order
|
|
return priorities[i] > priorities[j]
|
|
})
|
|
|
|
ensurePrioritized := func(reapedTxs types.Txs) {
|
|
reapedPriorities := make([]int64, len(reapedTxs))
|
|
for i, rTx := range reapedTxs {
|
|
reapedPriorities[i] = txMap[rTx.Key()].priority
|
|
}
|
|
|
|
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
|
|
}
|
|
|
|
// reap by gas capacity only
|
|
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, 50)
|
|
|
|
// reap by transaction bytes only
|
|
reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.GreaterOrEqual(t, len(reapedTxs), 16)
|
|
|
|
// Reap by both transaction bytes and gas, where the size yields 31 reaped
|
|
// transactions and the gas limit reaps 25 transactions.
|
|
reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, 25)
|
|
}
|
|
|
|
func TestTxMempool_ReapMaxTxs(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 0)
|
|
tTxs := checkTxs(ctx, t, txmp, 100, 0)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
|
|
txMap := make(map[types.TxKey]testTx)
|
|
priorities := make([]int64, len(tTxs))
|
|
for i, tTx := range tTxs {
|
|
txMap[tTx.tx.Key()] = tTx
|
|
priorities[i] = tTx.priority
|
|
}
|
|
|
|
sort.Slice(priorities, func(i, j int) bool {
|
|
// sort by priority, i.e. decreasing order
|
|
return priorities[i] > priorities[j]
|
|
})
|
|
|
|
ensurePrioritized := func(reapedTxs types.Txs) {
|
|
reapedPriorities := make([]int64, len(reapedTxs))
|
|
for i, rTx := range reapedTxs {
|
|
reapedPriorities[i] = txMap[rTx.Key()].priority
|
|
}
|
|
|
|
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
|
|
}
|
|
|
|
// reap all transactions
|
|
reapedTxs := txmp.ReapMaxTxs(-1)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, len(tTxs))
|
|
|
|
// reap a single transaction
|
|
reapedTxs = txmp.ReapMaxTxs(1)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, 1)
|
|
|
|
// reap half of the transactions
|
|
reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, len(tTxs)/2)
|
|
}
|
|
|
|
func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
txmp := setup(t, client, 0)
|
|
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
tx := make([]byte, txmp.config.MaxTxBytes+1)
|
|
_, err := rng.Read(tx)
|
|
require.NoError(t, err)
|
|
|
|
require.Error(t, txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: 0}))
|
|
|
|
tx = make([]byte, txmp.config.MaxTxBytes-1)
|
|
_, err = rng.Read(tx)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: 0}))
|
|
}
|
|
|
|
func TestTxMempool_CheckTxSamePeer(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 100)
|
|
peerID := uint16(1)
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
|
prefix := make([]byte, 20)
|
|
_, err := rng.Read(prefix)
|
|
require.NoError(t, err)
|
|
|
|
tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50))
|
|
|
|
require.NoError(t, txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: peerID}))
|
|
require.Error(t, txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: peerID}))
|
|
}
|
|
|
|
func TestTxMempool_CheckTxSameSender(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 100)
|
|
peerID := uint16(1)
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
|
prefix1 := make([]byte, 20)
|
|
_, err := rng.Read(prefix1)
|
|
require.NoError(t, err)
|
|
|
|
prefix2 := make([]byte, 20)
|
|
_, err = rng.Read(prefix2)
|
|
require.NoError(t, err)
|
|
|
|
tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50))
|
|
tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50))
|
|
|
|
require.NoError(t, txmp.CheckTx(ctx, tx1, nil, TxInfo{SenderID: peerID}))
|
|
require.Equal(t, 1, txmp.Size())
|
|
require.NoError(t, txmp.CheckTx(ctx, tx2, nil, TxInfo{SenderID: peerID}))
|
|
require.Equal(t, 1, txmp.Size())
|
|
}
|
|
|
|
func TestTxMempool_ConcurrentTxs(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 100)
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
checkTxDone := make(chan struct{})
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
for i := 0; i < 20; i++ {
|
|
_ = checkTxs(ctx, t, txmp, 100, 0)
|
|
dur := rng.Intn(1000-500) + 500
|
|
time.Sleep(time.Duration(dur) * time.Millisecond)
|
|
}
|
|
|
|
wg.Done()
|
|
close(checkTxDone)
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
defer wg.Done()
|
|
|
|
var height int64 = 1
|
|
|
|
for range ticker.C {
|
|
reapedTxs := txmp.ReapMaxTxs(200)
|
|
if len(reapedTxs) > 0 {
|
|
responses := make([]*abci.ExecTxResult, len(reapedTxs))
|
|
for i := 0; i < len(responses); i++ {
|
|
var code uint32
|
|
|
|
if i%10 == 0 {
|
|
code = 100
|
|
} else {
|
|
code = abci.CodeTypeOK
|
|
}
|
|
|
|
responses[i] = &abci.ExecTxResult{Code: code}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(ctx, height, reapedTxs, responses, nil, nil, true))
|
|
txmp.Unlock()
|
|
|
|
height++
|
|
} else {
|
|
// only return once we know we finished the CheckTx loop
|
|
select {
|
|
case <-checkTxDone:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
require.Zero(t, txmp.Size())
|
|
require.Zero(t, txmp.SizeBytes())
|
|
}
|
|
|
|
func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
txmp := setup(t, client, 500)
|
|
txmp.height = 100
|
|
txmp.config.TTLNumBlocks = 10
|
|
|
|
tTxs := checkTxs(ctx, t, txmp, 100, 0)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
|
|
// reap 5 txs at the next height -- no txs should expire
|
|
reapedTxs := txmp.ReapMaxTxs(5)
|
|
responses := make([]*abci.ExecTxResult, len(reapedTxs))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ExecTxResult{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(ctx, txmp.height+1, reapedTxs, responses, nil, nil, true))
|
|
txmp.Unlock()
|
|
|
|
require.Equal(t, 95, txmp.Size())
|
|
|
|
// check more txs at height 101
|
|
_ = checkTxs(ctx, t, txmp, 50, 1)
|
|
require.Equal(t, 145, txmp.Size())
|
|
|
|
// Reap 5 txs at a height that would expire all the transactions from before
|
|
// the previous Update (height 100).
|
|
//
|
|
// NOTE: When we reap txs below, we do not know if we're picking txs from the
|
|
// initial CheckTx calls or from the second round of CheckTx calls. Thus, we
|
|
// cannot guarantee that all 95 txs are remaining that should be expired and
|
|
// removed. However, we do know that that at most 95 txs can be expired and
|
|
// removed.
|
|
reapedTxs = txmp.ReapMaxTxs(5)
|
|
responses = make([]*abci.ExecTxResult, len(reapedTxs))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ExecTxResult{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(ctx, txmp.height+10, reapedTxs, responses, nil, nil, true))
|
|
txmp.Unlock()
|
|
|
|
require.GreaterOrEqual(t, txmp.Size(), 45)
|
|
}
|
|
|
|
func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
cases := []struct {
|
|
name string
|
|
err error
|
|
}{
|
|
{
|
|
name: "error",
|
|
err: errors.New("test error"),
|
|
},
|
|
{
|
|
name: "no error",
|
|
err: nil,
|
|
},
|
|
}
|
|
for _, tc := range cases {
|
|
testCase := tc
|
|
t.Run(testCase.name, func(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
|
|
if err := client.Start(ctx); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
t.Cleanup(client.Wait)
|
|
|
|
postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error {
|
|
return testCase.err
|
|
}
|
|
txmp := setup(t, client, 0, WithPostCheck(postCheckFn))
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
tx := make([]byte, txmp.config.MaxTxBytes-1)
|
|
_, err := rng.Read(tx)
|
|
require.NoError(t, err)
|
|
|
|
callback := func(res *abci.ResponseCheckTx) {
|
|
expectedErrString := ""
|
|
if testCase.err != nil {
|
|
expectedErrString = testCase.err.Error()
|
|
require.Equal(t, expectedErrString, txmp.postCheck(tx, res).Error())
|
|
} else {
|
|
require.Equal(t, nil, txmp.postCheck(tx, res))
|
|
}
|
|
}
|
|
if testCase.err == nil {
|
|
require.NoError(t, txmp.CheckTx(ctx, tx, callback, TxInfo{SenderID: 0}))
|
|
} else {
|
|
err = txmp.CheckTx(ctx, tx, callback, TxInfo{SenderID: 0})
|
|
require.EqualError(t, err, "test error")
|
|
}
|
|
})
|
|
}
|
|
}
|