internal/proxy: add initial set of abci metrics

This commit is contained in:
William Banfield
2021-10-12 13:33:38 -04:00
parent 3646b635d3
commit 4396224c1f
8 changed files with 120 additions and 20 deletions

View File

@@ -312,7 +312,7 @@ func newConsensusStateForReplay(cfg config.BaseConfig, csConfig *config.Consensu
// Create proxyAppConn connection (consensus, mempool, query)
clientCreator, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
proxyApp := proxy.NewAppConns(clientCreator)
proxyApp := proxy.NewAppConns(clientCreator, proxy.NopMetrics())
err = proxyApp.Start()
if err != nil {
tmos.Exit(fmt.Sprintf("Error starting proxy app conns: %v", err))

View File

@@ -64,7 +64,7 @@ func newMockProxyApp(appHash []byte, abciResponses *tmstate.ABCIResponses) proxy
if err != nil {
panic(err)
}
return proxy.NewAppConnConsensus(cli)
return proxy.NewAppConnConsensus(cli, proxy.NopMetrics())
}
type mockProxyApp struct {

View File

@@ -65,7 +65,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
blockStore := store.NewBlockStore(blockStoreDB)
proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app))
proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), proxy.NopMetrics())
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return fmt.Errorf("failed to start proxy app connections: %w", err)

View File

