p2p/metrics: add the metricsLabelCache to prevent name recalculation

This commit is contained in:
William Banfield
2022-10-21 13:19:58 -04:00
parent 0ea99d089b
commit 63a6058e97
5 changed files with 67 additions and 1 deletions

View File

@@ -1,6 +1,11 @@
package p2p
import (
"fmt"
"reflect"
"regexp"
"sync"
"github.com/go-kit/kit/metrics"
)
@@ -10,6 +15,13 @@ const (
MetricsSubsystem = "p2p"
)
var (
// valueToLabelRegexp is used to find the golang package name and type name
// so that the name can be turned into a prometheus label where the characters
// in the label do not include prometheus special characters such as '*' and '.'.
valueToLabelRegexp = regexp.MustCompile(`\*?(\w+)\.(.*)`)
)
//go:generate go run ../scripts/metricsgen -struct=Metrics
// Metrics contains metrics exposed by this package.
@@ -29,3 +41,38 @@ type Metrics struct {
// Number of bytes of each message type sent.
MessageSendBytesTotal metrics.Counter `metrics_labels:"message_type"`
}
type metricsLabelCache struct {
mtx *sync.RWMutex
messageLabelNames map[reflect.Type]string
}
// ValueToMetricLabel is a method that is used to produce a prometheus label value of the golang
// type that is passed in.
// This method uses a map on the Metrics struct so that each label name only needs
// to be produced once to prevent expensive string operations.
func (m *metricsLabelCache) ValueToMetricLabel(i interface{}) string {
t := reflect.TypeOf(i)
m.mtx.RLock()
if s, ok := m.messageLabelNames[t]; ok {
m.mtx.RUnlock()
return s
}
m.mtx.RUnlock()
s := t.String()
ss := valueToLabelRegexp.FindStringSubmatch(s)
l := fmt.Sprintf("%s_%s", ss[1], ss[2])
m.mtx.Lock()
defer m.mtx.Unlock()
m.messageLabelNames[t] = l
return l
}
func newMetricsLabelCache() *metricsLabelCache {
return &metricsLabelCache{
mtx: &sync.RWMutex{},
messageLabelNames: map[reflect.Type]string{},
}
}

View File

@@ -122,6 +122,7 @@ type peer struct {
metrics *Metrics
metricsTicker *time.Ticker
mlc *metricsLabelCache
// When removal of a peer fails, we set this flag
removalAttemptFailed bool
@@ -137,6 +138,7 @@ func newPeer(
msgTypeByChID map[byte]proto.Message,
chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
mlc *metricsLabelCache,
options ...PeerOption,
) *peer {
p := &peer{
@@ -146,6 +148,7 @@ func newPeer(
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
mlc: mlc,
}
p.mconn = createMConnection(
@@ -262,6 +265,7 @@ func (p *peer) Send(e Envelope) bool {
return false
}
msg := e.Message
metricLabelValue := p.mlc.ValueToMetricLabel(msg)
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
@@ -280,6 +284,7 @@ func (p *peer) Send(e Envelope) bool {
"chID", fmt.Sprintf("%#x", e.ChannelID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
}
return res
}
@@ -293,6 +298,7 @@ func (p *peer) TrySend(e Envelope) bool {
return false
}
msg := e.Message
metricLabelValue := p.mlc.ValueToMetricLabel(msg)
if w, ok := msg.(Wrapper); ok {
var err error
msg, err = w.Wrap()
@@ -311,6 +317,7 @@ func (p *peer) TrySend(e Envelope) bool {
"chID", fmt.Sprintf("%#x", e.ChannelID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
}
return res
}
@@ -444,7 +451,7 @@ func createMConnection(
}
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", "tmp").Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes)))
reactor.Receive(Envelope{
ChannelID: chID,
Src: p,

View File

@@ -3,6 +3,7 @@ package p2p
import (
"fmt"
"math"
"reflect"
"sync"
"time"
@@ -92,6 +93,7 @@ type Switch struct {
rng *rand.Rand // seed for randomizing dial times and orders
metrics *Metrics
mlc *metricsLabelCache
}
// NetAddress returns the address the switch is listening on.
@@ -109,6 +111,10 @@ func NewSwitch(
transport Transport,
options ...SwitchOption,
) *Switch {
mlc := &metricsLabelCache{
mtx: &sync.RWMutex{},
messageLabelNames: make(map[reflect.Type]string),
}
sw := &Switch{
config: cfg,
reactors: make(map[string]Reactor),
@@ -123,6 +129,7 @@ func NewSwitch(
filterTimeout: defaultFilterTimeout,
persistentPeersAddrs: make([]*NetAddress, 0),
unconditionalPeerIDs: make(map[ID]struct{}),
mlc: mlc,
}
// Ensure we have a completely undeterministic PRNG.
@@ -632,6 +639,7 @@ func (sw *Switch) acceptRoutine() {
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
mlc: sw.mlc,
isPersistent: sw.IsPeerPersistent,
})
if err != nil {
@@ -737,6 +745,7 @@ func (sw *Switch) addOutboundPeerWithConfig(
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
mlc: sw.mlc,
})
if err != nil {
if e, ok := err.(ErrRejected); ok {

View File

@@ -152,6 +152,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
sw.msgTypeByChID,
sw.chDescs,
sw.StopPeerForError,
sw.mlc,
)
if err = sw.addPeer(p); err != nil {

View File

@@ -52,6 +52,7 @@ type peerConfig struct {
reactorsByCh map[byte]Reactor
msgTypeByChID map[byte]proto.Message
metrics *Metrics
mlc *metricsLabelCache
}
// Transport emits and connects to Peers. The implementation of Peer is left to
@@ -524,6 +525,7 @@ func (mt *MultiplexTransport) wrapPeer(
cfg.msgTypeByChID,
cfg.chDescs,
cfg.onPeerError,
cfg.mlc,
PeerMetrics(cfg.metrics),
)