diff --git a/mempool/v0/bench_test.go b/mempool/v0/bench_test.go index 46dc64797..cb1be502a 100644 --- a/mempool/v0/bench_test.go +++ b/mempool/v0/bench_test.go @@ -5,7 +5,6 @@ import ( "sync/atomic" "testing" - "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" @@ -14,8 +13,7 @@ import ( func BenchmarkReap(b *testing.B) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(b, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() mp.config.Size = 100000 @@ -37,8 +35,7 @@ func BenchmarkReap(b *testing.B) { func BenchmarkCheckTx(b *testing.B) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(b, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() mp.config.Size = 1000000 @@ -60,8 +57,7 @@ func BenchmarkCheckTx(b *testing.B) { func BenchmarkParallelCheckTx(b *testing.B) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(b, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() mp.config.Size = 100000000 @@ -86,8 +82,7 @@ func BenchmarkParallelCheckTx(b *testing.B) { func BenchmarkCheckDuplicateTx(b *testing.B) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(b, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() mp.config.Size = 1000000 diff --git a/mempool/v0/cache_test.go b/mempool/v0/cache_test.go index 4dc8e2720..c62e99891 100644 --- a/mempool/v0/cache_test.go +++ b/mempool/v0/cache_test.go @@ -16,8 +16,7 @@ import ( func TestCacheAfterUpdate(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(t, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() // reAddIndices & txsInCache can have elements > numTxsToCreate diff --git a/mempool/v0/clist_mempool_test.go b/mempool/v0/clist_mempool_test.go index 00292d52f..7501c0de2 100644 --- a/mempool/v0/clist_mempool_test.go +++ b/mempool/v0/clist_mempool_test.go @@ -54,11 +54,11 @@ func newMempoolWithAppAndConfigMock(cc proxy.ClientCreator, cfg *config.Config, return mp, func() { os.RemoveAll(cfg.RootDir) } } -func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc, error) { +func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) { conf := config.ResetTestRoot("mempool_test") mp, cu := newMempoolWithAppAndConfig(cc, conf) - return mp, cu, nil + return mp, cu } func newMempoolWithAppAndConfig(cc proxy.ClientCreator, cfg *config.Config) (*CListMempool, cleanupFunc) { @@ -119,8 +119,7 @@ func checkTxs(t *testing.T, mp mempool.Mempool, count int, peerID uint16) types. func TestReapMaxBytesMaxGas(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(t, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() // Ensure gas calculation behaves as expected @@ -169,8 +168,7 @@ func TestReapMaxBytesMaxGas(t *testing.T) { func TestMempoolFilters(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(t, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() emptyTxArr := []types.Tx{[]byte{}} @@ -209,8 +207,7 @@ func TestMempoolFilters(t *testing.T) { func TestMempoolUpdate(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(t, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() // 1. Adds valid txs to the cache @@ -350,8 +347,7 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) { func TestTxsAvailable(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(t, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() mp.EnableTxsAvailable() @@ -396,13 +392,12 @@ func TestSerialReap(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithApp(cc) - require.NoError(t, err) + mp, cleanup := newMempoolWithApp(cc) defer cleanup() appConnCon, _ := cc.NewABCIClient() appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) - err = appConnCon.Start() + err := appConnCon.Start() require.Nil(t, err) cacheMap := make(map[string]struct{}) @@ -507,8 +502,7 @@ func TestMempool_CheckTxChecksTxSize(t *testing.T) { app := kvstore.NewApplication() cc := proxy.NewLocalClientCreator(app) - mempl, cleanup, err := newMempoolWithApp(cc) - require.NoError(t, err) + mempl, cleanup := newMempoolWithApp(cc) defer cleanup() maxTxSize := mempl.config.MaxTxBytes @@ -598,8 +592,7 @@ func TestMempoolTxsBytes(t *testing.T) { app2 := kvstore.NewApplication() cc = proxy.NewLocalClientCreator(app2) - mp, cleanup, err = newMempoolWithApp(cc) - require.NoError(t, err) + mp, cleanup = newMempoolWithApp(cc) defer cleanup() txBytes := make([]byte, 8) diff --git a/mempool/v0/reactor_test.go b/mempool/v0/reactor_test.go index 0c7aab1b9..425083654 100644 --- a/mempool/v0/reactor_test.go +++ b/mempool/v0/reactor_test.go @@ -1,391 +1,389 @@ package v0 -// import ( -// "context" -// "sync" -// "testing" -// "time" - -// "github.com/stretchr/testify/require" - -// abciclient "github.com/tendermint/tendermint/abci/client" -// "github.com/tendermint/tendermint/abci/example/kvstore" -// abci "github.com/tendermint/tendermint/abci/types" -// "github.com/tendermint/tendermint/config" -// "github.com/tendermint/tendermint/internal/mempool" -// "github.com/tendermint/tendermint/internal/p2p" -// "github.com/tendermint/tendermint/internal/p2p/p2ptest" -// "github.com/tendermint/tendermint/libs/log" -// tmrand "github.com/tendermint/tendermint/libs/rand" -// protomem "github.com/tendermint/tendermint/proto/tendermint/mempool" -// "github.com/tendermint/tendermint/types" -// ) - -// type reactorTestSuite struct { -// network *p2ptest.Network -// logger log.Logger - -// reactors map[types.NodeID]*Reactor -// mempoolChnnels map[types.NodeID]*p2p.Channel -// mempools map[types.NodeID]*CListMempool -// kvstores map[types.NodeID]*kvstore.Application - -// peerChans map[types.NodeID]chan p2p.PeerUpdate -// peerUpdates map[types.NodeID]*p2p.PeerUpdates - -// nodes []types.NodeID -// } - -// func setup(t *testing.T, config *config.MempoolConfig, numNodes int, chBuf uint) *reactorTestSuite { -// t.Helper() - -// rts := &reactorTestSuite{ -// logger: log.TestingLogger().With("testCase", t.Name()), -// network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}), -// reactors: make(map[types.NodeID]*Reactor, numNodes), -// mempoolChnnels: make(map[types.NodeID]*p2p.Channel, numNodes), -// mempools: make(map[types.NodeID]*CListMempool, numNodes), -// kvstores: make(map[types.NodeID]*kvstore.Application, numNodes), -// peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), -// peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), -// } - -// chDesc := p2p.ChannelDescriptor{ID: byte(mempool.MempoolChannel)} -// rts.mempoolChnnels = rts.network.MakeChannelsNoCleanup(t, chDesc, new(protomem.Message), int(chBuf)) - -// for nodeID := range rts.network.Nodes { -// rts.kvstores[nodeID] = kvstore.NewApplication() -// cc := abciclient.NewLocalCreator(rts.kvstores[nodeID]) - -// mempool, memCleanup, err := newMempoolWithApp(cc) -// require.NoError(t, err) -// t.Cleanup(memCleanup) -// mempool.SetLogger(rts.logger) -// rts.mempools[nodeID] = mempool - -// rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) -// rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) -// rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID]) - -// rts.reactors[nodeID] = NewReactor( -// rts.logger.With("nodeID", nodeID), -// config, -// mempool, -// rts.mempoolChnnels[nodeID], -// rts.peerUpdates[nodeID], -// ) - -// rts.nodes = append(rts.nodes, nodeID) - -// require.NoError(t, rts.reactors[nodeID].Start()) -// require.True(t, rts.reactors[nodeID].IsRunning()) -// } - -// require.Len(t, rts.reactors, numNodes) - -// t.Cleanup(func() { -// for nodeID := range rts.reactors { -// if rts.reactors[nodeID].IsRunning() { -// require.NoError(t, rts.reactors[nodeID].Stop()) -// require.False(t, rts.reactors[nodeID].IsRunning()) -// } -// } -// }) - -// return rts -// } - -// func (rts *reactorTestSuite) start(t *testing.T) { -// t.Helper() -// rts.network.Start(t) -// require.Len(t, -// rts.network.RandomNode().PeerManager.Peers(), -// len(rts.nodes)-1, -// "network does not have expected number of nodes") -// } - -// func (rts *reactorTestSuite) assertMempoolChannelsDrained(t *testing.T) { -// t.Helper() - -// for id, r := range rts.reactors { -// require.NoError(t, r.Stop(), "stopping reactor %s", id) -// r.Wait() -// require.False(t, r.IsRunning(), "reactor %s did not stop", id) -// } - -// for _, mch := range rts.mempoolChnnels { -// require.Empty(t, mch.Out, "checking channel %q (len=%d)", mch.ID, len(mch.Out)) -// } -// } - -// func (rts *reactorTestSuite) waitForTxns(t *testing.T, txs types.Txs, ids ...types.NodeID) { -// t.Helper() - -// fn := func(pool *CListMempool) { -// for pool.Size() < len(txs) { -// time.Sleep(50 * time.Millisecond) -// } - -// reapedTxs := pool.ReapMaxTxs(len(txs)) -// require.Equal(t, len(txs), len(reapedTxs)) -// for i, tx := range txs { -// require.Equalf(t, -// tx, -// reapedTxs[i], -// "txs at index %d in reactor mempool mismatch; got: %v, expected: %v", i, tx, reapedTxs[i], -// ) -// } -// } - -// if len(ids) == 1 { -// fn(rts.reactors[ids[0]].mempool) -// return -// } - -// wg := &sync.WaitGroup{} -// for id := range rts.mempools { -// if len(ids) > 0 && !p2ptest.NodeInSlice(id, ids) { -// continue -// } - -// wg.Add(1) -// func(nid types.NodeID) { defer wg.Done(); fn(rts.reactors[nid].mempool) }(id) -// } - -// wg.Wait() -// } - -// func TestReactorBroadcastTxs(t *testing.T) { -// numTxs := 1000 -// numNodes := 10 -// cfg := config.TestConfig() - -// rts := setup(t, cfg.Mempool, numNodes, 0) - -// primary := rts.nodes[0] -// secondaries := rts.nodes[1:] - -// txs := checkTxs(t, rts.reactors[primary].mempool, numTxs, mempool.UnknownPeerID) - -// // run the router -// rts.start(t) - -// // Wait till all secondary suites (reactor) received all mempool txs from the -// // primary suite (node). -// rts.waitForTxns(t, txs, secondaries...) - -// for _, pool := range rts.mempools { -// require.Equal(t, len(txs), pool.Size()) -// } - -// rts.assertMempoolChannelsDrained(t) -// } - -// // regression test for https://github.com/tendermint/tendermint/issues/5408 -// func TestReactorConcurrency(t *testing.T) { -// numTxs := 5 -// numNodes := 2 -// cfg := config.TestConfig() - -// rts := setup(t, cfg.Mempool, numNodes, 0) - -// primary := rts.nodes[0] -// secondary := rts.nodes[1] - -// rts.start(t) - -// var wg sync.WaitGroup - -// for i := 0; i < 1000; i++ { -// wg.Add(2) - -// // 1. submit a bunch of txs -// // 2. update the whole mempool - -// txs := checkTxs(t, rts.reactors[primary].mempool, numTxs, mempool.UnknownPeerID) -// go func() { -// defer wg.Done() - -// mempool := rts.mempools[primary] - -// mempool.Lock() -// defer mempool.Unlock() - -// deliverTxResponses := make([]*abci.ResponseDeliverTx, len(txs)) -// for i := range txs { -// deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} -// } - -// require.NoError(t, mempool.Update(1, txs, deliverTxResponses, nil, nil)) -// }() - -// // 1. submit a bunch of txs -// // 2. update none -// _ = checkTxs(t, rts.reactors[secondary].mempool, numTxs, mempool.UnknownPeerID) -// go func() { -// defer wg.Done() - -// mempool := rts.mempools[secondary] - -// mempool.Lock() -// defer mempool.Unlock() - -// err := mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) -// require.NoError(t, err) -// }() - -// // flush the mempool -// rts.mempools[secondary].Flush() -// } - -// wg.Wait() -// } - -// func TestReactorNoBroadcastToSender(t *testing.T) { -// numTxs := 1000 -// numNodes := 2 -// cfg := config.TestConfig() - -// rts := setup(t, cfg.Mempool, numNodes, uint(numTxs)) - -// primary := rts.nodes[0] -// secondary := rts.nodes[1] - -// peerID := uint16(1) -// _ = checkTxs(t, rts.mempools[primary], numTxs, peerID) - -// rts.start(t) - -// time.Sleep(100 * time.Millisecond) - -// require.Eventually(t, func() bool { -// return rts.mempools[secondary].Size() == 0 -// }, time.Minute, 100*time.Millisecond) - -// rts.assertMempoolChannelsDrained(t) -// } - -// func TestReactor_MaxTxBytes(t *testing.T) { -// numNodes := 2 -// cfg := config.TestConfig() - -// rts := setup(t, cfg.Mempool, numNodes, 0) - -// primary := rts.nodes[0] -// secondary := rts.nodes[1] - -// // Broadcast a tx, which has the max size and ensure it's received by the -// // second reactor. -// tx1 := tmrand.Bytes(cfg.Mempool.MaxTxBytes) -// err := rts.reactors[primary].mempool.CheckTx( -// context.Background(), -// tx1, -// nil, -// mempool.TxInfo{ -// SenderID: mempool.UnknownPeerID, -// }, -// ) -// require.NoError(t, err) - -// rts.start(t) - -// // Wait till all secondary suites (reactor) received all mempool txs from the -// // primary suite (node). -// rts.waitForTxns(t, []types.Tx{tx1}, secondary) - -// rts.reactors[primary].mempool.Flush() -// rts.reactors[secondary].mempool.Flush() - -// // broadcast a tx, which is beyond the max size and ensure it's not sent -// tx2 := tmrand.Bytes(cfg.Mempool.MaxTxBytes + 1) -// err = rts.mempools[primary].CheckTx(context.Background(), tx2, nil, mempool.TxInfo{SenderID: mempool.UnknownPeerID}) -// require.Error(t, err) - -// rts.assertMempoolChannelsDrained(t) -// } - -// func TestDontExhaustMaxActiveIDs(t *testing.T) { -// cfg := config.TestConfig() - -// // we're creating a single node network, but not starting the -// // network. -// rts := setup(t, cfg.Mempool, 1, mempool.MaxActiveIDs+1) - -// nodeID := rts.nodes[0] - -// peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") -// require.NoError(t, err) - -// // ensure the reactor does not panic (i.e. exhaust active IDs) -// for i := 0; i < mempool.MaxActiveIDs+1; i++ { -// rts.peerChans[nodeID] <- p2p.PeerUpdate{ -// Status: p2p.PeerStatusUp, -// NodeID: peerID, -// } - -// rts.mempoolChnnels[nodeID].Out <- p2p.Envelope{ -// To: peerID, -// Message: &protomem.Txs{ -// Txs: [][]byte{}, -// }, -// } -// } - -// require.Eventually( -// t, -// func() bool { -// for _, mch := range rts.mempoolChnnels { -// if len(mch.Out) > 0 { -// return false -// } -// } - -// return true -// }, -// time.Minute, -// 10*time.Millisecond, -// ) - -// rts.assertMempoolChannelsDrained(t) -// } - -// func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { -// if testing.Short() { -// t.Skip("skipping test in short mode") -// } - -// // 0 is already reserved for UnknownPeerID -// ids := mempool.NewMempoolIDs() - -// peerID, err := types.NewNodeID("0011223344556677889900112233445566778899") -// require.NoError(t, err) - -// for i := 0; i < mempool.MaxActiveIDs-1; i++ { -// ids.ReserveForPeer(peerID) -// } - -// require.Panics(t, func() { -// ids.ReserveForPeer(peerID) -// }) -// } - -// func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { -// if testing.Short() { -// t.Skip("skipping test in short mode") -// } - -// cfg := config.TestConfig() - -// rts := setup(t, cfg.Mempool, 2, 0) - -// primary := rts.nodes[0] -// secondary := rts.nodes[1] - -// rts.start(t) - -// // disconnect peer -// rts.peerChans[primary] <- p2p.PeerUpdate{ -// Status: p2p.PeerStatusDown, -// NodeID: secondary, -// } -// } +import ( + "encoding/hex" + "errors" + "net" + "sync" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/go-kit/log/term" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/tendermint/abci/example/kvstore" + abci "github.com/tendermint/tendermint/abci/types" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + tmrand "github.com/tendermint/tendermint/libs/rand" + "github.com/tendermint/tendermint/mempool" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/mock" + memproto "github.com/tendermint/tendermint/proto/tendermint/mempool" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" +) + +const ( + numTxs = 1000 + timeout = 120 * time.Second // ridiculously high because CircleCI is slow +) + +type peerState struct { + height int64 +} + +func (ps peerState) GetHeight() int64 { + return ps.height +} + +// Send a bunch of txs to the first reactor's mempool and wait for them all to +// be received in the others. +func TestReactorBroadcastTxsMessage(t *testing.T) { + config := cfg.TestConfig() + // if there were more than two reactors, the order of transactions could not be + // asserted in waitForTxsOnReactors (due to transactions gossiping). If we + // replace Connect2Switches (full mesh) with a func, which connects first + // reactor to others and nothing else, this test should also pass with >2 reactors. + const N = 2 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + }() + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + peer.Set(types.PeerStateKey, peerState{1}) + } + } + + txs := checkTxs(t, reactors[0].mempool, numTxs, mempool.UnknownPeerID) + waitForTxsOnReactors(t, txs, reactors) +} + +// regression test for https://github.com/tendermint/tendermint/issues/5408 +func TestReactorConcurrency(t *testing.T) { + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + }() + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + peer.Set(types.PeerStateKey, peerState{1}) + } + } + var wg sync.WaitGroup + + const numTxs = 5 + + for i := 0; i < 1000; i++ { + wg.Add(2) + + // 1. submit a bunch of txs + // 2. update the whole mempool + txs := checkTxs(t, reactors[0].mempool, numTxs, mempool.UnknownPeerID) + go func() { + defer wg.Done() + + reactors[0].mempool.Lock() + defer reactors[0].mempool.Unlock() + + deliverTxResponses := make([]*abci.ResponseDeliverTx, len(txs)) + for i := range txs { + deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} + } + err := reactors[0].mempool.Update(1, txs, deliverTxResponses, nil, nil) + assert.NoError(t, err) + }() + + // 1. submit a bunch of txs + // 2. update none + _ = checkTxs(t, reactors[1].mempool, numTxs, mempool.UnknownPeerID) + go func() { + defer wg.Done() + + reactors[1].mempool.Lock() + defer reactors[1].mempool.Unlock() + err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) + assert.NoError(t, err) + }() + + // 1. flush the mempool + reactors[1].mempool.Flush() + } + + wg.Wait() +} + +// Send a bunch of txs to the first reactor's mempool, claiming it came from peer +// ensure peer gets no txs. +func TestReactorNoBroadcastToSender(t *testing.T) { + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + }() + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + peer.Set(types.PeerStateKey, peerState{1}) + } + } + + const peerID = 1 + checkTxs(t, reactors[0].mempool, numTxs, peerID) + ensureNoTxs(t, reactors[peerID], 100*time.Millisecond) +} + +func TestReactor_MaxTxBytes(t *testing.T) { + config := cfg.TestConfig() + + const N = 2 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + }() + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + peer.Set(types.PeerStateKey, peerState{1}) + } + } + + // Broadcast a tx, which has the max size + // => ensure it's received by the second reactor. + tx1 := tmrand.Bytes(config.Mempool.MaxTxBytes) + err := reactors[0].mempool.CheckTx(tx1, nil, mempool.TxInfo{SenderID: mempool.UnknownPeerID}) + require.NoError(t, err) + waitForTxsOnReactors(t, []types.Tx{tx1}, reactors) + + reactors[0].mempool.Flush() + reactors[1].mempool.Flush() + + // Broadcast a tx, which is beyond the max size + // => ensure it's not sent + tx2 := tmrand.Bytes(config.Mempool.MaxTxBytes + 1) + err = reactors[0].mempool.CheckTx(tx2, nil, mempool.TxInfo{SenderID: mempool.UnknownPeerID}) + require.Error(t, err) +} + +func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + }() + + // stop peer + sw := reactors[1].Switch + sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) + + // check that we are not leaking any go-routines + // i.e. broadcastTxRoutine finishes when peer is stopped + leaktest.CheckTimeout(t, 10*time.Second)() +} + +func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + + config := cfg.TestConfig() + const N = 2 + reactors := makeAndConnectReactors(config, N) + + // stop reactors + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + + // check that we are not leaking any go-routines + // i.e. broadcastTxRoutine finishes when reactor is stopped + leaktest.CheckTimeout(t, 10*time.Second)() +} + +func TestMempoolIDsBasic(t *testing.T) { + ids := newMempoolIDs() + + peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + + ids.ReserveForPeer(peer) + assert.EqualValues(t, 1, ids.GetForPeer(peer)) + ids.Reclaim(peer) + + ids.ReserveForPeer(peer) + assert.EqualValues(t, 2, ids.GetForPeer(peer)) + ids.Reclaim(peer) +} + +func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { + if testing.Short() { + return + } + + // 0 is already reserved for UnknownPeerID + ids := newMempoolIDs() + + for i := 0; i < mempool.MaxActiveIDs-1; i++ { + peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + ids.ReserveForPeer(peer) + } + + assert.Panics(t, func() { + peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + ids.ReserveForPeer(peer) + }) +} + +func TestDontExhaustMaxActiveIDs(t *testing.T) { + config := cfg.TestConfig() + const N = 1 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + if err := r.Stop(); err != nil { + assert.NoError(t, err) + } + } + }() + reactor := reactors[0] + + for i := 0; i < mempool.MaxActiveIDs+1; i++ { + peer := mock.NewPeer(nil) + reactor.Receive(mempool.MempoolChannel, peer, []byte{0x1, 0x2, 0x3}) + reactor.AddPeer(peer) + } +} + +// mempoolLogger is a TestingLogger which uses a different +// color for each validator ("validator" key must exist). +func mempoolLogger() log.Logger { + return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "validator" { + return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} + } + } + return term.FgBgColor{} + }) +} + +// connect N mempool reactors through N switches +func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { + reactors := make([]*Reactor, n) + logger := mempoolLogger() + for i := 0; i < n; i++ { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() + + reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states + reactors[i].SetLogger(logger.With("validator", i)) + } + + p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("MEMPOOL", reactors[i]) + return s + + }, p2p.Connect2Switches) + return reactors +} + +func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) { + // wait for the txs in all mempools + wg := new(sync.WaitGroup) + for i, reactor := range reactors { + wg.Add(1) + go func(r *Reactor, reactorIndex int) { + defer wg.Done() + waitForTxsOnReactor(t, txs, r, reactorIndex) + }(reactor, i) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + timer := time.After(timeout) + select { + case <-timer: + t.Fatal("Timed out waiting for txs") + case <-done: + } +} + +func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) { + mempool := reactor.mempool + for mempool.Size() < len(txs) { + time.Sleep(time.Millisecond * 100) + } + + reapedTxs := mempool.ReapMaxTxs(len(txs)) + for i, tx := range txs { + assert.Equalf(t, tx, reapedTxs[i], + "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i]) + } +} + +// ensure no txs on reactor after some timeout +func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) { + time.Sleep(timeout) // wait for the txs in all mempools + assert.Zero(t, reactor.mempool.Size()) +} + +func TestMempoolVectors(t *testing.T) { + testCases := []struct { + testName string + tx []byte + expBytes string + }{ + {"tx 1", []byte{123}, "0a030a017b"}, + {"tx 2", []byte("proto encoding in mempool"), "0a1b0a1970726f746f20656e636f64696e6720696e206d656d706f6f6c"}, + } + + for _, tc := range testCases { + tc := tc + + msg := memproto.Message{ + Sum: &memproto.Message_Txs{ + Txs: &memproto.Txs{Txs: [][]byte{tc.tx}}, + }, + } + bz, err := msg.Marshal() + require.NoError(t, err, tc.testName) + + require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName) + } +}