Compare commits

...

5 Commits

Author SHA1 Message Date
William Banfield
78443687a6 uncomment envelope methods 2022-11-03 16:58:17 -04:00
William Banfield
f7218259c5 fix metric to be seconds 2022-11-03 16:03:08 -04:00
William Banfield
5fa49830e4 add send and receive timings 2022-11-03 15:53:57 -04:00
William Banfield
e91616c15f add allocate in Receive calls 2022-11-03 15:20:41 -04:00
William Banfield
fbad107210 comment out new functions 2022-11-03 15:09:28 -04:00
13 changed files with 50 additions and 11 deletions

View File

@@ -234,7 +234,7 @@ func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *bcproto.Message
msg := &bcproto.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -312,7 +312,7 @@ func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *bcproto.Message
msg := &bcproto.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -518,7 +518,7 @@ func (r *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (r *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *bcproto.Message
msg := &bcproto.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -393,7 +393,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmcons.Message
msg := &tmcons.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -93,7 +93,7 @@ func (evR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmproto.EvidenceList
msg := &tmproto.EvidenceList{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -191,7 +191,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *protomem.Message
msg := &protomem.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -190,7 +190,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *protomem.Message
msg := &protomem.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -39,8 +39,13 @@ type Metrics struct {
NumTxs metrics.Gauge
// Number of bytes of each message type received.
MessageReceiveBytesTotal metrics.Counter
// Number of bytes of each message type sent.
MessageSendBytesTotal metrics.Counter
// Histogram of message receive duration.
MessageReceiveTime metrics.Histogram
// Histogram of message send duration.
MessageSendTime metrics.Histogram
}
// PrometheusMetrics returns Metrics build using Prometheus client library.
@@ -94,6 +99,22 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "message_send_bytes_total",
Help: "Number of bytes of each message type sent.",
}, append(labels, "message_type")).With(labelsAndValues...),
MessageReceiveTime: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "message_receive_time",
Help: "Histogram of message receive duration.",
Buckets: stdprometheus.ExponentialBucketsRange(0.05, 50, 8),
}, labels).With(labelsAndValues...),
MessageSendTime: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "message_send_time",
Help: "Histogram of message send duration.",
Buckets: stdprometheus.ExponentialBucketsRange(0.05, 50, 8),
}, labels).With(labelsAndValues...),
}
}
@@ -106,6 +127,8 @@ func NopMetrics() *Metrics {
NumTxs: discard.NewGauge(),
MessageReceiveBytesTotal: discard.NewCounter(),
MessageSendBytesTotal: discard.NewCounter(),
MessageReceiveTime: discard.NewHistogram(),
MessageSendTime: discard.NewHistogram(),
}
}

View File

@@ -64,6 +64,12 @@ type EnvelopeSender interface {
//
// Deprecated: Will be removed in v0.37.
func SendEnvelopeShim(p Peer, e Envelope, lg log.Logger) bool {
before := time.Now()
defer func() {
if pp, ok := p.(*peer); ok {
pp.metrics.MessageSendTime.Observe(time.Since(before).Seconds())
}
}()
if es, ok := p.(EnvelopeSender); ok {
return es.SendEnvelope(e)
}
@@ -86,6 +92,12 @@ func SendEnvelopeShim(p Peer, e Envelope, lg log.Logger) bool {
//
// Deprecated: Will be removed in v0.37.
func TrySendEnvelopeShim(p Peer, e Envelope, lg log.Logger) bool {
before := time.Now()
defer func() {
if pp, ok := p.(*peer); ok {
pp.metrics.MessageSendTime.Observe(time.Since(before).Seconds())
}
}()
if es, ok := p.(EnvelopeSender); ok {
return es.SendEnvelope(e)
}
@@ -510,6 +522,10 @@ func createMConnection(
) *tmconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
before := time.Now()
defer func() {
p.metrics.MessageReceiveTime.Observe(time.Since(before).Seconds())
}()
reactor := reactorsByCh[chID]
if reactor == nil {
// Note that its ok to panic here as it's caught in the conn._recover,

View File

@@ -301,7 +301,7 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmp2p.Message
msg := &tmp2p.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -80,7 +80,7 @@ func (tr *TestReactor) ReceiveEnvelope(e Envelope) {
}
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
var msg *p2pproto.Message
msg := &p2pproto.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -226,7 +226,7 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *ssproto.Message
msg := &ssproto.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)

View File

@@ -396,7 +396,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
}
func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
var msg *tmcons.Message
msg := &tmcons.Message{}
err := proto.Unmarshal(msgBytes, msg)
if err != nil {
panic(err)