diff --git a/p2p/peer.go b/p2p/peer.go index 7280b08ad..58c7eaabd 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -40,6 +40,9 @@ type Peer interface { NewSend(Envelope) bool NewTrySend(Envelope) bool + Send(byte, []byte) bool + TrySend(byte, []byte) bool + Set(string, interface{}) Get(string) interface{} } @@ -249,12 +252,11 @@ func (p *peer) Status() tmconn.ConnectionStatus { return p.mconn.Status() } -// Send msg bytes to the channel identified by chID byte. Returns false if the -// send queue is full after timeout, specified by MConnection. +// NewSend sends the message in the envelope on the channel specified by the +// envelope. Returns false if the connection times out trying to place the message +// onto its internal queue. func (p *peer) NewSend(e Envelope) bool { if !p.IsRunning() { - // see Switch#Broadcast, where we fetch the list of peers and loop over - // them - while we're looping, one peer may be removed and stopped. return false } else if !p.hasChannel(e.ChannelID) { return false @@ -269,20 +271,36 @@ func (p *peer) NewSend(e Envelope) bool { p.Logger.Error("marshaling message to send", "error", err) return false } - res := p.mconn.Send(e.ChannelID, msgBytes) + res := p.Send(e.ChannelID, msgBytes) if res { - labels := []string{ - "peer_id", string(p.ID()), - "chID", fmt.Sprintf("%#x", e.ChannelID), - } - p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) } return res } -// NewTrySend msg bytes to the channel identified by chID byte. Immediately returns -// false if the send queue is full. +// Send msg bytes to the channel identified by chID byte. Returns false if the +// send queue is full after timeout, specified by MConnection. +// NewSend replaces TrySend which will be deprecated in a future release. +func (p *peer) Send(chID byte, msgBytes []byte) bool { + if !p.IsRunning() { + return false + } else if !p.hasChannel(chID) { + return false + } + res := p.mconn.Send(chID, msgBytes) + if res { + labels := []string{ + "peer_id", string(p.ID()), + "chID", fmt.Sprintf("%#x", chID), + } + p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) + } + return res +} + +// NewTrySend attempts to sends the message in the envelope on the channel specified by the +// envelope. Returns false immediately if the connection's internal queue is full +// NewTrySend replaces TrySend which will be deprecated in a future release. func (p *peer) NewTrySend(e Envelope) bool { if !p.IsRunning() { // see Switch#Broadcast, where we fetch the list of peers and loop over @@ -301,14 +319,28 @@ func (p *peer) NewTrySend(e Envelope) bool { p.Logger.Error("marshaling message to send", "error", err) return false } - res := p.mconn.TrySend(e.ChannelID, msgBytes) + res := p.TrySend(e.ChannelID, msgBytes) + if res { + p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) + } + return res +} + +// TrySend msg bytes to the channel identified by chID byte. Immediately returns +// false if the send queue is full. +func (p *peer) TrySend(chID byte, msgBytes []byte) bool { + if !p.IsRunning() { + return false + } else if !p.hasChannel(chID) { + return false + } + res := p.mconn.TrySend(chID, msgBytes) if res { labels := []string{ "peer_id", string(p.ID()), - "chID", fmt.Sprintf("%#x", e.ChannelID), + "chID", fmt.Sprintf("%#x", chID), } p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) - p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) } return res }