mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-19 11:12:50 +00:00
Compare commits
5 Commits
main
...
wb/msg-byt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78443687a6 | ||
|
|
f7218259c5 | ||
|
|
5fa49830e4 | ||
|
|
e91616c15f | ||
|
|
fbad107210 |
@@ -234,7 +234,7 @@ func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *bcproto.Message
|
||||
msg := &bcproto.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -312,7 +312,7 @@ func (bcR *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *bcproto.Message
|
||||
msg := &bcproto.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -518,7 +518,7 @@ func (r *BlockchainReactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (r *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *bcproto.Message
|
||||
msg := &bcproto.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -393,7 +393,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *tmcons.Message
|
||||
msg := &tmcons.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -93,7 +93,7 @@ func (evR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *tmproto.EvidenceList
|
||||
msg := &tmproto.EvidenceList{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -191,7 +191,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *protomem.Message
|
||||
msg := &protomem.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -190,7 +190,7 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *protomem.Message
|
||||
msg := &protomem.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -39,8 +39,13 @@ type Metrics struct {
|
||||
NumTxs metrics.Gauge
|
||||
// Number of bytes of each message type received.
|
||||
MessageReceiveBytesTotal metrics.Counter
|
||||
|
||||
// Number of bytes of each message type sent.
|
||||
MessageSendBytesTotal metrics.Counter
|
||||
// Histogram of message receive duration.
|
||||
MessageReceiveTime metrics.Histogram
|
||||
// Histogram of message send duration.
|
||||
MessageSendTime metrics.Histogram
|
||||
}
|
||||
|
||||
// PrometheusMetrics returns Metrics build using Prometheus client library.
|
||||
@@ -94,6 +99,22 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
Name: "message_send_bytes_total",
|
||||
Help: "Number of bytes of each message type sent.",
|
||||
}, append(labels, "message_type")).With(labelsAndValues...),
|
||||
MessageReceiveTime: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "message_receive_time",
|
||||
Help: "Histogram of message receive duration.",
|
||||
|
||||
Buckets: stdprometheus.ExponentialBucketsRange(0.05, 50, 8),
|
||||
}, labels).With(labelsAndValues...),
|
||||
MessageSendTime: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "message_send_time",
|
||||
Help: "Histogram of message send duration.",
|
||||
|
||||
Buckets: stdprometheus.ExponentialBucketsRange(0.05, 50, 8),
|
||||
}, labels).With(labelsAndValues...),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -106,6 +127,8 @@ func NopMetrics() *Metrics {
|
||||
NumTxs: discard.NewGauge(),
|
||||
MessageReceiveBytesTotal: discard.NewCounter(),
|
||||
MessageSendBytesTotal: discard.NewCounter(),
|
||||
MessageReceiveTime: discard.NewHistogram(),
|
||||
MessageSendTime: discard.NewHistogram(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
16
p2p/peer.go
16
p2p/peer.go
@@ -64,6 +64,12 @@ type EnvelopeSender interface {
|
||||
//
|
||||
// Deprecated: Will be removed in v0.37.
|
||||
func SendEnvelopeShim(p Peer, e Envelope, lg log.Logger) bool {
|
||||
before := time.Now()
|
||||
defer func() {
|
||||
if pp, ok := p.(*peer); ok {
|
||||
pp.metrics.MessageSendTime.Observe(time.Since(before).Seconds())
|
||||
}
|
||||
}()
|
||||
if es, ok := p.(EnvelopeSender); ok {
|
||||
return es.SendEnvelope(e)
|
||||
}
|
||||
@@ -86,6 +92,12 @@ func SendEnvelopeShim(p Peer, e Envelope, lg log.Logger) bool {
|
||||
//
|
||||
// Deprecated: Will be removed in v0.37.
|
||||
func TrySendEnvelopeShim(p Peer, e Envelope, lg log.Logger) bool {
|
||||
before := time.Now()
|
||||
defer func() {
|
||||
if pp, ok := p.(*peer); ok {
|
||||
pp.metrics.MessageSendTime.Observe(time.Since(before).Seconds())
|
||||
}
|
||||
}()
|
||||
if es, ok := p.(EnvelopeSender); ok {
|
||||
return es.SendEnvelope(e)
|
||||
}
|
||||
@@ -510,6 +522,10 @@ func createMConnection(
|
||||
) *tmconn.MConnection {
|
||||
|
||||
onReceive := func(chID byte, msgBytes []byte) {
|
||||
before := time.Now()
|
||||
defer func() {
|
||||
p.metrics.MessageReceiveTime.Observe(time.Since(before).Seconds())
|
||||
}()
|
||||
reactor := reactorsByCh[chID]
|
||||
if reactor == nil {
|
||||
// Note that its ok to panic here as it's caught in the conn._recover,
|
||||
|
||||
@@ -301,7 +301,7 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *tmp2p.Message
|
||||
msg := &tmp2p.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -80,7 +80,7 @@ func (tr *TestReactor) ReceiveEnvelope(e Envelope) {
|
||||
}
|
||||
|
||||
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
|
||||
var msg *p2pproto.Message
|
||||
msg := &p2pproto.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -226,7 +226,7 @@ func (r *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *ssproto.Message
|
||||
msg := &ssproto.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
@@ -396,7 +396,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
|
||||
}
|
||||
|
||||
func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
|
||||
var msg *tmcons.Message
|
||||
msg := &tmcons.Message{}
|
||||
err := proto.Unmarshal(msgBytes, msg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
||||
Reference in New Issue
Block a user