Compare commits

...

12 Commits

Author SHA1 Message Date
tycho garen
c5c088b1f8 reorg files 2022-03-04 15:41:58 -05:00
tycho garen
1c7c3a09ec remove file 2022-03-04 15:27:02 -05:00
tycho garen
11f3415a02 Merge remote-tracking branch 'origin/master' into proxy-remove-triforcated-client 2022-03-04 15:23:22 -05:00
tycho garen
21f93fa15e unwind client creator 2022-03-04 15:23:08 -05:00
tycho garen
fa1520dd90 more cleanup 2022-03-04 14:59:43 -05:00
Sam Kleinman
bacc38aced Update internal/proxy/multi_app_conn.go
Co-authored-by: M. J. Fromberger <michael.j.fromberger@gmail.com>
2022-03-04 12:29:39 -05:00
tycho garen
b5fecde705 Merge remote-tracking branch 'origin/master' into proxy-remove-triforcated-client 2022-03-04 12:29:15 -05:00
tycho garen
079d3e5e25 fix test 2022-03-04 09:41:52 -05:00
tycho garen
06b193c742 avoid skipping 2022-03-04 09:30:59 -05:00
tycho garen
5765ec96d4 Merge remote-tracking branch 'origin/master' into proxy-remove-triforcated-client 2022-03-04 09:01:51 -05:00
tycho garen
ce7d4174a9 fix build 2022-03-04 08:53:30 -05:00
tycho garen
378a2f0aed proxy: collapse triforcated client 2022-03-03 12:19:06 -05:00
33 changed files with 473 additions and 759 deletions

View File

@@ -1,33 +0,0 @@
package abciclient
import (
"fmt"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
)
// Creator creates new ABCI clients.
type Creator func(log.Logger) (Client, error)
// NewLocalCreator returns a Creator for the given app,
// which will be running locally.
func NewLocalCreator(app types.Application) Creator {
return func(logger log.Logger) (Client, error) {
return NewLocalClient(logger, app), nil
}
}
// NewRemoteCreator returns a Creator for the given address (e.g.
// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you
// want the client to connect before reporting success.
func NewRemoteCreator(logger log.Logger, addr, transport string, mustConnect bool) Creator {
return func(log.Logger) (Client, error) {
remoteApp, err := NewClient(logger, addr, transport, mustConnect)
if err != nil {
return nil, fmt.Errorf("failed to connect to proxy: %w", err)
}
return remoteApp, nil
}
}

8
go.mod
View File

