From 63a6058e97276dfe836593001936854ac4cfc3f0 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Fri, 21 Oct 2022 13:19:58 -0400 Subject: [PATCH] p2p/metrics: add the metricsLabelCache to prevent name recalculation --- p2p/metrics.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ p2p/peer.go | 9 ++++++++- p2p/switch.go | 9 +++++++++ p2p/test_util.go | 1 + p2p/transport.go | 2 ++ 5 files changed, 67 insertions(+), 1 deletion(-) diff --git a/p2p/metrics.go b/p2p/metrics.go index fe33a2f41..808142e9a 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -1,6 +1,11 @@ package p2p import ( + "fmt" + "reflect" + "regexp" + "sync" + "github.com/go-kit/kit/metrics" ) @@ -10,6 +15,13 @@ const ( MetricsSubsystem = "p2p" ) +var ( + // valueToLabelRegexp is used to find the golang package name and type name + // so that the name can be turned into a prometheus label where the characters + // in the label do not include prometheus special characters such as '*' and '.'. + valueToLabelRegexp = regexp.MustCompile(`\*?(\w+)\.(.*)`) +) + //go:generate go run ../scripts/metricsgen -struct=Metrics // Metrics contains metrics exposed by this package. @@ -29,3 +41,38 @@ type Metrics struct { // Number of bytes of each message type sent. MessageSendBytesTotal metrics.Counter `metrics_labels:"message_type"` } + +type metricsLabelCache struct { + mtx *sync.RWMutex + messageLabelNames map[reflect.Type]string +} + +// ValueToMetricLabel is a method that is used to produce a prometheus label value of the golang +// type that is passed in. +// This method uses a map on the Metrics struct so that each label name only needs +// to be produced once to prevent expensive string operations. +func (m *metricsLabelCache) ValueToMetricLabel(i interface{}) string { + t := reflect.TypeOf(i) + m.mtx.RLock() + + if s, ok := m.messageLabelNames[t]; ok { + m.mtx.RUnlock() + return s + } + m.mtx.RUnlock() + + s := t.String() + ss := valueToLabelRegexp.FindStringSubmatch(s) + l := fmt.Sprintf("%s_%s", ss[1], ss[2]) + m.mtx.Lock() + defer m.mtx.Unlock() + m.messageLabelNames[t] = l + return l +} + +func newMetricsLabelCache() *metricsLabelCache { + return &metricsLabelCache{ + mtx: &sync.RWMutex{}, + messageLabelNames: map[reflect.Type]string{}, + } +} diff --git a/p2p/peer.go b/p2p/peer.go index 9668d34be..b7c6c1562 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -122,6 +122,7 @@ type peer struct { metrics *Metrics metricsTicker *time.Ticker + mlc *metricsLabelCache // When removal of a peer fails, we set this flag removalAttemptFailed bool @@ -137,6 +138,7 @@ func newPeer( msgTypeByChID map[byte]proto.Message, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), + mlc *metricsLabelCache, options ...PeerOption, ) *peer { p := &peer{ @@ -146,6 +148,7 @@ func newPeer( Data: cmap.NewCMap(), metricsTicker: time.NewTicker(metricsTickerDuration), metrics: NopMetrics(), + mlc: mlc, } p.mconn = createMConnection( @@ -262,6 +265,7 @@ func (p *peer) Send(e Envelope) bool { return false } msg := e.Message + metricLabelValue := p.mlc.ValueToMetricLabel(msg) if w, ok := msg.(Wrapper); ok { var err error msg, err = w.Wrap() @@ -280,6 +284,7 @@ func (p *peer) Send(e Envelope) bool { "chID", fmt.Sprintf("%#x", e.ChannelID), } p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) + p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) } return res } @@ -293,6 +298,7 @@ func (p *peer) TrySend(e Envelope) bool { return false } msg := e.Message + metricLabelValue := p.mlc.ValueToMetricLabel(msg) if w, ok := msg.(Wrapper); ok { var err error msg, err = w.Wrap() @@ -311,6 +317,7 @@ func (p *peer) TrySend(e Envelope) bool { "chID", fmt.Sprintf("%#x", e.ChannelID), } p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes))) + p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes))) } return res } @@ -444,7 +451,7 @@ func createMConnection( } } p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) - p.metrics.MessageReceiveBytesTotal.With("message_type", "tmp").Add(float64(len(msgBytes))) + p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes))) reactor.Receive(Envelope{ ChannelID: chID, Src: p, diff --git a/p2p/switch.go b/p2p/switch.go index ce7bbbe37..b0ba71fcc 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -3,6 +3,7 @@ package p2p import ( "fmt" "math" + "reflect" "sync" "time" @@ -92,6 +93,7 @@ type Switch struct { rng *rand.Rand // seed for randomizing dial times and orders metrics *Metrics + mlc *metricsLabelCache } // NetAddress returns the address the switch is listening on. @@ -109,6 +111,10 @@ func NewSwitch( transport Transport, options ...SwitchOption, ) *Switch { + mlc := &metricsLabelCache{ + mtx: &sync.RWMutex{}, + messageLabelNames: make(map[reflect.Type]string), + } sw := &Switch{ config: cfg, reactors: make(map[string]Reactor), @@ -123,6 +129,7 @@ func NewSwitch( filterTimeout: defaultFilterTimeout, persistentPeersAddrs: make([]*NetAddress, 0), unconditionalPeerIDs: make(map[ID]struct{}), + mlc: mlc, } // Ensure we have a completely undeterministic PRNG. @@ -632,6 +639,7 @@ func (sw *Switch) acceptRoutine() { reactorsByCh: sw.reactorsByCh, msgTypeByChID: sw.msgTypeByChID, metrics: sw.metrics, + mlc: sw.mlc, isPersistent: sw.IsPeerPersistent, }) if err != nil { @@ -737,6 +745,7 @@ func (sw *Switch) addOutboundPeerWithConfig( reactorsByCh: sw.reactorsByCh, msgTypeByChID: sw.msgTypeByChID, metrics: sw.metrics, + mlc: sw.mlc, }) if err != nil { if e, ok := err.(ErrRejected); ok { diff --git a/p2p/test_util.go b/p2p/test_util.go index 14af8c520..1d9a4883c 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -152,6 +152,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { sw.msgTypeByChID, sw.chDescs, sw.StopPeerForError, + sw.mlc, ) if err = sw.addPeer(p); err != nil { diff --git a/p2p/transport.go b/p2p/transport.go index c16376fe8..b5538ff18 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -52,6 +52,7 @@ type peerConfig struct { reactorsByCh map[byte]Reactor msgTypeByChID map[byte]proto.Message metrics *Metrics + mlc *metricsLabelCache } // Transport emits and connects to Peers. The implementation of Peer is left to @@ -524,6 +525,7 @@ func (mt *MultiplexTransport) wrapPeer( cfg.msgTypeByChID, cfg.chDescs, cfg.onPeerError, + cfg.mlc, PeerMetrics(cfg.metrics), )