From 378a2f0aeda9b483bfd973f193fb8a890afea80b Mon Sep 17 00:00:00 2001 From: tycho garen Date: Thu, 3 Mar 2022 12:19:06 -0500 Subject: [PATCH] proxy: collapse triforcated client --- internal/blocksync/reactor_test.go | 8 +- internal/consensus/replay.go | 23 +-- internal/consensus/replay_file.go | 4 +- internal/consensus/replay_test.go | 24 +-- internal/consensus/wal_generator.go | 4 +- internal/mempool/mempool.go | 7 +- internal/proxy/app_conn.go | 211 +++++--------------------- internal/proxy/multi_app_conn.go | 104 +++++-------- internal/proxy/multi_app_conn_test.go | 4 +- internal/rpc/core/abci.go | 4 +- internal/rpc/core/env.go | 5 +- internal/rpc/core/mempool.go | 2 +- internal/state/execution.go | 10 +- internal/state/execution_test.go | 24 +-- internal/state/helpers_test.go | 4 +- internal/state/validation_test.go | 6 +- internal/statesync/reactor.go | 10 +- internal/statesync/reactor_test.go | 14 +- internal/statesync/syncer.go | 10 +- node/node.go | 20 +-- node/node_test.go | 18 +-- node/setup.go | 8 +- 22 files changed, 167 insertions(+), 357 deletions(-) diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index 68656fbc3..118286322 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -33,7 +33,7 @@ type reactorTestSuite struct { nodes []types.NodeID reactors map[types.NodeID]*Reactor - app map[types.NodeID]proxy.AppConns + app map[types.NodeID]abciclient.Client blockSyncChannels map[types.NodeID]*p2p.Channel peerChans map[types.NodeID]chan p2p.PeerUpdate @@ -64,7 +64,7 @@ func setup( network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}), nodes: make([]types.NodeID, 0, numNodes), reactors: make(map[types.NodeID]*Reactor, numNodes), - app: make(map[types.NodeID]proxy.AppConns, numNodes), + app: make(map[types.NodeID]abciclient.Client, numNodes), blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes), peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes), peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes), @@ -109,7 +109,7 @@ func (rts *reactorTestSuite) addNode( logger := log.TestingLogger() rts.nodes = append(rts.nodes, nodeID) - rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics()) + rts.app[nodeID] = proxy.New(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics()) require.NoError(t, rts.app[nodeID].Start(ctx)) blockDB := dbm.NewMemDB() @@ -124,7 +124,7 @@ func (rts *reactorTestSuite) addNode( blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), - rts.app[nodeID].Consensus(), + rts.app[nodeID], mock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, diff --git a/internal/consensus/replay.go b/internal/consensus/replay.go index 6250ffc06..9381cdd29 100644 --- a/internal/consensus/replay.go +++ b/internal/consensus/replay.go @@ -10,6 +10,7 @@ import ( "reflect" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/internal/eventbus" @@ -237,10 +238,10 @@ func (h *Handshaker) NBlocks() int { } // TODO: retry the handshake/replay if it fails ? -func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error { +func (h *Handshaker) Handshake(ctx context.Context, proxyApp abciclient.Client) error { // Handshake is done via ABCI Info on the query conn. - res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo) + res, err := proxyApp.Info(ctx, proxy.RequestInfo) if err != nil { return fmt.Errorf("error calling Info: %w", err) } @@ -285,7 +286,7 @@ func (h *Handshaker) ReplayBlocks( state sm.State, appHash []byte, appBlockHeight int64, - proxyApp proxy.AppConns, + proxyApp abciclient.Client, ) ([]byte, error) { storeBlockBase := h.store.Base() storeBlockHeight := h.store.Height() @@ -316,7 +317,7 @@ func (h *Handshaker) ReplayBlocks( Validators: nextVals, AppStateBytes: h.genDoc.AppState, } - res, err := proxyApp.Consensus().InitChain(ctx, req) + res, err := proxyApp.InitChain(ctx, req) if err != nil { return nil, err } @@ -413,7 +414,7 @@ func (h *Handshaker) ReplayBlocks( // NOTE: We could instead use the cs.WAL on cs.Start, // but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT h.logger.Info("Replay last block using real app") - state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp) return state.AppHash, err case appBlockHeight == storeBlockHeight: @@ -445,7 +446,7 @@ func (h *Handshaker) ReplayBlocks( func (h *Handshaker) replayBlocks( ctx context.Context, state sm.State, - proxyApp proxy.AppConns, + proxyApp abciclient.Client, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) { @@ -481,16 +482,16 @@ func (h *Handshaker) replayBlocks( // We emit events for the index services at the final block due to the sync issue when // the node shutdown during the block committing status. blockExec := sm.NewBlockExecutor( - h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store) + h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store) blockExec.SetEventBus(h.eventBus) appHash, err = sm.ExecCommitBlock(ctx, - blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) + blockExec, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) if err != nil { return nil, err } } else { appHash, err = sm.ExecCommitBlock(ctx, - nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) + nil, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state) if err != nil { return nil, err } @@ -501,7 +502,7 @@ func (h *Handshaker) replayBlocks( if mutateState { // sync the final block - state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus()) + state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp) if err != nil { return nil, err } @@ -517,7 +518,7 @@ func (h *Handshaker) replayBlock( ctx context.Context, state sm.State, height int64, - proxyApp proxy.AppConnConsensus, + proxyApp abciclient.Client, ) (sm.State, error) { block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) diff --git a/internal/consensus/replay_file.go b/internal/consensus/replay_file.go index 310eb0ab6..deb510643 100644 --- a/internal/consensus/replay_file.go +++ b/internal/consensus/replay_file.go @@ -325,7 +325,7 @@ func newConsensusStateForReplay( // Create proxyAppConn connection (consensus, mempool, query) clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) err = proxyApp.Start(ctx) if err != nil { return nil, fmt.Errorf("starting proxy app conns: %w", err) @@ -343,7 +343,7 @@ func newConsensusStateForReplay( } mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore) consensusState := NewState(ctx, logger, csConfig, state.Copy(), blockExec, blockStore, mempool, evpool) diff --git a/internal/consensus/replay_test.go b/internal/consensus/replay_test.go index b9302d125..fd84e71d7 100644 --- a/internal/consensus/replay_test.go +++ b/internal/consensus/replay_test.go @@ -808,7 +808,7 @@ func testHandshakeReplay( if nBlocks > 0 { // run nBlocks against a new client to build up the app state. // use a throwaway tendermint state - proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics()) + proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) stateDB1 := dbm.NewMemDB() stateStore := sm.NewStore(stateDB1) err := stateStore.Save(genesisState) @@ -829,7 +829,7 @@ func testHandshakeReplay( genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile()) require.NoError(t, err) handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) - proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics()) + proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") t.Cleanup(func() { cancel(); proxyApp.Wait() }) @@ -842,7 +842,7 @@ func testHandshakeReplay( require.NoError(t, err, "Error on abci handshake") // get the latest app hash from the app - res, err := proxyApp.Query().Info(ctx, abci.RequestInfo{Version: ""}) + res, err := proxyApp.Info(ctx, abci.RequestInfo{Version: ""}) if err != nil { t.Fatal(err) } @@ -875,11 +875,11 @@ func applyBlock( evpool sm.EvidencePool, st sm.State, blk *types.Block, - proxyApp proxy.AppConns, + proxyApp abciclient.Client, blockStore *mockBlockStore, ) sm.State { testPartSize := types.BlockPartSizeBytes - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore) bps, err := blk.MakePartSet(testPartSize) require.NoError(t, err) @@ -892,7 +892,7 @@ func applyBlock( func buildAppStateFromChain( ctx context.Context, t *testing.T, - proxyApp proxy.AppConns, + proxyApp abciclient.Client, stateStore sm.Store, mempool mempool.Mempool, evpool sm.EvidencePool, @@ -908,7 +908,7 @@ func buildAppStateFromChain( state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version validators := types.TM2PB.ValidatorUpdates(state.Validators) - _, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{ + _, err := proxyApp.InitChain(ctx, abci.RequestInitChain{ Validators: validators, }) require.NoError(t, err) @@ -960,12 +960,12 @@ func buildTMStateFromChain( defer kvstoreApp.Close() clientCreator := abciclient.NewLocalCreator(kvstoreApp) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx)) state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version validators := types.TM2PB.ValidatorUpdates(state.Validators) - _, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{ + _, err := proxyApp.InitChain(ctx, abci.RequestInitChain{ Validators: validators, }) require.NoError(t, err) @@ -1032,7 +1032,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { { app := &badApp{numBlocks: 3, allHashesAreWrong: true} clientCreator := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); proxyApp.Wait() }) @@ -1052,7 +1052,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) { { app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true} clientCreator := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) t.Cleanup(func() { cancel(); proxyApp.Wait() }) @@ -1308,7 +1308,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) { logger := log.TestingLogger() handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc) - proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics()) + proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics()) require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections") require.NoError(t, handshaker.Handshake(ctx, proxyApp), "error on abci handshake") diff --git a/internal/consensus/wal_generator.go b/internal/consensus/wal_generator.go index b10feb828..151b72e4a 100644 --- a/internal/consensus/wal_generator.go +++ b/internal/consensus/wal_generator.go @@ -68,7 +68,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr blockStore := store.NewBlockStore(blockStoreDB) - proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics()) + proxyApp := proxy.New(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics()) if err := proxyApp.Start(ctx); err != nil { t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err)) } @@ -82,7 +82,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr mempool := emptyMempool{} evpool := sm.EmptyEvidencePool{} - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore) + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore) consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState.SetEventBus(eventBus) if privValidator != nil && privValidator != (*privval.FilePV)(nil) { diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 21429721d..f55ba2684 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -9,10 +9,10 @@ import ( "sync/atomic" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/libs/clist" - "github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/libs/log" tmmath "github.com/tendermint/tendermint/libs/math" "github.com/tendermint/tendermint/types" @@ -31,7 +31,7 @@ type TxMempool struct { logger log.Logger metrics *Metrics config *config.MempoolConfig - proxyAppConn proxy.AppConnMempool + proxyAppConn abciclient.Client // txsAvailable fires once for each height when the mempool is not empty txsAvailable chan struct{} @@ -93,7 +93,7 @@ type TxMempool struct { func NewTxMempool( logger log.Logger, cfg *config.MempoolConfig, - proxyAppConn proxy.AppConnMempool, + proxyAppConn abciclient.Client, height int64, options ...TxMempoolOption, ) *TxMempool { @@ -422,7 +422,6 @@ func (txmp *TxMempool) Update( newPreFn PreCheckFunc, newPostFn PostCheckFunc, ) error { - txmp.height = blockHeight txmp.notifiedTxsAvailable = false diff --git a/internal/proxy/app_conn.go b/internal/proxy/app_conn.go index f30757f45..99226a743 100644 --- a/internal/proxy/app_conn.go +++ b/internal/proxy/app_conn.go @@ -10,233 +10,96 @@ import ( "github.com/tendermint/tendermint/abci/types" ) -//go:generate ../../scripts/mockery_generate.sh AppConnConsensus|AppConnMempool|AppConnQuery|AppConnSnapshot - -//---------------------------------------------------------------------------------------- -// Enforce which abci msgs can be sent on a connection at the type level - -type AppConnConsensus interface { - Error() error - - InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error) - - PrepareProposal(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) - ProcessProposal(context.Context, types.RequestProcessProposal) (*types.ResponseProcessProposal, error) - ExtendVote(context.Context, types.RequestExtendVote) (*types.ResponseExtendVote, error) - VerifyVoteExtension(context.Context, types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) - FinalizeBlock(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) - Commit(context.Context) (*types.ResponseCommit, error) -} - -type AppConnMempool interface { - Error() error - - CheckTx(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error) - - Flush(context.Context) error -} - -type AppConnQuery interface { - Error() error - - Echo(context.Context, string) (*types.ResponseEcho, error) - Info(context.Context, types.RequestInfo) (*types.ResponseInfo, error) - Query(context.Context, types.RequestQuery) (*types.ResponseQuery, error) -} - -type AppConnSnapshot interface { - Error() error - - ListSnapshots(context.Context, types.RequestListSnapshots) (*types.ResponseListSnapshots, error) - OfferSnapshot(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) - LoadSnapshotChunk(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) - ApplySnapshotChunk(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) -} - -//----------------------------------------------------------------------------------------- -// Implements AppConnConsensus (subset of abciclient.Client) - -type appConnConsensus struct { +type proxyClient struct { metrics *Metrics - appConn abciclient.Client + abciclient.Client } -var _ AppConnConsensus = (*appConnConsensus)(nil) - -func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus { - return &appConnConsensus{ +func newProxyClient(appConn abciclient.Client, metrics *Metrics) abciclient.Client { + return &proxyClient{ metrics: metrics, - appConn: appConn, + Client: appConn, } } -func (app *appConnConsensus) Error() error { - return app.appConn.Error() -} - -func (app *appConnConsensus) InitChain( - ctx context.Context, - req types.RequestInitChain, -) (*types.ResponseInitChain, error) { +func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))() - return app.appConn.InitChain(ctx, req) + return app.Client.InitChain(ctx, req) } -func (app *appConnConsensus) PrepareProposal( - ctx context.Context, - req types.RequestPrepareProposal, -) (*types.ResponsePrepareProposal, error) { +func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))() - return app.appConn.PrepareProposal(ctx, req) + return app.Client.PrepareProposal(ctx, req) } -func (app *appConnConsensus) ProcessProposal( - ctx context.Context, - req types.RequestProcessProposal, -) (*types.ResponseProcessProposal, error) { +func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))() - return app.appConn.ProcessProposal(ctx, req) + return app.Client.ProcessProposal(ctx, req) } -func (app *appConnConsensus) ExtendVote( - ctx context.Context, - req types.RequestExtendVote, -) (*types.ResponseExtendVote, error) { +func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))() - return app.appConn.ExtendVote(ctx, req) + return app.Client.ExtendVote(ctx, req) } -func (app *appConnConsensus) VerifyVoteExtension( - ctx context.Context, - req types.RequestVerifyVoteExtension, -) (*types.ResponseVerifyVoteExtension, error) { +func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))() - return app.appConn.VerifyVoteExtension(ctx, req) + return app.Client.VerifyVoteExtension(ctx, req) } -func (app *appConnConsensus) FinalizeBlock( - ctx context.Context, - req types.RequestFinalizeBlock, -) (*types.ResponseFinalizeBlock, error) { +func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))() - return app.appConn.FinalizeBlock(ctx, req) + return app.Client.FinalizeBlock(ctx, req) } -func (app *appConnConsensus) Commit(ctx context.Context) (*types.ResponseCommit, error) { +func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))() - return app.appConn.Commit(ctx) + return app.Client.Commit(ctx) } -//------------------------------------------------ -// Implements AppConnMempool (subset of abciclient.Client) - -type appConnMempool struct { - metrics *Metrics - appConn abciclient.Client -} - -func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempool { - return &appConnMempool{ - metrics: metrics, - appConn: appConn, - } -} - -func (app *appConnMempool) Error() error { - return app.appConn.Error() -} - -func (app *appConnMempool) Flush(ctx context.Context) error { +func (app *proxyClient) Flush(ctx context.Context) error { defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))() - return app.appConn.Flush(ctx) + return app.Client.Flush(ctx) } -func (app *appConnMempool) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { +func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))() - return app.appConn.CheckTx(ctx, req) + return app.Client.CheckTx(ctx, req) } -//------------------------------------------------ -// Implements AppConnQuery (subset of abciclient.Client) - -type appConnQuery struct { - metrics *Metrics - appConn abciclient.Client -} - -func NewAppConnQuery(appConn abciclient.Client, metrics *Metrics) AppConnQuery { - return &appConnQuery{ - metrics: metrics, - appConn: appConn, - } -} - -func (app *appConnQuery) Error() error { - return app.appConn.Error() -} - -func (app *appConnQuery) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { +func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))() - return app.appConn.Echo(ctx, msg) + return app.Client.Echo(ctx, msg) } -func (app *appConnQuery) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { +func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))() - return app.appConn.Info(ctx, req) + return app.Client.Info(ctx, req) } -func (app *appConnQuery) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { +func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))() - return app.appConn.Query(ctx, reqQuery) + return app.Client.Query(ctx, reqQuery) } -//------------------------------------------------ -// Implements AppConnSnapshot (subset of abciclient.Client) - -type appConnSnapshot struct { - metrics *Metrics - appConn abciclient.Client -} - -func NewAppConnSnapshot(appConn abciclient.Client, metrics *Metrics) AppConnSnapshot { - return &appConnSnapshot{ - metrics: metrics, - appConn: appConn, - } -} - -func (app *appConnSnapshot) Error() error { - return app.appConn.Error() -} - -func (app *appConnSnapshot) ListSnapshots( - ctx context.Context, - req types.RequestListSnapshots, -) (*types.ResponseListSnapshots, error) { +func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))() - return app.appConn.ListSnapshots(ctx, req) + return app.Client.ListSnapshots(ctx, req) } -func (app *appConnSnapshot) OfferSnapshot( - ctx context.Context, - req types.RequestOfferSnapshot, -) (*types.ResponseOfferSnapshot, error) { +func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))() - return app.appConn.OfferSnapshot(ctx, req) + return app.Client.OfferSnapshot(ctx, req) } -func (app *appConnSnapshot) LoadSnapshotChunk( - ctx context.Context, - req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { +func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))() - return app.appConn.LoadSnapshotChunk(ctx, req) + return app.Client.LoadSnapshotChunk(ctx, req) } -func (app *appConnSnapshot) ApplySnapshotChunk( - ctx context.Context, - req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { +func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) { defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))() - return app.appConn.ApplySnapshotChunk(ctx, req) + return app.Client.ApplySnapshotChunk(ctx, req) } // addTimeSample returns a function that, when called, adds an observation to m. diff --git a/internal/proxy/multi_app_conn.go b/internal/proxy/multi_app_conn.go index 61e9c9ff2..1683d79a7 100644 --- a/internal/proxy/multi_app_conn.go +++ b/internal/proxy/multi_app_conn.go @@ -10,57 +10,9 @@ import ( "github.com/tendermint/tendermint/libs/service" ) -// AppConns is the Tendermint's interface to the application that consists of -// multiple connections. -type AppConns interface { - service.Service - - // Mempool connection - Mempool() AppConnMempool - // Consensus connection - Consensus() AppConnConsensus - // Query connection - Query() AppConnQuery - // Snapshot connection - Snapshot() AppConnSnapshot -} - -// NewAppConns calls NewMultiAppConn. -func NewAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns { - return NewMultiAppConn(clientCreator, logger, metrics) -} - -// multiAppConn implements AppConns. -// -// A multiAppConn is made of a few appConns and manages their underlying abci -// clients. -// TODO: on app restart, clients must reboot together -type multiAppConn struct { - service.BaseService - logger log.Logger - - metrics *Metrics - consensusConn AppConnConsensus - mempoolConn AppConnMempool - queryConn AppConnQuery - snapshotConn AppConnSnapshot - - client stoppableClient - - clientCreator abciclient.Creator -} - -// TODO: this is a totally internal and quasi permanent shim for -// clients. eventually we can have a single client and have some kind -// of reasonable lifecycle witout needing an explicit stop method. -type stoppableClient interface { - abciclient.Client - Stop() -} - -// NewMultiAppConn makes all necessary abci connections to the application. -func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns { - multiAppConn := &multiAppConn{ +// New creates a proxy application interface. +func New(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) abciclient.Client { + multiAppConn := &proxyConn{ logger: logger, metrics: metrics, clientCreator: clientCreator, @@ -69,16 +21,38 @@ func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metric return multiAppConn } -func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn } -func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn } -func (app *multiAppConn) Query() AppConnQuery { return app.queryConn } -func (app *multiAppConn) Snapshot() AppConnSnapshot { return app.snapshotConn } +// proxyConn implements provides the application connection. +type proxyConn struct { + service.BaseService + abciclient.Client -func (app *multiAppConn) OnStart(ctx context.Context) error { + logger log.Logger + + metrics *Metrics + + clientCreator abciclient.Creator +} + +func (app *proxyConn) OnStop() { tryCallStop(app.Client) } +func (app *proxyConn) IsRunning() bool { return app.Client.IsRunning() } +func (app *proxyConn) Start(ctx context.Context) error { return app.BaseService.Start(ctx) } +func (app *proxyConn) Wait() { app.BaseService.Wait() } + +func tryCallStop(client abciclient.Client) { + if client == nil { + return + } + + if sc, ok := client.(interface{ Stop() }); ok { + sc.Stop() + } +} + +func (app *proxyConn) OnStart(ctx context.Context) error { var err error defer func() { if err != nil { - app.client.Stop() + tryCallStop(app.Client) } }() @@ -88,24 +62,18 @@ func (app *multiAppConn) OnStart(ctx context.Context) error { return err } - app.queryConn = NewAppConnQuery(client, app.metrics) - app.snapshotConn = NewAppConnSnapshot(client, app.metrics) - app.mempoolConn = NewAppConnMempool(client, app.metrics) - app.consensusConn = NewAppConnConsensus(client, app.metrics) - - app.client = client.(stoppableClient) - + app.Client = newProxyClient(client, app.metrics) // Kill Tendermint if the ABCI application crashes. go func() { - if !client.IsRunning() { + if !app.Client.IsRunning() { return } - app.client.Wait() + app.Client.Wait() if ctx.Err() != nil { return } - if err := app.client.Error(); err != nil { + if err := app.Client.Error(); err != nil { app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint", "err", err) if killErr := kill(); killErr != nil { @@ -119,8 +87,6 @@ func (app *multiAppConn) OnStart(ctx context.Context) error { return client.Start(ctx) } -func (app *multiAppConn) OnStop() { app.client.Stop() } - func kill() error { p, err := os.FindProcess(os.Getpid()) if err != nil { diff --git a/internal/proxy/multi_app_conn_test.go b/internal/proxy/multi_app_conn_test.go index efbb3f56f..07f9d0f87 100644 --- a/internal/proxy/multi_app_conn_test.go +++ b/internal/proxy/multi_app_conn_test.go @@ -42,7 +42,7 @@ func TestAppConns_Start_Stop(t *testing.T) { return cl, nil } - appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics()) + appConns := New(creator, log.TestingLogger(), NopMetrics()) err := appConns.Start(ctx) require.NoError(t, err) @@ -84,7 +84,7 @@ func TestAppConns_Failure(t *testing.T) { return cl, nil } - appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics()) + appConns := New(creator, log.TestingLogger(), NopMetrics()) err := appConns.Start(ctx) require.NoError(t, err) diff --git a/internal/rpc/core/abci.go b/internal/rpc/core/abci.go index cbd27a09d..8f5e61d55 100644 --- a/internal/rpc/core/abci.go +++ b/internal/rpc/core/abci.go @@ -18,7 +18,7 @@ func (env *Environment) ABCIQuery( height int64, prove bool, ) (*coretypes.ResultABCIQuery, error) { - resQuery, err := env.ProxyAppQuery.Query(ctx, abci.RequestQuery{ + resQuery, err := env.ProxyApp.Query(ctx, abci.RequestQuery{ Path: path, Data: data, Height: height, @@ -34,7 +34,7 @@ func (env *Environment) ABCIQuery( // ABCIInfo gets some info about the application. // More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info func (env *Environment) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, error) { - resInfo, err := env.ProxyAppQuery.Info(ctx, proxy.RequestInfo) + resInfo, err := env.ProxyApp.Info(ctx, proxy.RequestInfo) if err != nil { return nil, err } diff --git a/internal/rpc/core/env.go b/internal/rpc/core/env.go index 5a718b232..348ed0867 100644 --- a/internal/rpc/core/env.go +++ b/internal/rpc/core/env.go @@ -11,6 +11,7 @@ import ( "github.com/rs/cors" + abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/internal/blocksync" @@ -19,7 +20,6 @@ import ( "github.com/tendermint/tendermint/internal/eventlog" "github.com/tendermint/tendermint/internal/mempool" "github.com/tendermint/tendermint/internal/p2p" - "github.com/tendermint/tendermint/internal/proxy" tmpubsub "github.com/tendermint/tendermint/internal/pubsub" "github.com/tendermint/tendermint/internal/pubsub/query" sm "github.com/tendermint/tendermint/internal/state" @@ -73,8 +73,7 @@ type peerManager interface { // to be setup once during startup. type Environment struct { // external, thread safe interfaces - ProxyAppQuery proxy.AppConnQuery - ProxyAppMempool proxy.AppConnMempool + ProxyApp abciclient.Client // interfaces defined in types and above StateStore sm.Store diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 61d36e93a..e679bfd4c 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -158,7 +158,7 @@ func (env *Environment) NumUnconfirmedTxs(ctx context.Context) (*coretypes.Resul // be added to the mempool either. // More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx func (env *Environment) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultCheckTx, error) { - res, err := env.ProxyAppMempool.CheckTx(ctx, abci.RequestCheckTx{Tx: tx}) + res, err := env.ProxyApp.CheckTx(ctx, abci.RequestCheckTx{Tx: tx}) if err != nil { return nil, err } diff --git a/internal/state/execution.go b/internal/state/execution.go index cdd6e009b..05444c007 100644 --- a/internal/state/execution.go +++ b/internal/state/execution.go @@ -6,11 +6,11 @@ import ( "fmt" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/encoding" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/mempool" - "github.com/tendermint/tendermint/internal/proxy" "github.com/tendermint/tendermint/libs/log" tmstate "github.com/tendermint/tendermint/proto/tendermint/state" "github.com/tendermint/tendermint/types" @@ -30,7 +30,7 @@ type BlockExecutor struct { blockStore BlockStore // execute the app against this - proxyApp proxy.AppConnConsensus + proxyApp abciclient.Client // events eventBus types.BlockEventPublisher @@ -60,7 +60,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption { func NewBlockExecutor( stateStore Store, logger log.Logger, - proxyApp proxy.AppConnConsensus, + proxyApp abciclient.Client, pool mempool.Mempool, evpool EvidencePool, blockStore BlockStore, @@ -375,7 +375,7 @@ func (blockExec *BlockExecutor) Commit( func execBlockOnProxyApp( ctx context.Context, logger log.Logger, - proxyAppConn proxy.AppConnConsensus, + proxyAppConn abciclient.Client, block *types.Block, store Store, initialHeight int64, @@ -617,7 +617,7 @@ func fireEvents( func ExecCommitBlock( ctx context.Context, be *BlockExecutor, - appConnConsensus proxy.AppConnConsensus, + appConnConsensus abciclient.Client, block *types.Block, logger log.Logger, store Store, diff --git a/internal/state/execution_test.go b/internal/state/execution_test.go index 636e654e7..be2392a96 100644 --- a/internal/state/execution_test.go +++ b/internal/state/execution_test.go @@ -41,7 +41,7 @@ func TestApplyBlock(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) logger := log.TestingLogger() - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -52,7 +52,7 @@ func TestApplyBlock(t *testing.T) { state, stateDB, _ := makeState(t, 1, 1) stateStore := sm.NewStore(stateDB) blockStore := store.NewBlockStore(dbm.NewMemDB()) - blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), + blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore) block, err := sf.MakeBlock(state, 1, new(types.Commit)) @@ -75,7 +75,7 @@ func TestBeginBlockValidators(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics()) + proxyApp := proxy.New(cc, log.TestingLogger(), proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) @@ -121,7 +121,7 @@ func TestBeginBlockValidators(t *testing.T) { block, err := sf.MakeBlock(state, 2, lastCommit) require.NoError(t, err) - _, err = sm.ExecCommitBlock(ctx, nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state) + _, err = sm.ExecCommitBlock(ctx, nil, proxyApp, block, log.TestingLogger(), stateStore, 1, state) require.NoError(t, err, tc.desc) // -> app receives a list of validators with a bool indicating if they signed @@ -146,7 +146,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) - proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics()) + proxyApp := proxy.New(cc, log.TestingLogger(), proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) @@ -222,7 +222,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) { blockStore := store.NewBlockStore(dbm.NewMemDB()) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), + blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mmock.Mempool{}, evpool, blockStore) block, err := sf.MakeBlock(state, 1, new(types.Commit)) @@ -250,7 +250,7 @@ func TestProcessProposal(t *testing.T) { app := abcimocks.NewBaseMock() cc := abciclient.NewLocalCreator(app) logger := log.TestingLogger() - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) @@ -261,7 +261,7 @@ func TestProcessProposal(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, @@ -453,7 +453,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) logger := log.TestingLogger() - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) @@ -464,7 +464,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, @@ -528,7 +528,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { app := &testApp{} cc := abciclient.NewLocalCreator(app) logger := log.TestingLogger() - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err := proxyApp.Start(ctx) require.NoError(t, err) @@ -538,7 +538,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), - proxyApp.Consensus(), + proxyApp, mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, diff --git a/internal/state/helpers_test.go b/internal/state/helpers_test.go index a5720f183..af67a0232 100644 --- a/internal/state/helpers_test.go +++ b/internal/state/helpers_test.go @@ -33,10 +33,10 @@ type paramsChangeTestCase struct { params types.ConsensusParams } -func newTestApp() proxy.AppConns { +func newTestApp() abciclient.Client { app := &testApp{} cc := abciclient.NewLocalCreator(app) - return proxy.NewAppConns(cc, log.NewNopLogger(), proxy.NopMetrics()) + return proxy.New(cc, log.NewNopLogger(), proxy.NopMetrics()) } func makeAndCommitGoodBlock( diff --git a/internal/state/validation_test.go b/internal/state/validation_test.go index 4d78fde74..b90f9b134 100644 --- a/internal/state/validation_test.go +++ b/internal/state/validation_test.go @@ -40,7 +40,7 @@ func TestValidateBlockHeader(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), - proxyApp.Consensus(), + proxyApp, memmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, @@ -128,7 +128,7 @@ func TestValidateBlockCommit(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), - proxyApp.Consensus(), + proxyApp, memmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore, @@ -263,7 +263,7 @@ func TestValidateBlockEvidence(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, log.TestingLogger(), - proxyApp.Consensus(), + proxyApp, memmock.Mempool{}, evpool, blockStore, diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 1f65a8c0c..51f626027 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -11,11 +11,11 @@ import ( "sync" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/eventbus" "github.com/tendermint/tendermint/internal/p2p" - "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/store" "github.com/tendermint/tendermint/libs/log" @@ -135,8 +135,7 @@ type Reactor struct { stateStore sm.Store blockStore *store.BlockStore - conn proxy.AppConnSnapshot - connQuery proxy.AppConnQuery + conn abciclient.Client tempDir string snapshotCh *p2p.Channel chunkCh *p2p.Channel @@ -173,8 +172,7 @@ func NewReactor( initialHeight int64, cfg config.StateSyncConfig, logger log.Logger, - conn proxy.AppConnSnapshot, - connQuery proxy.AppConnQuery, + conn abciclient.Client, channelCreator p2p.ChannelCreator, peerUpdates *p2p.PeerUpdates, stateStore sm.Store, @@ -209,7 +207,6 @@ func NewReactor( initialHeight: initialHeight, cfg: cfg, conn: conn, - connQuery: connQuery, snapshotCh: snapshotCh, chunkCh: chunkCh, blockCh: blockCh, @@ -287,7 +284,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { r.cfg, r.logger, r.conn, - r.connQuery, r.stateProvider, r.snapshotCh, r.chunkCh, diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index f259cfa58..ac49c337d 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" + clientmocks "github.com/tendermint/tendermint/abci/client/mocks" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/p2p" @@ -71,21 +72,14 @@ type reactorTestSuite struct { func setup( ctx context.Context, t *testing.T, - conn *proxymocks.AppConnSnapshot, - connQuery *proxymocks.AppConnQuery, + conn *clientmocks.Client, stateProvider *mocks.StateProvider, chBuf uint, ) *reactorTestSuite { t.Helper() if conn == nil { - conn = &proxymocks.AppConnSnapshot{} - } - if connQuery == nil { - connQuery = &proxymocks.AppConnQuery{} - } - if stateProvider == nil { - stateProvider = &mocks.StateProvider{} + conn = &clientmocks.Client{} } rts := &reactorTestSuite{ @@ -102,7 +96,6 @@ func setup( paramsOutCh: make(chan p2p.Envelope, chBuf), paramsPeerErrCh: make(chan p2p.PeerError, chBuf), conn: conn, - connQuery: connQuery, stateProvider: stateProvider, } @@ -171,7 +164,6 @@ func setup( *cfg, logger.With("component", "reactor"), conn, - connQuery, chCreator, rts.peerUpdates, rts.stateStore, diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index e2e41586c..78eb8d53a 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -8,6 +8,7 @@ import ( "sync" "time" + abciclient "github.com/tendermint/tendermint/abci/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/internal/p2p" @@ -54,8 +55,7 @@ var ( type syncer struct { logger log.Logger stateProvider StateProvider - conn proxy.AppConnSnapshot - connQuery proxy.AppConnQuery + conn abciclient.Client snapshots *snapshotPool snapshotCh *p2p.Channel chunkCh *p2p.Channel @@ -76,8 +76,7 @@ type syncer struct { func newSyncer( cfg config.StateSyncConfig, logger log.Logger, - conn proxy.AppConnSnapshot, - connQuery proxy.AppConnQuery, + conn abciclient.Client, stateProvider StateProvider, snapshotCh *p2p.Channel, chunkCh *p2p.Channel, @@ -88,7 +87,6 @@ func newSyncer( logger: logger, stateProvider: stateProvider, conn: conn, - connQuery: connQuery, snapshots: newSnapshotPool(), snapshotCh: snapshotCh, chunkCh: chunkCh, @@ -547,7 +545,7 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uin // verifyApp verifies the sync, checking the app hash, last block height and app version func (s *syncer) verifyApp(ctx context.Context, snapshot *snapshot, appVersion uint64) error { - resp, err := s.connQuery.Info(ctx, proxy.RequestInfo) + resp, err := s.conn.Info(ctx, proxy.RequestInfo) if err != nil { return fmt.Errorf("failed to query ABCI app for appHash: %w", err) } diff --git a/node/node.go b/node/node.go index 7d7b75170..0c011ac7c 100644 --- a/node/node.go +++ b/node/node.go @@ -159,7 +159,7 @@ func makeNode( nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). - proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy) + proxyApp := proxy.New(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy) if err := proxyApp.Start(ctx); err != nil { return nil, fmt.Errorf("error starting proxy app connections: %w", err) } @@ -289,7 +289,7 @@ func makeNode( blockExec := sm.NewBlockExecutor( stateStore, logger.With("module", "state"), - proxyApp.Consensus(), + proxyApp, mp, evPool, blockStore, @@ -343,8 +343,8 @@ func makeNode( genDoc.InitialHeight, *cfg.StateSync, logger.With("module", "statesync"), - proxyApp.Snapshot(), - proxyApp.Query(), + proxyApp, + proxyApp, router.OpenChannel, peerManager.Subscribe(ctx), stateStore, @@ -394,11 +394,7 @@ func makeNode( shutdownOps: makeCloser(closers), rpcEnv: &rpccore.Environment{ - ProxyAppQuery: proxyApp.Query(), - ProxyAppMempool: proxyApp.Mempool(), - - StateStore: stateStore, - BlockStore: blockStore, + ProxyApp: proxyApp, EvidencePool: evPool, ConsensusState: csState, @@ -769,14 +765,14 @@ func loadStateFromDBOrGenesisDocProvider( return state, nil } -func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions { +func getRouterConfig(conf *config.Config, proxyApp abciclient.Client) p2p.RouterOptions { opts := p2p.RouterOptions{ QueueType: conf.P2P.QueueType, } if conf.FilterPeers && proxyApp != nil { opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error { - res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{ + res, err := proxyApp.Query(ctx, abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/id/%s", id), }) if err != nil { @@ -790,7 +786,7 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt } opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error { - res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{ + res, err := proxyApp.Query(ctx, abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))), }) if err != nil { diff --git a/node/node_test.go b/node/node_test.go index 41cb1b6a9..34dd67373 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -274,7 +274,7 @@ func TestCreateProposalBlock(t *testing.T) { logger := log.NewNopLogger() cc := abciclient.NewLocalCreator(kvstore.NewApplication()) - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err = proxyApp.Start(ctx) require.NoError(t, err) @@ -291,7 +291,7 @@ func TestCreateProposalBlock(t *testing.T) { mp := mempool.NewTxMempool( logger.With("module", "mempool"), cfg.Mempool, - proxyApp.Mempool(), + proxyApp, state.LastBlockHeight, ) @@ -328,7 +328,7 @@ func TestCreateProposalBlock(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mp, evidencePool, blockStore, @@ -373,7 +373,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { logger := log.NewNopLogger() cc := abciclient.NewLocalCreator(kvstore.NewApplication()) - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err = proxyApp.Start(ctx) require.NoError(t, err) @@ -391,7 +391,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { mp := mempool.NewTxMempool( logger.With("module", "mempool"), cfg.Mempool, - proxyApp.Mempool(), + proxyApp, state.LastBlockHeight, ) @@ -404,7 +404,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mp, sm.EmptyEvidencePool{}, blockStore, @@ -441,7 +441,7 @@ func TestMaxProposalBlockSize(t *testing.T) { logger := log.NewNopLogger() cc := abciclient.NewLocalCreator(kvstore.NewApplication()) - proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics()) + proxyApp := proxy.New(cc, logger, proxy.NopMetrics()) err = proxyApp.Start(ctx) require.NoError(t, err) @@ -456,7 +456,7 @@ func TestMaxProposalBlockSize(t *testing.T) { mp := mempool.NewTxMempool( logger.With("module", "mempool"), cfg.Mempool, - proxyApp.Mempool(), + proxyApp, state.LastBlockHeight, ) @@ -476,7 +476,7 @@ func TestMaxProposalBlockSize(t *testing.T) { blockExec := sm.NewBlockExecutor( stateStore, logger, - proxyApp.Consensus(), + proxyApp, mp, sm.EmptyEvidencePool{}, blockStore, diff --git a/node/setup.go b/node/setup.go index e880cd5c4..31711bc9c 100644 --- a/node/setup.go +++ b/node/setup.go @@ -10,6 +10,7 @@ import ( dbm "github.com/tendermint/tm-db" + abciclient "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/internal/blocksync" @@ -20,7 +21,6 @@ import ( "github.com/tendermint/tendermint/internal/p2p" "github.com/tendermint/tendermint/internal/p2p/conn" "github.com/tendermint/tendermint/internal/p2p/pex" - "github.com/tendermint/tendermint/internal/proxy" sm "github.com/tendermint/tendermint/internal/state" "github.com/tendermint/tendermint/internal/state/indexer" "github.com/tendermint/tendermint/internal/state/indexer/sink" @@ -171,7 +171,7 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { func createMempoolReactor( ctx context.Context, cfg *config.Config, - proxyApp proxy.AppConns, + proxyApp abciclient.Client, state sm.State, memplMetrics *mempool.Metrics, peerManager *p2p.PeerManager, @@ -183,7 +183,7 @@ func createMempoolReactor( mp := mempool.NewTxMempool( logger, cfg.Mempool, - proxyApp.Mempool(), + proxyApp, state.LastBlockHeight, mempool.WithMetrics(memplMetrics), mempool.WithPreCheck(sm.TxPreCheck(state)), @@ -385,7 +385,7 @@ func createRouter( nodeKey types.NodeKey, peerManager *p2p.PeerManager, cfg *config.Config, - proxyApp proxy.AppConns, + proxyApp abciclient.Client, ) (*p2p.Router, error) { p2pLogger := logger.With("module", "p2p")