Files
tendermint/proxy/app_conn.go
Sergio Mena a749afe8fb Divergences in comparison with #9620. Part 1: easy/obvious (#9888)
* Obvious changes (including bugs)

* Update privval/file.go

Co-authored-by: Thane Thomson <connect@thanethomson.com>

Co-authored-by: Thane Thomson <connect@thanethomson.com>
2022-12-16 12:16:53 +01:00

231 lines
8.6 KiB
Go

package proxy
import (
"context"
"time"
"github.com/go-kit/kit/metrics"
abcicli "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/types"
)
//go:generate ../scripts/mockery_generate.sh AppConnConsensus|AppConnMempool|AppConnQuery|AppConnSnapshot
//----------------------------------------------------------------------------------------
// Enforce which abci msgs can be sent on a connection at the type level
type AppConnConsensus interface {
Error() error
InitChain(context.Context, *types.RequestInitChain) (*types.ResponseInitChain, error)
PrepareProposal(context.Context, *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
ProcessProposal(context.Context, *types.RequestProcessProposal) (*types.ResponseProcessProposal, error)
ExtendVote(context.Context, *types.RequestExtendVote) (*types.ResponseExtendVote, error)
VerifyVoteExtension(context.Context, *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error)
FinalizeBlock(context.Context, *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error)
Commit(context.Context) (*types.ResponseCommit, error)
}
type AppConnMempool interface {
SetResponseCallback(abcicli.Callback)
Error() error
CheckTx(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error)
CheckTxAsync(context.Context, *types.RequestCheckTx) (*abcicli.ReqRes, error)
Flush(context.Context) error
}
type AppConnQuery interface {
Error() error
Echo(context.Context, string) (*types.ResponseEcho, error)
Info(context.Context, *types.RequestInfo) (*types.ResponseInfo, error)
Query(context.Context, *types.RequestQuery) (*types.ResponseQuery, error)
}
type AppConnSnapshot interface {
Error() error
ListSnapshots(context.Context, *types.RequestListSnapshots) (*types.ResponseListSnapshots, error)
OfferSnapshot(context.Context, *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunk(context.Context, *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunk(context.Context, *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
}
//-----------------------------------------------------------------------------------------
// Implements AppConnConsensus (subset of abcicli.Client)
type appConnConsensus struct {
metrics *Metrics
appConn abcicli.Client
}
var _ AppConnConsensus = (*appConnConsensus)(nil)
func NewAppConnConsensus(appConn abcicli.Client, metrics *Metrics) AppConnConsensus {
return &appConnConsensus{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnConsensus) Error() error {
return app.appConn.Error()
}
func (app *appConnConsensus) InitChain(ctx context.Context, req *types.RequestInitChain) (*types.ResponseInitChain, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "init_chain"))()
return app.appConn.InitChain(ctx, req)
}
func (app *appConnConsensus) PrepareProposal(ctx context.Context,
req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "prepare_proposal"))()
return app.appConn.PrepareProposal(ctx, req)
}
func (app *appConnConsensus) ProcessProposal(ctx context.Context, req *types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "process_proposal"))()
return app.appConn.ProcessProposal(ctx, req)
}
func (app *appConnConsensus) ExtendVote(ctx context.Context, req *types.RequestExtendVote) (*types.ResponseExtendVote, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "extend_vote"))()
return app.appConn.ExtendVote(ctx, req)
}
func (app *appConnConsensus) VerifyVoteExtension(ctx context.Context, req *types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "verify_vote_extension"))()
return app.appConn.VerifyVoteExtension(ctx, req)
}
func (app *appConnConsensus) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "finalize_block"))()
return app.appConn.FinalizeBlock(ctx, req)
}
func (app *appConnConsensus) Commit(ctx context.Context) (*types.ResponseCommit, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "commit"))()
return app.appConn.Commit(ctx, &types.RequestCommit{})
}
//------------------------------------------------
// Implements AppConnMempool (subset of abcicli.Client)
type appConnMempool struct {
metrics *Metrics
appConn abcicli.Client
}
func NewAppConnMempool(appConn abcicli.Client, metrics *Metrics) AppConnMempool {
return &appConnMempool{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnMempool) SetResponseCallback(cb abcicli.Callback) {
app.appConn.SetResponseCallback(cb)
}
func (app *appConnMempool) Error() error {
return app.appConn.Error()
}
func (app *appConnMempool) Flush(ctx context.Context) error {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "flush"))()
return app.appConn.Flush(ctx)
}
func (app *appConnMempool) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "check_tx", "type", "sync"))()
return app.appConn.CheckTx(ctx, req)
}
func (app *appConnMempool) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*abcicli.ReqRes, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "check_tx", "type", "async"))()
return app.appConn.CheckTxAsync(ctx, req)
}
//------------------------------------------------
// Implements AppConnQuery (subset of abcicli.Client)
type appConnQuery struct {
metrics *Metrics
appConn abcicli.Client
}
func NewAppConnQuery(appConn abcicli.Client, metrics *Metrics) AppConnQuery {
return &appConnQuery{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnQuery) Error() error {
return app.appConn.Error()
}
func (app *appConnQuery) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "echo"))()
return app.appConn.Echo(ctx, msg)
}
func (app *appConnQuery) Info(ctx context.Context, req *types.RequestInfo) (*types.ResponseInfo, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "info"))()
return app.appConn.Info(ctx, req)
}
func (app *appConnQuery) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "query"))()
return app.appConn.Query(ctx, req)
}
//------------------------------------------------
// Implements AppConnSnapshot (subset of abcicli.Client)
type appConnSnapshot struct {
metrics *Metrics
appConn abcicli.Client
}
func NewAppConnSnapshot(appConn abcicli.Client, metrics *Metrics) AppConnSnapshot {
return &appConnSnapshot{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnSnapshot) Error() error {
return app.appConn.Error()
}
func (app *appConnSnapshot) ListSnapshots(ctx context.Context, req *types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "list_snapshots"))()
return app.appConn.ListSnapshots(ctx, req)
}
func (app *appConnSnapshot) OfferSnapshot(ctx context.Context, req *types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "offer_snapshot"))()
return app.appConn.OfferSnapshot(ctx, req)
}
func (app *appConnSnapshot) LoadSnapshotChunk(ctx context.Context, req *types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "load_snapshot_chunk"))()
return app.appConn.LoadSnapshotChunk(ctx, req)
}
func (app *appConnSnapshot) ApplySnapshotChunk(ctx context.Context, req *types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTimingSeconds.With("method", "apply_snapshot_chunk"))()
return app.appConn.ApplySnapshotChunk(ctx, req)
}
// addTimeSample returns a function that, when called, adds an observation to m.
// The observation added to m is the number of seconds ellapsed since addTimeSample
// was initially called. addTimeSample is meant to be called in a defer to calculate
// the amount of time a function takes to complete.
func addTimeSample(m metrics.Histogram) func() {
start := time.Now()
return func() { m.Observe(time.Since(start).Seconds()) }
}