@@ -39,6 +39,12 @@ require (
pgregory.net/rapid v0.4.7
)
require (
github.com/creachadair/atomicfile v0.2.4
github.com/google/go-cmp v0.5.7
gotest.tools v2.2.0+incompatible
)
require (
4d63.com/gochecknoglobals v0.1.0 // indirect
github.com/Antonboom/errname v0.1.5 // indirect
@@ -67,7 +73,6 @@ require (
github.com/chavacava/garif v0.0.0-20210405164556-e8a0a408d6af // indirect
github.com/containerd/continuity v0.2.1 // indirect
github.com/daixiang0/gci v0.3.1-0.20220208004058-76d765e3ab48 // indirect
github.com/creachadair/atomicfile v0.2.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingajkin/go-header v0.4.2 // indirect
github.com/dgraph-io/badger/v2 v2.2007.2 // indirect
@@ -107,7 +112,6 @@ require (
github.com/golangci/revgrep v0.0.0-20210930125155-c22e5001d4f2 // indirect
github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect
github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/gordonklaus/ineffassign v0.0.0-20210914165742-4cc7213b9bc8 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.4.2 // indirect

View File

@@ -33,7 +33,7 @@ type reactorTestSuite struct {
nodes []types.NodeID
reactors map[types.NodeID]*Reactor
app map[types.NodeID]proxy.AppConns
app map[types.NodeID]abciclient.Client
blockSyncChannels map[types.NodeID]*p2p.Channel
peerChans map[types.NodeID]chan p2p.PeerUpdate
@@ -64,7 +64,7 @@ func setup(
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]proxy.AppConns, numNodes),
app: make(map[types.NodeID]abciclient.Client, numNodes),
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
@@ -109,7 +109,7 @@ func (rts *reactorTestSuite) addNode(
logger := log.TestingLogger()
rts.nodes = append(rts.nodes, nodeID)
rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics())
rts.app[nodeID] = proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics())
require.NoError(t, rts.app[nodeID].Start(ctx))
blockDB := dbm.NewMemDB()
@@ -124,7 +124,7 @@ func (rts *reactorTestSuite) addNode(
blockExec := sm.NewBlockExecutor(
stateStore,
log.TestingLogger(),
rts.app[nodeID].Consensus(),
rts.app[nodeID],
mock.Mempool{},
sm.EmptyEvidencePool{},
blockStore,

View File

@@ -10,6 +10,7 @@ import (
"reflect"
"time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/internal/eventbus"
@@ -237,10 +238,10 @@ func (h *Handshaker) NBlocks() int {
}
// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error {
func (h *Handshaker) Handshake(ctx context.Context, proxyApp abciclient.Client) error {
// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo)
res, err := proxyApp.Info(ctx, proxy.RequestInfo)
if err != nil {
return fmt.Errorf("error calling Info: %w", err)
}
@@ -285,7 +286,7 @@ func (h *Handshaker) ReplayBlocks(
state sm.State,
appHash []byte,
appBlockHeight int64,
proxyApp proxy.AppConns,
proxyApp abciclient.Client,
) ([]byte, error) {
storeBlockBase := h.store.Base()
storeBlockHeight := h.store.Height()
@@ -316,7 +317,7 @@ func (h *Handshaker) ReplayBlocks(
Validators: nextVals,
AppStateBytes: h.genDoc.AppState,
}
res, err := proxyApp.Consensus().InitChain(ctx, req)
res, err := proxyApp.InitChain(ctx, req)
if err != nil {
return nil, err
}
@@ -413,7 +414,7 @@ func (h *Handshaker) ReplayBlocks(
// NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
h.logger.Info("Replay last block using real app")
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp)
return state.AppHash, err
case appBlockHeight == storeBlockHeight:
@@ -426,6 +427,9 @@ func (h *Handshaker) ReplayBlocks(
if err != nil {
return nil, err
}
if err := mockApp.Start(ctx); err != nil {
return nil, err
}
h.logger.Info("Replay last block using mock app")
state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp)
@@ -445,7 +449,7 @@ func (h *Handshaker) ReplayBlocks(
func (h *Handshaker) replayBlocks(
ctx context.Context,
state sm.State,
proxyApp proxy.AppConns,
proxyApp abciclient.Client,
appBlockHeight,
storeBlockHeight int64,
mutateState bool) ([]byte, error) {
@@ -481,16 +485,16 @@ func (h *Handshaker) replayBlocks(
// We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status.
blockExec := sm.NewBlockExecutor(
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
blockExec.SetEventBus(h.eventBus)
appHash, err = sm.ExecCommitBlock(ctx,
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
blockExec, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil {
return nil, err
}
} else {
appHash, err = sm.ExecCommitBlock(ctx,
nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
nil, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
if err != nil {
return nil, err
}
@@ -501,7 +505,7 @@ func (h *Handshaker) replayBlocks(
if mutateState {
// sync the final block
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp)
if err != nil {
return nil, err
}
@@ -517,7 +521,7 @@ func (h *Handshaker) replayBlock(
ctx context.Context,
state sm.State,
height int64,
proxyApp proxy.AppConnConsensus,
proxyApp abciclient.Client,
) (sm.State, error) {
block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height)

View File

@@ -327,8 +327,12 @@ func newConsensusStateForReplay(
}
// Create proxyAppConn connection (consensus, mempool, query)
clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
client, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
if err != nil {
return nil, err
}
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
err = proxyApp.Start(ctx)
if err != nil {
return nil, fmt.Errorf("starting proxy app conns: %w", err)
@@ -346,7 +350,7 @@ func newConsensusStateForReplay(
}
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore)
consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec,
blockStore, mempool, evpool)

View File

@@ -61,22 +61,11 @@ func newMockProxyApp(
logger log.Logger,
appHash []byte,
abciResponses *tmstate.ABCIResponses,
) (proxy.AppConnConsensus, error) {
clientCreator := abciclient.NewLocalCreator(&mockProxyApp{
) (abciclient.Client, error) {
return proxy.New(abciclient.NewLocalClient(logger, &mockProxyApp{
appHash: appHash,
abciResponses: abciResponses,
})
cli, err := clientCreator(logger)
if err != nil {
return nil, err
}
if err = cli.Start(ctx); err != nil {
return nil, err
}
return proxy.NewAppConnConsensus(cli, proxy.NopMetrics()), nil
}), logger, proxy.NopMetrics()), nil
}
type mockProxyApp struct {

View File

@@ -804,11 +804,11 @@ func testHandshakeReplay(
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
clientCreator2 := abciclient.NewLocalCreator(kvstoreApp)
clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp)
if nBlocks > 0 {
// run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
stateDB1 := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB1)
err := stateStore.Save(genesisState)
@@ -829,9 +829,10 @@ func testHandshakeReplay(
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
require.True(t, proxyApp.IsRunning())
require.NotNil(t, proxyApp)
t.Cleanup(func() { cancel(); proxyApp.Wait() })
err = handshaker.Handshake(ctx, proxyApp)
@@ -842,7 +843,7 @@ func testHandshakeReplay(
require.NoError(t, err, "Error on abci handshake")
// get the latest app hash from the app
res, err := proxyApp.Query().Info(ctx, abci.RequestInfo{Version: ""})
res, err := proxyApp.Info(ctx, abci.RequestInfo{Version: ""})
if err != nil {
t.Fatal(err)
}
@@ -875,11 +876,11 @@ func applyBlock(
evpool sm.EvidencePool,
st sm.State,
blk *types.Block,
proxyApp proxy.AppConns,
proxyApp abciclient.Client,
blockStore *mockBlockStore,
) sm.State {
testPartSize := types.BlockPartSizeBytes
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
bps, err := blk.MakePartSet(testPartSize)
require.NoError(t, err)
@@ -892,7 +893,7 @@ func applyBlock(
func buildAppStateFromChain(
ctx context.Context,
t *testing.T,
proxyApp proxy.AppConns,
proxyApp abciclient.Client,
stateStore sm.Store,
mempool mempool.Mempool,
evpool sm.EvidencePool,
@@ -908,7 +909,7 @@ func buildAppStateFromChain(
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
validators := types.TM2PB.ValidatorUpdates(state.Validators)
_, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{
_, err := proxyApp.InitChain(ctx, abci.RequestInitChain{
Validators: validators,
})
require.NoError(t, err)
@@ -958,14 +959,14 @@ func buildTMStateFromChain(
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
defer kvstoreApp.Close()
clientCreator := abciclient.NewLocalCreator(kvstoreApp)
client := abciclient.NewLocalClient(logger, kvstoreApp)
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx))
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
validators := types.TM2PB.ValidatorUpdates(state.Validators)
_, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{
_, err := proxyApp.InitChain(ctx, abci.RequestInitChain{
Validators: validators,
})
require.NoError(t, err)
@@ -1031,8 +1032,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - 0x03
{
app := &badApp{numBlocks: 3, allHashesAreWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
client := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); proxyApp.Wait() })
@@ -1051,8 +1052,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
// - RANDOM HASH
{
app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
clientCreator := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
client := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); proxyApp.Wait() })
@@ -1282,12 +1283,13 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.NewNopLogger()
votePower := 10 + int64(rand.Uint32())
val, _, err := factory.Validator(ctx, votePower)
require.NoError(t, err)
vals := types.NewValidatorSet([]*types.Validator{val})
app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
clientCreator := abciclient.NewLocalCreator(app)
client := abciclient.NewLocalClient(logger, app)
cfg, err := ResetConfig(t.TempDir(), "handshake_test_")
require.NoError(t, err)
@@ -1306,9 +1308,8 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
logger := log.TestingLogger()
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
require.NoError(t, handshaker.Handshake(ctx, proxyApp), "error on abci handshake")

View File

@@ -67,8 +67,8 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
}
blockStore := store.NewBlockStore(blockStoreDB)
proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics())
proxyLogger := logger.With("module", "proxy")
proxyApp := proxy.New(abciclient.NewLocalClient(logger, app), proxyLogger, proxy.NopMetrics())
if err := proxyApp.Start(ctx); err != nil {
t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err))
}
@@ -82,7 +82,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
if err != nil {
t.Fatal(err)

View File

@@ -9,10 +9,10 @@ import (
"sync/atomic"
"time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/libs/clist"
"github.com/tendermint/tendermint/internal/proxy"
"github.com/tendermint/tendermint/libs/log"
tmmath "github.com/tendermint/tendermint/libs/math"
"github.com/tendermint/tendermint/types"
@@ -31,7 +31,7 @@ type TxMempool struct {
logger log.Logger
metrics *Metrics
config *config.MempoolConfig
proxyAppConn proxy.AppConnMempool
proxyAppConn abciclient.Client
// txsAvailable fires once for each height when the mempool is not empty
txsAvailable chan struct{}
@@ -93,7 +93,7 @@ type TxMempool struct {
func NewTxMempool(
logger log.Logger,
cfg *config.MempoolConfig,
proxyAppConn proxy.AppConnMempool,
proxyAppConn abciclient.Client,
options ...TxMempoolOption,
) *TxMempool {
@@ -421,7 +421,6 @@ func (txmp *TxMempool) Update(
newPreFn PreCheckFunc,
newPostFn PostCheckFunc,
) error {
txmp.height = blockHeight
txmp.notifiedTxsAvailable = false

View File

@@ -78,24 +78,23 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
app := &application{kvstore.NewApplication()}
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger()
app := &application{kvstore.NewApplication()}
conn := abciclient.NewLocalClient(logger, app)
cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
require.NoError(t, err)
cfg.Mempool.CacheSize = cacheSize
appConnMem, err := cc(logger)
require.NoError(t, err)
require.NoError(t, appConnMem.Start(ctx))
require.NoError(t, conn.Start(ctx))
t.Cleanup(func() {
os.RemoveAll(cfg.RootDir)
cancel()
appConnMem.Wait()
conn.Wait()
})
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, options...)
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, conn, options...)
}
func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {

View File

@@ -1,249 +0,0 @@
package proxy
import (
"context"
"time"
"github.com/go-kit/kit/metrics"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/types"
)
//go:generate ../../scripts/mockery_generate.sh AppConnConsensus|AppConnMempool|AppConnQuery|AppConnSnapshot
//----------------------------------------------------------------------------------------
// Enforce which abci msgs can be sent on a connection at the type level
type AppConnConsensus interface {
Error() error
InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
PrepareProposal(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
ProcessProposal(context.Context, types.RequestProcessProposal) (*types.ResponseProcessProposal, error)
ExtendVote(context.Context, types.RequestExtendVote) (*types.ResponseExtendVote, error)
VerifyVoteExtension(context.Context, types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error)
FinalizeBlock(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error)
Commit(context.Context) (*types.ResponseCommit, error)
}
type AppConnMempool interface {
Error() error
CheckTx(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error)
Flush(context.Context) error
}
type AppConnQuery interface {
Error() error
Echo(context.Context, string) (*types.ResponseEcho, error)
Info(context.Context, types.RequestInfo) (*types.ResponseInfo, error)
Query(context.Context, types.RequestQuery) (*types.ResponseQuery, error)
}
type AppConnSnapshot interface {
Error() error
ListSnapshots(context.Context, types.RequestListSnapshots) (*types.ResponseListSnapshots, error)
OfferSnapshot(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunk(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunk(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
}
//-----------------------------------------------------------------------------------------
// Implements AppConnConsensus (subset of abciclient.Client)
type appConnConsensus struct {
metrics *Metrics
appConn abciclient.Client
}
var _ AppConnConsensus = (*appConnConsensus)(nil)
func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus {
return &appConnConsensus{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnConsensus) Error() error {
return app.appConn.Error()
}
func (app *appConnConsensus) InitChain(
ctx context.Context,
req types.RequestInitChain,
) (*types.ResponseInitChain, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))()
return app.appConn.InitChain(ctx, req)
}
func (app *appConnConsensus) PrepareProposal(
ctx context.Context,
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
return app.appConn.PrepareProposal(ctx, req)
}
func (app *appConnConsensus) ProcessProposal(
ctx context.Context,
req types.RequestProcessProposal,
) (*types.ResponseProcessProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))()
return app.appConn.ProcessProposal(ctx, req)
}
func (app *appConnConsensus) ExtendVote(
ctx context.Context,
req types.RequestExtendVote,
) (*types.ResponseExtendVote, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))()
return app.appConn.ExtendVote(ctx, req)
}
func (app *appConnConsensus) VerifyVoteExtension(
ctx context.Context,
req types.RequestVerifyVoteExtension,
) (*types.ResponseVerifyVoteExtension, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))()
return app.appConn.VerifyVoteExtension(ctx, req)
}
func (app *appConnConsensus) FinalizeBlock(
ctx context.Context,
req types.RequestFinalizeBlock,
) (*types.ResponseFinalizeBlock, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))()
return app.appConn.FinalizeBlock(ctx, req)
}
func (app *appConnConsensus) Commit(ctx context.Context) (*types.ResponseCommit, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))()
return app.appConn.Commit(ctx)
}
//------------------------------------------------
// Implements AppConnMempool (subset of abciclient.Client)
type appConnMempool struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempool {
return &appConnMempool{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnMempool) Error() error {
return app.appConn.Error()
}
func (app *appConnMempool) Flush(ctx context.Context) error {
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))()
return app.appConn.Flush(ctx)
}
func (app *appConnMempool) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
return app.appConn.CheckTx(ctx, req)
}
//------------------------------------------------
// Implements AppConnQuery (subset of abciclient.Client)
type appConnQuery struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnQuery(appConn abciclient.Client, metrics *Metrics) AppConnQuery {
return &appConnQuery{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnQuery) Error() error {
return app.appConn.Error()
}
func (app *appConnQuery) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))()
return app.appConn.Echo(ctx, msg)
}
func (app *appConnQuery) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))()
return app.appConn.Info(ctx, req)
}
func (app *appConnQuery) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))()
return app.appConn.Query(ctx, reqQuery)
}
//------------------------------------------------
// Implements AppConnSnapshot (subset of abciclient.Client)
type appConnSnapshot struct {
metrics *Metrics
appConn abciclient.Client
}
func NewAppConnSnapshot(appConn abciclient.Client, metrics *Metrics) AppConnSnapshot {
return &appConnSnapshot{
metrics: metrics,
appConn: appConn,
}
}
func (app *appConnSnapshot) Error() error {
return app.appConn.Error()
}
func (app *appConnSnapshot) ListSnapshots(
ctx context.Context,
req types.RequestListSnapshots,
) (*types.ResponseListSnapshots, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))()
return app.appConn.ListSnapshots(ctx, req)
}
func (app *appConnSnapshot) OfferSnapshot(
ctx context.Context,
req types.RequestOfferSnapshot,
) (*types.ResponseOfferSnapshot, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))()
return app.appConn.OfferSnapshot(ctx, req)
}
func (app *appConnSnapshot) LoadSnapshotChunk(
ctx context.Context,
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))()
return app.appConn.LoadSnapshotChunk(ctx, req)
}
func (app *appConnSnapshot) ApplySnapshotChunk(
ctx context.Context,
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))()
return app.appConn.ApplySnapshotChunk(ctx, req)
}
// addTimeSample returns a function that, when called, adds an observation to m.
// The observation added to m is the number of seconds ellapsed since addTimeSample
// was initially called. addTimeSample is meant to be called in a defer to calculate
// the amount of time a function takes to complete.
func addTimeSample(m metrics.Histogram) func() {
start := time.Now()
return func() { m.Observe(time.Since(start).Seconds()) }
}