@@ -2,6 +2,7 @@ package proxy
import (
"context"
"time"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/types"
@@ -56,11 +57,13 @@ type AppConnSnapshot interface {
// Implements AppConnConsensus (subset of abciclient.Client)
type appConnConsensus struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnConsensus(appConn abciclient.Client) AppConnConsensus {
func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus {
return &appConnConsensus{
metrics: metrics,
appConn: appConn,
}
}
@@ -77,6 +80,9 @@ func (app *appConnConsensus) InitChainSync(
ctx context.Context,
req types.RequestInitChain,
) (*types.ResponseInitChain, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "init_chain",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.InitChainSync(ctx, req)
}
@@ -84,6 +90,9 @@ func (app *appConnConsensus) BeginBlockSync(
ctx context.Context,
req types.RequestBeginBlock,
) (*types.ResponseBeginBlock, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "begin_block",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.BeginBlockSync(ctx, req)
}
@@ -91,6 +100,9 @@ func (app *appConnConsensus) DeliverTxAsync(
ctx context.Context,
req types.RequestDeliverTx,
) (*abciclient.ReqRes, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "deliver_tx",
"type", "aync").Observe(time.Since(start).Seconds())
return app.appConn.DeliverTxAsync(ctx, req)
}
@@ -98,10 +110,16 @@ func (app *appConnConsensus) EndBlockSync(
ctx context.Context,
req types.RequestEndBlock,
) (*types.ResponseEndBlock, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "deliver_tx",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.EndBlockSync(ctx, req)
}
func (app *appConnConsensus) CommitSync(ctx context.Context) (*types.ResponseCommit, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "commit",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.CommitSync(ctx)
}
@@ -109,11 +127,13 @@ func (app *appConnConsensus) CommitSync(ctx context.Context) (*types.ResponseCom
// Implements AppConnMempool (subset of abciclient.Client)
type appConnMempool struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnMempool(appConn abciclient.Client) AppConnMempool {
func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempool {
return &appConnMempool{
metrics: metrics,
appConn: appConn,
}
}
@@ -127,18 +147,30 @@ func (app *appConnMempool) Error() error {
}
func (app *appConnMempool) FlushAsync(ctx context.Context) (*abciclient.ReqRes, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "flush",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.FlushAsync(ctx)
}
func (app *appConnMempool) FlushSync(ctx context.Context) error {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "flush",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.FlushSync(ctx)
}
func (app *appConnMempool) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*abciclient.ReqRes, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "check_tx",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.CheckTxAsync(ctx, req)
}
func (app *appConnMempool) CheckTxSync(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "check_tx",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.CheckTxSync(ctx, req)
}
@@ -146,11 +178,13 @@ func (app *appConnMempool) CheckTxSync(ctx context.Context, req types.RequestChe
// Implements AppConnQuery (subset of abciclient.Client)
type appConnQuery struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnQuery(appConn abciclient.Client) AppConnQuery {
func NewAppConnQuery(appConn abciclient.Client, metrics *Metrics) AppConnQuery {
return &appConnQuery{
metrics: metrics,
appConn: appConn,
}
}
@@ -160,14 +194,23 @@ func (app *appConnQuery) Error() error {
}
func (app *appConnQuery) EchoSync(ctx context.Context, msg string) (*types.ResponseEcho, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "echo",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.EchoSync(ctx, msg)
}
func (app *appConnQuery) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "info",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.InfoSync(ctx, req)
}
func (app *appConnQuery) QuerySync(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "query",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.QuerySync(ctx, reqQuery)
}
@@ -175,11 +218,13 @@ func (app *appConnQuery) QuerySync(ctx context.Context, reqQuery types.RequestQu
// Implements AppConnSnapshot (subset of abciclient.Client)
type appConnSnapshot struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnSnapshot(appConn abciclient.Client) AppConnSnapshot {
func NewAppConnSnapshot(appConn abciclient.Client, metrics *Metrics) AppConnSnapshot {
return &appConnSnapshot{
metrics: metrics,
appConn: appConn,
}
}
@@ -192,6 +237,9 @@ func (app *appConnSnapshot) ListSnapshotsSync(
ctx context.Context,
req types.RequestListSnapshots,
) (*types.ResponseListSnapshots, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "list_snapshots",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.ListSnapshotsSync(ctx, req)
}
@@ -199,17 +247,26 @@ func (app *appConnSnapshot) OfferSnapshotSync(
ctx context.Context,
req types.RequestOfferSnapshot,
) (*types.ResponseOfferSnapshot, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "offer_snapshot",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.OfferSnapshotSync(ctx, req)
}
func (app *appConnSnapshot) LoadSnapshotChunkSync(
ctx context.Context,
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "load_snapshot_chunk",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.LoadSnapshotChunkSync(ctx, req)
}
func (app *appConnSnapshot) ApplySnapshotChunkSync(
ctx context.Context,
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
start := time.Now()
defer app.metrics.MethodTiming.With("method", "apply_snapshot_chunk",
"type", "sync").Observe(time.Since(start).Seconds())
return app.appConn.ApplySnapshotChunkSync(ctx, req)
}

39
internal/proxy/metrics.go Normal file
View File

@@ -0,0 +1,39 @@
package proxy
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "abci_connection"
)
type Metrics struct {
MethodTiming metrics.Histogram
}
func PrometheusMetrics(namespace string, defaultLabelsAndValues ...string) *Metrics {
defaultLabels := []string{}
for i := 0; i < len(defaultLabelsAndValues); i += 2 {
defaultLabels = append(defaultLabels, defaultLabelsAndValues[i])
}
return &Metrics{
MethodTiming: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "method_timing",
Help: "ABCI Method Timing",
}, append(defaultLabels, []string{"method", "type"}...)).With(defaultLabelsAndValues...),
}
}
func NopMetrics() *Metrics {
return &Metrics{
MethodTiming: discard.NewHistogram(),
}
}

View File

@@ -33,8 +33,8 @@ type AppConns interface {
}
// NewAppConns calls NewMultiAppConn.
func NewAppConns(clientCreator abciclient.Creator) AppConns {
return NewMultiAppConn(clientCreator)
func NewAppConns(clientCreator abciclient.Creator, metrics *Metrics) AppConns {
return NewMultiAppConn(clientCreator, metrics)
}
// multiAppConn implements AppConns.
@@ -45,6 +45,7 @@ func NewAppConns(clientCreator abciclient.Creator) AppConns {
type multiAppConn struct {
service.BaseService
metrics *Metrics
consensusConn AppConnConsensus
mempoolConn AppConnMempool
queryConn AppConnQuery
@@ -59,8 +60,9 @@ type multiAppConn struct {
}
// NewMultiAppConn makes all necessary abci connections to the application.
func NewMultiAppConn(clientCreator abciclient.Creator) AppConns {
func NewMultiAppConn(clientCreator abciclient.Creator, metrics *Metrics) AppConns {
multiAppConn := &multiAppConn{
metrics: metrics,
clientCreator: clientCreator,
}
multiAppConn.BaseService = *service.NewBaseService(nil, "multiAppConn", multiAppConn)
@@ -89,7 +91,7 @@ func (app *multiAppConn) OnStart() error {
return err
}
app.queryConnClient = c
app.queryConn = NewAppConnQuery(c)
app.queryConn = NewAppConnQuery(c, app.metrics)
c, err = app.abciClientFor(connSnapshot)
if err != nil {
@@ -97,7 +99,7 @@ func (app *multiAppConn) OnStart() error {
return err
}
app.snapshotConnClient = c
app.snapshotConn = NewAppConnSnapshot(c)
app.snapshotConn = NewAppConnSnapshot(c, app.metrics)
c, err = app.abciClientFor(connMempool)
if err != nil {
@@ -105,7 +107,7 @@ func (app *multiAppConn) OnStart() error {
return err
}
app.mempoolConnClient = c
app.mempoolConn = NewAppConnMempool(c)
app.mempoolConn = NewAppConnMempool(c, app.metrics)
c, err = app.abciClientFor(connConsensus)
if err != nil {
@@ -113,7 +115,7 @@ func (app *multiAppConn) OnStart() error {
return err
}
app.consensusConnClient = c
app.consensusConn = NewAppConnConsensus(c)
app.consensusConn = NewAppConnConsensus(c, app.metrics)
// Kill Tendermint if the ABCI application crashes.
go app.killTMOnClientError()

View File

@@ -146,8 +146,10 @@ func makeNode(cfg *config.Config,
return nil, err
}
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger)
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, nodeMetrics.proxy)
if err != nil {
return nil, err
}
@@ -240,9 +242,6 @@ func makeNode(cfg *config.Config,
return nil, fmt.Errorf("failed to create peer manager: %w", err)
}
nodeMetrics :=
defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(cfg, proxyApp))
if err != nil {
@@ -928,6 +927,7 @@ type nodeMetrics struct {
mempool *mempool.Metrics
state *sm.Metrics
statesync *statesync.Metrics
proxy *proxy.Metrics
}
// metricsProvider returns consensus, p2p, mempool, state, statesync Metrics.
@@ -944,6 +944,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
proxy.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
}
}
return &nodeMetrics{
@@ -952,6 +953,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
mempool.NopMetrics(),
sm.NopMetrics(),
statesync.NopMetrics(),
proxy.NopMetrics(),
}
}
}

View File

@@ -47,8 +47,8 @@ func initDBs(cfg *config.Config, dbProvider config.DBProvider) (blockStore *stor
return
}
func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator)
func createAndStartProxyAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator, metrics)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)