From 39b2e4b8785c7b515ceb31246a98267379b41a1b Mon Sep 17 00:00:00 2001 From: William Banfield Date: Fri, 28 Oct 2022 15:44:48 -0400 Subject: [PATCH] reactors have channel and peer in new receive --- blockchain/v1/reactor.go | 5 ++++- blockchain/v2/reactor.go | 5 ++++- blocksync/reactor.go | 5 ++++- consensus/byzantine_test.go | 4 +--- consensus/reactor.go | 5 ++++- evidence/reactor.go | 5 ++++- mempool/v1/reactor.go | 5 ++++- p2p/mock/reactor.go | 5 ++++- p2p/pex/pex_reactor.go | 5 ++++- p2p/switch_test.go | 5 ++++- statesync/reactor.go | 5 ++++- test/maverick/consensus/reactor.go | 5 ++++- 12 files changed, 45 insertions(+), 14 deletions(-) diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go index 8c0cc0ddd..cd03e96de 100644 --- a/blockchain/v1/reactor.go +++ b/blockchain/v1/reactor.go @@ -307,7 +307,10 @@ func (bcR *BlockchainReactor) NewReceive(e p2p.Envelope) { } func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - bcR.NewReceive(p2p.Envelope{}) + bcR.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } // processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 0bab59cf7..c102fe236 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -514,7 +514,10 @@ func (r *BlockchainReactor) NewReceive(e p2p.Envelope) { } func (r *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - r.NewReceive(p2p.Envelope{}) + r.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } // AddPeer implements Reactor interface diff --git a/blocksync/reactor.go b/blocksync/reactor.go index c5161ee32..6e1e29003 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -234,7 +234,10 @@ func (bcR *Reactor) NewReceive(e p2p.Envelope) { } func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - bcR.NewReceive(p2p.Envelope{}) + bcR.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } // Handle messages from the poolReactor telling the reactor what to do. diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index ba68aea73..9a4ff01d4 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -602,9 +602,7 @@ func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (br *ByzantineReactor) NewReceive(e p2p.Envelope) { br.reactor.NewReceive(e) } -func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) -br.NewReceive(p2p.Envelope{}) -{ +func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) { br.reactor.Receive(chID, p, m) } func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } diff --git a/consensus/reactor.go b/consensus/reactor.go index d33ea5846..ef04e0365 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -388,7 +388,10 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { } func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - conR.NewReceive(p2p.Envelope{}) + conR.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } // SetEventBus sets event bus. diff --git a/evidence/reactor.go b/evidence/reactor.go index a5d773359..4773e4e09 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -93,7 +93,10 @@ func (evR *Reactor) NewReceive(e p2p.Envelope) { } func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - evR.NewReceive(p2p.Envelope{}) + evR.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } // SetEventBus implements events.Eventable. diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 05dbe315c..0cf17f0f1 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -188,7 +188,10 @@ func (memR *Reactor) NewReceive(e p2p.Envelope) { } func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - memR.NewReceive(p2p.Envelope{}) + memR.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } // PeerState describes the state of a peer. diff --git a/p2p/mock/reactor.go b/p2p/mock/reactor.go index 893ed2032..878a5755b 100644 --- a/p2p/mock/reactor.go +++ b/p2p/mock/reactor.go @@ -22,5 +22,8 @@ func NewReactor() *Reactor { func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels } func (r *Reactor) AddPeer(peer p2p.Peer) {} func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {} -func (r *Reactor) NewReceive(e p2p.Envelope) {} +func (r *Reactor) NewReceive(e p2p.Envelope) +ChannelID: chID, +Src: peer, +{} func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 72b1a4858..ca46b8579 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -299,7 +299,10 @@ func (r *Reactor) NewReceive(e p2p.Envelope) { } func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - r.NewReceive(p2p.Envelope{}) + r.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } // enforces a minimum amount of time between requests diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 5de846767..cfe4f2252 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -83,7 +83,10 @@ func (tr *TestReactor) NewReceive(e Envelope) { } func (tr *TestReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - tr.NewReceive(p2p.Envelope{}) + tr.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } func (tr *TestReactor) getMsgs(chID byte) []PeerMessage { diff --git a/statesync/reactor.go b/statesync/reactor.go index cd81766fa..2f08c1d91 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -224,7 +224,10 @@ func (r *Reactor) NewReceive(e p2p.Envelope) { } func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - r.NewReceive(p2p.Envelope{}) + r.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } // recentSnapshots fetches the n most recent snapshots from the app diff --git a/test/maverick/consensus/reactor.go b/test/maverick/consensus/reactor.go index f1bd8c797..08e7a009f 100644 --- a/test/maverick/consensus/reactor.go +++ b/test/maverick/consensus/reactor.go @@ -388,7 +388,10 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { } func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - conR.NewReceive(p2p.Envelope{}) + conR.NewReceive(p2p.Envelope{ + ChannelID: chID, + Src: peer, + }) } // SetEventBus sets event bus.