From 0c0cb9f200a5466446320060ab03abdae9032cd7 Mon Sep 17 00:00:00 2001 From: Tess Rinearson Date: Fri, 27 Mar 2020 15:08:53 +0100 Subject: [PATCH] mempool: reserve IDs in InitPeer instead of AddPeer --- consensus/reactor.go | 2 +- evidence/reactor.go | 2 +- mempool/reactor.go | 11 ++++++++--- mempool/reactor_test.go | 18 ++++++++++++++++++ p2p/pex/pex_reactor.go | 2 +- 5 files changed, 29 insertions(+), 6 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 0f2dad743..3711dd7cf 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -57,7 +57,7 @@ func NewReactor(consensusState *State, fastSync bool, options ...ReactorOption) metrics: NopMetrics(), } conR.updateFastSyncingMetric() - conR.BaseReactor = *p2p.NewBaseReactor("Reactor", conR) + conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) for _, option := range options { option(conR) diff --git a/evidence/reactor.go b/evidence/reactor.go index 19ad0f135..e4dbd51ad 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -34,7 +34,7 @@ func NewReactor(evpool *Pool) *Reactor { evR := &Reactor{ evpool: evpool, } - evR.BaseReactor = *p2p.NewBaseReactor("Reactor", evR) + evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR) return evR } diff --git a/mempool/reactor.go b/mempool/reactor.go index 161fc7212..fda12c021 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -47,7 +47,7 @@ type mempoolIDs struct { activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter } -// Reserve searches for the next unused ID and assignes it to the +// Reserve searches for the next unused ID and assigns it to the // peer. func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { ids.mtx.Lock() @@ -110,10 +110,16 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor { mempool: mempool, ids: newMempoolIDs(), } - memR.BaseReactor = *p2p.NewBaseReactor("Reactor", memR) + memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) return memR } +// InitPeer implements Reactor by creating a state for the peer. +func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { + memR.ids.ReserveForPeer(peer) + return peer +} + // SetLogger sets the Logger on the reactor and the underlying mempool. func (memR *Reactor) SetLogger(l log.Logger) { memR.Logger = l @@ -142,7 +148,6 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *Reactor) AddPeer(peer p2p.Peer) { - memR.ids.ReserveForPeer(peer) go memR.broadcastTxRoutine(peer) } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 27c82f2c4..87da0557d 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -223,3 +223,21 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { ids.ReserveForPeer(peer) }) } + +func TestDontExhaustMaxActiveIDs(t *testing.T) { + config := cfg.TestConfig() + const N = 1 + reactors := makeAndConnectReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() + reactor := reactors[0] + + for i := 0; i < maxActiveIDs+1; i++ { + peer := mock.NewPeer(nil) + reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3}) + reactor.AddPeer(peer) + } +} diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 3a3d2d7de..6dc38a921 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -137,7 +137,7 @@ func NewReactor(b AddrBook, config *ReactorConfig) *Reactor { lastReceivedRequests: cmap.NewCMap(), crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo), } - r.BaseReactor = *p2p.NewBaseReactor("Reactor", r) + r.BaseReactor = *p2p.NewBaseReactor("PEX", r) return r }