View File

@@ -1,42 +1,218 @@
package proxy
import (
"context"
"io"
"os"
"syscall"
"time"
"github.com/go-kit/kit/metrics"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
e2e "github.com/tendermint/tendermint/test/e2e/app"
)
// DefaultClientCreator returns a default ClientCreator, which will create a
// ClientFactory returns a default ClientCreator, which will create a
// local client if addr is one of: 'kvstore',
// 'persistent_kvstore', 'e2e', or 'noop', otherwise - a remote client.
//
// The Closer is a noop except for persistent_kvstore applications,
// which will clean up the store.
func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abciclient.Creator, io.Closer) {
func ClientFactory(logger log.Logger, addr, transport, dbDir string) (abciclient.Client, io.Closer, error) {
switch addr {
case "kvstore":
return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{}
return abciclient.NewLocalClient(logger, kvstore.NewApplication()), noopCloser{}, nil
case "persistent_kvstore":
app := kvstore.NewPersistentKVStoreApplication(logger, dbDir)
return abciclient.NewLocalCreator(app), app
return abciclient.NewLocalClient(logger, app), app, nil
case "e2e":
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))
if err != nil {
panic(err)
return nil, noopCloser{}, err
}
return abciclient.NewLocalCreator(app), noopCloser{}
return abciclient.NewLocalClient(logger, app), noopCloser{}, nil
case "noop":
return abciclient.NewLocalCreator(types.NewBaseApplication()), noopCloser{}
return abciclient.NewLocalClient(logger, types.NewBaseApplication()), noopCloser{}, nil
default:
mustConnect := false // loop retrying
return abciclient.NewRemoteCreator(logger, addr, transport, mustConnect), noopCloser{}
client, err := abciclient.NewClient(logger, addr, transport, mustConnect)
if err != nil {
return nil, noopCloser{}, err
}
return client, noopCloser{}, nil
}
}
type noopCloser struct{}
func (noopCloser) Close() error { return nil }
////////////////////////////////////////////////////////////////////////
// proxyClient implements provides the application connection.
type proxyClient struct {
service.BaseService
logger log.Logger
client abciclient.Client
metrics *Metrics
}
// New creates a proxy application interface.
func New(client abciclient.Client, logger log.Logger, metrics *Metrics) abciclient.Client {
conn := &proxyClient{
logger: logger,
metrics: metrics,
client: client,
}
conn.BaseService = *service.NewBaseService(logger, "proxyClient", conn)
return conn
}
func (app *proxyClient) OnStop() { tryCallStop(app.client) }
func (app *proxyClient) Error() error { return app.client.Error() }
func tryCallStop(client abciclient.Client) {
switch c := client.(type) {
case nil:
return
case interface{ Stop() }:
c.Stop()
}
}
func (app *proxyClient) OnStart(ctx context.Context) error {
var err error
defer func() {
if err != nil {
tryCallStop(app.client)
}
}()
// Kill Tendermint if the ABCI application crashes.
go func() {
if !app.client.IsRunning() {
return
}
app.client.Wait()
if ctx.Err() != nil {
return
}
if err := app.client.Error(); err != nil {
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint",
"err", err)
if killErr := kill(); killErr != nil {
app.logger.Error("Failed to kill this process - please do so manually",
"err", killErr)
}
}
}()
return app.client.Start(ctx)
}
func kill() error {
p, err := os.FindProcess(os.Getpid())
if err != nil {
return err
}
return p.Signal(syscall.SIGTERM)
}
func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))()
return app.client.InitChain(ctx, req)
}
func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
return app.client.PrepareProposal(ctx, req)
}
func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))()
return app.client.ProcessProposal(ctx, req)
}
func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))()
return app.client.ExtendVote(ctx, req)
}
func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))()
return app.client.VerifyVoteExtension(ctx, req)
}
func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))()
return app.client.FinalizeBlock(ctx, req)
}
func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))()
return app.client.Commit(ctx)
}
func (app *proxyClient) Flush(ctx context.Context) error {
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))()
return app.client.Flush(ctx)
}
func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
return app.client.CheckTx(ctx, req)
}
func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))()
return app.client.Echo(ctx, msg)
}
func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))()
return app.client.Info(ctx, req)
}
func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))()
return app.client.Query(ctx, reqQuery)
}
func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))()
return app.client.ListSnapshots(ctx, req)
}
func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))()
return app.client.OfferSnapshot(ctx, req)
}
func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))()
return app.client.LoadSnapshotChunk(ctx, req)
}
func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))()
return app.client.ApplySnapshotChunk(ctx, req)
}
// addTimeSample returns a function that, when called, adds an observation to m.
// The observation added to m is the number of seconds ellapsed since addTimeSample
// was initially called. addTimeSample is meant to be called in a defer to calculate
// the amount of time a function takes to complete.
func addTimeSample(m metrics.Histogram) func() {
start := time.Now()
return func() { m.Observe(time.Since(start).Seconds()) }
}

