diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 064f58fff..cbcf57404 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -38,14 +38,6 @@ type Reactor interface { // or other reason). RemovePeer(peer Peer, reason interface{}) - // NewReceive is called by the switch when an envelope is received from any connected - // peer on any of the channels registered by the reactor - // - // The set of messages passed to Receive and NewReceive are identical. While both - // need to be implemented to satisfy the interface, only one message stream - // should be used per reactor. - NewReceive(Envelope) - // Receive is called by the switch when msgBytes is received from the peer. // // NOTE reactor can not keep msgBytes around after Receive completes without @@ -53,12 +45,22 @@ type Reactor interface { // // CONTRACT: msgBytes are not nil. // - // The messages passed to Receive and NewReceive are identical. While both - // need to be implemented to satisfy the interface, only one message stream - // should be used per reactor. + // Only one of Receive or NewReceive are called per message. If NewReceive + // is implemented, it will be used, otherwise the switch will fallback to + // using Receive. Receive will be replaced by NewReceive in a future version Receive(chID byte, peer Peer, msgBytes []byte) } +type NewReceiver interface { + // NewReceive is called by the switch when an envelope is received from any connected + // peer on any of the channels registered by the reactor. + // + // Only one of Receive or NewReceive are called per message. If NewReceive + // is implemented, it will be used, otherwise the switch will fallback to + // using Receive. Receive will be replaced by NewReceive in a future version + NewReceive(Envelope) +} + //-------------------------------------- type BaseReactor struct { diff --git a/p2p/peer.go b/p2p/peer.go index 9ddcdaaf7..3aa680d80 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -465,12 +465,15 @@ func createMConnection( } p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes))) - reactor.NewReceive(Envelope{ - ChannelID: chID, - Src: p, - Message: msg, - }) - reactor.Receive(chID, p, msgBytes) + if nr, ok := reactor.(NewReceiver); ok { + nr.NewReceive(Envelope{ + ChannelID: chID, + Src: p, + Message: msg, + }) + } else { + reactor.Receive(chID, p, msgBytes) + } } onError := func(r interface{}) {