From d5865af1f454cc238272b7cab4c5fe3d50c2445e Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Fri, 5 Nov 2021 12:50:53 -0700 Subject: [PATCH] Add basic metrics to the indexer package. (#7250) This follows the same model as we did in the p2p package. Rework the indexer service constructor to take a struct of arguments, that makes it easier to construct the optional settings. Deprecate but do not remove the existing constructor. Clean up node initialization a little bit. --- internal/inspect/inspect.go | 7 ++- internal/state/indexer/indexer_service.go | 41 +++++++++++-- internal/state/indexer/metrics.go | 73 +++++++++++++++++++++++ node/node.go | 34 ++++++----- node/node_test.go | 3 +- node/setup.go | 9 ++- 6 files changed, 143 insertions(+), 24 deletions(-) create mode 100644 internal/state/indexer/metrics.go diff --git a/internal/inspect/inspect.go b/internal/inspect/inspect.go index 0a92ef3f2..66e9c9421 100644 --- a/internal/inspect/inspect.go +++ b/internal/inspect/inspect.go @@ -47,8 +47,11 @@ func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexe routes := rpc.Routes(*cfg, ss, bs, es, logger) eb := eventbus.NewDefault() eb.SetLogger(logger.With("module", "events")) - is := indexer.NewIndexerService(es, eb) - is.SetLogger(logger.With("module", "txindex")) + is := indexer.NewService(indexer.ServiceArgs{ + Sinks: es, + EventBus: eb, + Logger: logger.With("module", "txindex"), + }) return &Inspector{ routes: routes, config: cfg, diff --git a/internal/state/indexer/indexer_service.go b/internal/state/indexer/indexer_service.go index 5952050f2..8f69f488b 100644 --- a/internal/state/indexer/indexer_service.go +++ b/internal/state/indexer/indexer_service.go @@ -2,8 +2,10 @@ package indexer import ( "context" + "time" "github.com/tendermint/tendermint/internal/eventbus" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/types" @@ -16,6 +18,7 @@ type Service struct { eventSinks []EventSink eventBus *eventbus.EventBus + metrics *Metrics currentBlock struct { header types.EventDataNewBlockHeader @@ -24,13 +27,29 @@ type Service struct { } } -// NewIndexerService returns a new service instance. -func NewIndexerService(es []EventSink, eventBus *eventbus.EventBus) *Service { - is := &Service{eventSinks: es, eventBus: eventBus} - is.BaseService = *service.NewBaseService(nil, "IndexerService", is) +// NewService constructs a new indexer service from the given arguments. +func NewService(args ServiceArgs) *Service { + is := &Service{ + eventSinks: args.Sinks, + eventBus: args.EventBus, + metrics: args.Metrics, + } + if is.metrics == nil { + is.metrics = NopMetrics() + } + is.BaseService = *service.NewBaseService(args.Logger, "IndexerService", is) return is } +// NewIndexerService returns a new service instance. +// Deprecated: Use NewService instead. +func NewIndexerService(es []EventSink, eventBus *eventbus.EventBus) *Service { + return NewService(ServiceArgs{ + Sinks: es, + EventBus: eventBus, + }) +} + // publish publishes a pubsub message to the service. The service blocks until // the message has been fully processed. func (is *Service) publish(msg pubsub.Message) error { @@ -71,20 +90,26 @@ func (is *Service) publish(msg pubsub.Message) error { if curr.Pending == 0 { // INDEX: We have all the transactions we expect for the current block. for _, sink := range is.eventSinks { + start := time.Now() if err := sink.IndexBlockEvents(is.currentBlock.header); err != nil { is.Logger.Error("failed to index block header", "height", is.currentBlock.height, "err", err) } else { + is.metrics.BlockEventsSeconds.Observe(time.Since(start).Seconds()) + is.metrics.BlocksIndexed.Add(1) is.Logger.Debug("indexed block", "height", is.currentBlock.height, "sink", sink.Type()) } if curr.Size() != 0 { + start := time.Now() err := sink.IndexTxEvents(curr.Ops) if err != nil { is.Logger.Error("failed to index block txs", "height", is.currentBlock.height, "err", err) } else { + is.metrics.TxEventsSeconds.Observe(time.Since(start).Seconds()) + is.metrics.TransactionsIndexed.Add(float64(curr.Size())) is.Logger.Debug("indexed txs", "height", is.currentBlock.height, "sink", sink.Type()) } @@ -122,6 +147,14 @@ func (is *Service) OnStop() { } } +// ServiceArgs are arguments for constructing a new indexer service. +type ServiceArgs struct { + Sinks []EventSink + EventBus *eventbus.EventBus + Metrics *Metrics + Logger log.Logger +} + // KVSinkEnabled returns the given eventSinks is containing KVEventSink. func KVSinkEnabled(sinks []EventSink) bool { for _, sink := range sinks { diff --git a/internal/state/indexer/metrics.go b/internal/state/indexer/metrics.go new file mode 100644 index 000000000..aa64a4bb2 --- /dev/null +++ b/internal/state/indexer/metrics.go @@ -0,0 +1,73 @@ +package indexer + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + + prometheus "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +// MetricsSubsystem is a the subsystem label for the indexer package. +const MetricsSubsystem = "indexer" + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // Latency for indexing block events. + BlockEventsSeconds metrics.Histogram + + // Latency for indexing transaction events. + TxEventsSeconds metrics.Histogram + + // Number of complete blocks indexed. + BlocksIndexed metrics.Counter + + // Number of transactions indexed. + TransactionsIndexed metrics.Counter +} + +// PrometheusMetrics returns Metrics build using Prometheus client library. +// Optionally, labels can be provided along with their values ("foo", +// "fooValue"). +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + BlockEventsSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "block_events_seconds", + Help: "Latency for indexing block events.", + }, labels).With(labelsAndValues...), + TxEventsSeconds: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "tx_events_seconds", + Help: "Latency for indexing transaction events.", + }, labels).With(labelsAndValues...), + BlocksIndexed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "blocks_indexed", + Help: "Number of complete blocks indexed.", + }, labels).With(labelsAndValues...), + TransactionsIndexed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "transactions_indexed", + Help: "Number of transactions indexed.", + }, labels).With(labelsAndValues...), + } +} + +// NopMetrics returns an indexer metrics stub that discards all samples. +func NopMetrics() *Metrics { + return &Metrics{ + BlockEventsSeconds: discard.NewHistogram(), + TxEventsSeconds: discard.NewHistogram(), + BlocksIndexed: discard.NewCounter(), + TransactionsIndexed: discard.NewCounter(), + } +} diff --git a/node/node.go b/node/node.go index 840eaac76..fa703f1d6 100644 --- a/node/node.go +++ b/node/node.go @@ -171,7 +171,8 @@ func makeNode(cfg *config.Config, return nil, combineCloseError(err, makeCloser(closers)) } - indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID) + indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, + logger, genDoc.ChainID, nodeMetrics.indexer) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } @@ -900,11 +901,12 @@ func defaultGenesisDocProviderFunc(cfg *config.Config) genesisDocProvider { type nodeMetrics struct { consensus *consensus.Metrics - p2p *p2p.Metrics + indexer *indexer.Metrics mempool *mempool.Metrics + p2p *p2p.Metrics + proxy *proxy.Metrics state *sm.Metrics statesync *statesync.Metrics - proxy *proxy.Metrics } // metricsProvider returns consensus, p2p, mempool, state, statesync Metrics. @@ -916,21 +918,23 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider { return func(chainID string) *nodeMetrics { if cfg.Prometheus { return &nodeMetrics{ - consensus.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), - p2p.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), - mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), - sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), - statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), - proxy.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + consensus: consensus.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + indexer: indexer.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + mempool: mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + p2p: p2p.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + proxy: proxy.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + state: sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + statesync: statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), } } return &nodeMetrics{ - consensus.NopMetrics(), - p2p.NopMetrics(), - mempool.NopMetrics(), - sm.NopMetrics(), - statesync.NopMetrics(), - proxy.NopMetrics(), + consensus: consensus.NopMetrics(), + indexer: indexer.NopMetrics(), + mempool: mempool.NopMetrics(), + p2p: p2p.NopMetrics(), + proxy: proxy.NopMetrics(), + state: sm.NopMetrics(), + statesync: statesync.NopMetrics(), } } } diff --git a/node/node_test.go b/node/node_test.go index f0591a165..90a585a63 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -558,7 +558,8 @@ func TestNodeSetEventSink(t *testing.T) { require.NoError(t, err) indexService, eventSinks, err := createAndStartIndexerService(cfg, - config.DefaultDBProvider, eventBus, logger, genDoc.ChainID) + config.DefaultDBProvider, eventBus, logger, genDoc.ChainID, + indexer.NopMetrics()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, indexService.Stop()) }) return eventSinks diff --git a/node/setup.go b/node/setup.go index 6ee509c28..8d5fb21b9 100644 --- a/node/setup.go +++ b/node/setup.go @@ -113,14 +113,19 @@ func createAndStartIndexerService( eventBus *eventbus.EventBus, logger log.Logger, chainID string, + metrics *indexer.Metrics, ) (*indexer.Service, []indexer.EventSink, error) { eventSinks, err := sink.EventSinksFromConfig(cfg, dbProvider, chainID) if err != nil { return nil, nil, err } - indexerService := indexer.NewIndexerService(eventSinks, eventBus) - indexerService.SetLogger(logger.With("module", "txindex")) + indexerService := indexer.NewService(indexer.ServiceArgs{ + Sinks: eventSinks, + EventBus: eventBus, + Logger: logger.With("module", "txindex"), + Metrics: metrics, + }) if err := indexerService.Start(); err != nil { return nil, nil, err