mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-12 06:41:31 +00:00
proxy: move proxy package to internal
This commit is contained in:
212
internal/proxy/app_conn.go
Normal file
212
internal/proxy/app_conn.go
Normal file
@@ -0,0 +1,212 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
//go:generate ../scripts/mockery_generate.sh AppConnConsensus|AppConnMempool|AppConnQuery|AppConnSnapshot
|
||||
|
||||
//----------------------------------------------------------------------------------------
|
||||
// Enforce which abci msgs can be sent on a connection at the type level
|
||||
|
||||
type AppConnConsensus interface {
|
||||
SetResponseCallback(abcicli.Callback)
|
||||
Error() error
|
||||
|
||||
InitChainSync(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
|
||||
|
||||
BeginBlockSync(context.Context, types.RequestBeginBlock) (*types.ResponseBeginBlock, error)
|
||||
DeliverTxAsync(context.Context, types.RequestDeliverTx) (*abcicli.ReqRes, error)
|
||||
EndBlockSync(context.Context, types.RequestEndBlock) (*types.ResponseEndBlock, error)
|
||||
CommitSync(context.Context) (*types.ResponseCommit, error)
|
||||
}
|
||||
|
||||
type AppConnMempool interface {
|
||||
SetResponseCallback(abcicli.Callback)
|
||||
Error() error
|
||||
|
||||
CheckTxAsync(context.Context, types.RequestCheckTx) (*abcicli.ReqRes, error)
|
||||
CheckTxSync(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error)
|
||||
|
||||
FlushAsync(context.Context) (*abcicli.ReqRes, error)
|
||||
FlushSync(context.Context) error
|
||||
}
|
||||
|
||||
type AppConnQuery interface {
|
||||
Error() error
|
||||
|
||||
EchoSync(context.Context, string) (*types.ResponseEcho, error)
|
||||
InfoSync(context.Context, types.RequestInfo) (*types.ResponseInfo, error)
|
||||
QuerySync(context.Context, types.RequestQuery) (*types.ResponseQuery, error)
|
||||
}
|
||||
|
||||
type AppConnSnapshot interface {
|
||||
Error() error
|
||||
|
||||
ListSnapshotsSync(context.Context, types.RequestListSnapshots) (*types.ResponseListSnapshots, error)
|
||||
OfferSnapshotSync(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
|
||||
LoadSnapshotChunkSync(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
|
||||
ApplySnapshotChunkSync(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------------------
|
||||
// Implements AppConnConsensus (subset of abcicli.Client)
|
||||
|
||||
type appConnConsensus struct {
|
||||
appConn abcicli.Client
|
||||
}
|
||||
|
||||
func NewAppConnConsensus(appConn abcicli.Client) AppConnConsensus {
|
||||
return &appConnConsensus{
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) SetResponseCallback(cb abcicli.Callback) {
|
||||
app.appConn.SetResponseCallback(cb)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) InitChainSync(
|
||||
ctx context.Context,
|
||||
req types.RequestInitChain,
|
||||
) (*types.ResponseInitChain, error) {
|
||||
return app.appConn.InitChainSync(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) BeginBlockSync(
|
||||
ctx context.Context,
|
||||
req types.RequestBeginBlock,
|
||||
) (*types.ResponseBeginBlock, error) {
|
||||
return app.appConn.BeginBlockSync(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) DeliverTxAsync(ctx context.Context, req types.RequestDeliverTx) (*abcicli.ReqRes, error) {
|
||||
return app.appConn.DeliverTxAsync(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) EndBlockSync(
|
||||
ctx context.Context,
|
||||
req types.RequestEndBlock,
|
||||
) (*types.ResponseEndBlock, error) {
|
||||
return app.appConn.EndBlockSync(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) CommitSync(ctx context.Context) (*types.ResponseCommit, error) {
|
||||
return app.appConn.CommitSync(ctx)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnMempool (subset of abcicli.Client)
|
||||
|
||||
type appConnMempool struct {
|
||||
appConn abcicli.Client
|
||||
}
|
||||
|
||||
func NewAppConnMempool(appConn abcicli.Client) AppConnMempool {
|
||||
return &appConnMempool{
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnMempool) SetResponseCallback(cb abcicli.Callback) {
|
||||
app.appConn.SetResponseCallback(cb)
|
||||
}
|
||||
|
||||
func (app *appConnMempool) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnMempool) FlushAsync(ctx context.Context) (*abcicli.ReqRes, error) {
|
||||
return app.appConn.FlushAsync(ctx)
|
||||
}
|
||||
|
||||
func (app *appConnMempool) FlushSync(ctx context.Context) error {
|
||||
return app.appConn.FlushSync(ctx)
|
||||
}
|
||||
|
||||
func (app *appConnMempool) CheckTxAsync(ctx context.Context, req types.RequestCheckTx) (*abcicli.ReqRes, error) {
|
||||
return app.appConn.CheckTxAsync(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnMempool) CheckTxSync(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
|
||||
return app.appConn.CheckTxSync(ctx, req)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnQuery (subset of abcicli.Client)
|
||||
|
||||
type appConnQuery struct {
|
||||
appConn abcicli.Client
|
||||
}
|
||||
|
||||
func NewAppConnQuery(appConn abcicli.Client) AppConnQuery {
|
||||
return &appConnQuery{
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnQuery) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnQuery) EchoSync(ctx context.Context, msg string) (*types.ResponseEcho, error) {
|
||||
return app.appConn.EchoSync(ctx, msg)
|
||||
}
|
||||
|
||||
func (app *appConnQuery) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
|
||||
return app.appConn.InfoSync(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnQuery) QuerySync(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
|
||||
return app.appConn.QuerySync(ctx, reqQuery)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnSnapshot (subset of abcicli.Client)
|
||||
|
||||
type appConnSnapshot struct {
|
||||
appConn abcicli.Client
|
||||
}
|
||||
|
||||
func NewAppConnSnapshot(appConn abcicli.Client) AppConnSnapshot {
|
||||
return &appConnSnapshot{
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) ListSnapshotsSync(
|
||||
ctx context.Context,
|
||||
req types.RequestListSnapshots,
|
||||
) (*types.ResponseListSnapshots, error) {
|
||||
return app.appConn.ListSnapshotsSync(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) OfferSnapshotSync(
|
||||
ctx context.Context,
|
||||
req types.RequestOfferSnapshot,
|
||||
) (*types.ResponseOfferSnapshot, error) {
|
||||
return app.appConn.OfferSnapshotSync(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) LoadSnapshotChunkSync(
|
||||
ctx context.Context,
|
||||
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
|
||||
return app.appConn.LoadSnapshotChunkSync(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) ApplySnapshotChunkSync(
|
||||
ctx context.Context,
|
||||
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
|
||||
return app.appConn.ApplySnapshotChunkSync(ctx, req)
|
||||
}
|
||||
186
internal/proxy/app_conn_test.go
Normal file
186
internal/proxy/app_conn_test.go
Normal file
@@ -0,0 +1,186 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
"github.com/tendermint/tendermint/abci/server"
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
)
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
type appConnTestI interface {
|
||||
EchoAsync(ctx context.Context, msg string) (*abcicli.ReqRes, error)
|
||||
FlushSync(context.Context) error
|
||||
InfoSync(context.Context, types.RequestInfo) (*types.ResponseInfo, error)
|
||||
}
|
||||
|
||||
type appConnTest struct {
|
||||
appConn abcicli.Client
|
||||
}
|
||||
|
||||
func newAppConnTest(appConn abcicli.Client) appConnTestI {
|
||||
return &appConnTest{appConn}
|
||||
}
|
||||
|
||||
func (app *appConnTest) EchoAsync(ctx context.Context, msg string) (*abcicli.ReqRes, error) {
|
||||
return app.appConn.EchoAsync(ctx, msg)
|
||||
}
|
||||
|
||||
func (app *appConnTest) FlushSync(ctx context.Context) error {
|
||||
return app.appConn.FlushSync(ctx)
|
||||
}
|
||||
|
||||
func (app *appConnTest) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
|
||||
return app.appConn.InfoSync(ctx, req)
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
var SOCKET = "socket"
|
||||
|
||||
func TestEcho(t *testing.T) {
|
||||
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
|
||||
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
|
||||
|
||||
// Start server
|
||||
s := server.NewSocketServer(sockPath, kvstore.NewApplication())
|
||||
s.SetLogger(log.TestingLogger().With("module", "abci-server"))
|
||||
if err := s.Start(); err != nil {
|
||||
t.Fatalf("Error starting socket server: %v", err.Error())
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if err := s.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Start client
|
||||
cli, err := clientCreator.NewABCIClient()
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating ABCI client: %v", err.Error())
|
||||
}
|
||||
cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
|
||||
if err := cli.Start(); err != nil {
|
||||
t.Fatalf("Error starting ABCI client: %v", err.Error())
|
||||
}
|
||||
|
||||
proxy := newAppConnTest(cli)
|
||||
t.Log("Connected")
|
||||
|
||||
ctx := context.Background()
|
||||
for i := 0; i < 1000; i++ {
|
||||
_, err = proxy.EchoAsync(ctx, fmt.Sprintf("echo-%v", i))
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// flush sometimes
|
||||
if i%128 == 0 {
|
||||
if err := proxy.FlushSync(ctx); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := proxy.FlushSync(ctx); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEcho(b *testing.B) {
|
||||
b.StopTimer() // Initialize
|
||||
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
|
||||
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
|
||||
|
||||
// Start server
|
||||
s := server.NewSocketServer(sockPath, kvstore.NewApplication())
|
||||
s.SetLogger(log.TestingLogger().With("module", "abci-server"))
|
||||
if err := s.Start(); err != nil {
|
||||
b.Fatalf("Error starting socket server: %v", err.Error())
|
||||
}
|
||||
b.Cleanup(func() {
|
||||
if err := s.Stop(); err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Start client
|
||||
cli, err := clientCreator.NewABCIClient()
|
||||
if err != nil {
|
||||
b.Fatalf("Error creating ABCI client: %v", err.Error())
|
||||
}
|
||||
cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
|
||||
if err := cli.Start(); err != nil {
|
||||
b.Fatalf("Error starting ABCI client: %v", err.Error())
|
||||
}
|
||||
|
||||
proxy := newAppConnTest(cli)
|
||||
b.Log("Connected")
|
||||
echoString := strings.Repeat(" ", 200)
|
||||
b.StartTimer() // Start benchmarking tests
|
||||
|
||||
ctx := context.Background()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, err = proxy.EchoAsync(ctx, echoString)
|
||||
if err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
// flush sometimes
|
||||
if i%128 == 0 {
|
||||
if err := proxy.FlushSync(ctx); err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if err := proxy.FlushSync(ctx); err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
|
||||
b.StopTimer()
|
||||
// info := proxy.InfoSync(types.RequestInfo{""})
|
||||
// b.Log("N: ", b.N, info)
|
||||
}
|
||||
|
||||
func TestInfo(t *testing.T) {
|
||||
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
|
||||
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
|
||||
|
||||
// Start server
|
||||
s := server.NewSocketServer(sockPath, kvstore.NewApplication())
|
||||
s.SetLogger(log.TestingLogger().With("module", "abci-server"))
|
||||
if err := s.Start(); err != nil {
|
||||
t.Fatalf("Error starting socket server: %v", err.Error())
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
if err := s.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
// Start client
|
||||
cli, err := clientCreator.NewABCIClient()
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating ABCI client: %v", err.Error())
|
||||
}
|
||||
cli.SetLogger(log.TestingLogger().With("module", "abci-client"))
|
||||
if err := cli.Start(); err != nil {
|
||||
t.Fatalf("Error starting ABCI client: %v", err.Error())
|
||||
}
|
||||
|
||||
proxy := newAppConnTest(cli)
|
||||
t.Log("Connected")
|
||||
|
||||
resInfo, err := proxy.InfoSync(context.Background(), RequestInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if resInfo.Data != "{\"size\":0}" {
|
||||
t.Error("Expected ResponseInfo with one element '{\"size\":0}' but got something else")
|
||||
}
|
||||
}
|
||||
86
internal/proxy/client.go
Normal file
86
internal/proxy/client.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
tmsync "github.com/tendermint/tendermint/internal/libs/sync"
|
||||
)
|
||||
|
||||
//----------------------------------------------------
|
||||
// local proxy uses a mutex on an in-proc app
|
||||
|
||||
type localClientCreator struct {
|
||||
mtx *tmsync.RWMutex
|
||||
app types.Application
|
||||
}
|
||||
|
||||
// NewLocalClientCreator returns a ClientCreator for the given app,
|
||||
// which will be running locally.
|
||||
func NewLocalClientCreator(app types.Application) abcicli.ClientCreator {
|
||||
return &localClientCreator{
|
||||
mtx: new(tmsync.RWMutex),
|
||||
app: app,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *localClientCreator) NewABCIClient() (abcicli.Client, error) {
|
||||
return abcicli.NewLocalClient(l.mtx, l.app), nil
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------
|
||||
// remote proxy opens new connections to an external app process
|
||||
|
||||
type remoteClientCreator struct {
|
||||
addr string
|
||||
transport string
|
||||
mustConnect bool
|
||||
}
|
||||
|
||||
// NewRemoteClientCreator returns a ClientCreator for the given address (e.g.
|
||||
// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you
|
||||
// want the client to connect before reporting success.
|
||||
func NewRemoteClientCreator(addr, transport string, mustConnect bool) abcicli.ClientCreator {
|
||||
return &remoteClientCreator{
|
||||
addr: addr,
|
||||
transport: transport,
|
||||
mustConnect: mustConnect,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *remoteClientCreator) NewABCIClient() (abcicli.Client, error) {
|
||||
remoteApp, err := abcicli.NewClient(r.addr, r.transport, r.mustConnect)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to proxy: %w", err)
|
||||
}
|
||||
|
||||
return remoteApp, nil
|
||||
}
|
||||
|
||||
// DefaultClientCreator returns a default ClientCreator, which will create a
|
||||
// local client if addr is one of: 'kvstore',
|
||||
// 'persistent_kvstore' or 'noop', otherwise - a remote client.
|
||||
//
|
||||
// The Closer is a noop except for persistent_kvstore applications,
|
||||
// which will clean up the store.
|
||||
func DefaultClientCreator(addr, transport, dbDir string) (abcicli.ClientCreator, io.Closer) {
|
||||
switch addr {
|
||||
case "kvstore":
|
||||
return NewLocalClientCreator(kvstore.NewApplication()), noopCloser{}
|
||||
case "persistent_kvstore":
|
||||
app := kvstore.NewPersistentKVStoreApplication(dbDir)
|
||||
return NewLocalClientCreator(app), app
|
||||
case "noop":
|
||||
return NewLocalClientCreator(types.NewBaseApplication()), noopCloser{}
|
||||
default:
|
||||
mustConnect := false // loop retrying
|
||||
return NewRemoteClientCreator(addr, transport, mustConnect), noopCloser{}
|
||||
}
|
||||
}
|
||||
|
||||
type noopCloser struct{}
|
||||
|
||||
func (noopCloser) Close() error { return nil }
|
||||
152
internal/proxy/mocks/app_conn_consensus.go
Normal file
152
internal/proxy/mocks/app_conn_consensus.go
Normal file
@@ -0,0 +1,152 @@
|
||||
// Code generated by mockery. DO NOT EDIT.
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
types "github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
// AppConnConsensus is an autogenerated mock type for the AppConnConsensus type
|
||||
type AppConnConsensus struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// BeginBlockSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnConsensus) BeginBlockSync(_a0 context.Context, _a1 types.RequestBeginBlock) (*types.ResponseBeginBlock, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseBeginBlock
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestBeginBlock) *types.ResponseBeginBlock); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseBeginBlock)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestBeginBlock) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// CommitSync provides a mock function with given fields: _a0
|
||||
func (_m *AppConnConsensus) CommitSync(_a0 context.Context) (*types.ResponseCommit, error) {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 *types.ResponseCommit
|
||||
if rf, ok := ret.Get(0).(func(context.Context) *types.ResponseCommit); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseCommit)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
|
||||
r1 = rf(_a0)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// DeliverTxAsync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnConsensus) DeliverTxAsync(_a0 context.Context, _a1 types.RequestDeliverTx) (*abcicli.ReqRes, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *abcicli.ReqRes
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestDeliverTx) *abcicli.ReqRes); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*abcicli.ReqRes)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestDeliverTx) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// EndBlockSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnConsensus) EndBlockSync(_a0 context.Context, _a1 types.RequestEndBlock) (*types.ResponseEndBlock, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseEndBlock
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestEndBlock) *types.ResponseEndBlock); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseEndBlock)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestEndBlock) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Error provides a mock function with given fields:
|
||||
func (_m *AppConnConsensus) Error() error {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func() error); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// InitChainSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnConsensus) InitChainSync(_a0 context.Context, _a1 types.RequestInitChain) (*types.ResponseInitChain, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseInitChain
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestInitChain) *types.ResponseInitChain); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseInitChain)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestInitChain) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// SetResponseCallback provides a mock function with given fields: _a0
|
||||
func (_m *AppConnConsensus) SetResponseCallback(_a0 abcicli.Callback) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
120
internal/proxy/mocks/app_conn_mempool.go
Normal file
120
internal/proxy/mocks/app_conn_mempool.go
Normal file
@@ -0,0 +1,120 @@
|
||||
// Code generated by mockery. DO NOT EDIT.
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
types "github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
// AppConnMempool is an autogenerated mock type for the AppConnMempool type
|
||||
type AppConnMempool struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// CheckTxAsync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnMempool) CheckTxAsync(_a0 context.Context, _a1 types.RequestCheckTx) (*abcicli.ReqRes, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *abcicli.ReqRes
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestCheckTx) *abcicli.ReqRes); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*abcicli.ReqRes)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestCheckTx) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// CheckTxSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnMempool) CheckTxSync(_a0 context.Context, _a1 types.RequestCheckTx) (*types.ResponseCheckTx, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseCheckTx
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestCheckTx) *types.ResponseCheckTx); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseCheckTx)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestCheckTx) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Error provides a mock function with given fields:
|
||||
func (_m *AppConnMempool) Error() error {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func() error); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// FlushAsync provides a mock function with given fields: _a0
|
||||
func (_m *AppConnMempool) FlushAsync(_a0 context.Context) (*abcicli.ReqRes, error) {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 *abcicli.ReqRes
|
||||
if rf, ok := ret.Get(0).(func(context.Context) *abcicli.ReqRes); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*abcicli.ReqRes)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context) error); ok {
|
||||
r1 = rf(_a0)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// FlushSync provides a mock function with given fields: _a0
|
||||
func (_m *AppConnMempool) FlushSync(_a0 context.Context) error {
|
||||
ret := _m.Called(_a0)
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
|
||||
r0 = rf(_a0)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// SetResponseCallback provides a mock function with given fields: _a0
|
||||
func (_m *AppConnMempool) SetResponseCallback(_a0 abcicli.Callback) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
99
internal/proxy/mocks/app_conn_query.go
Normal file
99
internal/proxy/mocks/app_conn_query.go
Normal file
@@ -0,0 +1,99 @@
|
||||
// Code generated by mockery. DO NOT EDIT.
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
types "github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
// AppConnQuery is an autogenerated mock type for the AppConnQuery type
|
||||
type AppConnQuery struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// EchoSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnQuery) EchoSync(_a0 context.Context, _a1 string) (*types.ResponseEcho, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseEcho
|
||||
if rf, ok := ret.Get(0).(func(context.Context, string) *types.ResponseEcho); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseEcho)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Error provides a mock function with given fields:
|
||||
func (_m *AppConnQuery) Error() error {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func() error); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// InfoSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnQuery) InfoSync(_a0 context.Context, _a1 types.RequestInfo) (*types.ResponseInfo, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseInfo
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestInfo) *types.ResponseInfo); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseInfo)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestInfo) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// QuerySync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnQuery) QuerySync(_a0 context.Context, _a1 types.RequestQuery) (*types.ResponseQuery, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseQuery
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestQuery) *types.ResponseQuery); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseQuery)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestQuery) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
122
internal/proxy/mocks/app_conn_snapshot.go
Normal file
122
internal/proxy/mocks/app_conn_snapshot.go
Normal file
@@ -0,0 +1,122 @@
|
||||
// Code generated by mockery. DO NOT EDIT.
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
types "github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
// AppConnSnapshot is an autogenerated mock type for the AppConnSnapshot type
|
||||
type AppConnSnapshot struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// ApplySnapshotChunkSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnSnapshot) ApplySnapshotChunkSync(_a0 context.Context, _a1 types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseApplySnapshotChunk
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestApplySnapshotChunk) *types.ResponseApplySnapshotChunk); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseApplySnapshotChunk)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestApplySnapshotChunk) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// Error provides a mock function with given fields:
|
||||
func (_m *AppConnSnapshot) Error() error {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func() error); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// ListSnapshotsSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnSnapshot) ListSnapshotsSync(_a0 context.Context, _a1 types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseListSnapshots
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestListSnapshots) *types.ResponseListSnapshots); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseListSnapshots)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestListSnapshots) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// LoadSnapshotChunkSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnSnapshot) LoadSnapshotChunkSync(_a0 context.Context, _a1 types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseLoadSnapshotChunk
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestLoadSnapshotChunk) *types.ResponseLoadSnapshotChunk); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseLoadSnapshotChunk)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestLoadSnapshotChunk) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
|
||||
// OfferSnapshotSync provides a mock function with given fields: _a0, _a1
|
||||
func (_m *AppConnSnapshot) OfferSnapshotSync(_a0 context.Context, _a1 types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
|
||||
ret := _m.Called(_a0, _a1)
|
||||
|
||||
var r0 *types.ResponseOfferSnapshot
|
||||
if rf, ok := ret.Get(0).(func(context.Context, types.RequestOfferSnapshot) *types.ResponseOfferSnapshot); ok {
|
||||
r0 = rf(_a0, _a1)
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(*types.ResponseOfferSnapshot)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func(context.Context, types.RequestOfferSnapshot) error); ok {
|
||||
r1 = rf(_a0, _a1)
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
36
internal/proxy/mocks/client_creator.go
Normal file
36
internal/proxy/mocks/client_creator.go
Normal file
@@ -0,0 +1,36 @@
|
||||
// Code generated by mockery. DO NOT EDIT.
|
||||
|
||||
package mocks
|
||||
|
||||
import (
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
)
|
||||
|
||||
// ClientCreator is an autogenerated mock type for the ClientCreator type
|
||||
type ClientCreator struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// NewABCIClient provides a mock function with given fields:
|
||||
func (_m *ClientCreator) NewABCIClient() (abcicli.Client, error) {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 abcicli.Client
|
||||
if rf, ok := ret.Get(0).(func() abcicli.Client); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(abcicli.Client)
|
||||
}
|
||||
}
|
||||
|
||||
var r1 error
|
||||
if rf, ok := ret.Get(1).(func() error); ok {
|
||||
r1 = rf()
|
||||
} else {
|
||||
r1 = ret.Error(1)
|
||||
}
|
||||
|
||||
return r0, r1
|
||||
}
|
||||
200
internal/proxy/multi_app_conn.go
Normal file
200
internal/proxy/multi_app_conn.go
Normal file
@@ -0,0 +1,200 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
||||
tmlog "github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
)
|
||||
|
||||
const (
|
||||
connConsensus = "consensus"
|
||||
connMempool = "mempool"
|
||||
connQuery = "query"
|
||||
connSnapshot = "snapshot"
|
||||
)
|
||||
|
||||
// AppConns is the Tendermint's interface to the application that consists of
|
||||
// multiple connections.
|
||||
type AppConns interface {
|
||||
service.Service
|
||||
|
||||
// Mempool connection
|
||||
Mempool() AppConnMempool
|
||||
// Consensus connection
|
||||
Consensus() AppConnConsensus
|
||||
// Query connection
|
||||
Query() AppConnQuery
|
||||
// Snapshot connection
|
||||
Snapshot() AppConnSnapshot
|
||||
}
|
||||
|
||||
// NewAppConns calls NewMultiAppConn.
|
||||
func NewAppConns(clientCreator abcicli.ClientCreator) AppConns {
|
||||
return NewMultiAppConn(clientCreator)
|
||||
}
|
||||
|
||||
// multiAppConn implements AppConns.
|
||||
//
|
||||
// A multiAppConn is made of a few appConns and manages their underlying abci
|
||||
// clients.
|
||||
// TODO: on app restart, clients must reboot together
|
||||
type multiAppConn struct {
|
||||
service.BaseService
|
||||
|
||||
consensusConn AppConnConsensus
|
||||
mempoolConn AppConnMempool
|
||||
queryConn AppConnQuery
|
||||
snapshotConn AppConnSnapshot
|
||||
|
||||
consensusConnClient abcicli.Client
|
||||
mempoolConnClient abcicli.Client
|
||||
queryConnClient abcicli.Client
|
||||
snapshotConnClient abcicli.Client
|
||||
|
||||
clientCreator abcicli.ClientCreator
|
||||
}
|
||||
|
||||
// NewMultiAppConn makes all necessary abci connections to the application.
|
||||
func NewMultiAppConn(clientCreator abcicli.ClientCreator) AppConns {
|
||||
multiAppConn := &multiAppConn{
|
||||
clientCreator: clientCreator,
|
||||
}
|
||||
multiAppConn.BaseService = *service.NewBaseService(nil, "multiAppConn", multiAppConn)
|
||||
return multiAppConn
|
||||
}
|
||||
|
||||
func (app *multiAppConn) Mempool() AppConnMempool {
|
||||
return app.mempoolConn
|
||||
}
|
||||
|
||||
func (app *multiAppConn) Consensus() AppConnConsensus {
|
||||
return app.consensusConn
|
||||
}
|
||||
|
||||
func (app *multiAppConn) Query() AppConnQuery {
|
||||
return app.queryConn
|
||||
}
|
||||
|
||||
func (app *multiAppConn) Snapshot() AppConnSnapshot {
|
||||
return app.snapshotConn
|
||||
}
|
||||
|
||||
func (app *multiAppConn) OnStart() error {
|
||||
c, err := app.abciClientFor(connQuery)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
app.queryConnClient = c
|
||||
app.queryConn = NewAppConnQuery(c)
|
||||
|
||||
c, err = app.abciClientFor(connSnapshot)
|
||||
if err != nil {
|
||||
app.stopAllClients()
|
||||
return err
|
||||
}
|
||||
app.snapshotConnClient = c
|
||||
app.snapshotConn = NewAppConnSnapshot(c)
|
||||
|
||||
c, err = app.abciClientFor(connMempool)
|
||||
if err != nil {
|
||||
app.stopAllClients()
|
||||
return err
|
||||
}
|
||||
app.mempoolConnClient = c
|
||||
app.mempoolConn = NewAppConnMempool(c)
|
||||
|
||||
c, err = app.abciClientFor(connConsensus)
|
||||
if err != nil {
|
||||
app.stopAllClients()
|
||||
return err
|
||||
}
|
||||
app.consensusConnClient = c
|
||||
app.consensusConn = NewAppConnConsensus(c)
|
||||
|
||||
// Kill Tendermint if the ABCI application crashes.
|
||||
go app.killTMOnClientError()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (app *multiAppConn) OnStop() {
|
||||
app.stopAllClients()
|
||||
}
|
||||
|
||||
func (app *multiAppConn) killTMOnClientError() {
|
||||
killFn := func(conn string, err error, logger tmlog.Logger) {
|
||||
logger.Error(
|
||||
fmt.Sprintf("%s connection terminated. Did the application crash? Please restart tendermint", conn),
|
||||
"err", err)
|
||||
if killErr := kill(); killErr != nil {
|
||||
logger.Error("Failed to kill this process - please do so manually", "err", killErr)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-app.consensusConnClient.Quit():
|
||||
if err := app.consensusConnClient.Error(); err != nil {
|
||||
killFn(connConsensus, err, app.Logger)
|
||||
}
|
||||
case <-app.mempoolConnClient.Quit():
|
||||
if err := app.mempoolConnClient.Error(); err != nil {
|
||||
killFn(connMempool, err, app.Logger)
|
||||
}
|
||||
case <-app.queryConnClient.Quit():
|
||||
if err := app.queryConnClient.Error(); err != nil {
|
||||
killFn(connQuery, err, app.Logger)
|
||||
}
|
||||
case <-app.snapshotConnClient.Quit():
|
||||
if err := app.snapshotConnClient.Error(); err != nil {
|
||||
killFn(connSnapshot, err, app.Logger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (app *multiAppConn) stopAllClients() {
|
||||
if app.consensusConnClient != nil {
|
||||
if err := app.consensusConnClient.Stop(); err != nil {
|
||||
app.Logger.Error("error while stopping consensus client", "error", err)
|
||||
}
|
||||
}
|
||||
if app.mempoolConnClient != nil {
|
||||
if err := app.mempoolConnClient.Stop(); err != nil {
|
||||
app.Logger.Error("error while stopping mempool client", "error", err)
|
||||
}
|
||||
}
|
||||
if app.queryConnClient != nil {
|
||||
if err := app.queryConnClient.Stop(); err != nil {
|
||||
app.Logger.Error("error while stopping query client", "error", err)
|
||||
}
|
||||
}
|
||||
if app.snapshotConnClient != nil {
|
||||
if err := app.snapshotConnClient.Stop(); err != nil {
|
||||
app.Logger.Error("error while stopping snapshot client", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (app *multiAppConn) abciClientFor(conn string) (abcicli.Client, error) {
|
||||
c, err := app.clientCreator.NewABCIClient()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating ABCI client (%s connection): %w", conn, err)
|
||||
}
|
||||
c.SetLogger(app.Logger.With("module", "abci-client", "connection", conn))
|
||||
if err := c.Start(); err != nil {
|
||||
return nil, fmt.Errorf("error starting ABCI client (%s connection): %w", conn, err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func kill() error {
|
||||
p, err := os.FindProcess(os.Getpid())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.Signal(syscall.SIGTERM)
|
||||
}
|
||||
90
internal/proxy/multi_app_conn_test.go
Normal file
90
internal/proxy/multi_app_conn_test.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
abcimocks "github.com/tendermint/tendermint/abci/client/mocks"
|
||||
"github.com/tendermint/tendermint/internal/proxy/mocks"
|
||||
)
|
||||
|
||||
func TestAppConns_Start_Stop(t *testing.T) {
|
||||
quitCh := make(<-chan struct{})
|
||||
|
||||
clientCreatorMock := &mocks.ClientCreator{}
|
||||
|
||||
clientMock := &abcimocks.Client{}
|
||||
clientMock.On("SetLogger", mock.Anything).Return().Times(4)
|
||||
clientMock.On("Start").Return(nil).Times(4)
|
||||
clientMock.On("Stop").Return(nil).Times(4)
|
||||
clientMock.On("Quit").Return(quitCh).Times(4)
|
||||
|
||||
clientCreatorMock.On("NewABCIClient").Return(clientMock, nil).Times(4)
|
||||
|
||||
appConns := NewAppConns(clientCreatorMock)
|
||||
|
||||
err := appConns.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
err = appConns.Stop()
|
||||
require.NoError(t, err)
|
||||
|
||||
clientMock.AssertExpectations(t)
|
||||
}
|
||||
|
||||
// Upon failure, we call tmos.Kill
|
||||
func TestAppConns_Failure(t *testing.T) {
|
||||
ok := make(chan struct{})
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGTERM)
|
||||
go func() {
|
||||
for range c {
|
||||
close(ok)
|
||||
}
|
||||
}()
|
||||
|
||||
quitCh := make(chan struct{})
|
||||
var recvQuitCh <-chan struct{} // nolint:gosimple
|
||||
recvQuitCh = quitCh
|
||||
|
||||
clientCreatorMock := &mocks.ClientCreator{}
|
||||
|
||||
clientMock := &abcimocks.Client{}
|
||||
clientMock.On("SetLogger", mock.Anything).Return()
|
||||
clientMock.On("Start").Return(nil)
|
||||
clientMock.On("Stop").Return(nil)
|
||||
|
||||
clientMock.On("Quit").Return(recvQuitCh)
|
||||
clientMock.On("Error").Return(errors.New("EOF")).Once()
|
||||
|
||||
clientCreatorMock.On("NewABCIClient").Return(clientMock, nil)
|
||||
|
||||
appConns := NewAppConns(clientCreatorMock)
|
||||
|
||||
err := appConns.Start()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := appConns.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
// simulate failure
|
||||
close(quitCh)
|
||||
|
||||
select {
|
||||
case <-ok:
|
||||
t.Log("SIGTERM successfully received")
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("expected process to receive SIGTERM signal")
|
||||
}
|
||||
}
|
||||
16
internal/proxy/version.go
Normal file
16
internal/proxy/version.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/version"
|
||||
)
|
||||
|
||||
// RequestInfo contains all the information for sending
|
||||
// the abci.RequestInfo message during handshake with the app.
|
||||
// It contains only compile-time version information.
|
||||
var RequestInfo = abci.RequestInfo{
|
||||
Version: version.TMVersion,
|
||||
BlockVersion: version.BlockProtocol,
|
||||
P2PVersion: version.P2PProtocol,
|
||||
AbciVersion: version.ABCIVersion,
|
||||
}
|
||||
Reference in New Issue
Block a user