View File

@@ -2,18 +2,26 @@ package proxy
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
abciclient "github.com/tendermint/tendermint/abci/client"
abcimocks "github.com/tendermint/tendermint/abci/client/mocks"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/abci/server"
"github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"gotest.tools/assert"
)
//----------------------------------------
@@ -51,7 +59,10 @@ var SOCKET = "socket"
func TestEcho(t *testing.T) {
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
logger := log.TestingLogger()
clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true)
client, err := abciclient.NewClient(logger, sockPath, SOCKET, true)
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -62,12 +73,9 @@ func TestEcho(t *testing.T) {
t.Cleanup(func() { cancel(); s.Wait() })
// Start client
cli, err := clientCreator(logger.With("module", "abci-client"))
require.NoError(t, err, "Error creating ABCI client:")
require.NoError(t, client.Start(ctx), "Error starting ABCI client")
require.NoError(t, cli.Start(ctx), "Error starting ABCI client")
proxy := newAppConnTest(cli)
proxy := newAppConnTest(client)
t.Log("Connected")
for i := 0; i < 1000; i++ {
@@ -91,7 +99,10 @@ func BenchmarkEcho(b *testing.B) {
b.StopTimer() // Initialize
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
logger := log.TestingLogger()
clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true)
client, err := abciclient.NewClient(logger, sockPath, SOCKET, true)
if err != nil {
b.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -102,12 +113,9 @@ func BenchmarkEcho(b *testing.B) {
b.Cleanup(func() { cancel(); s.Wait() })
// Start client
cli, err := clientCreator(logger.With("module", "abci-client"))
require.NoError(b, err, "Error creating ABCI client")
require.NoError(b, client.Start(ctx), "Error starting ABCI client")
require.NoError(b, cli.Start(ctx), "Error starting ABCI client")
proxy := newAppConnTest(cli)
proxy := newAppConnTest(client)
b.Log("Connected")
echoString := strings.Repeat(" ", 200)
b.StartTimer() // Start benchmarking tests
@@ -139,7 +147,10 @@ func TestInfo(t *testing.T) {
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
logger := log.TestingLogger()
clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true)
client, err := abciclient.NewClient(logger, sockPath, SOCKET, true)
if err != nil {
t.Fatal(err)
}
// Start server
s := server.NewSocketServer(logger.With("module", "abci-server"), sockPath, kvstore.NewApplication())
@@ -147,12 +158,9 @@ func TestInfo(t *testing.T) {
t.Cleanup(func() { cancel(); s.Wait() })
// Start client
cli, err := clientCreator(logger.With("module", "abci-client"))
require.NoError(t, err, "Error creating ABCI client")
require.NoError(t, client.Start(ctx), "Error starting ABCI client")
require.NoError(t, cli.Start(ctx), "Error starting ABCI client")
proxy := newAppConnTest(cli)
proxy := newAppConnTest(client)
t.Log("Connected")
resInfo, err := proxy.Info(ctx, RequestInfo)
@@ -162,3 +170,72 @@ func TestInfo(t *testing.T) {
t.Error("Expected ResponseInfo with one element '{\"size\":0}' but got something else")
}
}
type noopStoppableClientImpl struct {
abciclient.Client
count int
}
func (c *noopStoppableClientImpl) Stop() { c.count++ }
func TestAppConns_Start_Stop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clientMock := &abcimocks.Client{}
clientMock.On("Start", mock.Anything).Return(nil)
clientMock.On("Error").Return(nil)
clientMock.On("IsRunning").Return(true)
clientMock.On("Wait").Return(nil).Times(1)
cl := &noopStoppableClientImpl{Client: clientMock}
appConns := New(cl, log.TestingLogger(), NopMetrics())
err := appConns.Start(ctx)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
cancel()
appConns.Wait()
clientMock.AssertExpectations(t)
assert.Equal(t, 1, cl.count)
}
// Upon failure, we call tmos.Kill
func TestAppConns_Failure(t *testing.T) {
ok := make(chan struct{})
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM)
go func() {
for range c {
close(ok)
return
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clientMock := &abcimocks.Client{}
clientMock.On("SetLogger", mock.Anything).Return()
clientMock.On("Start", mock.Anything).Return(nil)
clientMock.On("IsRunning").Return(true)
clientMock.On("Wait").Return(nil)
clientMock.On("Error").Return(errors.New("EOF"))
cl := &noopStoppableClientImpl{Client: clientMock}
appConns := New(cl, log.TestingLogger(), NopMetrics())
err := appConns.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); appConns.Wait() })
select {
case <-ok:
t.Log("SIGTERM successfully received")
case <-time.After(5 * time.Second):
t.Fatal("expected process to receive SIGTERM signal")
}
}

View File

@@ -1,131 +0,0 @@
package proxy
import (
"context"
"os"
"syscall"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
// AppConns is the Tendermint's interface to the application that consists of
// multiple connections.
type AppConns interface {
service.Service
// Mempool connection
Mempool() AppConnMempool
// Consensus connection
Consensus() AppConnConsensus
// Query connection
Query() AppConnQuery
// Snapshot connection
Snapshot() AppConnSnapshot
}
// NewAppConns calls NewMultiAppConn.
func NewAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
return NewMultiAppConn(clientCreator, logger, metrics)
}
// multiAppConn implements AppConns.
//
// A multiAppConn is made of a few appConns and manages their underlying abci
// clients.
// TODO: on app restart, clients must reboot together
type multiAppConn struct {
service.BaseService
logger log.Logger
metrics *Metrics
consensusConn AppConnConsensus
mempoolConn AppConnMempool
queryConn AppConnQuery
snapshotConn AppConnSnapshot
client stoppableClient
clientCreator abciclient.Creator
}
// TODO: this is a totally internal and quasi permanent shim for
// clients. eventually we can have a single client and have some kind
// of reasonable lifecycle witout needing an explicit stop method.
type stoppableClient interface {
abciclient.Client
Stop()
}
// NewMultiAppConn makes all necessary abci connections to the application.
func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
multiAppConn := &multiAppConn{
logger: logger,
metrics: metrics,
clientCreator: clientCreator,
}
multiAppConn.BaseService = *service.NewBaseService(logger, "multiAppConn", multiAppConn)
return multiAppConn
}
func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn }
func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn }
func (app *multiAppConn) Query() AppConnQuery { return app.queryConn }
func (app *multiAppConn) Snapshot() AppConnSnapshot { return app.snapshotConn }
func (app *multiAppConn) OnStart(ctx context.Context) error {
var err error
defer func() {
if err != nil {
app.client.Stop()
}
}()
var client abciclient.Client
client, err = app.clientCreator(app.logger)
if err != nil {
return err
}
app.queryConn = NewAppConnQuery(client, app.metrics)
app.snapshotConn = NewAppConnSnapshot(client, app.metrics)
app.mempoolConn = NewAppConnMempool(client, app.metrics)
app.consensusConn = NewAppConnConsensus(client, app.metrics)
app.client = client.(stoppableClient)
// Kill Tendermint if the ABCI application crashes.
go func() {
if !client.IsRunning() {
return
}
app.client.Wait()
if ctx.Err() != nil {
return
}
if err := app.client.Error(); err != nil {
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint",
"err", err)
if killErr := kill(); killErr != nil {
app.logger.Error("Failed to kill this process - please do so manually",
"err", killErr)
}
}
}()
return client.Start(ctx)
}
func (app *multiAppConn) OnStop() { app.client.Stop() }
func kill() error {
p, err := os.FindProcess(os.Getpid())
if err != nil {
return err
}
return p.Signal(syscall.SIGTERM)
}

