NewReceive called conditionally from switch

This commit is contained in:
William Banfield
2022-10-28 15:22:57 -04:00
parent e07e7c9e10
commit ed6ef40568
2 changed files with 22 additions and 17 deletions

View File

@@ -38,14 +38,6 @@ type Reactor interface {
// or other reason).
RemovePeer(peer Peer, reason interface{})
// NewReceive is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor
//
// The set of messages passed to Receive and NewReceive are identical. While both
// need to be implemented to satisfy the interface, only one message stream
// should be used per reactor.
NewReceive(Envelope)
// Receive is called by the switch when msgBytes is received from the peer.
//
// NOTE reactor can not keep msgBytes around after Receive completes without
@@ -53,12 +45,22 @@ type Reactor interface {
//
// CONTRACT: msgBytes are not nil.
//
// The messages passed to Receive and NewReceive are identical. While both
// need to be implemented to satisfy the interface, only one message stream
// should be used per reactor.
// Only one of Receive or NewReceive are called per message. If NewReceive
// is implemented, it will be used, otherwise the switch will fallback to
// using Receive. Receive will be replaced by NewReceive in a future version
Receive(chID byte, peer Peer, msgBytes []byte)
}
type NewReceiver interface {
// NewReceive is called by the switch when an envelope is received from any connected
// peer on any of the channels registered by the reactor.
//
// Only one of Receive or NewReceive are called per message. If NewReceive
// is implemented, it will be used, otherwise the switch will fallback to
// using Receive. Receive will be replaced by NewReceive in a future version
NewReceive(Envelope)
}
//--------------------------------------
type BaseReactor struct {

View File

@@ -465,12 +465,15 @@ func createMConnection(
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes)))
reactor.NewReceive(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
reactor.Receive(chID, p, msgBytes)
if nr, ok := reactor.(NewReceiver); ok {
nr.NewReceive(Envelope{
ChannelID: chID,
Src: p,
Message: msg,
})
} else {
reactor.Receive(chID, p, msgBytes)
}
}
onError := func(r interface{}) {