diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 690c08df9..f25c33ce0 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -1,6 +1,9 @@ package proxy import ( + "time" + + "github.com/go-kit/kit/metrics" abcicli "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/types" ) @@ -56,11 +59,13 @@ type AppConnSnapshot interface { // Implements AppConnConsensus (subset of abcicli.Client) type appConnConsensus struct { + metrics *Metrics appConn abcicli.Client } -func NewAppConnConsensus(appConn abcicli.Client) AppConnConsensus { +func NewAppConnConsensus(appConn abcicli.Client, metrics *Metrics) AppConnConsensus { return &appConnConsensus{ + metrics: metrics, appConn: appConn, } } @@ -74,22 +79,27 @@ func (app *appConnConsensus) Error() error { } func (app *appConnConsensus) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))() return app.appConn.InitChainSync(req) } func (app *appConnConsensus) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "begin_block", "type", "sync"))() return app.appConn.BeginBlockSync(req) } func (app *appConnConsensus) DeliverTxAsync(req types.RequestDeliverTx) *abcicli.ReqRes { + defer addTimeSample(app.metrics.MethodTiming.With("method", "deliver_tx", "type", "async"))() return app.appConn.DeliverTxAsync(req) } func (app *appConnConsensus) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "end_block", "type", "sync"))() return app.appConn.EndBlockSync(req) } func (app *appConnConsensus) CommitSync() (*types.ResponseCommit, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))() return app.appConn.CommitSync() } @@ -97,11 +107,13 @@ func (app *appConnConsensus) CommitSync() (*types.ResponseCommit, error) { // Implements AppConnMempool (subset of abcicli.Client) type appConnMempool struct { + metrics *Metrics appConn abcicli.Client } -func NewAppConnMempool(appConn abcicli.Client) AppConnMempool { +func NewAppConnMempool(appConn abcicli.Client, metrics *Metrics) AppConnMempool { return &appConnMempool{ + metrics: metrics, appConn: appConn, } } @@ -115,18 +127,22 @@ func (app *appConnMempool) Error() error { } func (app *appConnMempool) FlushAsync() *abcicli.ReqRes { + defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "async"))() return app.appConn.FlushAsync() } func (app *appConnMempool) FlushSync() error { + defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))() return app.appConn.FlushSync() } func (app *appConnMempool) CheckTxAsync(req types.RequestCheckTx) *abcicli.ReqRes { + defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "async"))() return app.appConn.CheckTxAsync(req) } func (app *appConnMempool) CheckTxSync(req types.RequestCheckTx) (*types.ResponseCheckTx, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() return app.appConn.CheckTxSync(req) } @@ -134,11 +150,13 @@ func (app *appConnMempool) CheckTxSync(req types.RequestCheckTx) (*types.Respons // Implements AppConnQuery (subset of abcicli.Client) type appConnQuery struct { + metrics *Metrics appConn abcicli.Client } -func NewAppConnQuery(appConn abcicli.Client) AppConnQuery { +func NewAppConnQuery(appConn abcicli.Client, metrics *Metrics) AppConnQuery { return &appConnQuery{ + metrics: metrics, appConn: appConn, } } @@ -148,14 +166,17 @@ func (app *appConnQuery) Error() error { } func (app *appConnQuery) EchoSync(msg string) (*types.ResponseEcho, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))() return app.appConn.EchoSync(msg) } func (app *appConnQuery) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))() return app.appConn.InfoSync(req) } func (app *appConnQuery) QuerySync(reqQuery types.RequestQuery) (*types.ResponseQuery, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))() return app.appConn.QuerySync(reqQuery) } @@ -163,11 +184,13 @@ func (app *appConnQuery) QuerySync(reqQuery types.RequestQuery) (*types.Response // Implements AppConnSnapshot (subset of abcicli.Client) type appConnSnapshot struct { + metrics *Metrics appConn abcicli.Client } -func NewAppConnSnapshot(appConn abcicli.Client) AppConnSnapshot { +func NewAppConnSnapshot(appConn abcicli.Client, metrics *Metrics) AppConnSnapshot { return &appConnSnapshot{ + metrics: metrics, appConn: appConn, } } @@ -177,19 +200,32 @@ func (app *appConnSnapshot) Error() error { } func (app *appConnSnapshot) ListSnapshotsSync(req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))() return app.appConn.ListSnapshotsSync(req) } func (app *appConnSnapshot) OfferSnapshotSync(req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))() return app.appConn.OfferSnapshotSync(req) } func (app *appConnSnapshot) LoadSnapshotChunkSync( req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))() return app.appConn.LoadSnapshotChunkSync(req) } func (app *appConnSnapshot) ApplySnapshotChunkSync( req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))() return app.appConn.ApplySnapshotChunkSync(req) } + +// addTimeSample returns a function that, when called, adds an observation to m. +// The observation added to m is the number of seconds ellapsed since addTimeSample +// was initially called. addTimeSample is meant to be called in a defer to calculate +// the amount of time a function takes to complete. +func addTimeSample(m metrics.Histogram) func() { + start := time.Now() + return func() { m.Observe(time.Since(start).Seconds()) } +} diff --git a/proxy/metrics.go b/proxy/metrics.go new file mode 100644 index 000000000..988c332a6 --- /dev/null +++ b/proxy/metrics.go @@ -0,0 +1,47 @@ +package proxy + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "abci_connection" +) + +// Metrics contains the prometheus metrics exposed by the proxy package. +type Metrics struct { + MethodTiming metrics.Histogram +} + +// PrometheusMetrics constructs a Metrics instance that collects metrics samples. +// The resulting metrics will be prefixed with namespace and labeled with the +// defaultLabelsAndValues. defaultLabelsAndValues must be a list of string pairs +// where the first of each pair is the label and the second is the value. +func PrometheusMetrics(namespace string, defaultLabelsAndValues ...string) *Metrics { + defaultLabels := []string{} + for i := 0; i < len(defaultLabelsAndValues); i += 2 { + defaultLabels = append(defaultLabels, defaultLabelsAndValues[i]) + } + return &Metrics{ + MethodTiming: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "method_timing_seconds", + Help: "ABCI Method Timing", + Buckets: []float64{.0001, .0004, .002, .009, .02, .1, .65, 2, 6, 25}, + }, append(defaultLabels, []string{"method", "type"}...)).With(defaultLabelsAndValues...), + } +} + +// NopMetrics constructs a Metrics instance that discards all samples and is suitable +// for testing. +func NopMetrics() *Metrics { + return &Metrics{ + MethodTiming: discard.NewHistogram(), + } +} diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index a7a6f7014..66d238388 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -32,8 +32,8 @@ type AppConns interface { } // NewAppConns calls NewMultiAppConn. -func NewAppConns(clientCreator ClientCreator) AppConns { - return NewMultiAppConn(clientCreator) +func NewAppConns(clientCreator abciclient.Creator, metrics *Metrics) AppConns { + return NewMultiAppConn(clientCreator, metrics) } // multiAppConn implements AppConns. @@ -44,6 +44,7 @@ func NewAppConns(clientCreator ClientCreator) AppConns { type multiAppConn struct { service.BaseService + metrics *Metrics consensusConn AppConnConsensus mempoolConn AppConnMempool queryConn AppConnQuery @@ -58,8 +59,9 @@ type multiAppConn struct { } // NewMultiAppConn makes all necessary abci connections to the application. -func NewMultiAppConn(clientCreator ClientCreator) AppConns { +func NewMultiAppConn(clientCreator ClientCreator, metrics *Metrics) AppConns { multiAppConn := &multiAppConn{ + metrics: metrics, clientCreator: clientCreator, } multiAppConn.BaseService = *service.NewBaseService(nil, "multiAppConn", multiAppConn) @@ -88,7 +90,7 @@ func (app *multiAppConn) OnStart() error { return err } app.queryConnClient = c - app.queryConn = NewAppConnQuery(c) + app.queryConn = NewAppConnQuery(c, app.metrics) c, err = app.abciClientFor(connSnapshot) if err != nil { @@ -96,7 +98,7 @@ func (app *multiAppConn) OnStart() error { return err } app.snapshotConnClient = c - app.snapshotConn = NewAppConnSnapshot(c) + app.snapshotConn = NewAppConnSnapshot(c, app.metrics) c, err = app.abciClientFor(connMempool) if err != nil { @@ -104,7 +106,7 @@ func (app *multiAppConn) OnStart() error { return err } app.mempoolConnClient = c - app.mempoolConn = NewAppConnMempool(c) + app.mempoolConn = NewAppConnMempool(c, app.metrics) c, err = app.abciClientFor(connConsensus) if err != nil { @@ -112,7 +114,7 @@ func (app *multiAppConn) OnStart() error { return err } app.consensusConnClient = c - app.consensusConn = NewAppConnConsensus(c) + app.consensusConn = NewAppConnConsensus(c, app.metrics) // Kill Tendermint if the ABCI application crashes. go app.killTMOnClientError() diff --git a/proxy/multi_app_conn_test.go b/proxy/multi_app_conn_test.go index 34b0d0830..4449b714a 100644 --- a/proxy/multi_app_conn_test.go +++ b/proxy/multi_app_conn_test.go @@ -28,7 +28,7 @@ func TestAppConns_Start_Stop(t *testing.T) { clientCreatorMock.On("NewABCIClient").Return(clientMock, nil).Times(4) - appConns := NewAppConns(clientCreatorMock) + appConns := NewAppConns(creator, NopMetrics()) err := appConns.Start() require.NoError(t, err) @@ -68,7 +68,7 @@ func TestAppConns_Failure(t *testing.T) { clientCreatorMock.On("NewABCIClient").Return(clientMock, nil) - appConns := NewAppConns(clientCreatorMock) + appConns := NewAppConns(clientCreatorMock, NopMetrics()) err := appConns.Start() require.NoError(t, err)