From 6eefce83efda10054cd05eec786ff44c4b5d1fd6 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 26 Oct 2022 18:37:21 -0400 Subject: [PATCH] Receive called from OnReceive --- p2p/base_reactor.go | 29 +++++++++++++++++++++++------ p2p/peer.go | 1 + 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index a24011df9..064f58fff 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -38,9 +38,25 @@ type Reactor interface { // or other reason). RemovePeer(peer Peer, reason interface{}) - // Receive is called by the switch when an envelope is received from any connected + // NewReceive is called by the switch when an envelope is received from any connected // peer on any of the channels registered by the reactor + // + // The set of messages passed to Receive and NewReceive are identical. While both + // need to be implemented to satisfy the interface, only one message stream + // should be used per reactor. NewReceive(Envelope) + + // Receive is called by the switch when msgBytes is received from the peer. + // + // NOTE reactor can not keep msgBytes around after Receive completes without + // copying. + // + // CONTRACT: msgBytes are not nil. + // + // The messages passed to Receive and NewReceive are identical. While both + // need to be implemented to satisfy the interface, only one message stream + // should be used per reactor. + Receive(chID byte, peer Peer, msgBytes []byte) } //-------------------------------------- @@ -60,8 +76,9 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor { func (br *BaseReactor) SetSwitch(sw *Switch) { br.Switch = sw } -func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil } -func (*BaseReactor) AddPeer(peer Peer) {} -func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} -func (*BaseReactor) NewReceive(e Envelope) {} -func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } +func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil } +func (*BaseReactor) AddPeer(peer Peer) {} +func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} +func (*BaseReactor) NewReceive(e Envelope) {} +func (*BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} +func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } diff --git a/p2p/peer.go b/p2p/peer.go index 861f9d610..c18034dda 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -482,6 +482,7 @@ func createMConnection( Src: p, Message: msg, }) + reactor.Receive(chID, p, msgBytes) } onError := func(r interface{}) {