View File

@@ -1,99 +0,0 @@
package proxy
import (
"context"
"errors"
"os"
"os/signal"
"syscall"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
abciclient "github.com/tendermint/tendermint/abci/client"
abcimocks "github.com/tendermint/tendermint/abci/client/mocks"
"github.com/tendermint/tendermint/libs/log"
)
type noopStoppableClientImpl struct {
abciclient.Client
count int
}
func (c *noopStoppableClientImpl) Stop() { c.count++ }
func TestAppConns_Start_Stop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clientMock := &abcimocks.Client{}
clientMock.On("Start", mock.Anything).Return(nil)
clientMock.On("Error").Return(nil)
clientMock.On("IsRunning").Return(true)
clientMock.On("Wait").Return(nil).Times(1)
cl := &noopStoppableClientImpl{Client: clientMock}
creatorCallCount := 0
creator := func(logger log.Logger) (abciclient.Client, error) {
creatorCallCount++
return cl, nil
}
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
err := appConns.Start(ctx)
require.NoError(t, err)
time.Sleep(200 * time.Millisecond)
cancel()
appConns.Wait()
clientMock.AssertExpectations(t)
assert.Equal(t, 1, cl.count)
assert.Equal(t, 1, creatorCallCount)
}
// Upon failure, we call tmos.Kill
func TestAppConns_Failure(t *testing.T) {
ok := make(chan struct{})
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM)
go func() {
for range c {
close(ok)
return
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clientMock := &abcimocks.Client{}
clientMock.On("SetLogger", mock.Anything).Return()
clientMock.On("Start", mock.Anything).Return(nil)
clientMock.On("IsRunning").Return(true)
clientMock.On("Wait").Return(nil)
clientMock.On("Error").Return(errors.New("EOF"))
cl := &noopStoppableClientImpl{Client: clientMock}
creator := func(log.Logger) (abciclient.Client, error) {
return cl, nil
}
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
err := appConns.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() { cancel(); appConns.Wait() })
select {
case <-ok:
t.Log("SIGTERM successfully received")
case <-time.After(5 * time.Second):
t.Fatal("expected process to receive SIGTERM signal")
}
}

View File

