diff --git a/internal/mempool/v1/mempool_test.go b/internal/mempool/v1/mempool_test.go index 09f431853..b338301b8 100644 --- a/internal/mempool/v1/mempool_test.go +++ b/internal/mempool/v1/mempool_test.go @@ -95,6 +95,18 @@ func setup(t testing.TB, cacheSize int, options ...TxMempoolOption) *TxMempool { return NewTxMempool(log.TestingLogger().With("test", t.Name()), cfg.Mempool, appConnMem, 0, options...) } +// mustCheckTx invokes txmp.CheckTx for the given transaction and waits until +// its callback has finished executing. It fails t if CheckTx fails. +func mustCheckTx(t *testing.T, txmp *TxMempool, spec string) { + done := make(chan struct{}) + if err := txmp.CheckTx(context.Background(), []byte(spec), func(*abci.Response) { + close(done) + }, mempool.TxInfo{}); err != nil { + t.Fatalf("CheckTx for %q failed: %v", spec, err) + } + <-done +} + func checkTxs(t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx { txs := make([]testTx, numTxs) txInfo := mempool.TxInfo{SenderID: peerID} @@ -196,6 +208,76 @@ func TestTxMempool_Size(t *testing.T) { require.Equal(t, int64(2850), txmp.SizeBytes()) } +func TestTxMempool_Eviction(t *testing.T) { + txmp := setup(t, 0) + txmp.config.Size = 5 + txmp.config.MaxTxsBytes = 60 + txExists := func(spec string) bool { + txmp.Lock() + defer txmp.Unlock() + key := types.Tx(spec).Key() + _, ok := txmp.txByKey[key] + return ok + } + + // A transaction bigger than the mempool should be rejected even when there + // are slots available. + mustCheckTx(t, txmp, "big=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef=1") + require.Equal(t, 0, txmp.Size()) + + // Nearly-fill the mempool with a low-priority transaction, to show that it + // is evicted even when slots are available for a higher-priority tx. + const bigTx = "big=0123456789abcdef0123456789abcdef0123456789abcdef01234=2" + mustCheckTx(t, txmp, bigTx) + require.Equal(t, 1, txmp.Size()) // bigTx is the only element + require.True(t, txExists(bigTx)) + require.Equal(t, int64(len(bigTx)), txmp.SizeBytes()) + + // The next transaction should evict bigTx, because it is higher priority + // but does not fit on size. + mustCheckTx(t, txmp, "key1=0000=25") + require.True(t, txExists("key1=0000=25")) + require.False(t, txExists(bigTx)) + require.Equal(t, int64(len("key1=0000=25")), txmp.SizeBytes()) + + // Now fill up the rest of the slots with other transactions. + mustCheckTx(t, txmp, "key2=0001=5") + mustCheckTx(t, txmp, "key3=0002=10") + mustCheckTx(t, txmp, "key4=0003=3") + mustCheckTx(t, txmp, "key5=0004=3") + + // A new transaction with low priority should be discarded. + mustCheckTx(t, txmp, "key6=0005=1") + require.False(t, txExists("key6=0005=1")) + + // A new transaction with higher priority should evict key5, which is the + // newest of the two transactions with lowest priority. + mustCheckTx(t, txmp, "key7=0006=7") + require.True(t, txExists("key7=0006=7")) // new transaction added + require.False(t, txExists("key5=0004=3")) // newest low-priority tx evicted + require.True(t, txExists("key4=0003=3")) // older low-priority tx retained + + // Another new transaction evicts the other low-priority element. + mustCheckTx(t, txmp, "key8=0007=20") + require.True(t, txExists("key8=0007=20")) + require.False(t, txExists("key4=0003=3")) + + // Now the lowest-priority tx is 5, so that should be the next to go. + mustCheckTx(t, txmp, "key9=0008=9") + require.True(t, txExists("key9=0008=9")) + require.False(t, txExists("k3y2=0001=5")) + + // Add a transaction that requires eviction of multiple lower-priority + // entries, in order to fit the size of the element. + mustCheckTx(t, txmp, "key10=0123456789abcdef=11") // evict 10, 9, 7; keep 25, 20, 11 + require.True(t, txExists("key1=0000=25")) + require.True(t, txExists("key8=0007=20")) + require.True(t, txExists("key10=0123456789abcdef=11")) + require.False(t, txExists("key3=0002=10")) + require.False(t, txExists("key9=0008=9")) + require.False(t, txExists("key7=0006=7")) +} + func TestTxMempool_Flush(t *testing.T) { txmp := setup(t, 0) txs := checkTxs(t, txmp, 100, 0) @@ -438,6 +520,51 @@ func TestTxMempool_ConcurrentTxs(t *testing.T) { require.Zero(t, txmp.SizeBytes()) } +func TestTxMempool_ExpiredTxs_Timestamp(t *testing.T) { + txmp := setup(t, 50) + txmp.config.TTLDuration = 5 * time.Millisecond + + added1 := checkTxs(t, txmp, 25, 0) + require.Equal(t, len(added1), txmp.Size()) + + // Wait a while, then add some more transactions that should not be expired + // when the first batch TTLs out. + // + // ms: 0 1 2 3 4 5 6 + // ^ ^ ^ ^ + // | | | +-- Update (triggers pruning) + // | | +------ first batch expires + // | +-------------- second batch added + // +-------------------------- first batch added + // + // The exact intervals are not important except that the delta should be + // large relative to the cost of CheckTx (ms vs. ns is fine here). + time.Sleep(3 * time.Millisecond) + added2 := checkTxs(t, txmp, 25, 1) + + // Wait a while longer, so that the first batch will expire. + time.Sleep(3 * time.Millisecond) + + // Trigger an update so that pruning will occur. + txmp.Lock() + defer txmp.Unlock() + require.NoError(t, txmp.Update(txmp.height+1, nil, nil, nil, nil)) + + // All the transactions in the original set should have been purged. + for _, tx := range added1 { + if _, ok := txmp.txByKey[tx.tx.Key()]; ok { + t.Errorf("Transaction %X should have been purged for TTL", tx.tx.Key()) + } + } + + // All the transactions added later should still be around. + for _, tx := range added2 { + if _, ok := txmp.txByKey[tx.tx.Key()]; !ok { + t.Errorf("Transaction %X should still be in the mempool, but is not", tx.tx.Key()) + } + } +} + func TestTxMempool_ExpiredTxs_NumBlocks(t *testing.T) { txmp := setup(t, 500) txmp.height = 100