From c5c088b1f87bf7fdb0fcb9d01b30da1e1264d0fa Mon Sep 17 00:00:00 2001 From: tycho garen Date: Fri, 4 Mar 2022 15:41:58 -0500 Subject: [PATCH] reorg files --- go.mod | 8 +- internal/proxy/app_conn.go | 99 ---------- internal/proxy/client.go | 171 ++++++++++++++++++ .../{app_conn_test.go => client_test.go} | 77 ++++++++ internal/proxy/multi_app_conn.go | 84 --------- internal/proxy/multi_app_conn_test.go | 88 --------- 6 files changed, 254 insertions(+), 273 deletions(-) delete mode 100644 internal/proxy/app_conn.go rename internal/proxy/{app_conn_test.go => client_test.go} (69%) delete mode 100644 internal/proxy/multi_app_conn.go delete mode 100644 internal/proxy/multi_app_conn_test.go diff --git a/go.mod b/go.mod index 136dd3af3..c795f1319 100644 --- a/go.mod +++ b/go.mod @@ -39,6 +39,12 @@ require ( pgregory.net/rapid v0.4.7 ) +require ( + github.com/creachadair/atomicfile v0.2.4 + github.com/google/go-cmp v0.5.7 + gotest.tools v2.2.0+incompatible +) + require ( 4d63.com/gochecknoglobals v0.1.0 // indirect github.com/Antonboom/errname v0.1.5 // indirect @@ -67,7 +73,6 @@ require ( github.com/chavacava/garif v0.0.0-20210405164556-e8a0a408d6af // indirect github.com/containerd/continuity v0.2.1 // indirect github.com/daixiang0/gci v0.3.1-0.20220208004058-76d765e3ab48 // indirect - github.com/creachadair/atomicfile v0.2.4 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/denis-tingajkin/go-header v0.4.2 // indirect github.com/dgraph-io/badger/v2 v2.2007.2 // indirect @@ -107,7 +112,6 @@ require ( github.com/golangci/revgrep v0.0.0-20210930125155-c22e5001d4f2 // indirect github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect github.com/google/btree v1.0.0 // indirect - github.com/google/go-cmp v0.5.7 // indirect github.com/gordonklaus/ineffassign v0.0.0-20210914165742-4cc7213b9bc8 // indirect github.com/gostaticanalysis/analysisutil v0.7.1 // indirect github.com/gostaticanalysis/comment v1.4.2 // indirect diff --git a/internal/proxy/app_conn.go b/internal/proxy/app_conn.go deleted file mode 100644 index 3d023564f..000000000 --- a/internal/proxy/app_conn.go +++ /dev/null @@ -1,99 +0,0 @@ -package proxy - -import ( - "context" - "time" - - "github.com/go-kit/kit/metrics" - - "github.com/tendermint/tendermint/abci/types" -) - -func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))() - return app.client.InitChain(ctx, req) -} - -func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))() - return app.client.PrepareProposal(ctx, req) -} - -func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))() - return app.client.ProcessProposal(ctx, req) -} - -func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))() - return app.client.ExtendVote(ctx, req) -} - -func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))() - return app.client.VerifyVoteExtension(ctx, req) -} - -func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))() - return app.client.FinalizeBlock(ctx, req) -} - -func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))() - return app.client.Commit(ctx) -} - -func (app *proxyClient) Flush(ctx context.Context) error { - defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))() - return app.client.Flush(ctx) -} - -func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() - return app.client.CheckTx(ctx, req) -} - -func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))() - return app.client.Echo(ctx, msg) -} - -func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))() - return app.client.Info(ctx, req) -} - -func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))() - return app.client.Query(ctx, reqQuery) -} - -func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))() - return app.client.ListSnapshots(ctx, req) -} - -func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))() - return app.client.OfferSnapshot(ctx, req) -} - -func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))() - return app.client.LoadSnapshotChunk(ctx, req) -} - -func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { - defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))() - return app.client.ApplySnapshotChunk(ctx, req) -} - -// addTimeSample returns a function that, when called, adds an observation to m. -// The observation added to m is the number of seconds ellapsed since addTimeSample -// was initially called. addTimeSample is meant to be called in a defer to calculate -// the amount of time a function takes to complete. -func addTimeSample(m metrics.Histogram) func() { - start := time.Now() - return func() { m.Observe(time.Since(start).Seconds()) } -} diff --git a/internal/proxy/client.go b/internal/proxy/client.go index 9983c725e..256dd4065 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -1,12 +1,18 @@ package proxy import ( + "context" "io" + "os" + "syscall" + "time" + "github.com/go-kit/kit/metrics" abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" e2e "github.com/tendermint/tendermint/test/e2e/app" ) @@ -45,3 +51,168 @@ func ClientFactory(logger log.Logger, addr, transport, dbDir string) (abciclient type noopCloser struct{} func (noopCloser) Close() error { return nil } + +//////////////////////////////////////////////////////////////////////// + +// proxyClient implements provides the application connection. +type proxyClient struct { + service.BaseService + logger log.Logger + + client abciclient.Client + metrics *Metrics +} + +// New creates a proxy application interface. +func New(client abciclient.Client, logger log.Logger, metrics *Metrics) abciclient.Client { + conn := &proxyClient{ + logger: logger, + metrics: metrics, + client: client, + } + conn.BaseService = *service.NewBaseService(logger, "proxyClient", conn) + return conn +} + +func (app *proxyClient) OnStop() { tryCallStop(app.client) } +func (app *proxyClient) Error() error { return app.client.Error() } + +func tryCallStop(client abciclient.Client) { + switch c := client.(type) { + case nil: + return + case interface{ Stop() }: + c.Stop() + } +} + +func (app *proxyClient) OnStart(ctx context.Context) error { + var err error + defer func() { + if err != nil { + tryCallStop(app.client) + } + }() + + // Kill Tendermint if the ABCI application crashes. + go func() { + if !app.client.IsRunning() { + return + } + app.client.Wait() + if ctx.Err() != nil { + return + } + + if err := app.client.Error(); err != nil { + app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint", + "err", err) + + if killErr := kill(); killErr != nil { + app.logger.Error("Failed to kill this process - please do so manually", + "err", killErr) + } + } + + }() + + return app.client.Start(ctx) +} + +func kill() error { + p, err := os.FindProcess(os.Getpid()) + if err != nil { + return err + } + + return p.Signal(syscall.SIGTERM) +} + +func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))() + return app.client.InitChain(ctx, req) +} + +func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))() + return app.client.PrepareProposal(ctx, req) +} + +func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))() + return app.client.ProcessProposal(ctx, req) +} + +func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))() + return app.client.ExtendVote(ctx, req) +} + +func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))() + return app.client.VerifyVoteExtension(ctx, req) +} + +func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))() + return app.client.FinalizeBlock(ctx, req) +} + +func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))() + return app.client.Commit(ctx) +} + +func (app *proxyClient) Flush(ctx context.Context) error { + defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))() + return app.client.Flush(ctx) +} + +func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() + return app.client.CheckTx(ctx, req) +} + +func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))() + return app.client.Echo(ctx, msg) +} + +func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))() + return app.client.Info(ctx, req) +} + +func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))() + return app.client.Query(ctx, reqQuery) +} + +func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))() + return app.client.ListSnapshots(ctx, req) +} + +func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))() + return app.client.OfferSnapshot(ctx, req) +} + +func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))() + return app.client.LoadSnapshotChunk(ctx, req) +} + +func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { + defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))() + return app.client.ApplySnapshotChunk(ctx, req) +} + +// addTimeSample returns a function that, when called, adds an observation to m. +// The observation added to m is the number of seconds ellapsed since addTimeSample +// was initially called. addTimeSample is meant to be called in a defer to calculate +// the amount of time a function takes to complete. +func addTimeSample(m metrics.Histogram) func() { + start := time.Now() + return func() { m.Observe(time.Since(start).Seconds()) } +} diff --git a/internal/proxy/app_conn_test.go b/internal/proxy/client_test.go similarity index 69% rename from internal/proxy/app_conn_test.go rename to internal/proxy/client_test.go index 526d5819f..fab3f16a4 100644 --- a/internal/proxy/app_conn_test.go +++ b/internal/proxy/client_test.go @@ -2,18 +2,26 @@ package proxy import ( "context" + "errors" "fmt" + "os" + "os/signal" "strings" + "syscall" "testing" + "time" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" abciclient "github.com/tendermint/tendermint/abci/client" + abcimocks "github.com/tendermint/tendermint/abci/client/mocks" "github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/server" "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" + "gotest.tools/assert" ) //---------------------------------------- @@ -162,3 +170,72 @@ func TestInfo(t *testing.T) { t.Error("Expected ResponseInfo with one element '{\"size\":0}' but got something else") } } + +type noopStoppableClientImpl struct { + abciclient.Client + count int +} + +func (c *noopStoppableClientImpl) Stop() { c.count++ } + +func TestAppConns_Start_Stop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clientMock := &abcimocks.Client{} + clientMock.On("Start", mock.Anything).Return(nil) + clientMock.On("Error").Return(nil) + clientMock.On("IsRunning").Return(true) + clientMock.On("Wait").Return(nil).Times(1) + cl := &noopStoppableClientImpl{Client: clientMock} + + appConns := New(cl, log.TestingLogger(), NopMetrics()) + + err := appConns.Start(ctx) + require.NoError(t, err) + + time.Sleep(200 * time.Millisecond) + + cancel() + appConns.Wait() + + clientMock.AssertExpectations(t) + assert.Equal(t, 1, cl.count) +} + +// Upon failure, we call tmos.Kill +func TestAppConns_Failure(t *testing.T) { + ok := make(chan struct{}) + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM) + go func() { + for range c { + close(ok) + return + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + clientMock := &abcimocks.Client{} + clientMock.On("SetLogger", mock.Anything).Return() + clientMock.On("Start", mock.Anything).Return(nil) + clientMock.On("IsRunning").Return(true) + clientMock.On("Wait").Return(nil) + clientMock.On("Error").Return(errors.New("EOF")) + cl := &noopStoppableClientImpl{Client: clientMock} + + appConns := New(cl, log.TestingLogger(), NopMetrics()) + + err := appConns.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { cancel(); appConns.Wait() }) + + select { + case <-ok: + t.Log("SIGTERM successfully received") + case <-time.After(5 * time.Second): + t.Fatal("expected process to receive SIGTERM signal") + } +} diff --git a/internal/proxy/multi_app_conn.go b/internal/proxy/multi_app_conn.go deleted file mode 100644 index 8ab27b99a..000000000 --- a/internal/proxy/multi_app_conn.go +++ /dev/null @@ -1,84 +0,0 @@ -package proxy - -import ( - "context" - "os" - "syscall" - - abciclient "github.com/tendermint/tendermint/abci/client" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" -) - -// New creates a proxy application interface. -func New(client abciclient.Client, logger log.Logger, metrics *Metrics) abciclient.Client { - conn := &proxyClient{ - logger: logger, - metrics: metrics, - client: client, - } - conn.BaseService = *service.NewBaseService(logger, "proxyClient", conn) - return conn -} - -// proxyClient implements provides the application connection. -type proxyClient struct { - service.BaseService - logger log.Logger - - client abciclient.Client - metrics *Metrics -} - -func (app *proxyClient) OnStop() { tryCallStop(app.client) } -func (app *proxyClient) Error() error { return app.client.Error() } - -func tryCallStop(client abciclient.Client) { - switch c := client.(type) { - case nil: - return - case interface{ Stop() }: - c.Stop() - } -} - -func (app *proxyClient) OnStart(ctx context.Context) error { - var err error - defer func() { - if err != nil { - tryCallStop(app.client) - } - }() - - // Kill Tendermint if the ABCI application crashes. - go func() { - if !app.client.IsRunning() { - return - } - app.client.Wait() - if ctx.Err() != nil { - return - } - - if err := app.client.Error(); err != nil { - app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint", - "err", err) - if killErr := kill(); killErr != nil { - app.logger.Error("Failed to kill this process - please do so manually", - "err", killErr) - } - } - - }() - - return app.client.Start(ctx) -} - -func kill() error { - p, err := os.FindProcess(os.Getpid()) - if err != nil { - return err - } - - return p.Signal(syscall.SIGTERM) -} diff --git a/internal/proxy/multi_app_conn_test.go b/internal/proxy/multi_app_conn_test.go deleted file mode 100644 index f3f96e3ac..000000000 --- a/internal/proxy/multi_app_conn_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package proxy - -import ( - "context" - "errors" - "os" - "os/signal" - "syscall" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - abciclient "github.com/tendermint/tendermint/abci/client" - abcimocks "github.com/tendermint/tendermint/abci/client/mocks" - "github.com/tendermint/tendermint/libs/log" -) - -type noopStoppableClientImpl struct { - abciclient.Client - count int -} - -func (c *noopStoppableClientImpl) Stop() { c.count++ } - -func TestAppConns_Start_Stop(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clientMock := &abcimocks.Client{} - clientMock.On("Start", mock.Anything).Return(nil) - clientMock.On("Error").Return(nil) - clientMock.On("IsRunning").Return(true) - clientMock.On("Wait").Return(nil).Times(1) - cl := &noopStoppableClientImpl{Client: clientMock} - - appConns := New(cl, log.TestingLogger(), NopMetrics()) - - err := appConns.Start(ctx) - require.NoError(t, err) - - time.Sleep(200 * time.Millisecond) - - cancel() - appConns.Wait() - - clientMock.AssertExpectations(t) - assert.Equal(t, 1, cl.count) -} - -// Upon failure, we call tmos.Kill -func TestAppConns_Failure(t *testing.T) { - ok := make(chan struct{}) - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGTERM) - go func() { - for range c { - close(ok) - return - } - }() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clientMock := &abcimocks.Client{} - clientMock.On("SetLogger", mock.Anything).Return() - clientMock.On("Start", mock.Anything).Return(nil) - clientMock.On("IsRunning").Return(true) - clientMock.On("Wait").Return(nil) - clientMock.On("Error").Return(errors.New("EOF")) - cl := &noopStoppableClientImpl{Client: clientMock} - - appConns := New(cl, log.TestingLogger(), NopMetrics()) - - err := appConns.Start(ctx) - require.NoError(t, err) - t.Cleanup(func() { cancel(); appConns.Wait() }) - - select { - case <-ok: - t.Log("SIGTERM successfully received") - case <-time.After(5 * time.Second): - t.Fatal("expected process to receive SIGTERM signal") - } -}