@@ -18,7 +18,7 @@ func (env *Environment) ABCIQuery(
height int64,
prove bool,
) (*coretypes.ResultABCIQuery, error) {
resQuery, err := env.ProxyAppQuery.Query(ctx, abci.RequestQuery{
resQuery, err := env.ProxyApp.Query(ctx, abci.RequestQuery{
Path: path,
Data: data,
Height: height,
@@ -34,7 +34,7 @@ func (env *Environment) ABCIQuery(
// ABCIInfo gets some info about the application.
// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info
func (env *Environment) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, error) {
resInfo, err := env.ProxyAppQuery.Info(ctx, proxy.RequestInfo)
resInfo, err := env.ProxyApp.Info(ctx, proxy.RequestInfo)
if err != nil {
return nil, err
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/rs/cors"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/blocksync"
@@ -19,7 +20,6 @@ import (
"github.com/tendermint/tendermint/internal/eventlog"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
"github.com/tendermint/tendermint/internal/pubsub/query"
sm "github.com/tendermint/tendermint/internal/state"
@@ -67,8 +67,7 @@ type peerManager interface {
// to be setup once during startup.
type Environment struct {
// external, thread safe interfaces
ProxyAppQuery proxy.AppConnQuery
ProxyAppMempool proxy.AppConnMempool
ProxyApp abciclient.Client
// interfaces defined in types and above
StateStore sm.Store

View File

@@ -158,7 +158,7 @@ func (env *Environment) NumUnconfirmedTxs(ctx context.Context) (*coretypes.Resul
// be added to the mempool either.
// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx
func (env *Environment) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultCheckTx, error) {
res, err := env.ProxyAppMempool.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
res, err := env.ProxyApp.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
if err != nil {
return nil, err
}

View File

@@ -6,11 +6,11 @@ import (
"fmt"
"time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/mempool"
"github.com/tendermint/tendermint/internal/proxy"
"github.com/tendermint/tendermint/libs/log"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
"github.com/tendermint/tendermint/types"
@@ -30,7 +30,7 @@ type BlockExecutor struct {
blockStore BlockStore
// execute the app against this
proxyApp proxy.AppConnConsensus
proxyApp abciclient.Client
// events
eventBus types.BlockEventPublisher
@@ -60,7 +60,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
func NewBlockExecutor(
stateStore Store,
logger log.Logger,
proxyApp proxy.AppConnConsensus,
proxyApp abciclient.Client,
pool mempool.Mempool,
evpool EvidencePool,
blockStore BlockStore,
@@ -375,7 +375,7 @@ func (blockExec *BlockExecutor) Commit(
func execBlockOnProxyApp(
ctx context.Context,
logger log.Logger,
proxyAppConn proxy.AppConnConsensus,
proxyAppConn abciclient.Client,
block *types.Block,
store Store,
initialHeight int64,
@@ -617,7 +617,7 @@ func fireEvents(
func ExecCommitBlock(
ctx context.Context,
be *BlockExecutor,
appConnConsensus proxy.AppConnConsensus,
appConnConsensus abciclient.Client,
block *types.Block,
logger log.Logger,
store Store,

View File

@@ -39,9 +39,9 @@ var (
func TestApplyBlock(t *testing.T) {
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -52,7 +52,7 @@ func TestApplyBlock(t *testing.T) {
state, stateDB, _ := makeState(t, 1, 1)
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(),
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp,
mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore)
block, err := sf.MakeBlock(state, 1, new(types.Commit))
@@ -73,9 +73,10 @@ func TestBeginBlockValidators(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
logger := log.TestingLogger()
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
@@ -121,7 +122,7 @@ func TestBeginBlockValidators(t *testing.T) {
block, err := sf.MakeBlock(state, 2, lastCommit)
require.NoError(t, err)
_, err = sm.ExecCommitBlock(ctx, nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state)
_, err = sm.ExecCommitBlock(ctx, nil, proxyApp, block, log.TestingLogger(), stateStore, 1, state)
require.NoError(t, err, tc.desc)
// -> app receives a list of validators with a bool indicating if they signed
@@ -145,8 +146,9 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
defer cancel()
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
logger := log.TestingLogger()
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
@@ -222,7 +224,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp,
mmock.Mempool{}, evpool, blockStore)
block, err := sf.MakeBlock(state, 1, new(types.Commit))
@@ -248,9 +250,9 @@ func TestProcessProposal(t *testing.T) {
defer cancel()
app := abcimocks.NewBaseMock()
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
@@ -261,7 +263,7 @@ func TestProcessProposal(t *testing.T) {
blockExec := sm.NewBlockExecutor(
stateStore,
logger,
proxyApp.Consensus(),
proxyApp,
mmock.Mempool{},
sm.EmptyEvidencePool{},
blockStore,
@@ -451,9 +453,9 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
defer cancel()
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
@@ -464,7 +466,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
blockExec := sm.NewBlockExecutor(
stateStore,
logger,
proxyApp.Consensus(),
proxyApp,
mmock.Mempool{},
sm.EmptyEvidencePool{},
blockStore,
@@ -526,9 +528,9 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
defer cancel()
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
logger := log.TestingLogger()
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, app)
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err := proxyApp.Start(ctx)
require.NoError(t, err)
@@ -538,7 +540,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
blockExec := sm.NewBlockExecutor(
stateStore,
log.TestingLogger(),
proxyApp.Consensus(),
proxyApp,
mmock.Mempool{},
sm.EmptyEvidencePool{},
blockStore,

View File

@@ -11,16 +11,13 @@ import (
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/encoding"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
sf "github.com/tendermint/tendermint/internal/state/test/factory"
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
tmtime "github.com/tendermint/tendermint/libs/time"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
@@ -33,12 +30,6 @@ type paramsChangeTestCase struct {
params types.ConsensusParams
}
func newTestApp() proxy.AppConns {
app := &testApp{}
cc := abciclient.NewLocalCreator(app)
return proxy.NewAppConns(cc, log.NewNopLogger(), proxy.NopMetrics())
}
func makeAndCommitGoodBlock(
ctx context.Context,
t *testing.T,

View File

@@ -10,10 +10,12 @@ import (
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/tmhash"
memmock "github.com/tendermint/tendermint/internal/mempool/mock"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/mocks"
statefactory "github.com/tendermint/tendermint/internal/state/test/factory"
@@ -30,8 +32,8 @@ const validationTestsStopHeight int64 = 10
func TestValidateBlockHeader(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proxyApp := newTestApp()
logger := log.TestingLogger()
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx))
state, stateDB, privVals := makeState(t, 3, 1)
@@ -39,8 +41,8 @@ func TestValidateBlockHeader(t *testing.T) {
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(
stateStore,
log.TestingLogger(),
proxyApp.Consensus(),
logger,
proxyApp,
memmock.Mempool{},
sm.EmptyEvidencePool{},
blockStore,
@@ -119,7 +121,8 @@ func TestValidateBlockCommit(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proxyApp := newTestApp()
logger := log.TestingLogger()
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx))
state, stateDB, privVals := makeState(t, 1, 1)
@@ -127,8 +130,8 @@ func TestValidateBlockCommit(t *testing.T) {
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockExec := sm.NewBlockExecutor(
stateStore,
log.TestingLogger(),
proxyApp.Consensus(),
logger,
proxyApp,
memmock.Mempool{},
sm.EmptyEvidencePool{},
blockStore,
@@ -245,7 +248,8 @@ func TestValidateBlockEvidence(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
proxyApp := newTestApp()
logger := log.TestingLogger()
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx))
state, stateDB, privVals := makeState(t, 4, 1)
@@ -263,7 +267,7 @@ func TestValidateBlockEvidence(t *testing.T) {
blockExec := sm.NewBlockExecutor(
stateStore,
log.TestingLogger(),
proxyApp.Consensus(),
proxyApp,
memmock.Mempool{},
evpool,
blockStore,

View File

@@ -11,11 +11,11 @@ import (
"sync"
"time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
@@ -135,8 +135,7 @@ type Reactor struct {
stateStore sm.Store
blockStore *store.BlockStore
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
conn abciclient.Client
tempDir string
snapshotCh *p2p.Channel
chunkCh *p2p.Channel
@@ -173,8 +172,7 @@ func NewReactor(
initialHeight int64,
cfg config.StateSyncConfig,
logger log.Logger,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
conn abciclient.Client,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
stateStore sm.Store,
@@ -209,7 +207,6 @@ func NewReactor(
initialHeight: initialHeight,
cfg: cfg,
conn: conn,
connQuery: connQuery,
snapshotCh: snapshotCh,
chunkCh: chunkCh,
blockCh: blockCh,
@@ -287,7 +284,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.cfg,
r.logger,
r.conn,
r.connQuery,
r.stateProvider,
r.snapshotCh,
r.chunkCh,

View File

@@ -13,11 +13,11 @@ import (
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
clientmocks "github.com/tendermint/tendermint/abci/client/mocks"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks"
smmocks "github.com/tendermint/tendermint/internal/state/mocks"
"github.com/tendermint/tendermint/internal/statesync/mocks"
"github.com/tendermint/tendermint/internal/store"
@@ -37,8 +37,7 @@ type reactorTestSuite struct {
reactor *Reactor
syncer *syncer
conn *proxymocks.AppConnSnapshot
connQuery *proxymocks.AppConnQuery
conn *clientmocks.Client
stateProvider *mocks.StateProvider
snapshotChannel *p2p.Channel
@@ -71,21 +70,14 @@ type reactorTestSuite struct {
func setup(
ctx context.Context,
t *testing.T,
conn *proxymocks.AppConnSnapshot,
connQuery *proxymocks.AppConnQuery,
conn *clientmocks.Client,
stateProvider *mocks.StateProvider,
chBuf uint,
) *reactorTestSuite {
t.Helper()
if conn == nil {
conn = &proxymocks.AppConnSnapshot{}
}
if connQuery == nil {
connQuery = &proxymocks.AppConnQuery{}
}
if stateProvider == nil {
stateProvider = &mocks.StateProvider{}
conn = &clientmocks.Client{}
}
rts := &reactorTestSuite{
@@ -102,7 +94,6 @@ func setup(
paramsOutCh: make(chan p2p.Envelope, chBuf),
paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
conn: conn,
connQuery: connQuery,
stateProvider: stateProvider,
}
@@ -171,7 +162,6 @@ func setup(
*cfg,
logger.With("component", "reactor"),
conn,
connQuery,
chCreator,
rts.peerUpdates,
rts.stateStore,
@@ -186,7 +176,6 @@ func setup(
*cfg,
logger.With("component", "syncer"),
conn,
connQuery,
stateProvider,
rts.snapshotChannel,
rts.chunkChannel,
@@ -211,7 +200,7 @@ func TestReactor_Sync(t *testing.T) {
defer cancel()
const snapshotHeight = 7
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
// app accepts any snapshot
rts.conn.On("OfferSnapshot", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
@@ -222,7 +211,7 @@ func TestReactor_Sync(t *testing.T) {
Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
// app query returns valid state app hash
rts.connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
rts.conn.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
AppVersion: testAppVersion,
LastBlockHeight: snapshotHeight,
LastBlockAppHash: chain[snapshotHeight+1].AppHash,
@@ -265,7 +254,7 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
rts.chunkInCh <- p2p.Envelope{
From: types.NodeID("aa"),
@@ -316,14 +305,14 @@ func TestReactor_ChunkRequest(t *testing.T) {
defer cancel()
// mock ABCI connection to return local snapshots
conn := &proxymocks.AppConnSnapshot{}
conn := &clientmocks.Client{}
conn.On("LoadSnapshotChunk", mock.Anything, abci.RequestLoadSnapshotChunk{
Height: tc.request.Height,
Format: tc.request.Format,
Chunk: tc.request.Index,
}).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
rts := setup(ctx, t, conn, nil, nil, 2)
rts := setup(ctx, t, conn, nil, 2)
rts.chunkInCh <- p2p.Envelope{
From: types.NodeID("aa"),
@@ -343,7 +332,7 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
rts.snapshotInCh <- p2p.Envelope{
From: types.NodeID("aa"),
@@ -403,12 +392,12 @@ func TestReactor_SnapshotsRequest(t *testing.T) {
defer cancel()
// mock ABCI connection to return local snapshots
conn := &proxymocks.AppConnSnapshot{}
conn := &clientmocks.Client{}
conn.On("ListSnapshots", mock.Anything, abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
Snapshots: tc.snapshots,
}, nil)
rts := setup(ctx, t, conn, nil, nil, 100)
rts := setup(ctx, t, conn, nil, 100)
rts.snapshotInCh <- p2p.Envelope{
From: types.NodeID("aa"),
@@ -435,7 +424,7 @@ func TestReactor_LightBlockResponse(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
var height int64 = 10
// generates a random header
@@ -492,7 +481,7 @@ func TestReactor_BlockProviders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("aa"),
Status: p2p.PeerStatusUp,
@@ -559,7 +548,7 @@ func TestReactor_StateProviderP2P(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
// make syncer non nil else test won't think we are state syncing
rts.reactor.syncer = rts.syncer
peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
@@ -636,7 +625,7 @@ func TestReactor_Backfill(t *testing.T) {
defer cancel()
t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute))
rts := setup(ctx, t, nil, nil, nil, 21)
rts := setup(ctx, t, nil, nil, 21)
var (
startHeight int64 = 20

View File

@@ -8,6 +8,7 @@ import (
"sync"
"time"
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p"
@@ -54,8 +55,7 @@ var (
type syncer struct {
logger log.Logger
stateProvider StateProvider
conn proxy.AppConnSnapshot
connQuery proxy.AppConnQuery
conn abciclient.Client
snapshots *snapshotPool
snapshotCh *p2p.Channel
chunkCh *p2p.Channel
@@ -76,8 +76,7 @@ type syncer struct {
func newSyncer(
cfg config.StateSyncConfig,
logger log.Logger,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
conn abciclient.Client,
stateProvider StateProvider,
snapshotCh *p2p.Channel,
chunkCh *p2p.Channel,
@@ -88,7 +87,6 @@ func newSyncer(
logger: logger,
stateProvider: stateProvider,
conn: conn,
connQuery: connQuery,
snapshots: newSnapshotPool(),
snapshotCh: snapshotCh,
chunkCh: chunkCh,
@@ -547,7 +545,7 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uin
// verifyApp verifies the sync, checking the app hash, last block height and app version
func (s *syncer) verifyApp(ctx context.Context, snapshot *snapshot, appVersion uint64) error {
resp, err := s.connQuery.Info(ctx, proxy.RequestInfo)
resp, err := s.conn.Info(ctx, proxy.RequestInfo)
if err != nil {
return fmt.Errorf("failed to query ABCI app for appHash: %w", err)
}

View File

@@ -11,9 +11,9 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
clientmocks "github.com/tendermint/tendermint/abci/client/mocks"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/internal/proxy"
proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/statesync/mocks"
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
@@ -62,13 +62,12 @@ func TestSyncer_SyncAny(t *testing.T) {
stateProvider.On("AppHash", mock.Anything, uint64(2)).Return([]byte("app_hash_2"), nil)
stateProvider.On("Commit", mock.Anything, uint64(1)).Return(commit, nil)
stateProvider.On("State", mock.Anything, uint64(1)).Return(state, nil)
connSnapshot := &proxymocks.AppConnSnapshot{}
connQuery := &proxymocks.AppConnQuery{}
conn := &clientmocks.Client{}
peerAID := types.NodeID("aa")
peerBID := types.NodeID("bb")
peerCID := types.NodeID("cc")
rts := setup(ctx, t, connSnapshot, connQuery, stateProvider, 4)
rts := setup(ctx, t, conn, stateProvider, 4)
rts.reactor.syncer = rts.syncer
@@ -110,7 +109,7 @@ func TestSyncer_SyncAny(t *testing.T) {
// We start a sync, with peers sending back chunks when requested. We first reject the snapshot
// with height 2 format 2, and accept the snapshot at height 1.
connSnapshot.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
Snapshot: &abci.Snapshot{
Height: 2,
Format: 2,
@@ -119,7 +118,7 @@ func TestSyncer_SyncAny(t *testing.T) {
},
AppHash: []byte("app_hash_2"),
}).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
connSnapshot.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
Snapshot: &abci.Snapshot{
Height: s.Height,
Format: s.Format,
@@ -171,7 +170,7 @@ func TestSyncer_SyncAny(t *testing.T) {
// The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
// which should cause it to keep the existing chunk 0 and 2, and restart restoration from
// beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
Index: 2, Chunk: []byte{1, 1, 2},
}).Once().Run(func(args mock.Arguments) { time.Sleep(1 * time.Second) }).Return(
&abci.ResponseApplySnapshotChunk{
@@ -179,16 +178,16 @@ func TestSyncer_SyncAny(t *testing.T) {
RefetchChunks: []uint32{1},
}, nil)
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
Index: 0, Chunk: []byte{1, 1, 0},
}).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
Index: 1, Chunk: []byte{1, 1, 1},
}).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
Index: 2, Chunk: []byte{1, 1, 2},
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
conn.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
AppVersion: testAppVersion,
LastBlockHeight: 1,
LastBlockAppHash: []byte("app_hash"),
@@ -217,8 +216,7 @@ func TestSyncer_SyncAny(t *testing.T) {
require.Equal(t, int64(len(rts.syncer.snapshots.snapshots)), rts.reactor.TotalSnapshots())
require.Equal(t, int64(0), rts.reactor.SnapshotChunksCount())
connSnapshot.AssertExpectations(t)
connQuery.AssertExpectations(t)
conn.AssertExpectations(t)
}
func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
@@ -228,7 +226,7 @@ func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
_, _, err := rts.syncer.SyncAny(ctx, 0, func() error { return nil })
require.Equal(t, errNoSnapshots, err)
@@ -241,7 +239,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
peerID := types.NodeID("aa")
@@ -265,7 +263,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
// s22 is tried first, then s12, then s11, then errNoSnapshots
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
@@ -307,7 +305,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
// s22 is tried first, which reject s22 and s12, then s11 will abort.
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
@@ -345,7 +343,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
peerAID := types.NodeID("aa")
peerBID := types.NodeID("bb")
@@ -394,7 +392,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
errBoom := errors.New("boom")
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
@@ -444,7 +442,7 @@ func TestSyncer_offerSnapshot(t *testing.T) {
stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
rts.conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
@@ -497,7 +495,7 @@ func TestSyncer_applyChunks_Results(t *testing.T) {
stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
body := []byte{1, 2, 3}
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, t.TempDir())
@@ -557,7 +555,7 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, t.TempDir())
require.NoError(t, err)
@@ -628,7 +626,7 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
stateProvider := &mocks.StateProvider{}
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
rts := setup(ctx, t, nil, nil, stateProvider, 2)
rts := setup(ctx, t, nil, stateProvider, 2)
// Set up three peers across two snapshots, and ask for one of them to be banned.
// It should be banned from all snapshots.
@@ -761,9 +759,9 @@ func TestSyncer_verifyApp(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
rts := setup(ctx, t, nil, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 2)
rts.connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err)
rts.conn.On("Info", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err)
err := rts.syncer.verifyApp(ctx, s, appVersion)
unwrapped := errors.Unwrap(err)
if unwrapped != nil {

View File

@@ -100,7 +100,10 @@ func newDefaultNode(
return nil, err
}
appClient, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
appClient, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
if err != nil {
return nil, err
}
return makeNode(
ctx,
@@ -120,7 +123,7 @@ func makeNode(
cfg *config.Config,
filePrivval *privval.FilePV,
nodeKey types.NodeKey,
clientCreator abciclient.Creator,
client abciclient.Client,
genesisDocProvider genesisDocProvider,
dbProvider config.DBProvider,
logger log.Logger,
@@ -155,7 +158,7 @@ func makeNode(
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy)
proxyApp := proxy.New(client, logger.With("module", "proxy"), nodeMetrics.proxy)
if err := proxyApp.Start(ctx); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %w", err)
}
@@ -281,7 +284,7 @@ func makeNode(
blockExec := sm.NewBlockExecutor(
stateStore,
logger.With("module", "state"),
proxyApp.Consensus(),
proxyApp,
mp,
evPool,
blockStore,
@@ -339,8 +342,7 @@ func makeNode(
genDoc.InitialHeight,
*cfg.StateSync,
logger.With("module", "statesync"),
proxyApp.Snapshot(),
proxyApp.Query(),
proxyApp,
router.OpenChannel,
peerManager.Subscribe(ctx),
stateStore,
@@ -390,14 +392,13 @@ func makeNode(
shutdownOps: makeCloser(closers),
rpcEnv: &rpccore.Environment{
ProxyAppQuery: proxyApp.Query(),
ProxyAppMempool: proxyApp.Mempool(),
StateStore: stateStore,
BlockStore: blockStore,
ProxyApp: proxyApp,
EvidencePool: evPool,
ConsensusState: csState,
StateStore: stateStore,
BlockStore: blockStore,
ConsensusReactor: csReactor,
BlockSyncReactor: bcReactor,
@@ -752,14 +753,14 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene
return state, nil
}
func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
func getRouterConfig(conf *config.Config, proxyApp abciclient.Client) p2p.RouterOptions {
opts := p2p.RouterOptions{
QueueType: conf.P2P.QueueType,
}
if conf.FilterPeers && proxyApp != nil {
opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error {
res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{
res, err := proxyApp.Query(ctx, abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", id),
})
if err != nil {
@@ -773,7 +774,7 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt
}
opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error {
res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{
res, err := proxyApp.Query(ctx, abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))),
})
if err != nil {

View File

@@ -273,8 +273,8 @@ func TestCreateProposalBlock(t *testing.T) {
logger := log.NewNopLogger()
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, kvstore.NewApplication())
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err = proxyApp.Start(ctx)
require.NoError(t, err)
@@ -291,7 +291,7 @@ func TestCreateProposalBlock(t *testing.T) {
mp := mempool.NewTxMempool(
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
proxyApp,
)
// Make EvidencePool
@@ -327,7 +327,7 @@ func TestCreateProposalBlock(t *testing.T) {
blockExec := sm.NewBlockExecutor(
stateStore,
logger,
proxyApp.Consensus(),
proxyApp,
mp,
evidencePool,
blockStore,
@@ -371,8 +371,8 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
logger := log.NewNopLogger()
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, kvstore.NewApplication())
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err = proxyApp.Start(ctx)
require.NoError(t, err)
@@ -390,7 +390,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
mp := mempool.NewTxMempool(
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
proxyApp,
)
// fill the mempool with one txs just below the maximum size
@@ -402,7 +402,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
blockExec := sm.NewBlockExecutor(
stateStore,
logger,
proxyApp.Consensus(),
proxyApp,
mp,
sm.EmptyEvidencePool{},
blockStore,
@@ -438,8 +438,8 @@ func TestMaxProposalBlockSize(t *testing.T) {
logger := log.NewNopLogger()
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
cc := abciclient.NewLocalClient(logger, kvstore.NewApplication())
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
err = proxyApp.Start(ctx)
require.NoError(t, err)
@@ -454,7 +454,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
mp := mempool.NewTxMempool(
logger.With("module", "mempool"),
cfg.Mempool,
proxyApp.Mempool(),
proxyApp,
)
// fill the mempool with one txs just below the maximum size
@@ -473,7 +473,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
blockExec := sm.NewBlockExecutor(
stateStore,
logger,
proxyApp.Consensus(),
proxyApp,
mp,
sm.EmptyEvidencePool{},
blockStore,

View File

@@ -35,7 +35,7 @@ func New(
ctx context.Context,
conf *config.Config,
logger log.Logger,
cf abciclient.Creator,
cf abciclient.Client,
gen *types.GenesisDoc,
) (service.Service, error) {
nodeKey, err := types.LoadOrGenNodeKey(conf.NodeKeyFile())

View File

@@ -10,6 +10,7 @@ import (
dbm "github.com/tendermint/tm-db"
abciclient "github.com/tendermint/tendermint/abci/client"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/internal/blocksync"
@@ -20,7 +21,6 @@ import (
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
"github.com/tendermint/tendermint/internal/p2p/pex"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/state/indexer/sink"
@@ -171,7 +171,7 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
func createMempoolReactor(
ctx context.Context,
cfg *config.Config,
proxyApp proxy.AppConns,
proxyApp abciclient.Client,
store sm.Store,
memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager,
@@ -183,7 +183,7 @@ func createMempoolReactor(
mp := mempool.NewTxMempool(
logger,
cfg.Mempool,
proxyApp.Mempool(),
proxyApp,
mempool.WithMetrics(memplMetrics),
mempool.WithPreCheck(sm.TxPreCheckFromStore(store)),
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
@@ -387,7 +387,7 @@ func createRouter(
nodeKey types.NodeKey,
peerManager *p2p.PeerManager,
cfg *config.Config,
proxyApp proxy.AppConns,
proxyApp abciclient.Client,
) (*p2p.Router, error) {
p2pLogger := logger.With("module", "p2p")

View File

@@ -98,7 +98,7 @@ func StartTendermint(
}
}
papp := abciclient.NewLocalCreator(app)
papp := abciclient.NewLocalClient(logger, app)
tmNode, err := node.New(ctx, conf, logger, papp, nil)
if err != nil {
return nil, func(_ context.Context) error { cancel(); return nil }, err

View File

@@ -136,7 +136,7 @@ func startNode(ctx context.Context, cfg *Config) error {
ctx,
tmcfg,
nodeLogger,
abciclient.NewLocalCreator(app),
abciclient.NewLocalClient(nodeLogger, app),
nil,
)
if err != nil {

View File

@@ -15,9 +15,9 @@ var getMp func() mempool.Mempool
func init() {
app := kvstore.NewApplication()
cc := abciclient.NewLocalCreator(app)
appConnMem, _ := cc(log.NewNopLogger())
err := appConnMem.Start(context.TODO())
logger := log.NewNopLogger()
conn := abciclient.NewLocalClient(logger, app)
err := conn.Start(context.TODO())
if err != nil {
panic(err)
}
@@ -27,12 +27,7 @@ func init() {
getMp = func() mempool.Mempool {
if mp == nil {
mp = mempool.NewTxMempool(
log.NewNopLogger(),
cfg,
appConnMem,
)
mp = mempool.NewTxMempool(logger, cfg, conn)
}
return mp
}