mirror of
https://github.com/tendermint/tendermint.git
synced 2026-05-23 07:31:29 +00:00
proxy: collapse triforcated client
This commit is contained in:
@@ -33,7 +33,7 @@ type reactorTestSuite struct {
|
||||
nodes []types.NodeID
|
||||
|
||||
reactors map[types.NodeID]*Reactor
|
||||
app map[types.NodeID]proxy.AppConns
|
||||
app map[types.NodeID]abciclient.Client
|
||||
|
||||
blockSyncChannels map[types.NodeID]*p2p.Channel
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
@@ -64,7 +64,7 @@ func setup(
|
||||
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
|
||||
nodes: make([]types.NodeID, 0, numNodes),
|
||||
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
||||
app: make(map[types.NodeID]proxy.AppConns, numNodes),
|
||||
app: make(map[types.NodeID]abciclient.Client, numNodes),
|
||||
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
||||
@@ -109,7 +109,7 @@ func (rts *reactorTestSuite) addNode(
|
||||
logger := log.TestingLogger()
|
||||
|
||||
rts.nodes = append(rts.nodes, nodeID)
|
||||
rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics())
|
||||
rts.app[nodeID] = proxy.New(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics())
|
||||
require.NoError(t, rts.app[nodeID].Start(ctx))
|
||||
|
||||
blockDB := dbm.NewMemDB()
|
||||
@@ -124,7 +124,7 @@ func (rts *reactorTestSuite) addNode(
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
rts.app[nodeID].Consensus(),
|
||||
rts.app[nodeID],
|
||||
mock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto/merkle"
|
||||
"github.com/tendermint/tendermint/internal/eventbus"
|
||||
@@ -237,10 +238,10 @@ func (h *Handshaker) NBlocks() int {
|
||||
}
|
||||
|
||||
// TODO: retry the handshake/replay if it fails ?
|
||||
func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error {
|
||||
func (h *Handshaker) Handshake(ctx context.Context, proxyApp abciclient.Client) error {
|
||||
|
||||
// Handshake is done via ABCI Info on the query conn.
|
||||
res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo)
|
||||
res, err := proxyApp.Info(ctx, proxy.RequestInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error calling Info: %w", err)
|
||||
}
|
||||
@@ -285,7 +286,7 @@ func (h *Handshaker) ReplayBlocks(
|
||||
state sm.State,
|
||||
appHash []byte,
|
||||
appBlockHeight int64,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
) ([]byte, error) {
|
||||
storeBlockBase := h.store.Base()
|
||||
storeBlockHeight := h.store.Height()
|
||||
@@ -316,7 +317,7 @@ func (h *Handshaker) ReplayBlocks(
|
||||
Validators: nextVals,
|
||||
AppStateBytes: h.genDoc.AppState,
|
||||
}
|
||||
res, err := proxyApp.Consensus().InitChain(ctx, req)
|
||||
res, err := proxyApp.InitChain(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -413,7 +414,7 @@ func (h *Handshaker) ReplayBlocks(
|
||||
// NOTE: We could instead use the cs.WAL on cs.Start,
|
||||
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
|
||||
h.logger.Info("Replay last block using real app")
|
||||
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
|
||||
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp)
|
||||
return state.AppHash, err
|
||||
|
||||
case appBlockHeight == storeBlockHeight:
|
||||
@@ -445,7 +446,7 @@ func (h *Handshaker) ReplayBlocks(
|
||||
func (h *Handshaker) replayBlocks(
|
||||
ctx context.Context,
|
||||
state sm.State,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
appBlockHeight,
|
||||
storeBlockHeight int64,
|
||||
mutateState bool) ([]byte, error) {
|
||||
@@ -481,16 +482,16 @@ func (h *Handshaker) replayBlocks(
|
||||
// We emit events for the index services at the final block due to the sync issue when
|
||||
// the node shutdown during the block committing status.
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
|
||||
h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
|
||||
blockExec.SetEventBus(h.eventBus)
|
||||
appHash, err = sm.ExecCommitBlock(ctx,
|
||||
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
blockExec, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
appHash, err = sm.ExecCommitBlock(ctx,
|
||||
nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
nil, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -501,7 +502,7 @@ func (h *Handshaker) replayBlocks(
|
||||
|
||||
if mutateState {
|
||||
// sync the final block
|
||||
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
|
||||
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -517,7 +518,7 @@ func (h *Handshaker) replayBlock(
|
||||
ctx context.Context,
|
||||
state sm.State,
|
||||
height int64,
|
||||
proxyApp proxy.AppConnConsensus,
|
||||
proxyApp abciclient.Client,
|
||||
) (sm.State, error) {
|
||||
block := h.store.LoadBlock(height)
|
||||
meta := h.store.LoadBlockMeta(height)
|
||||
|
||||
@@ -325,7 +325,7 @@ func newConsensusStateForReplay(
|
||||
|
||||
// Create proxyAppConn connection (consensus, mempool, query)
|
||||
clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
|
||||
err = proxyApp.Start(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("starting proxy app conns: %w", err)
|
||||
@@ -343,7 +343,7 @@ func newConsensusStateForReplay(
|
||||
}
|
||||
|
||||
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore)
|
||||
|
||||
consensusState := NewState(ctx, logger, csConfig, state.Copy(), blockExec,
|
||||
blockStore, mempool, evpool)
|
||||
|
||||
@@ -808,7 +808,7 @@ func testHandshakeReplay(
|
||||
if nBlocks > 0 {
|
||||
// run nBlocks against a new client to build up the app state.
|
||||
// use a throwaway tendermint state
|
||||
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
|
||||
stateDB1 := dbm.NewMemDB()
|
||||
stateStore := sm.NewStore(stateDB1)
|
||||
err := stateStore.Save(genesisState)
|
||||
@@ -829,7 +829,7 @@ func testHandshakeReplay(
|
||||
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
|
||||
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
|
||||
|
||||
t.Cleanup(func() { cancel(); proxyApp.Wait() })
|
||||
@@ -842,7 +842,7 @@ func testHandshakeReplay(
|
||||
require.NoError(t, err, "Error on abci handshake")
|
||||
|
||||
// get the latest app hash from the app
|
||||
res, err := proxyApp.Query().Info(ctx, abci.RequestInfo{Version: ""})
|
||||
res, err := proxyApp.Info(ctx, abci.RequestInfo{Version: ""})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -875,11 +875,11 @@ func applyBlock(
|
||||
evpool sm.EvidencePool,
|
||||
st sm.State,
|
||||
blk *types.Block,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
blockStore *mockBlockStore,
|
||||
) sm.State {
|
||||
testPartSize := types.BlockPartSizeBytes
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
|
||||
|
||||
bps, err := blk.MakePartSet(testPartSize)
|
||||
require.NoError(t, err)
|
||||
@@ -892,7 +892,7 @@ func applyBlock(
|
||||
func buildAppStateFromChain(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
stateStore sm.Store,
|
||||
mempool mempool.Mempool,
|
||||
evpool sm.EvidencePool,
|
||||
@@ -908,7 +908,7 @@ func buildAppStateFromChain(
|
||||
|
||||
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
|
||||
validators := types.TM2PB.ValidatorUpdates(state.Validators)
|
||||
_, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{
|
||||
_, err := proxyApp.InitChain(ctx, abci.RequestInitChain{
|
||||
Validators: validators,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
@@ -960,12 +960,12 @@ func buildTMStateFromChain(
|
||||
defer kvstoreApp.Close()
|
||||
clientCreator := abciclient.NewLocalCreator(kvstoreApp)
|
||||
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx))
|
||||
|
||||
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
|
||||
validators := types.TM2PB.ValidatorUpdates(state.Validators)
|
||||
_, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{
|
||||
_, err := proxyApp.InitChain(ctx, abci.RequestInitChain{
|
||||
Validators: validators,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
@@ -1032,7 +1032,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
|
||||
{
|
||||
app := &badApp{numBlocks: 3, allHashesAreWrong: true}
|
||||
clientCreator := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { cancel(); proxyApp.Wait() })
|
||||
@@ -1052,7 +1052,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
|
||||
{
|
||||
app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
|
||||
clientCreator := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { cancel(); proxyApp.Wait() })
|
||||
@@ -1308,7 +1308,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
|
||||
|
||||
logger := log.TestingLogger()
|
||||
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(clientCreator, logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
|
||||
|
||||
require.NoError(t, handshaker.Handshake(ctx, proxyApp), "error on abci handshake")
|
||||
|
||||
@@ -68,7 +68,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
|
||||
|
||||
blockStore := store.NewBlockStore(blockStoreDB)
|
||||
|
||||
proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics())
|
||||
proxyApp := proxy.New(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics())
|
||||
if err := proxyApp.Start(ctx); err != nil {
|
||||
t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err))
|
||||
}
|
||||
@@ -82,7 +82,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
|
||||
|
||||
mempool := emptyMempool{}
|
||||
evpool := sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
|
||||
consensusState := NewState(ctx, logger, cfg.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
|
||||
consensusState.SetEventBus(eventBus)
|
||||
if privValidator != nil && privValidator != (*privval.FilePV)(nil) {
|
||||
|
||||
@@ -9,10 +9,10 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/libs/clist"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmmath "github.com/tendermint/tendermint/libs/math"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -31,7 +31,7 @@ type TxMempool struct {
|
||||
logger log.Logger
|
||||
metrics *Metrics
|
||||
config *config.MempoolConfig
|
||||
proxyAppConn proxy.AppConnMempool
|
||||
proxyAppConn abciclient.Client
|
||||
|
||||
// txsAvailable fires once for each height when the mempool is not empty
|
||||
txsAvailable chan struct{}
|
||||
@@ -93,7 +93,7 @@ type TxMempool struct {
|
||||
func NewTxMempool(
|
||||
logger log.Logger,
|
||||
cfg *config.MempoolConfig,
|
||||
proxyAppConn proxy.AppConnMempool,
|
||||
proxyAppConn abciclient.Client,
|
||||
height int64,
|
||||
options ...TxMempoolOption,
|
||||
) *TxMempool {
|
||||
@@ -422,7 +422,6 @@ func (txmp *TxMempool) Update(
|
||||
newPreFn PreCheckFunc,
|
||||
newPostFn PostCheckFunc,
|
||||
) error {
|
||||
|
||||
txmp.height = blockHeight
|
||||
txmp.notifiedTxsAvailable = false
|
||||
|
||||
|
||||
@@ -10,233 +10,96 @@ import (
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
//go:generate ../../scripts/mockery_generate.sh AppConnConsensus|AppConnMempool|AppConnQuery|AppConnSnapshot
|
||||
|
||||
//----------------------------------------------------------------------------------------
|
||||
// Enforce which abci msgs can be sent on a connection at the type level
|
||||
|
||||
type AppConnConsensus interface {
|
||||
Error() error
|
||||
|
||||
InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
|
||||
|
||||
PrepareProposal(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
|
||||
ProcessProposal(context.Context, types.RequestProcessProposal) (*types.ResponseProcessProposal, error)
|
||||
ExtendVote(context.Context, types.RequestExtendVote) (*types.ResponseExtendVote, error)
|
||||
VerifyVoteExtension(context.Context, types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error)
|
||||
FinalizeBlock(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error)
|
||||
Commit(context.Context) (*types.ResponseCommit, error)
|
||||
}
|
||||
|
||||
type AppConnMempool interface {
|
||||
Error() error
|
||||
|
||||
CheckTx(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error)
|
||||
|
||||
Flush(context.Context) error
|
||||
}
|
||||
|
||||
type AppConnQuery interface {
|
||||
Error() error
|
||||
|
||||
Echo(context.Context, string) (*types.ResponseEcho, error)
|
||||
Info(context.Context, types.RequestInfo) (*types.ResponseInfo, error)
|
||||
Query(context.Context, types.RequestQuery) (*types.ResponseQuery, error)
|
||||
}
|
||||
|
||||
type AppConnSnapshot interface {
|
||||
Error() error
|
||||
|
||||
ListSnapshots(context.Context, types.RequestListSnapshots) (*types.ResponseListSnapshots, error)
|
||||
OfferSnapshot(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
|
||||
LoadSnapshotChunk(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
|
||||
ApplySnapshotChunk(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------------------
|
||||
// Implements AppConnConsensus (subset of abciclient.Client)
|
||||
|
||||
type appConnConsensus struct {
|
||||
type proxyClient struct {
|
||||
metrics *Metrics
|
||||
appConn abciclient.Client
|
||||
abciclient.Client
|
||||
}
|
||||
|
||||
var _ AppConnConsensus = (*appConnConsensus)(nil)
|
||||
|
||||
func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus {
|
||||
return &appConnConsensus{
|
||||
func newProxyClient(appConn abciclient.Client, metrics *Metrics) abciclient.Client {
|
||||
return &proxyClient{
|
||||
metrics: metrics,
|
||||
appConn: appConn,
|
||||
Client: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) InitChain(
|
||||
ctx context.Context,
|
||||
req types.RequestInitChain,
|
||||
) (*types.ResponseInitChain, error) {
|
||||
func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))()
|
||||
return app.appConn.InitChain(ctx, req)
|
||||
return app.Client.InitChain(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) PrepareProposal(
|
||||
ctx context.Context,
|
||||
req types.RequestPrepareProposal,
|
||||
) (*types.ResponsePrepareProposal, error) {
|
||||
func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
|
||||
return app.appConn.PrepareProposal(ctx, req)
|
||||
return app.Client.PrepareProposal(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) ProcessProposal(
|
||||
ctx context.Context,
|
||||
req types.RequestProcessProposal,
|
||||
) (*types.ResponseProcessProposal, error) {
|
||||
func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))()
|
||||
return app.appConn.ProcessProposal(ctx, req)
|
||||
return app.Client.ProcessProposal(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) ExtendVote(
|
||||
ctx context.Context,
|
||||
req types.RequestExtendVote,
|
||||
) (*types.ResponseExtendVote, error) {
|
||||
func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))()
|
||||
return app.appConn.ExtendVote(ctx, req)
|
||||
return app.Client.ExtendVote(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) VerifyVoteExtension(
|
||||
ctx context.Context,
|
||||
req types.RequestVerifyVoteExtension,
|
||||
) (*types.ResponseVerifyVoteExtension, error) {
|
||||
func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))()
|
||||
return app.appConn.VerifyVoteExtension(ctx, req)
|
||||
return app.Client.VerifyVoteExtension(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) FinalizeBlock(
|
||||
ctx context.Context,
|
||||
req types.RequestFinalizeBlock,
|
||||
) (*types.ResponseFinalizeBlock, error) {
|
||||
func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))()
|
||||
return app.appConn.FinalizeBlock(ctx, req)
|
||||
return app.Client.FinalizeBlock(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) Commit(ctx context.Context) (*types.ResponseCommit, error) {
|
||||
func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))()
|
||||
return app.appConn.Commit(ctx)
|
||||
return app.Client.Commit(ctx)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnMempool (subset of abciclient.Client)
|
||||
|
||||
type appConnMempool struct {
|
||||
metrics *Metrics
|
||||
appConn abciclient.Client
|
||||
}
|
||||
|
||||
func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempool {
|
||||
return &appConnMempool{
|
||||
metrics: metrics,
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnMempool) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnMempool) Flush(ctx context.Context) error {
|
||||
func (app *proxyClient) Flush(ctx context.Context) error {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))()
|
||||
return app.appConn.Flush(ctx)
|
||||
return app.Client.Flush(ctx)
|
||||
}
|
||||
|
||||
func (app *appConnMempool) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
|
||||
func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
|
||||
return app.appConn.CheckTx(ctx, req)
|
||||
return app.Client.CheckTx(ctx, req)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnQuery (subset of abciclient.Client)
|
||||
|
||||
type appConnQuery struct {
|
||||
metrics *Metrics
|
||||
appConn abciclient.Client
|
||||
}
|
||||
|
||||
func NewAppConnQuery(appConn abciclient.Client, metrics *Metrics) AppConnQuery {
|
||||
return &appConnQuery{
|
||||
metrics: metrics,
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnQuery) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnQuery) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
|
||||
func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))()
|
||||
return app.appConn.Echo(ctx, msg)
|
||||
return app.Client.Echo(ctx, msg)
|
||||
}
|
||||
|
||||
func (app *appConnQuery) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
|
||||
func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))()
|
||||
return app.appConn.Info(ctx, req)
|
||||
return app.Client.Info(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnQuery) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
|
||||
func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))()
|
||||
return app.appConn.Query(ctx, reqQuery)
|
||||
return app.Client.Query(ctx, reqQuery)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnSnapshot (subset of abciclient.Client)
|
||||
|
||||
type appConnSnapshot struct {
|
||||
metrics *Metrics
|
||||
appConn abciclient.Client
|
||||
}
|
||||
|
||||
func NewAppConnSnapshot(appConn abciclient.Client, metrics *Metrics) AppConnSnapshot {
|
||||
return &appConnSnapshot{
|
||||
metrics: metrics,
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) ListSnapshots(
|
||||
ctx context.Context,
|
||||
req types.RequestListSnapshots,
|
||||
) (*types.ResponseListSnapshots, error) {
|
||||
func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))()
|
||||
return app.appConn.ListSnapshots(ctx, req)
|
||||
return app.Client.ListSnapshots(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) OfferSnapshot(
|
||||
ctx context.Context,
|
||||
req types.RequestOfferSnapshot,
|
||||
) (*types.ResponseOfferSnapshot, error) {
|
||||
func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))()
|
||||
return app.appConn.OfferSnapshot(ctx, req)
|
||||
return app.Client.OfferSnapshot(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) LoadSnapshotChunk(
|
||||
ctx context.Context,
|
||||
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
|
||||
func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))()
|
||||
return app.appConn.LoadSnapshotChunk(ctx, req)
|
||||
return app.Client.LoadSnapshotChunk(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) ApplySnapshotChunk(
|
||||
ctx context.Context,
|
||||
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
|
||||
func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))()
|
||||
return app.appConn.ApplySnapshotChunk(ctx, req)
|
||||
return app.Client.ApplySnapshotChunk(ctx, req)
|
||||
}
|
||||
|
||||
// addTimeSample returns a function that, when called, adds an observation to m.
|
||||
|
||||
@@ -10,57 +10,9 @@ import (
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
)
|
||||
|
||||
// AppConns is the Tendermint's interface to the application that consists of
|
||||
// multiple connections.
|
||||
type AppConns interface {
|
||||
service.Service
|
||||
|
||||
// Mempool connection
|
||||
Mempool() AppConnMempool
|
||||
// Consensus connection
|
||||
Consensus() AppConnConsensus
|
||||
// Query connection
|
||||
Query() AppConnQuery
|
||||
// Snapshot connection
|
||||
Snapshot() AppConnSnapshot
|
||||
}
|
||||
|
||||
// NewAppConns calls NewMultiAppConn.
|
||||
func NewAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
|
||||
return NewMultiAppConn(clientCreator, logger, metrics)
|
||||
}
|
||||
|
||||
// multiAppConn implements AppConns.
|
||||
//
|
||||
// A multiAppConn is made of a few appConns and manages their underlying abci
|
||||
// clients.
|
||||
// TODO: on app restart, clients must reboot together
|
||||
type multiAppConn struct {
|
||||
service.BaseService
|
||||
logger log.Logger
|
||||
|
||||
metrics *Metrics
|
||||
consensusConn AppConnConsensus
|
||||
mempoolConn AppConnMempool
|
||||
queryConn AppConnQuery
|
||||
snapshotConn AppConnSnapshot
|
||||
|
||||
client stoppableClient
|
||||
|
||||
clientCreator abciclient.Creator
|
||||
}
|
||||
|
||||
// TODO: this is a totally internal and quasi permanent shim for
|
||||
// clients. eventually we can have a single client and have some kind
|
||||
// of reasonable lifecycle witout needing an explicit stop method.
|
||||
type stoppableClient interface {
|
||||
abciclient.Client
|
||||
Stop()
|
||||
}
|
||||
|
||||
// NewMultiAppConn makes all necessary abci connections to the application.
|
||||
func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
|
||||
multiAppConn := &multiAppConn{
|
||||
// New creates a proxy application interface.
|
||||
func New(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) abciclient.Client {
|
||||
multiAppConn := &proxyConn{
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
clientCreator: clientCreator,
|
||||
@@ -69,16 +21,38 @@ func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metric
|
||||
return multiAppConn
|
||||
}
|
||||
|
||||
func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn }
|
||||
func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn }
|
||||
func (app *multiAppConn) Query() AppConnQuery { return app.queryConn }
|
||||
func (app *multiAppConn) Snapshot() AppConnSnapshot { return app.snapshotConn }
|
||||
// proxyConn implements provides the application connection.
|
||||
type proxyConn struct {
|
||||
service.BaseService
|
||||
abciclient.Client
|
||||
|
||||
func (app *multiAppConn) OnStart(ctx context.Context) error {
|
||||
logger log.Logger
|
||||
|
||||
metrics *Metrics
|
||||
|
||||
clientCreator abciclient.Creator
|
||||
}
|
||||
|
||||
func (app *proxyConn) OnStop() { tryCallStop(app.Client) }
|
||||
func (app *proxyConn) IsRunning() bool { return app.Client.IsRunning() }
|
||||
func (app *proxyConn) Start(ctx context.Context) error { return app.BaseService.Start(ctx) }
|
||||
func (app *proxyConn) Wait() { app.BaseService.Wait() }
|
||||
|
||||
func tryCallStop(client abciclient.Client) {
|
||||
if client == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if sc, ok := client.(interface{ Stop() }); ok {
|
||||
sc.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (app *proxyConn) OnStart(ctx context.Context) error {
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
app.client.Stop()
|
||||
tryCallStop(app.Client)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -88,24 +62,18 @@ func (app *multiAppConn) OnStart(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
app.queryConn = NewAppConnQuery(client, app.metrics)
|
||||
app.snapshotConn = NewAppConnSnapshot(client, app.metrics)
|
||||
app.mempoolConn = NewAppConnMempool(client, app.metrics)
|
||||
app.consensusConn = NewAppConnConsensus(client, app.metrics)
|
||||
|
||||
app.client = client.(stoppableClient)
|
||||
|
||||
app.Client = newProxyClient(client, app.metrics)
|
||||
// Kill Tendermint if the ABCI application crashes.
|
||||
go func() {
|
||||
if !client.IsRunning() {
|
||||
if !app.Client.IsRunning() {
|
||||
return
|
||||
}
|
||||
app.client.Wait()
|
||||
app.Client.Wait()
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := app.client.Error(); err != nil {
|
||||
if err := app.Client.Error(); err != nil {
|
||||
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint",
|
||||
"err", err)
|
||||
if killErr := kill(); killErr != nil {
|
||||
@@ -119,8 +87,6 @@ func (app *multiAppConn) OnStart(ctx context.Context) error {
|
||||
return client.Start(ctx)
|
||||
}
|
||||
|
||||
func (app *multiAppConn) OnStop() { app.client.Stop() }
|
||||
|
||||
func kill() error {
|
||||
p, err := os.FindProcess(os.Getpid())
|
||||
if err != nil {
|
||||
|
||||
@@ -42,7 +42,7 @@ func TestAppConns_Start_Stop(t *testing.T) {
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
|
||||
appConns := New(creator, log.TestingLogger(), NopMetrics())
|
||||
|
||||
err := appConns.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
@@ -84,7 +84,7 @@ func TestAppConns_Failure(t *testing.T) {
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
|
||||
appConns := New(creator, log.TestingLogger(), NopMetrics())
|
||||
|
||||
err := appConns.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -18,7 +18,7 @@ func (env *Environment) ABCIQuery(
|
||||
height int64,
|
||||
prove bool,
|
||||
) (*coretypes.ResultABCIQuery, error) {
|
||||
resQuery, err := env.ProxyAppQuery.Query(ctx, abci.RequestQuery{
|
||||
resQuery, err := env.ProxyApp.Query(ctx, abci.RequestQuery{
|
||||
Path: path,
|
||||
Data: data,
|
||||
Height: height,
|
||||
@@ -34,7 +34,7 @@ func (env *Environment) ABCIQuery(
|
||||
// ABCIInfo gets some info about the application.
|
||||
// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info
|
||||
func (env *Environment) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, error) {
|
||||
resInfo, err := env.ProxyAppQuery.Info(ctx, proxy.RequestInfo)
|
||||
resInfo, err := env.ProxyApp.Info(ctx, proxy.RequestInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/rs/cors"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/internal/blocksync"
|
||||
@@ -19,7 +20,6 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
|
||||
"github.com/tendermint/tendermint/internal/pubsub/query"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
@@ -73,8 +73,7 @@ type peerManager interface {
|
||||
// to be setup once during startup.
|
||||
type Environment struct {
|
||||
// external, thread safe interfaces
|
||||
ProxyAppQuery proxy.AppConnQuery
|
||||
ProxyAppMempool proxy.AppConnMempool
|
||||
ProxyApp abciclient.Client
|
||||
|
||||
// interfaces defined in types and above
|
||||
StateStore sm.Store
|
||||
|
||||
@@ -158,7 +158,7 @@ func (env *Environment) NumUnconfirmedTxs(ctx context.Context) (*coretypes.Resul
|
||||
// be added to the mempool either.
|
||||
// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx
|
||||
func (env *Environment) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultCheckTx, error) {
|
||||
res, err := env.ProxyAppMempool.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
|
||||
res, err := env.ProxyApp.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -6,11 +6,11 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto/encoding"
|
||||
"github.com/tendermint/tendermint/internal/eventbus"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -30,7 +30,7 @@ type BlockExecutor struct {
|
||||
blockStore BlockStore
|
||||
|
||||
// execute the app against this
|
||||
proxyApp proxy.AppConnConsensus
|
||||
proxyApp abciclient.Client
|
||||
|
||||
// events
|
||||
eventBus types.BlockEventPublisher
|
||||
@@ -60,7 +60,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
|
||||
func NewBlockExecutor(
|
||||
stateStore Store,
|
||||
logger log.Logger,
|
||||
proxyApp proxy.AppConnConsensus,
|
||||
proxyApp abciclient.Client,
|
||||
pool mempool.Mempool,
|
||||
evpool EvidencePool,
|
||||
blockStore BlockStore,
|
||||
@@ -375,7 +375,7 @@ func (blockExec *BlockExecutor) Commit(
|
||||
func execBlockOnProxyApp(
|
||||
ctx context.Context,
|
||||
logger log.Logger,
|
||||
proxyAppConn proxy.AppConnConsensus,
|
||||
proxyAppConn abciclient.Client,
|
||||
block *types.Block,
|
||||
store Store,
|
||||
initialHeight int64,
|
||||
@@ -617,7 +617,7 @@ func fireEvents(
|
||||
func ExecCommitBlock(
|
||||
ctx context.Context,
|
||||
be *BlockExecutor,
|
||||
appConnConsensus proxy.AppConnConsensus,
|
||||
appConnConsensus abciclient.Client,
|
||||
block *types.Block,
|
||||
logger log.Logger,
|
||||
store Store,
|
||||
|
||||
@@ -41,7 +41,7 @@ func TestApplyBlock(t *testing.T) {
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -52,7 +52,7 @@ func TestApplyBlock(t *testing.T) {
|
||||
state, stateDB, _ := makeState(t, 1, 1)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(),
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp,
|
||||
mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore)
|
||||
|
||||
block, err := sf.MakeBlock(state, 1, new(types.Commit))
|
||||
@@ -75,7 +75,7 @@ func TestBeginBlockValidators(t *testing.T) {
|
||||
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
|
||||
proxyApp := proxy.New(cc, log.TestingLogger(), proxy.NopMetrics())
|
||||
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
@@ -121,7 +121,7 @@ func TestBeginBlockValidators(t *testing.T) {
|
||||
block, err := sf.MakeBlock(state, 2, lastCommit)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sm.ExecCommitBlock(ctx, nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state)
|
||||
_, err = sm.ExecCommitBlock(ctx, nil, proxyApp, block, log.TestingLogger(), stateStore, 1, state)
|
||||
require.NoError(t, err, tc.desc)
|
||||
|
||||
// -> app receives a list of validators with a bool indicating if they signed
|
||||
@@ -146,7 +146,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
|
||||
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
|
||||
proxyApp := proxy.New(cc, log.TestingLogger(), proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -222,7 +222,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
|
||||
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp,
|
||||
mmock.Mempool{}, evpool, blockStore)
|
||||
|
||||
block, err := sf.MakeBlock(state, 1, new(types.Commit))
|
||||
@@ -250,7 +250,7 @@ func TestProcessProposal(t *testing.T) {
|
||||
app := abcimocks.NewBaseMock()
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -261,7 +261,7 @@ func TestProcessProposal(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -453,7 +453,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -464,7 +464,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -528,7 +528,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -538,7 +538,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
|
||||
@@ -33,10 +33,10 @@ type paramsChangeTestCase struct {
|
||||
params types.ConsensusParams
|
||||
}
|
||||
|
||||
func newTestApp() proxy.AppConns {
|
||||
func newTestApp() abciclient.Client {
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
return proxy.NewAppConns(cc, log.NewNopLogger(), proxy.NopMetrics())
|
||||
return proxy.New(cc, log.NewNopLogger(), proxy.NopMetrics())
|
||||
}
|
||||
|
||||
func makeAndCommitGoodBlock(
|
||||
|
||||
@@ -40,7 +40,7 @@ func TestValidateBlockHeader(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
memmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -128,7 +128,7 @@ func TestValidateBlockCommit(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
memmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -263,7 +263,7 @@ func TestValidateBlockEvidence(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
memmock.Mempool{},
|
||||
evpool,
|
||||
blockStore,
|
||||
|
||||
@@ -11,11 +11,11 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/eventbus"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
"github.com/tendermint/tendermint/internal/store"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -135,8 +135,7 @@ type Reactor struct {
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore
|
||||
|
||||
conn proxy.AppConnSnapshot
|
||||
connQuery proxy.AppConnQuery
|
||||
conn abciclient.Client
|
||||
tempDir string
|
||||
snapshotCh *p2p.Channel
|
||||
chunkCh *p2p.Channel
|
||||
@@ -173,8 +172,7 @@ func NewReactor(
|
||||
initialHeight int64,
|
||||
cfg config.StateSyncConfig,
|
||||
logger log.Logger,
|
||||
conn proxy.AppConnSnapshot,
|
||||
connQuery proxy.AppConnQuery,
|
||||
conn abciclient.Client,
|
||||
channelCreator p2p.ChannelCreator,
|
||||
peerUpdates *p2p.PeerUpdates,
|
||||
stateStore sm.Store,
|
||||
@@ -209,7 +207,6 @@ func NewReactor(
|
||||
initialHeight: initialHeight,
|
||||
cfg: cfg,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
snapshotCh: snapshotCh,
|
||||
chunkCh: chunkCh,
|
||||
blockCh: blockCh,
|
||||
@@ -287,7 +284,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
|
||||
r.cfg,
|
||||
r.logger,
|
||||
r.conn,
|
||||
r.connQuery,
|
||||
r.stateProvider,
|
||||
r.snapshotCh,
|
||||
r.chunkCh,
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
clientmocks "github.com/tendermint/tendermint/abci/client/mocks"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
@@ -71,21 +72,14 @@ type reactorTestSuite struct {
|
||||
func setup(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
conn *proxymocks.AppConnSnapshot,
|
||||
connQuery *proxymocks.AppConnQuery,
|
||||
conn *clientmocks.Client,
|
||||
stateProvider *mocks.StateProvider,
|
||||
chBuf uint,
|
||||
) *reactorTestSuite {
|
||||
t.Helper()
|
||||
|
||||
if conn == nil {
|
||||
conn = &proxymocks.AppConnSnapshot{}
|
||||
}
|
||||
if connQuery == nil {
|
||||
connQuery = &proxymocks.AppConnQuery{}
|
||||
}
|
||||
if stateProvider == nil {
|
||||
stateProvider = &mocks.StateProvider{}
|
||||
conn = &clientmocks.Client{}
|
||||
}
|
||||
|
||||
rts := &reactorTestSuite{
|
||||
@@ -102,7 +96,6 @@ func setup(
|
||||
paramsOutCh: make(chan p2p.Envelope, chBuf),
|
||||
paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
stateProvider: stateProvider,
|
||||
}
|
||||
|
||||
@@ -171,7 +164,6 @@ func setup(
|
||||
*cfg,
|
||||
logger.With("component", "reactor"),
|
||||
conn,
|
||||
connQuery,
|
||||
chCreator,
|
||||
rts.peerUpdates,
|
||||
rts.stateStore,
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
@@ -54,8 +55,7 @@ var (
|
||||
type syncer struct {
|
||||
logger log.Logger
|
||||
stateProvider StateProvider
|
||||
conn proxy.AppConnSnapshot
|
||||
connQuery proxy.AppConnQuery
|
||||
conn abciclient.Client
|
||||
snapshots *snapshotPool
|
||||
snapshotCh *p2p.Channel
|
||||
chunkCh *p2p.Channel
|
||||
@@ -76,8 +76,7 @@ type syncer struct {
|
||||
func newSyncer(
|
||||
cfg config.StateSyncConfig,
|
||||
logger log.Logger,
|
||||
conn proxy.AppConnSnapshot,
|
||||
connQuery proxy.AppConnQuery,
|
||||
conn abciclient.Client,
|
||||
stateProvider StateProvider,
|
||||
snapshotCh *p2p.Channel,
|
||||
chunkCh *p2p.Channel,
|
||||
@@ -88,7 +87,6 @@ func newSyncer(
|
||||
logger: logger,
|
||||
stateProvider: stateProvider,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
snapshots: newSnapshotPool(),
|
||||
snapshotCh: snapshotCh,
|
||||
chunkCh: chunkCh,
|
||||
@@ -547,7 +545,7 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uin
|
||||
|
||||
// verifyApp verifies the sync, checking the app hash, last block height and app version
|
||||
func (s *syncer) verifyApp(ctx context.Context, snapshot *snapshot, appVersion uint64) error {
|
||||
resp, err := s.connQuery.Info(ctx, proxy.RequestInfo)
|
||||
resp, err := s.conn.Info(ctx, proxy.RequestInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query ABCI app for appHash: %w", err)
|
||||
}
|
||||
|
||||
20
node/node.go
20
node/node.go
@@ -159,7 +159,7 @@ func makeNode(
|
||||
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
|
||||
|
||||
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy)
|
||||
proxyApp := proxy.New(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy)
|
||||
if err := proxyApp.Start(ctx); err != nil {
|
||||
return nil, fmt.Errorf("error starting proxy app connections: %w", err)
|
||||
}
|
||||
@@ -289,7 +289,7 @@ func makeNode(
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger.With("module", "state"),
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mp,
|
||||
evPool,
|
||||
blockStore,
|
||||
@@ -343,8 +343,8 @@ func makeNode(
|
||||
genDoc.InitialHeight,
|
||||
*cfg.StateSync,
|
||||
logger.With("module", "statesync"),
|
||||
proxyApp.Snapshot(),
|
||||
proxyApp.Query(),
|
||||
proxyApp,
|
||||
proxyApp,
|
||||
router.OpenChannel,
|
||||
peerManager.Subscribe(ctx),
|
||||
stateStore,
|
||||
@@ -394,11 +394,7 @@ func makeNode(
|
||||
shutdownOps: makeCloser(closers),
|
||||
|
||||
rpcEnv: &rpccore.Environment{
|
||||
ProxyAppQuery: proxyApp.Query(),
|
||||
ProxyAppMempool: proxyApp.Mempool(),
|
||||
|
||||
StateStore: stateStore,
|
||||
BlockStore: blockStore,
|
||||
ProxyApp: proxyApp,
|
||||
EvidencePool: evPool,
|
||||
ConsensusState: csState,
|
||||
|
||||
@@ -769,14 +765,14 @@ func loadStateFromDBOrGenesisDocProvider(
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
|
||||
func getRouterConfig(conf *config.Config, proxyApp abciclient.Client) p2p.RouterOptions {
|
||||
opts := p2p.RouterOptions{
|
||||
QueueType: conf.P2P.QueueType,
|
||||
}
|
||||
|
||||
if conf.FilterPeers && proxyApp != nil {
|
||||
opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error {
|
||||
res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{
|
||||
res, err := proxyApp.Query(ctx, abci.RequestQuery{
|
||||
Path: fmt.Sprintf("/p2p/filter/id/%s", id),
|
||||
})
|
||||
if err != nil {
|
||||
@@ -790,7 +786,7 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt
|
||||
}
|
||||
|
||||
opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error {
|
||||
res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{
|
||||
res, err := proxyApp.Query(ctx, abci.RequestQuery{
|
||||
Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))),
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -274,7 +274,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
logger := log.NewNopLogger()
|
||||
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err = proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -291,7 +291,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
mp := mempool.NewTxMempool(
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
proxyApp,
|
||||
state.LastBlockHeight,
|
||||
)
|
||||
|
||||
@@ -328,7 +328,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mp,
|
||||
evidencePool,
|
||||
blockStore,
|
||||
@@ -373,7 +373,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
logger := log.NewNopLogger()
|
||||
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err = proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -391,7 +391,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
mp := mempool.NewTxMempool(
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
proxyApp,
|
||||
state.LastBlockHeight,
|
||||
)
|
||||
|
||||
@@ -404,7 +404,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mp,
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -441,7 +441,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
logger := log.NewNopLogger()
|
||||
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err = proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -456,7 +456,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
mp := mempool.NewTxMempool(
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
proxyApp,
|
||||
state.LastBlockHeight,
|
||||
)
|
||||
|
||||
@@ -476,7 +476,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mp,
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/internal/blocksync"
|
||||
@@ -20,7 +21,6 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/p2p/conn"
|
||||
"github.com/tendermint/tendermint/internal/p2p/pex"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
"github.com/tendermint/tendermint/internal/state/indexer"
|
||||
"github.com/tendermint/tendermint/internal/state/indexer/sink"
|
||||
@@ -171,7 +171,7 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
|
||||
func createMempoolReactor(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
state sm.State,
|
||||
memplMetrics *mempool.Metrics,
|
||||
peerManager *p2p.PeerManager,
|
||||
@@ -183,7 +183,7 @@ func createMempoolReactor(
|
||||
mp := mempool.NewTxMempool(
|
||||
logger,
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
proxyApp,
|
||||
state.LastBlockHeight,
|
||||
mempool.WithMetrics(memplMetrics),
|
||||
mempool.WithPreCheck(sm.TxPreCheck(state)),
|
||||
@@ -385,7 +385,7 @@ func createRouter(
|
||||
nodeKey types.NodeKey,
|
||||
peerManager *p2p.PeerManager,
|
||||
cfg *config.Config,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
) (*p2p.Router, error) {
|
||||
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
|
||||
Reference in New Issue
Block a user