package v1 import ( "bytes" "errors" "fmt" "math/rand" "os" "sort" "strconv" "strings" "sync" "testing" "time" "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/example/code" "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/internal/test" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) // application extends the KV store application by overriding CheckTx to provide // transaction priority based on the value in the key/value pair. type application struct { *kvstore.Application } type testTx struct { tx types.Tx priority int64 } func (app *application) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { var ( priority int64 sender string ) // infer the priority from the raw transaction value (sender=key=value) parts := bytes.Split(req.Tx, []byte("=")) if len(parts) == 3 { v, err := strconv.ParseInt(string(parts[2]), 10, 64) if err != nil { return abci.ResponseCheckTx{ Priority: priority, Code: 100, GasWanted: 1, } } priority = v sender = string(parts[0]) } else { return abci.ResponseCheckTx{ Priority: priority, Code: 101, GasWanted: 1, } } return abci.ResponseCheckTx{ Priority: priority, Sender: sender, Code: code.CodeTypeOK, GasWanted: 1, } } func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool { t.Helper() app := &application{kvstore.NewApplication()} cc := proxy.NewLocalClientCreator(app) cfg := test.ResetTestRoot(strings.ReplaceAll(t.Name(), "/", "|")) cfg.Mempool.CacheSize = cacheSize appConnMem, err := cc.NewABCIClient() require.NoError(t, err) require.NoError(t, appConnMem.Start()) t.Cleanup(func() { os.RemoveAll(cfg.RootDir) require.NoError(t, appConnMem.Stop()) }) return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...) } // mustCheckTx invokes txmp.CheckTx for the given transaction and waits until // its callback has finished executing. It fails t if CheckTx fails. func mustCheckTx(t *testing.T, txmp *TxMempool, spec string) { done := make(chan struct{}) if err := txmp.CheckTx([]byte(spec), func(*abci.Response) { close(done) }, mempool.TxInfo{}); err != nil { t.Fatalf("CheckTx for %q failed: %v", spec, err) } <-done } func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { txs := make([]testTx, numTxs) txInfo := mempool.TxInfo{SenderID: peerID} rng := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i < numTxs; i++ { prefix := make([]byte, 20) _, err := rng.Read(prefix) require.NoError(t, err) priority := int64(rng.Intn(9999-1000) + 1000) txs[i] = testTx{ tx: []byte(fmt.Sprintf("sender-%d-%d=%X=%d", i, peerID, prefix, priority)), priority: priority, } require.NoError(t, txmp.CheckTx(txs[i].tx, nil, txInfo)) } return txs } func TestTxMempool_TxsAvailable(t *testing.T) { txmp := setup(t, 0) txmp.EnableTxsAvailable() ensureNoTxFire := func() { timer := time.NewTimer(500 * time.Millisecond) select { case <-txmp.TxsAvailable(): require.Fail(t, "unexpected transactions event") case <-timer.C: } } ensureTxFire := func() { timer := time.NewTimer(500 * time.Millisecond) select { case <-txmp.TxsAvailable(): case <-timer.C: require.Fail(t, "expected transactions event") } } // ensure no event as we have not executed any transactions yet ensureNoTxFire() // Execute CheckTx for some transactions and ensure TxsAvailable only fires // once. txs := checkTxs(t, txmp, 100, 0) ensureTxFire() ensureNoTxFire() rawTxs := make([]types.Tx, len(txs)) for i, tx := range txs { rawTxs[i] = tx.tx } responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) for i := 0; i < len(responses); i++ { responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} } // commit half the transactions and ensure we fire an event txmp.Lock() require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) txmp.Unlock() ensureTxFire() ensureNoTxFire() // Execute CheckTx for more transactions and ensure we do not fire another // event as we're still on the same height (1). _ = checkTxs(t, txmp, 100, 0) ensureNoTxFire() } func TestTxMempool_Size(t *testing.T) { txmp := setup(t, 0) txs := checkTxs(t, txmp, 100, 0) require.Equal(t, len(txs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) rawTxs := make([]types.Tx, len(txs)) for i, tx := range txs { rawTxs[i] = tx.tx } responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) for i := 0; i < len(responses); i++ { responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} } txmp.Lock() require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) txmp.Unlock() require.Equal(t, len(rawTxs)/2, txmp.Size()) require.Equal(t, int64(2850), txmp.SizeBytes()) } func TestTxMempool_Eviction(t *testing.T) { txmp := setup(t, 1000) txmp.config.Size = 5 txmp.config.MaxTxsBytes = 60 txExists := func(spec string) bool { txmp.Lock() defer txmp.Unlock() key := types.Tx(spec).Key() _, ok := txmp.txByKey[key] return ok } // A transaction bigger than the mempool should be rejected even when there // are slots available. mustCheckTx(t, txmp, "big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1") require.Equal(t, 0, txmp.Size()) // Nearly-fill the mempool with a low-priority transaction, to show that it // is evicted even when slots are available for a higher-priority tx. const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2" mustCheckTx(t, txmp, bigTx) require.Equal(t, 1, txmp.Size()) // bigTx is the only element require.True(t, txExists(bigTx)) require.Equal(t, int64(len(bigTx)), txmp.SizeBytes()) // The next transaction should evict bigTx, because it is higher priority // but does not fit on size. mustCheckTx(t, txmp, "key1=0000=25") require.True(t, txExists("key1=0000=25")) require.False(t, txExists(bigTx)) require.False(t, txmp.cache.Has([]byte(bigTx))) require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) // Now fill up the rest of the slots with other transactions. mustCheckTx(t, txmp, "key2=0001=5") mustCheckTx(t, txmp, "key3=0002=10") mustCheckTx(t, txmp, "key4=0003=3") mustCheckTx(t, txmp, "key5=0004=3") // A new transaction with low priority should be discarded. mustCheckTx(t, txmp, "key6=0005=1") require.False(t, txExists("key6=0005=1")) // A new transaction with higher priority should evict key5, which is the // newest of the two transactions with lowest priority. mustCheckTx(t, txmp, "key7=0006=7") require.True(t, txExists("key7=0006=7")) // new transaction added require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted require.True(t, txExists("key4=0003=3")) // older low-priority tx retained // Another new transaction evicts the other low-priority element. mustCheckTx(t, txmp, "key8=0007=20") require.True(t, txExists("key8=0007=20")) require.False(t, txExists("key4=0003=3")) // Now the lowest-priority tx is 5, so that should be the next to go. mustCheckTx(t, txmp, "key9=0008=9") require.True(t, txExists("key9=0008=9")) require.False(t, txExists("k3y2=0001=5")) // Add a transaction that requires eviction of multiple lower-priority // entries, in order to fit the size of the element. mustCheckTx(t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11 require.True(t, txExists("key1=0000=25")) require.True(t, txExists("key8=0007=20")) require.True(t, txExists("key10=0123456789abcdef=11")) require.False(t, txExists("key3=0002=10")) require.False(t, txExists("key9=0008=9")) require.False(t, txExists("key7=0006=7")) } func TestTxMempool_Flush(t *testing.T) { txmp := setup(t, 0) txs := checkTxs(t, txmp, 100, 0) require.Equal(t, len(txs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) rawTxs := make([]types.Tx, len(txs)) for i, tx := range txs { rawTxs[i] = tx.tx } responses := make([]*abci.ResponseDeliverTx, len(rawTxs[:50])) for i := 0; i < len(responses); i++ { responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} } txmp.Lock() require.NoError(t, txmp.Update(1, rawTxs[:50], responses, nil, nil)) txmp.Unlock() txmp.Flush() require.Zero(t, txmp.Size()) require.Equal(t, int64(0), txmp.SizeBytes()) } func TestTxMempool_ReapMaxBytesMaxGas(t *testing.T) { txmp := setup(t, 0) tTxs := checkTxs(t, txmp, 100, 0) // all txs request 1 gas unit require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) txMap := make(map[types.TxKey]testTx) priorities := make([]int64, len(tTxs)) for i, tTx := range tTxs { txMap[tTx.tx.Key()] = tTx priorities[i] = tTx.priority } sort.Slice(priorities, func(i, j int) bool { // sort by priority, i.e. decreasing order return priorities[i] > priorities[j] }) ensurePrioritized := func(reapedTxs types.Txs) { reapedPriorities := make([]int64, len(reapedTxs)) for i, rTx := range reapedTxs { reapedPriorities[i] = txMap[rTx.Key()].priority } require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) } // reap by gas capacity only reapedTxs := txmp.ReapMaxBytesMaxGas(-1, 50) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) require.Len(t, reapedTxs, 50) // reap by transaction bytes only reapedTxs = txmp.ReapMaxBytesMaxGas(1000, -1) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) require.GreaterOrEqual(t, len(reapedTxs), 16) // Reap by both transaction bytes and gas, where the size yields 31 reaped // transactions and the gas limit reaps 25 transactions. reapedTxs = txmp.ReapMaxBytesMaxGas(1500, 30) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) require.Len(t, reapedTxs, 25) } func TestTxMempool_ReapMaxTxs(t *testing.T) { txmp := setup(t, 0) tTxs := checkTxs(t, txmp, 100, 0) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) txMap := make(map[types.TxKey]testTx) priorities := make([]int64, len(tTxs)) for i, tTx := range tTxs { txMap[tTx.tx.Key()] = tTx priorities[i] = tTx.priority } sort.Slice(priorities, func(i, j int) bool { // sort by priority, i.e. decreasing order return priorities[i] > priorities[j] }) ensurePrioritized := func(reapedTxs types.Txs) { reapedPriorities := make([]int64, len(reapedTxs)) for i, rTx := range reapedTxs { reapedPriorities[i] = txMap[rTx.Key()].priority } require.Equal(t, priorities[:len(reapedPriorities)], reapedPriorities) } // reap all transactions reapedTxs := txmp.ReapMaxTxs(-1) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) require.Len(t, reapedTxs, len(tTxs)) // reap a single transaction reapedTxs = txmp.ReapMaxTxs(1) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) require.Len(t, reapedTxs, 1) // reap half of the transactions reapedTxs = txmp.ReapMaxTxs(len(tTxs) / 2) ensurePrioritized(reapedTxs) require.Equal(t, len(tTxs), txmp.Size()) require.Equal(t, int64(5690), txmp.SizeBytes()) require.Len(t, reapedTxs, len(tTxs)/2) } func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) { txmp := setup(t, 0) rng := rand.New(rand.NewSource(time.Now().UnixNano())) tx := make([]byte, txmp.config.MaxTxBytes+1) _, err := rng.Read(tx) require.NoError(t, err) require.Error(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0})) tx = make([]byte, txmp.config.MaxTxBytes-1) _, err = rng.Read(tx) require.NoError(t, err) require.NoError(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: 0})) } func TestTxMempool_CheckTxSamePeer(t *testing.T) { txmp := setup(t, 100) peerID := uint16(1) rng := rand.New(rand.NewSource(time.Now().UnixNano())) prefix := make([]byte, 20) _, err := rng.Read(prefix) require.NoError(t, err) tx := []byte(fmt.Sprintf("sender-0=%X=%d", prefix, 50)) require.NoError(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: peerID})) require.Error(t, txmp.CheckTx(tx, nil, mempool.TxInfo{SenderID: peerID})) } func TestTxMempool_CheckTxSameSender(t *testing.T) { txmp := setup(t, 100) peerID := uint16(1) rng := rand.New(rand.NewSource(time.Now().UnixNano())) prefix1 := make([]byte, 20) _, err := rng.Read(prefix1) require.NoError(t, err) prefix2 := make([]byte, 20) _, err = rng.Read(prefix2) require.NoError(t, err) tx1 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix1, 50)) tx2 := []byte(fmt.Sprintf("sender-0=%X=%d", prefix2, 50)) require.NoError(t, txmp.CheckTx(tx1, nil, mempool.TxInfo{SenderID: peerID})) require.Equal(t, 1, txmp.Size()) require.NoError(t, txmp.CheckTx(tx2, nil, mempool.TxInfo{SenderID: peerID})) require.Equal(t, 1, txmp.Size()) } func TestTxMempool_ConcurrentTxs(t *testing.T) { txmp := setup(t, 100) rng := rand.New(rand.NewSource(time.Now().UnixNano())) checkTxDone := make(chan struct{}) var wg sync.WaitGroup wg.Add(1) go func() { for i := 0; i < 20; i++ { _ = checkTxs(t, txmp, 100, 0) dur := rng.Intn(1000-500) + 500 time.Sleep(time.Duration(dur) * time.Millisecond) } wg.Done() close(checkTxDone) }() wg.Add(1) go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() defer wg.Done() var height int64 = 1 for range ticker.C { reapedTxs := txmp.ReapMaxTxs(200) if len(reapedTxs) > 0 { responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) for i := 0; i < len(responses); i++ { var code uint32 if i%10 == 0 { code = 100 } else { code = abci.CodeTypeOK } responses[i] = &abci.ResponseDeliverTx{Code: code} } txmp.Lock() require.NoError(t, txmp.Update(height, reapedTxs, responses, nil, nil)) txmp.Unlock() height++ } else { // only return once we know we finished the CheckTx loop select { case <-checkTxDone: return default: } } } }() wg.Wait() require.Zero(t, txmp.Size()) require.Zero(t, txmp.SizeBytes()) } func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) { txmp := setup(t, 5000) txmp.config.TTLDuration = 5 * time.Millisecond added1 := checkTxs(t, txmp, 10, 0) require.Equal(t, len(added1), txmp.Size()) // Wait a while, then add some more transactions that should not be expired // when the first batch TTLs out. // // ms: 0 1 2 3 4 5 6 // ^ ^ ^ ^ // | | | +-- Update (triggers pruning) // | | +------ first batch expires // | +-------------- second batch added // +-------------------------- first batch added // // The exact intervals are not important except that the delta should be // large relative to the cost of CheckTx (ms vs. ns is fine here). time.Sleep(3 * time.Millisecond) added2 := checkTxs(t, txmp, 10, 1) // Wait a while longer, so that the first batch will expire. time.Sleep(3 * time.Millisecond) // Trigger an update so that pruning will occur. txmp.Lock() defer txmp.Unlock() require.NoError(t, txmp.Update(txmp.height+1, nil, nil, nil, nil)) // All the transactions in the original set should have been purged. for _, tx := range added1 { if _, ok := txmp.txByKey[tx.tx.Key()]; ok { t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key()) } if txmp.cache.Has(tx.tx) { t.Errorf("Transaction %X should have been removed from the cache", tx.tx.Key()) } } // All the transactions added later should still be around. for _, tx := range added2 { if _, ok := txmp.txByKey[tx.tx.Key()]; !ok { t.Errorf("Transaction %X should still be in the mempool, but is not", tx.tx.Key()) } } } func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp := setup(t, 500) txmp.height = 100 txmp.config.TTLNumBlocks = 10 tTxs := checkTxs(t, txmp, 100, 0) require.Equal(t, len(tTxs), txmp.Size()) // reap 5 txs at the next height -- no txs should expire reapedTxs := txmp.ReapMaxTxs(5) responses := make([]*abci.ResponseDeliverTx, len(reapedTxs)) for i := 0; i < len(responses); i++ { responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} } txmp.Lock() require.NoError(t, txmp.Update(txmp.height+1, reapedTxs, responses, nil, nil)) txmp.Unlock() require.Equal(t, 95, txmp.Size()) // check more txs at height 101 _ = checkTxs(t, txmp, 50, 1) require.Equal(t, 145, txmp.Size()) // Reap 5 txs at a height that would expire all the transactions from before // the previous Update (height 100). // // NOTE: When we reap txs below, we do not know if we're picking txs from the // initial CheckTx calls or from the second round of CheckTx calls. Thus, we // cannot guarantee that all 95 txs are remaining that should be expired and // removed. However, we do know that that at most 95 txs can be expired and // removed. reapedTxs = txmp.ReapMaxTxs(5) responses = make([]*abci.ResponseDeliverTx, len(reapedTxs)) for i := 0; i < len(responses); i++ { responses[i] = &abci.ResponseDeliverTx{Code: abci.CodeTypeOK} } txmp.Lock() require.NoError(t, txmp.Update(txmp.height+10, reapedTxs, responses, nil, nil)) txmp.Unlock() require.GreaterOrEqual(t, txmp.Size(), 45) } func TestTxMempool_CheckTxPostCheckError(t *testing.T) { cases := []struct { name string err error }{ { name: "error", err: errors.New("test error"), }, { name: "no error", err: nil, }, } for _, tc := range cases { testCase := tc t.Run(testCase.name, func(t *testing.T) { postCheckFn := func(_ types.Tx, _ *abci.ResponseCheckTx) error { return testCase.err } txmp := setup(t, 0, WithPostCheck(postCheckFn)) rng := rand.New(rand.NewSource(time.Now().UnixNano())) tx := make([]byte, txmp.config.MaxTxBytes-1) _, err := rng.Read(tx) require.NoError(t, err) callback := func(res *abci.Response) { checkTxRes, ok := res.Value.(*abci.Response_CheckTx) require.True(t, ok) expectedErrString := "" if testCase.err != nil { expectedErrString = testCase.err.Error() } require.Equal(t, expectedErrString, checkTxRes.CheckTx.MempoolError) } require.NoError(t, txmp.CheckTx(tx, callback, mempool.TxInfo{SenderID: 0})) }) } }