mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-11 06:11:07 +00:00
This changes adds an `MempoolError` field to the `ResponseCheckTx`. This will allow clients to understand that their transaction was rejected from the mempool despite passing the ABCI check. This change also updates the code to make use of early returns to prevent highly nested code blocks. Namely, it returns when the type assertion fails at the beginning of the method, instead of wrapping the entire method in a large if statement. This has a somewhat large effect on the diff as rendered by github. addresses: #3546
528 lines
14 KiB
Go
528 lines
14 KiB
Go
package v1
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"os"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/tendermint/tendermint/abci/example/code"
|
|
"github.com/tendermint/tendermint/abci/example/kvstore"
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/internal/mempool"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
"github.com/tendermint/tendermint/proxy"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
// application extends the KV store application by overriding CheckTx to provide
|
|
// transaction priority based on the value in the key/value pair.
|
|
type application struct {
|
|
*kvstore.Application
|
|
}
|
|
|
|
type testTx struct {
|
|
tx types.Tx
|
|
priority int64
|
|
}
|
|
|
|
func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
|
|
var (
|
|
priority int64
|
|
sender string
|
|
)
|
|
|
|
// infer the priority from the raw transaction value (sender=key=value)
|
|
parts := bytes.Split(req.Tx, []byte("="))
|
|
if len(parts) == 3 {
|
|
v, err := strconv.ParseInt(string(parts[2]), 10, 64)
|
|
if err != nil {
|
|
return abci.ResponseCheckTx{
|
|
Priority: priority,
|
|
Code: 100,
|
|
GasWanted: 1,
|
|
}
|
|
}
|
|
|
|
priority = v
|
|
sender = string(parts[0])
|
|
} else {
|
|
return abci.ResponseCheckTx{
|
|
Priority: priority,
|
|
Code: 101,
|
|
GasWanted: 1,
|
|
}
|
|
}
|
|
|
|
return abci.ResponseCheckTx{
|
|
Priority: priority,
|
|
Sender: sender,
|
|
Code: code.CodeTypeOK,
|
|
GasWanted: 1,
|
|
}
|
|
}
|
|
|
|
func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool {
|
|
t.Helper()
|
|
|
|
app := &application{kvstore.NewApplication()}
|
|
cc := proxy.NewLocalClientCreator(app)
|
|
|
|
cfg := config.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|"))
|
|
cfg.Mempool.CacheSize = cacheSize
|
|
|
|
appConnMem, err := cc.NewABCIClient()
|
|
require.NoError(t, err)
|
|
require.NoError(t, appConnMem.Start())
|
|
|
|
t.Cleanup(func() {
|
|
os.RemoveAll(cfg.RootDir)
|
|
require.NoError(t, appConnMem.Stop())
|
|
})
|
|
|
|
return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...)
|
|
}
|
|
|
|
func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
|
|
txs := make([]testTx, numTxs)
|
|
txInfo := mempool.TxInfo{SenderID: peerID}
|
|
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
|
for i := 0; i < numTxs; i++ {
|
|
prefix := make([]byte, 20)
|
|
_, err := rng.Read(prefix)
|
|
require.NoError(t, err)
|
|
|
|
priority := int64(rng.Intn(9999-1000) + 1000)
|
|
|
|
txs[i] = testTx{
|
|
tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)),
|
|
priority: priority,
|
|
}
|
|
require.NoError(t, txmp.CheckTx(context.Background(), txs[i].tx, nil, txInfo))
|
|
}
|
|
|
|
return txs
|
|
}
|
|
|
|
func TestTxMempool_TxsAvailable(t *testing.T) {
|
|
txmp := setup(t, 0)
|
|
txmp.EnableTxsAvailable()
|
|
|
|
ensureNoTxFire := func() {
|
|
timer := time.NewTimer(500 * time.Millisecond)
|
|
select {
|
|
case <-txmp.TxsAvailable():
|
|
require.Fail(t, "unexpected transactions event")
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
|
|
ensureTxFire := func() {
|
|
timer := time.NewTimer(500 * time.Millisecond)
|
|
select {
|
|
case <-txmp.TxsAvailable():
|
|
case <-timer.C:
|
|
require.Fail(t, "expected transactions event")
|
|
}
|
|
}
|
|
|
|
// ensure no event as we have not executed any transactions yet
|
|
ensureNoTxFire()
|
|
|
|
// Execute CheckTx for some transactions and ensure TxsAvailable only fires
|
|
// once.
|
|
txs := checkTxs(t, txmp, 100, 0)
|
|
ensureTxFire()
|
|
ensureNoTxFire()
|
|
|
|
rawTxs := make([]types.Tx, len(txs))
|
|
for i, tx := range txs {
|
|
rawTxs[i] = tx.tx
|
|
}
|
|
|
|
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
// commit half the transactions and ensure we fire an event
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
|
|
txmp.Unlock()
|
|
ensureTxFire()
|
|
ensureNoTxFire()
|
|
|
|
// Execute CheckTx for more transactions and ensure we do not fire another
|
|
// event as we're still on the same height (1).
|
|
_ = checkTxs(t, txmp, 100, 0)
|
|
ensureNoTxFire()
|
|
}
|
|
|
|
func TestTxMempool_Size(t *testing.T) {
|
|
txmp := setup(t, 0)
|
|
txs := checkTxs(t, txmp, 100, 0)
|
|
require.Equal(t, len(txs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
|
|
rawTxs := make([]types.Tx, len(txs))
|
|
for i, tx := range txs {
|
|
rawTxs[i] = tx.tx
|
|
}
|
|
|
|
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
|
|
txmp.Unlock()
|
|
|
|
require.Equal(t, len(rawTxs)/2, txmp.Size())
|
|
require.Equal(t, int64(2850), txmp.SizeBytes())
|
|
}
|
|
|
|
func TestTxMempool_Flush(t *testing.T) {
|
|
txmp := setup(t, 0)
|
|
txs := checkTxs(t, txmp, 100, 0)
|
|
require.Equal(t, len(txs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
|
|
rawTxs := make([]types.Tx, len(txs))
|
|
for i, tx := range txs {
|
|
rawTxs[i] = tx.tx
|
|
}
|
|
|
|
responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50]))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil))
|
|
txmp.Unlock()
|
|
|
|
txmp.Flush()
|
|
require.Zero(t, txmp.Size())
|
|
require.Equal(t, int64(0), txmp.SizeBytes())
|
|
}
|
|
|
|
func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) {
|
|
txmp := setup(t, 0)
|
|
tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
|
|
txMap := make(map[[mempool.TxKeySize]byte]testTx)
|
|
priorities := make([]int64, len(tTxs))
|
|
for i, tTx := range tTxs {
|
|
txMap[mempool.TxKey(tTx.tx)] = tTx
|
|
priorities[i] = tTx.priority
|
|
}
|
|
|
|
sort.Slice(priorities, func(i, j int) bool {
|
|
// sort by priority, i.e. decreasing order
|
|
return priorities[i] > priorities[j]
|
|
})
|
|
|
|
ensurePrioritized := func(reapedTxs types.Txs) {
|
|
reapedPriorities := make([]int64, len(reapedTxs))
|
|
for i, rTx := range reapedTxs {
|
|
reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
|
|
}
|
|
|
|
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
|
|
}
|
|
|
|
// reap by gas capacity only
|
|
reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, 50)
|
|
|
|
// reap by transaction bytes only
|
|
reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.GreaterOrEqual(t, len(reapedTxs), 16)
|
|
|
|
// Reap by both transaction bytes and gas, where the size yields 31 reaped
|
|
// transactions and the gas limit reaps 25 transactions.
|
|
reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, 25)
|
|
}
|
|
|
|
func TestTxMempool_ReapMaxTxs(t *testing.T) {
|
|
txmp := setup(t, 0)
|
|
tTxs := checkTxs(t, txmp, 100, 0)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
|
|
txMap := make(map[[mempool.TxKeySize]byte]testTx)
|
|
priorities := make([]int64, len(tTxs))
|
|
for i, tTx := range tTxs {
|
|
txMap[mempool.TxKey(tTx.tx)] = tTx
|
|
priorities[i] = tTx.priority
|
|
}
|
|
|
|
sort.Slice(priorities, func(i, j int) bool {
|
|
// sort by priority, i.e. decreasing order
|
|
return priorities[i] > priorities[j]
|
|
})
|
|
|
|
ensurePrioritized := func(reapedTxs types.Txs) {
|
|
reapedPriorities := make([]int64, len(reapedTxs))
|
|
for i, rTx := range reapedTxs {
|
|
reapedPriorities[i] = txMap[mempool.TxKey(rTx)].priority
|
|
}
|
|
|
|
require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities)
|
|
}
|
|
|
|
// reap all transactions
|
|
reapedTxs := txmp.ReapMaxTxs(-1)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, len(tTxs))
|
|
|
|
// reap a single transaction
|
|
reapedTxs = txmp.ReapMaxTxs(1)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, 1)
|
|
|
|
// reap half of the transactions
|
|
reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2)
|
|
ensurePrioritized(reapedTxs)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, int64(5690), txmp.SizeBytes())
|
|
require.Len(t, reapedTxs, len(tTxs)/2)
|
|
}
|
|
|
|
func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
|
|
txmp := setup(t, 0)
|
|
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
tx := make([]byte, txmp.config.MaxTxBytes+1)
|
|
_, err := rng.Read(tx)
|
|
require.NoError(t, err)
|
|
|
|
require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: 0}))
|
|
|
|
tx = make([]byte, txmp.config.MaxTxBytes-1)
|
|
_, err = rng.Read(tx)
|
|
require.NoError(t, err)
|
|
|
|
require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: 0}))
|
|
}
|
|
|
|
func TestTxMempool_CheckTxSamePeer(t *testing.T) {
|
|
txmp := setup(t, 100)
|
|
peerID := uint16(1)
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
|
prefix := make([]byte, 20)
|
|
_, err := rng.Read(prefix)
|
|
require.NoError(t, err)
|
|
|
|
tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50))
|
|
|
|
require.NoError(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
|
|
require.Error(t, txmp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{SenderID: peerID}))
|
|
}
|
|
|
|
func TestTxMempool_CheckTxSameSender(t *testing.T) {
|
|
txmp := setup(t, 100)
|
|
peerID := uint16(1)
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
|
|
prefix1 := make([]byte, 20)
|
|
_, err := rng.Read(prefix1)
|
|
require.NoError(t, err)
|
|
|
|
prefix2 := make([]byte, 20)
|
|
_, err = rng.Read(prefix2)
|
|
require.NoError(t, err)
|
|
|
|
tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50))
|
|
tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50))
|
|
|
|
require.NoError(t, txmp.CheckTx(context.Background(), tx1, nil, mempool.TxInfo{SenderID: peerID}))
|
|
require.Equal(t, 1, txmp.Size())
|
|
require.NoError(t, txmp.CheckTx(context.Background(), tx2, nil, mempool.TxInfo{SenderID: peerID}))
|
|
require.Equal(t, 1, txmp.Size())
|
|
}
|
|
|
|
func TestTxMempool_ConcurrentTxs(t *testing.T) {
|
|
txmp := setup(t, 100)
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
checkTxDone := make(chan struct{})
|
|
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
for i := 0; i < 20; i++ {
|
|
_ = checkTxs(t, txmp, 100, 0)
|
|
dur := rng.Intn(1000-500) + 500
|
|
time.Sleep(time.Duration(dur) * time.Millisecond)
|
|
}
|
|
|
|
wg.Done()
|
|
close(checkTxDone)
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
defer wg.Done()
|
|
|
|
var height int64 = 1
|
|
|
|
for range ticker.C {
|
|
reapedTxs := txmp.ReapMaxTxs(200)
|
|
if len(reapedTxs) > 0 {
|
|
responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
|
|
for i := 0; i < len(responses); i++ {
|
|
var code uint32
|
|
|
|
if i%10 == 0 {
|
|
code = 100
|
|
} else {
|
|
code = abci.CodeTypeOK
|
|
}
|
|
|
|
responses[i] = &abci.ResponseDeliverTx{Code: code}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil))
|
|
txmp.Unlock()
|
|
|
|
height++
|
|
} else {
|
|
// only return once we know we finished the CheckTx loop
|
|
select {
|
|
case <-checkTxDone:
|
|
return
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
wg.Wait()
|
|
require.Zero(t, txmp.Size())
|
|
require.Zero(t, txmp.SizeBytes())
|
|
}
|
|
|
|
func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) {
|
|
txmp := setup(t, 500)
|
|
txmp.height = 100
|
|
txmp.config.TTLNumBlocks = 10
|
|
|
|
tTxs := checkTxs(t, txmp, 100, 0)
|
|
require.Equal(t, len(tTxs), txmp.Size())
|
|
require.Equal(t, 100, txmp.heightIndex.Size())
|
|
|
|
// reap 5 txs at the next height -- no txs should expire
|
|
reapedTxs := txmp.ReapMaxTxs(5)
|
|
responses := make([]*abci.ResponseDeliverTx, len(reapedTxs))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil))
|
|
txmp.Unlock()
|
|
|
|
require.Equal(t, 95, txmp.Size())
|
|
require.Equal(t, 95, txmp.heightIndex.Size())
|
|
|
|
// check more txs at height 101
|
|
_ = checkTxs(t, txmp, 50, 1)
|
|
require.Equal(t, 145, txmp.Size())
|
|
require.Equal(t, 145, txmp.heightIndex.Size())
|
|
|
|
// Reap 5 txs at a height that would expire all the transactions from before
|
|
// the previous Update (height 100).
|
|
//
|
|
// NOTE: When we reap txs below, we do not know if we're picking txs from the
|
|
// initial CheckTx calls or from the second round of CheckTx calls. Thus, we
|
|
// cannot guarantee that all 95 txs are remaining that should be expired and
|
|
// removed. However, we do know that that at most 95 txs can be expired and
|
|
// removed.
|
|
reapedTxs = txmp.ReapMaxTxs(5)
|
|
responses = make([]*abci.ResponseDeliverTx, len(reapedTxs))
|
|
for i := 0; i < len(responses); i++ {
|
|
responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK}
|
|
}
|
|
|
|
txmp.Lock()
|
|
require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil))
|
|
txmp.Unlock()
|
|
|
|
require.GreaterOrEqual(t, txmp.Size(), 45)
|
|
require.GreaterOrEqual(t, txmp.heightIndex.Size(), 45)
|
|
}
|
|
|
|
func TestTxMempool_CheckTxPostCheckError(t *testing.T) {
|
|
cases := []struct {
|
|
name string
|
|
err error
|
|
}{
|
|
{
|
|
name: "error",
|
|
err: errors.New("test error"),
|
|
},
|
|
{
|
|
name: "no error",
|
|
err: nil,
|
|
},
|
|
}
|
|
for _, tc := range cases {
|
|
testCase := tc
|
|
t.Run(testCase.name, func(t *testing.T) {
|
|
postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error {
|
|
return testCase.err
|
|
}
|
|
txmp := setup(t, 0, WithPostCheck(postCheckFn))
|
|
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
|
|
tx := make([]byte, txmp.config.MaxTxBytes-1)
|
|
_, err := rng.Read(tx)
|
|
require.NoError(t, err)
|
|
|
|
callback := func(res *abci.Response) {
|
|
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
|
|
require.True(t, ok)
|
|
expectedErrString := ""
|
|
if testCase.err != nil {
|
|
expectedErrString = testCase.err.Error()
|
|
}
|
|
require.Equal(t, expectedErrString, checkTxRes.CheckTx.MempoolError)
|
|
}
|
|
require.NoError(t, txmp.CheckTx(context.Background(), tx, callback, mempool.TxInfo{SenderID: 0}))
|
|
})
|
|
}
|
|
}
|