From 4396224c1f3678dd2d994ef7ea1e23b6f2a3e32d Mon Sep 17 00:00:00 2001 From: William Banfield Date: Tue, 12 Oct 2021 13:33:38 -0400 Subject: [PATCH] internal/proxy: add initial set of abci metrics --- internal/consensus/replay_file.go | 2 +- internal/consensus/replay_stubs.go | 2 +- internal/consensus/wal_generator.go | 2 +- internal/proxy/app_conn.go | 65 +++++++++++++++++++++++++++-- internal/proxy/metrics.go | 39 +++++++++++++++++ internal/proxy/multi_app_conn.go | 16 +++---- node/node.go | 10 +++-- node/setup.go | 4 +- 8 files changed, 120 insertions(+), 20 deletions(-) create mode 100644 internal/proxy/metrics.go diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index d009533ba..f60dff531 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -312,7 +312,7 @@ func newConsensusStateForReplay(cfg config.BaseConfig, csConfig *config.Consensu // Create proxyAppConn connection (consensus, mempool, query) clientCreator, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) - proxyApp := proxy.NewAppConns(clientCreator) + proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics()) err = proxyApp.Start() if err != nil { tmos.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err)) diff --git a/internal/consensus/replay_stubs.go b/internal/consensus/replay_stubs.go index bc8c11cd9..1235baccb 100644 --- a/internal/consensus/replay_stubs.go +++ b/internal/consensus/replay_stubs.go @@ -64,7 +64,7 @@ func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy if err != nil { panic(err) } - return proxy.NewAppConnConsensus(cli) + return proxy.NewAppConnConsensus(cli, proxy.NopMetrics()) } type mockProxyApp struct { diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index 7e31188fa..4b4375498 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -65,7 +65,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { blockStore := store.NewBlockStore(blockStoreDB) - proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app)) + proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), proxy.NopMetrics()) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { return fmt.Errorf("failed to start proxy app connections: %w", err) diff --git a/internal/proxy/app_conn.go b/internal/proxy/app_conn.go index ca2c7c109..4e79120cf 100644 --- a/internal/proxy/app_conn.go +++ b/internal/proxy/app_conn.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "time" abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/types" @@ -56,11 +57,13 @@ type AppConnSnapshot interface { // Implements AppConnConsensus (subset of abciclient.Client) type appConnConsensus struct { + metrics *Metrics appConn abciclient.Client } -func NewAppConnConsensus(appConn abciclient.Client) AppConnConsensus { +func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus { return &appConnConsensus{ + metrics: metrics, appConn: appConn, } } @@ -77,6 +80,9 @@ func (app *appConnConsensus) InitChainSync( ctx context.Context, req types.RequestInitChain, ) (*types.ResponseInitChain, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "init_chain", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.InitChainSync(ctx, req) } @@ -84,6 +90,9 @@ func (app *appConnConsensus) BeginBlockSync( ctx context.Context, req types.RequestBeginBlock, ) (*types.ResponseBeginBlock, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "begin_block", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.BeginBlockSync(ctx, req) } @@ -91,6 +100,9 @@ func (app *appConnConsensus) DeliverTxAsync( ctx context.Context, req types.RequestDeliverTx, ) (*abciclient.ReqRes, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "deliver_tx", + "type", "aync").Observe(time.Since(start).Seconds()) return app.appConn.DeliverTxAsync(ctx, req) } @@ -98,10 +110,16 @@ func (app *appConnConsensus) EndBlockSync( ctx context.Context, req types.RequestEndBlock, ) (*types.ResponseEndBlock, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "deliver_tx", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.EndBlockSync(ctx, req) } func (app *appConnConsensus) CommitSync(ctx context.Context) (*types.ResponseCommit, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "commit", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.CommitSync(ctx) } @@ -109,11 +127,13 @@ func (app *appConnConsensus) CommitSync(ctx context.Context) (*types.ResponseCom // Implements AppConnMempool (subset of abciclient.Client) type appConnMempool struct { + metrics *Metrics appConn abciclient.Client } -func NewAppConnMempool(appConn abciclient.Client) AppConnMempool { +func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempool { return &appConnMempool{ + metrics: metrics, appConn: appConn, } } @@ -127,18 +147,30 @@ func (app *appConnMempool) Error() error { } func (app *appConnMempool) FlushAsync(ctx context.Context) (*abciclient.ReqRes, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "flush", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.FlushAsync(ctx) } func (app *appConnMempool) FlushSync(ctx context.Context) error { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "flush", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.FlushSync(ctx) } func (app *appConnMempool) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*abciclient.ReqRes, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "check_tx", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.CheckTxAsync(ctx, req) } func (app *appConnMempool) CheckTxSync(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "check_tx", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.CheckTxSync(ctx, req) } @@ -146,11 +178,13 @@ func (app *appConnMempool) CheckTxSync(ctx context.Context, req types.RequestChe // Implements AppConnQuery (subset of abciclient.Client) type appConnQuery struct { + metrics *Metrics appConn abciclient.Client } -func NewAppConnQuery(appConn abciclient.Client) AppConnQuery { +func NewAppConnQuery(appConn abciclient.Client, metrics *Metrics) AppConnQuery { return &appConnQuery{ + metrics: metrics, appConn: appConn, } } @@ -160,14 +194,23 @@ func (app *appConnQuery) Error() error { } func (app *appConnQuery) EchoSync(ctx context.Context, msg string) (*types.ResponseEcho, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "echo", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.EchoSync(ctx, msg) } func (app *appConnQuery) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "info", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.InfoSync(ctx, req) } func (app *appConnQuery) QuerySync(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "query", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.QuerySync(ctx, reqQuery) } @@ -175,11 +218,13 @@ func (app *appConnQuery) QuerySync(ctx context.Context, reqQuery types.RequestQu // Implements AppConnSnapshot (subset of abciclient.Client) type appConnSnapshot struct { + metrics *Metrics appConn abciclient.Client } -func NewAppConnSnapshot(appConn abciclient.Client) AppConnSnapshot { +func NewAppConnSnapshot(appConn abciclient.Client, metrics *Metrics) AppConnSnapshot { return &appConnSnapshot{ + metrics: metrics, appConn: appConn, } } @@ -192,6 +237,9 @@ func (app *appConnSnapshot) ListSnapshotsSync( ctx context.Context, req types.RequestListSnapshots, ) (*types.ResponseListSnapshots, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "list_snapshots", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.ListSnapshotsSync(ctx, req) } @@ -199,17 +247,26 @@ func (app *appConnSnapshot) OfferSnapshotSync( ctx context.Context, req types.RequestOfferSnapshot, ) (*types.ResponseOfferSnapshot, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "offer_snapshot", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.OfferSnapshotSync(ctx, req) } func (app *appConnSnapshot) LoadSnapshotChunkSync( ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "load_snapshot_chunk", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.LoadSnapshotChunkSync(ctx, req) } func (app *appConnSnapshot) ApplySnapshotChunkSync( ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { + start := time.Now() + defer app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", + "type", "sync").Observe(time.Since(start).Seconds()) return app.appConn.ApplySnapshotChunkSync(ctx, req) } diff --git a/internal/proxy/metrics.go b/internal/proxy/metrics.go new file mode 100644 index 000000000..b0dc9fa9e --- /dev/null +++ b/internal/proxy/metrics.go @@ -0,0 +1,39 @@ +package proxy + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "abci_connection" +) + +type Metrics struct { + MethodTiming metrics.Histogram +} + +func PrometheusMetrics(namespace string, defaultLabelsAndValues ...string) *Metrics { + defaultLabels := []string{} + for i := 0; i < len(defaultLabelsAndValues); i += 2 { + defaultLabels = append(defaultLabels, defaultLabelsAndValues[i]) + } + return &Metrics{ + MethodTiming: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "method_timing", + Help: "ABCI Method Timing", + }, append(defaultLabels, []string{"method", "type"}...)).With(defaultLabelsAndValues...), + } +} + +func NopMetrics() *Metrics { + return &Metrics{ + MethodTiming: discard.NewHistogram(), + } +} diff --git a/internal/proxy/multi_app_conn.go b/internal/proxy/multi_app_conn.go index df49df287..0bcc64af6 100644 --- a/internal/proxy/multi_app_conn.go +++ b/internal/proxy/multi_app_conn.go @@ -33,8 +33,8 @@ type AppConns interface { } // NewAppConns calls NewMultiAppConn. -func NewAppConns(clientCreator abciclient.Creator) AppConns { - return NewMultiAppConn(clientCreator) +func NewAppConns(clientCreator abciclient.Creator, metrics *Metrics) AppConns { + return NewMultiAppConn(clientCreator, metrics) } // multiAppConn implements AppConns. @@ -45,6 +45,7 @@ func NewAppConns(clientCreator abciclient.Creator) AppConns { type multiAppConn struct { service.BaseService + metrics *Metrics consensusConn AppConnConsensus mempoolConn AppConnMempool queryConn AppConnQuery @@ -59,8 +60,9 @@ type multiAppConn struct { } // NewMultiAppConn makes all necessary abci connections to the application. -func NewMultiAppConn(clientCreator abciclient.Creator) AppConns { +func NewMultiAppConn(clientCreator abciclient.Creator, metrics *Metrics) AppConns { multiAppConn := &multiAppConn{ + metrics: metrics, clientCreator: clientCreator, } multiAppConn.BaseService = *service.NewBaseService(nil, "multiAppConn", multiAppConn) @@ -89,7 +91,7 @@ func (app *multiAppConn) OnStart() error { return err } app.queryConnClient = c - app.queryConn = NewAppConnQuery(c) + app.queryConn = NewAppConnQuery(c, app.metrics) c, err = app.abciClientFor(connSnapshot) if err != nil { @@ -97,7 +99,7 @@ func (app *multiAppConn) OnStart() error { return err } app.snapshotConnClient = c - app.snapshotConn = NewAppConnSnapshot(c) + app.snapshotConn = NewAppConnSnapshot(c, app.metrics) c, err = app.abciClientFor(connMempool) if err != nil { @@ -105,7 +107,7 @@ func (app *multiAppConn) OnStart() error { return err } app.mempoolConnClient = c - app.mempoolConn = NewAppConnMempool(c) + app.mempoolConn = NewAppConnMempool(c, app.metrics) c, err = app.abciClientFor(connConsensus) if err != nil { @@ -113,7 +115,7 @@ func (app *multiAppConn) OnStart() error { return err } app.consensusConnClient = c - app.consensusConn = NewAppConnConsensus(c) + app.consensusConn = NewAppConnConsensus(c, app.metrics) // Kill Tendermint if the ABCI application crashes. go app.killTMOnClientError() diff --git a/node/node.go b/node/node.go index 87e9b0dbd..eb186619d 100644 --- a/node/node.go +++ b/node/node.go @@ -146,8 +146,10 @@ func makeNode(cfg *config.Config, return nil, err } + nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) + // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). - proxyApp, err := createAndStartProxyAppConns(clientCreator, logger) + proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, nodeMetrics.proxy) if err != nil { return nil, err } @@ -240,9 +242,6 @@ func makeNode(cfg *config.Config, return nil, fmt.Errorf("failed to create peer manager: %w", err) } - nodeMetrics := - defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) - router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey, peerManager, transport, getRouterConfig(cfg, proxyApp)) if err != nil { @@ -928,6 +927,7 @@ type nodeMetrics struct { mempool *mempool.Metrics state *sm.Metrics statesync *statesync.Metrics + proxy *proxy.Metrics } // metricsProvider returns consensus, p2p, mempool, state, statesync Metrics. @@ -944,6 +944,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider { mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), + proxy.PrometheusMetrics(cfg.Namespace, "chain_id", chainID), } } return &nodeMetrics{ @@ -952,6 +953,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider { mempool.NopMetrics(), sm.NopMetrics(), statesync.NopMetrics(), + proxy.NopMetrics(), } } } diff --git a/node/setup.go b/node/setup.go index f6b9c028d..10a420aea 100644 --- a/node/setup.go +++ b/node/setup.go @@ -47,8 +47,8 @@ func initDBs(cfg *config.Config, dbProvider config.DBProvider) (blockStore *stor return } -func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger) (proxy.AppConns, error) { - proxyApp := proxy.NewAppConns(clientCreator) +func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) { + proxyApp := proxy.NewAppConns(clientCreator, metrics) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { return nil, fmt.Errorf("error starting proxy app connections: %v", err)