From e20105e65816de62767e1ab3d362dcef4d33177b Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Wed, 13 May 2020 12:39:28 +0400 Subject: [PATCH] mempool: do not launch broadcastTxRoutine if Broadcast is off Refs #3479 --- mempool/reactor.go | 8 +- mempool/reactor_test.go | 183 ++++++++++++++++++++-------------------- 2 files changed, 95 insertions(+), 96 deletions(-) diff --git a/mempool/reactor.go b/mempool/reactor.go index d64bb7868..bf34b2615 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -150,7 +150,9 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *Reactor) AddPeer(peer p2p.Peer) { - go memR.broadcastTxRoutine(peer) + if memR.config.Broadcast { + go memR.broadcastTxRoutine(peer) + } } // RemovePeer implements Reactor. @@ -193,10 +195,6 @@ type PeerState interface { // Send new mempool txs to peer. func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { - if !memR.config.Broadcast { - return - } - peerID := memR.ids.GetForPeer(peer) var next *clist.CElement for { diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index d71c20939..75b946a0e 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -20,6 +20,11 @@ import ( "github.com/tendermint/tendermint/types" ) +const ( + numTxs = 1000 + timeout = 120 * time.Second // ridiculously high because CircleCI is slow +) + type peerState struct { height int64 } @@ -28,90 +33,8 @@ func (ps peerState) GetHeight() int64 { return ps.height } -// mempoolLogger is a TestingLogger which uses a different -// color for each validator ("validator" key must exist). -func mempoolLogger() log.Logger { - return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { - for i := 0; i < len(keyvals)-1; i += 2 { - if keyvals[i] == "validator" { - return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} - } - } - return term.FgBgColor{} - }) -} - -// connect N mempool reactors through N switches -func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { - reactors := make([]*Reactor, n) - logger := mempoolLogger() - for i := 0; i < n; i++ { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mempool, cleanup := newMempoolWithApp(cc) - defer cleanup() - - reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states - reactors[i].SetLogger(logger.With("validator", i)) - } - - p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("MEMPOOL", reactors[i]) - return s - - }, p2p.Connect2Switches) - return reactors -} - -func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) { - // wait for the txs in all mempools - wg := new(sync.WaitGroup) - for i, reactor := range reactors { - wg.Add(1) - go func(r *Reactor, reactorIndex int) { - defer wg.Done() - waitForTxsOnReactor(t, txs, r, reactorIndex) - }(reactor, i) - } - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - timer := time.After(Timeout) - select { - case <-timer: - t.Fatal("Timed out waiting for txs") - case <-done: - } -} - -func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) { - mempool := reactor.mempool - for mempool.Size() < len(txs) { - time.Sleep(time.Millisecond * 100) - } - - reapedTxs := mempool.ReapMaxTxs(len(txs)) - for i, tx := range txs { - assert.Equalf(t, tx, reapedTxs[i], - "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i]) - } -} - -// ensure no txs on reactor after some timeout -func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) { - time.Sleep(timeout) // wait for the txs in all mempools - assert.Zero(t, reactor.mempool.Size()) -} - -const ( - NumTxs = 1000 - Timeout = 120 * time.Second // ridiculously high because CircleCI is slow -) - +// Send a bunch of txs to the first reactor's mempool and wait for them all to +// be received in the others. func TestReactorBroadcastTxMessage(t *testing.T) { config := cfg.TestConfig() const N = 4 @@ -127,12 +50,12 @@ func TestReactorBroadcastTxMessage(t *testing.T) { } } - // send a bunch of txs to the first reactor's mempool - // and wait for them all to be received in the others - txs := checkTxs(t, reactors[0].mempool, NumTxs, UnknownPeerID) + txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID) waitForTxsOnReactors(t, txs, reactors) } +// Send a bunch of txs to the first reactor's mempool, claiming it came from peer +// ensure peer gets no txs. func TestReactorNoBroadcastToSender(t *testing.T) { config := cfg.TestConfig() const N = 2 @@ -143,10 +66,9 @@ func TestReactorNoBroadcastToSender(t *testing.T) { } }() - // send a bunch of txs to the first reactor's mempool, claiming it came from peer - // ensure peer gets no txs - checkTxs(t, reactors[0].mempool, NumTxs, 1) - ensureNoTxs(t, reactors[1], 100*time.Millisecond) + const peerID = 1 + checkTxs(t, reactors[0].mempool, numTxs, peerID) + ensureNoTxs(t, reactors[peerID], 100*time.Millisecond) } func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { @@ -241,3 +163,82 @@ func TestDontExhaustMaxActiveIDs(t *testing.T) { reactor.AddPeer(peer) } } + +// mempoolLogger is a TestingLogger which uses a different +// color for each validator ("validator" key must exist). +func mempoolLogger() log.Logger { + return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "validator" { + return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} + } + } + return term.FgBgColor{} + }) +} + +// connect N mempool reactors through N switches +func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { + reactors := make([]*Reactor, n) + logger := mempoolLogger() + for i := 0; i < n; i++ { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + mempool, cleanup := newMempoolWithApp(cc) + defer cleanup() + + reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states + reactors[i].SetLogger(logger.With("validator", i)) + } + + p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("MEMPOOL", reactors[i]) + return s + + }, p2p.Connect2Switches) + return reactors +} + +func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) { + // wait for the txs in all mempools + wg := new(sync.WaitGroup) + for i, reactor := range reactors { + wg.Add(1) + go func(r *Reactor, reactorIndex int) { + defer wg.Done() + waitForTxsOnReactor(t, txs, r, reactorIndex) + }(reactor, i) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + timer := time.After(timeout) + select { + case <-timer: + t.Fatal("Timed out waiting for txs") + case <-done: + } +} + +func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) { + mempool := reactor.mempool + for mempool.Size() < len(txs) { + time.Sleep(time.Millisecond * 100) + } + + reapedTxs := mempool.ReapMaxTxs(len(txs)) + for i, tx := range txs { + assert.Equalf(t, tx, reapedTxs[i], + "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i]) + } +} + +// ensure no txs on reactor after some timeout +func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) { + time.Sleep(timeout) // wait for the txs in all mempools + assert.Zero(t, reactor.mempool.Size()) +}