mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-14 16:52:49 +00:00
Compare commits
12 Commits
master
...
proxy-remo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c5c088b1f8 | ||
|
|
1c7c3a09ec | ||
|
|
11f3415a02 | ||
|
|
21f93fa15e | ||
|
|
fa1520dd90 | ||
|
|
bacc38aced | ||
|
|
b5fecde705 | ||
|
|
079d3e5e25 | ||
|
|
06b193c742 | ||
|
|
5765ec96d4 | ||
|
|
ce7d4174a9 | ||
|
|
378a2f0aed |
@@ -1,33 +0,0 @@
|
||||
package abciclient
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
// Creator creates new ABCI clients.
|
||||
type Creator func(log.Logger) (Client, error)
|
||||
|
||||
// NewLocalCreator returns a Creator for the given app,
|
||||
// which will be running locally.
|
||||
func NewLocalCreator(app types.Application) Creator {
|
||||
return func(logger log.Logger) (Client, error) {
|
||||
return NewLocalClient(logger, app), nil
|
||||
}
|
||||
}
|
||||
|
||||
// NewRemoteCreator returns a Creator for the given address (e.g.
|
||||
// "192.168.0.1") and transport (e.g. "tcp"). Set mustConnect to true if you
|
||||
// want the client to connect before reporting success.
|
||||
func NewRemoteCreator(logger log.Logger, addr, transport string, mustConnect bool) Creator {
|
||||
return func(log.Logger) (Client, error) {
|
||||
remoteApp, err := NewClient(logger, addr, transport, mustConnect)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to proxy: %w", err)
|
||||
}
|
||||
|
||||
return remoteApp, nil
|
||||
}
|
||||
}
|
||||
8
go.mod
8
go.mod
@@ -39,6 +39,12 @@ require (
|
||||
pgregory.net/rapid v0.4.7
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/creachadair/atomicfile v0.2.4
|
||||
github.com/google/go-cmp v0.5.7
|
||||
gotest.tools v2.2.0+incompatible
|
||||
)
|
||||
|
||||
require (
|
||||
4d63.com/gochecknoglobals v0.1.0 // indirect
|
||||
github.com/Antonboom/errname v0.1.5 // indirect
|
||||
@@ -67,7 +73,6 @@ require (
|
||||
github.com/chavacava/garif v0.0.0-20210405164556-e8a0a408d6af // indirect
|
||||
github.com/containerd/continuity v0.2.1 // indirect
|
||||
github.com/daixiang0/gci v0.3.1-0.20220208004058-76d765e3ab48 // indirect
|
||||
github.com/creachadair/atomicfile v0.2.4 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/denis-tingajkin/go-header v0.4.2 // indirect
|
||||
github.com/dgraph-io/badger/v2 v2.2007.2 // indirect
|
||||
@@ -107,7 +112,6 @@ require (
|
||||
github.com/golangci/revgrep v0.0.0-20210930125155-c22e5001d4f2 // indirect
|
||||
github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect
|
||||
github.com/google/btree v1.0.0 // indirect
|
||||
github.com/google/go-cmp v0.5.7 // indirect
|
||||
github.com/gordonklaus/ineffassign v0.0.0-20210914165742-4cc7213b9bc8 // indirect
|
||||
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
|
||||
github.com/gostaticanalysis/comment v1.4.2 // indirect
|
||||
|
||||
@@ -33,7 +33,7 @@ type reactorTestSuite struct {
|
||||
nodes []types.NodeID
|
||||
|
||||
reactors map[types.NodeID]*Reactor
|
||||
app map[types.NodeID]proxy.AppConns
|
||||
app map[types.NodeID]abciclient.Client
|
||||
|
||||
blockSyncChannels map[types.NodeID]*p2p.Channel
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
@@ -64,7 +64,7 @@ func setup(
|
||||
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
|
||||
nodes: make([]types.NodeID, 0, numNodes),
|
||||
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
||||
app: make(map[types.NodeID]proxy.AppConns, numNodes),
|
||||
app: make(map[types.NodeID]abciclient.Client, numNodes),
|
||||
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
||||
@@ -109,7 +109,7 @@ func (rts *reactorTestSuite) addNode(
|
||||
logger := log.TestingLogger()
|
||||
|
||||
rts.nodes = append(rts.nodes, nodeID)
|
||||
rts.app[nodeID] = proxy.NewAppConns(abciclient.NewLocalCreator(&abci.BaseApplication{}), logger, proxy.NopMetrics())
|
||||
rts.app[nodeID] = proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics())
|
||||
require.NoError(t, rts.app[nodeID].Start(ctx))
|
||||
|
||||
blockDB := dbm.NewMemDB()
|
||||
@@ -124,7 +124,7 @@ func (rts *reactorTestSuite) addNode(
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
rts.app[nodeID].Consensus(),
|
||||
rts.app[nodeID],
|
||||
mock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto/merkle"
|
||||
"github.com/tendermint/tendermint/internal/eventbus"
|
||||
@@ -237,10 +238,10 @@ func (h *Handshaker) NBlocks() int {
|
||||
}
|
||||
|
||||
// TODO: retry the handshake/replay if it fails ?
|
||||
func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error {
|
||||
func (h *Handshaker) Handshake(ctx context.Context, proxyApp abciclient.Client) error {
|
||||
|
||||
// Handshake is done via ABCI Info on the query conn.
|
||||
res, err := proxyApp.Query().Info(ctx, proxy.RequestInfo)
|
||||
res, err := proxyApp.Info(ctx, proxy.RequestInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error calling Info: %w", err)
|
||||
}
|
||||
@@ -285,7 +286,7 @@ func (h *Handshaker) ReplayBlocks(
|
||||
state sm.State,
|
||||
appHash []byte,
|
||||
appBlockHeight int64,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
) ([]byte, error) {
|
||||
storeBlockBase := h.store.Base()
|
||||
storeBlockHeight := h.store.Height()
|
||||
@@ -316,7 +317,7 @@ func (h *Handshaker) ReplayBlocks(
|
||||
Validators: nextVals,
|
||||
AppStateBytes: h.genDoc.AppState,
|
||||
}
|
||||
res, err := proxyApp.Consensus().InitChain(ctx, req)
|
||||
res, err := proxyApp.InitChain(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -413,7 +414,7 @@ func (h *Handshaker) ReplayBlocks(
|
||||
// NOTE: We could instead use the cs.WAL on cs.Start,
|
||||
// but we'd have to allow the WAL to replay a block that wrote it's #ENDHEIGHT
|
||||
h.logger.Info("Replay last block using real app")
|
||||
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
|
||||
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp)
|
||||
return state.AppHash, err
|
||||
|
||||
case appBlockHeight == storeBlockHeight:
|
||||
@@ -426,6 +427,9 @@ func (h *Handshaker) ReplayBlocks(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := mockApp.Start(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h.logger.Info("Replay last block using mock app")
|
||||
state, err = h.replayBlock(ctx, state, storeBlockHeight, mockApp)
|
||||
@@ -445,7 +449,7 @@ func (h *Handshaker) ReplayBlocks(
|
||||
func (h *Handshaker) replayBlocks(
|
||||
ctx context.Context,
|
||||
state sm.State,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
appBlockHeight,
|
||||
storeBlockHeight int64,
|
||||
mutateState bool) ([]byte, error) {
|
||||
@@ -481,16 +485,16 @@ func (h *Handshaker) replayBlocks(
|
||||
// We emit events for the index services at the final block due to the sync issue when
|
||||
// the node shutdown during the block committing status.
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
h.stateStore, h.logger, proxyApp.Consensus(), emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
|
||||
h.stateStore, h.logger, proxyApp, emptyMempool{}, sm.EmptyEvidencePool{}, h.store)
|
||||
blockExec.SetEventBus(h.eventBus)
|
||||
appHash, err = sm.ExecCommitBlock(ctx,
|
||||
blockExec, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
blockExec, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
appHash, err = sm.ExecCommitBlock(ctx,
|
||||
nil, proxyApp.Consensus(), block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
nil, proxyApp, block, h.logger, h.stateStore, h.genDoc.InitialHeight, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -501,7 +505,7 @@ func (h *Handshaker) replayBlocks(
|
||||
|
||||
if mutateState {
|
||||
// sync the final block
|
||||
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp.Consensus())
|
||||
state, err = h.replayBlock(ctx, state, storeBlockHeight, proxyApp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -517,7 +521,7 @@ func (h *Handshaker) replayBlock(
|
||||
ctx context.Context,
|
||||
state sm.State,
|
||||
height int64,
|
||||
proxyApp proxy.AppConnConsensus,
|
||||
proxyApp abciclient.Client,
|
||||
) (sm.State, error) {
|
||||
block := h.store.LoadBlock(height)
|
||||
meta := h.store.LoadBlockMeta(height)
|
||||
|
||||
@@ -327,8 +327,12 @@ func newConsensusStateForReplay(
|
||||
}
|
||||
|
||||
// Create proxyAppConn connection (consensus, mempool, query)
|
||||
clientCreator, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
client, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
|
||||
err = proxyApp.Start(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("starting proxy app conns: %w", err)
|
||||
@@ -346,7 +350,7 @@ func newConsensusStateForReplay(
|
||||
}
|
||||
|
||||
mempool, evpool := emptyMempool{}, sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(), mempool, evpool, blockStore)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp, mempool, evpool, blockStore)
|
||||
|
||||
consensusState, err := NewState(ctx, logger, csConfig, stateStore, blockExec,
|
||||
blockStore, mempool, evpool)
|
||||
|
||||
@@ -61,22 +61,11 @@ func newMockProxyApp(
|
||||
logger log.Logger,
|
||||
appHash []byte,
|
||||
abciResponses *tmstate.ABCIResponses,
|
||||
) (proxy.AppConnConsensus, error) {
|
||||
|
||||
clientCreator := abciclient.NewLocalCreator(&mockProxyApp{
|
||||
) (abciclient.Client, error) {
|
||||
return proxy.New(abciclient.NewLocalClient(logger, &mockProxyApp{
|
||||
appHash: appHash,
|
||||
abciResponses: abciResponses,
|
||||
})
|
||||
cli, err := clientCreator(logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = cli.Start(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return proxy.NewAppConnConsensus(cli, proxy.NopMetrics()), nil
|
||||
}), logger, proxy.NopMetrics()), nil
|
||||
}
|
||||
|
||||
type mockProxyApp struct {
|
||||
|
||||
@@ -804,11 +804,11 @@ func testHandshakeReplay(
|
||||
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
|
||||
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
|
||||
|
||||
clientCreator2 := abciclient.NewLocalCreator(kvstoreApp)
|
||||
clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp)
|
||||
if nBlocks > 0 {
|
||||
// run nBlocks against a new client to build up the app state.
|
||||
// use a throwaway tendermint state
|
||||
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
|
||||
stateDB1 := dbm.NewMemDB()
|
||||
stateStore := sm.NewStore(stateDB1)
|
||||
err := stateStore.Save(genesisState)
|
||||
@@ -829,9 +829,10 @@ func testHandshakeReplay(
|
||||
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
|
||||
proxyApp := proxy.NewAppConns(clientCreator2, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
|
||||
|
||||
require.True(t, proxyApp.IsRunning())
|
||||
require.NotNil(t, proxyApp)
|
||||
t.Cleanup(func() { cancel(); proxyApp.Wait() })
|
||||
|
||||
err = handshaker.Handshake(ctx, proxyApp)
|
||||
@@ -842,7 +843,7 @@ func testHandshakeReplay(
|
||||
require.NoError(t, err, "Error on abci handshake")
|
||||
|
||||
// get the latest app hash from the app
|
||||
res, err := proxyApp.Query().Info(ctx, abci.RequestInfo{Version: ""})
|
||||
res, err := proxyApp.Info(ctx, abci.RequestInfo{Version: ""})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -875,11 +876,11 @@ func applyBlock(
|
||||
evpool sm.EvidencePool,
|
||||
st sm.State,
|
||||
blk *types.Block,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
blockStore *mockBlockStore,
|
||||
) sm.State {
|
||||
testPartSize := types.BlockPartSizeBytes
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
|
||||
|
||||
bps, err := blk.MakePartSet(testPartSize)
|
||||
require.NoError(t, err)
|
||||
@@ -892,7 +893,7 @@ func applyBlock(
|
||||
func buildAppStateFromChain(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
stateStore sm.Store,
|
||||
mempool mempool.Mempool,
|
||||
evpool sm.EvidencePool,
|
||||
@@ -908,7 +909,7 @@ func buildAppStateFromChain(
|
||||
|
||||
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
|
||||
validators := types.TM2PB.ValidatorUpdates(state.Validators)
|
||||
_, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{
|
||||
_, err := proxyApp.InitChain(ctx, abci.RequestInitChain{
|
||||
Validators: validators,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
@@ -958,14 +959,14 @@ func buildTMStateFromChain(
|
||||
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
|
||||
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
|
||||
defer kvstoreApp.Close()
|
||||
clientCreator := abciclient.NewLocalCreator(kvstoreApp)
|
||||
client := abciclient.NewLocalClient(logger, kvstoreApp)
|
||||
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx))
|
||||
|
||||
state.Version.Consensus.App = kvstore.ProtocolVersion // simulate handshake, receive app version
|
||||
validators := types.TM2PB.ValidatorUpdates(state.Validators)
|
||||
_, err := proxyApp.Consensus().InitChain(ctx, abci.RequestInitChain{
|
||||
_, err := proxyApp.InitChain(ctx, abci.RequestInitChain{
|
||||
Validators: validators,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
@@ -1031,8 +1032,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
|
||||
// - 0x03
|
||||
{
|
||||
app := &badApp{numBlocks: 3, allHashesAreWrong: true}
|
||||
clientCreator := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
client := abciclient.NewLocalClient(logger, app)
|
||||
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { cancel(); proxyApp.Wait() })
|
||||
@@ -1051,8 +1052,8 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {
|
||||
// - RANDOM HASH
|
||||
{
|
||||
app := &badApp{numBlocks: 3, onlyLastHashIsWrong: true}
|
||||
clientCreator := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
client := abciclient.NewLocalClient(logger, app)
|
||||
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { cancel(); proxyApp.Wait() })
|
||||
@@ -1282,12 +1283,13 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
logger := log.NewNopLogger()
|
||||
votePower := 10 + int64(rand.Uint32())
|
||||
val, _, err := factory.Validator(ctx, votePower)
|
||||
require.NoError(t, err)
|
||||
vals := types.NewValidatorSet([]*types.Validator{val})
|
||||
app := &initChainApp{vals: types.TM2PB.ValidatorUpdates(vals)}
|
||||
clientCreator := abciclient.NewLocalCreator(app)
|
||||
client := abciclient.NewLocalClient(logger, app)
|
||||
|
||||
cfg, err := ResetConfig(t.TempDir(), "handshake_test_")
|
||||
require.NoError(t, err)
|
||||
@@ -1306,9 +1308,8 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
|
||||
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
|
||||
logger := log.TestingLogger()
|
||||
handshaker := NewHandshaker(logger, stateStore, state, store, eventbus.NopEventBus{}, genDoc)
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger, proxy.NopMetrics())
|
||||
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
|
||||
|
||||
require.NoError(t, handshaker.Handshake(ctx, proxyApp), "error on abci handshake")
|
||||
|
||||
@@ -67,8 +67,8 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
|
||||
}
|
||||
|
||||
blockStore := store.NewBlockStore(blockStoreDB)
|
||||
|
||||
proxyApp := proxy.NewAppConns(abciclient.NewLocalCreator(app), logger.With("module", "proxy"), proxy.NopMetrics())
|
||||
proxyLogger := logger.With("module", "proxy")
|
||||
proxyApp := proxy.New(abciclient.NewLocalClient(logger, app), proxyLogger, proxy.NopMetrics())
|
||||
if err := proxyApp.Start(ctx); err != nil {
|
||||
t.Fatal(fmt.Errorf("failed to start proxy app connections: %w", err))
|
||||
}
|
||||
@@ -82,7 +82,7 @@ func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr
|
||||
|
||||
mempool := emptyMempool{}
|
||||
evpool := sm.EmptyEvidencePool{}
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool, blockStore)
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp, mempool, evpool, blockStore)
|
||||
consensusState, err := NewState(ctx, logger, cfg.Consensus, stateStore, blockExec, blockStore, mempool, evpool)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
||||
@@ -9,10 +9,10 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/libs/clist"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmmath "github.com/tendermint/tendermint/libs/math"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -31,7 +31,7 @@ type TxMempool struct {
|
||||
logger log.Logger
|
||||
metrics *Metrics
|
||||
config *config.MempoolConfig
|
||||
proxyAppConn proxy.AppConnMempool
|
||||
proxyAppConn abciclient.Client
|
||||
|
||||
// txsAvailable fires once for each height when the mempool is not empty
|
||||
txsAvailable chan struct{}
|
||||
@@ -93,7 +93,7 @@ type TxMempool struct {
|
||||
func NewTxMempool(
|
||||
logger log.Logger,
|
||||
cfg *config.MempoolConfig,
|
||||
proxyAppConn proxy.AppConnMempool,
|
||||
proxyAppConn abciclient.Client,
|
||||
options ...TxMempoolOption,
|
||||
) *TxMempool {
|
||||
|
||||
@@ -421,7 +421,6 @@ func (txmp *TxMempool) Update(
|
||||
newPreFn PreCheckFunc,
|
||||
newPostFn PostCheckFunc,
|
||||
) error {
|
||||
|
||||
txmp.height = blockHeight
|
||||
txmp.notifiedTxsAvailable = false
|
||||
|
||||
|
||||
@@ -78,24 +78,23 @@ func setup(ctx context.Context, t testing.TB, cacheSize int, options ...TxMempoo
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithCancel(ctx)
|
||||
|
||||
app := &application{kvstore.NewApplication()}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
logger := log.TestingLogger()
|
||||
|
||||
app := &application{kvstore.NewApplication()}
|
||||
conn := abciclient.NewLocalClient(logger, app)
|
||||
|
||||
cfg, err := config.ResetTestRoot(t.TempDir(), strings.ReplaceAll(t.Name(), "/", "|"))
|
||||
require.NoError(t, err)
|
||||
cfg.Mempool.CacheSize = cacheSize
|
||||
appConnMem, err := cc(logger)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, appConnMem.Start(ctx))
|
||||
require.NoError(t, conn.Start(ctx))
|
||||
|
||||
t.Cleanup(func() {
|
||||
os.RemoveAll(cfg.RootDir)
|
||||
cancel()
|
||||
appConnMem.Wait()
|
||||
conn.Wait()
|
||||
})
|
||||
|
||||
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, appConnMem, options...)
|
||||
return NewTxMempool(logger.With("test", t.Name()), cfg.Mempool, conn, options...)
|
||||
}
|
||||
|
||||
func checkTxs(ctx context.Context, t *testing.T, txmp *TxMempool, numTxs int, peerID uint16) []testTx {
|
||||
|
||||
@@ -1,249 +0,0 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
)
|
||||
|
||||
//go:generate ../../scripts/mockery_generate.sh AppConnConsensus|AppConnMempool|AppConnQuery|AppConnSnapshot
|
||||
|
||||
//----------------------------------------------------------------------------------------
|
||||
// Enforce which abci msgs can be sent on a connection at the type level
|
||||
|
||||
type AppConnConsensus interface {
|
||||
Error() error
|
||||
|
||||
InitChain(context.Context, types.RequestInitChain) (*types.ResponseInitChain, error)
|
||||
|
||||
PrepareProposal(context.Context, types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
|
||||
ProcessProposal(context.Context, types.RequestProcessProposal) (*types.ResponseProcessProposal, error)
|
||||
ExtendVote(context.Context, types.RequestExtendVote) (*types.ResponseExtendVote, error)
|
||||
VerifyVoteExtension(context.Context, types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error)
|
||||
FinalizeBlock(context.Context, types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error)
|
||||
Commit(context.Context) (*types.ResponseCommit, error)
|
||||
}
|
||||
|
||||
type AppConnMempool interface {
|
||||
Error() error
|
||||
|
||||
CheckTx(context.Context, types.RequestCheckTx) (*types.ResponseCheckTx, error)
|
||||
|
||||
Flush(context.Context) error
|
||||
}
|
||||
|
||||
type AppConnQuery interface {
|
||||
Error() error
|
||||
|
||||
Echo(context.Context, string) (*types.ResponseEcho, error)
|
||||
Info(context.Context, types.RequestInfo) (*types.ResponseInfo, error)
|
||||
Query(context.Context, types.RequestQuery) (*types.ResponseQuery, error)
|
||||
}
|
||||
|
||||
type AppConnSnapshot interface {
|
||||
Error() error
|
||||
|
||||
ListSnapshots(context.Context, types.RequestListSnapshots) (*types.ResponseListSnapshots, error)
|
||||
OfferSnapshot(context.Context, types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
|
||||
LoadSnapshotChunk(context.Context, types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
|
||||
ApplySnapshotChunk(context.Context, types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------------------
|
||||
// Implements AppConnConsensus (subset of abciclient.Client)
|
||||
|
||||
type appConnConsensus struct {
|
||||
metrics *Metrics
|
||||
appConn abciclient.Client
|
||||
}
|
||||
|
||||
var _ AppConnConsensus = (*appConnConsensus)(nil)
|
||||
|
||||
func NewAppConnConsensus(appConn abciclient.Client, metrics *Metrics) AppConnConsensus {
|
||||
return &appConnConsensus{
|
||||
metrics: metrics,
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) InitChain(
|
||||
ctx context.Context,
|
||||
req types.RequestInitChain,
|
||||
) (*types.ResponseInitChain, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))()
|
||||
return app.appConn.InitChain(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) PrepareProposal(
|
||||
ctx context.Context,
|
||||
req types.RequestPrepareProposal,
|
||||
) (*types.ResponsePrepareProposal, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
|
||||
return app.appConn.PrepareProposal(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) ProcessProposal(
|
||||
ctx context.Context,
|
||||
req types.RequestProcessProposal,
|
||||
) (*types.ResponseProcessProposal, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))()
|
||||
return app.appConn.ProcessProposal(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) ExtendVote(
|
||||
ctx context.Context,
|
||||
req types.RequestExtendVote,
|
||||
) (*types.ResponseExtendVote, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))()
|
||||
return app.appConn.ExtendVote(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) VerifyVoteExtension(
|
||||
ctx context.Context,
|
||||
req types.RequestVerifyVoteExtension,
|
||||
) (*types.ResponseVerifyVoteExtension, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))()
|
||||
return app.appConn.VerifyVoteExtension(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) FinalizeBlock(
|
||||
ctx context.Context,
|
||||
req types.RequestFinalizeBlock,
|
||||
) (*types.ResponseFinalizeBlock, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))()
|
||||
return app.appConn.FinalizeBlock(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnConsensus) Commit(ctx context.Context) (*types.ResponseCommit, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))()
|
||||
return app.appConn.Commit(ctx)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnMempool (subset of abciclient.Client)
|
||||
|
||||
type appConnMempool struct {
|
||||
metrics *Metrics
|
||||
appConn abciclient.Client
|
||||
}
|
||||
|
||||
func NewAppConnMempool(appConn abciclient.Client, metrics *Metrics) AppConnMempool {
|
||||
return &appConnMempool{
|
||||
metrics: metrics,
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnMempool) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnMempool) Flush(ctx context.Context) error {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))()
|
||||
return app.appConn.Flush(ctx)
|
||||
}
|
||||
|
||||
func (app *appConnMempool) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
|
||||
return app.appConn.CheckTx(ctx, req)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnQuery (subset of abciclient.Client)
|
||||
|
||||
type appConnQuery struct {
|
||||
metrics *Metrics
|
||||
appConn abciclient.Client
|
||||
}
|
||||
|
||||
func NewAppConnQuery(appConn abciclient.Client, metrics *Metrics) AppConnQuery {
|
||||
return &appConnQuery{
|
||||
metrics: metrics,
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnQuery) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnQuery) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))()
|
||||
return app.appConn.Echo(ctx, msg)
|
||||
}
|
||||
|
||||
func (app *appConnQuery) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))()
|
||||
return app.appConn.Info(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnQuery) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))()
|
||||
return app.appConn.Query(ctx, reqQuery)
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
// Implements AppConnSnapshot (subset of abciclient.Client)
|
||||
|
||||
type appConnSnapshot struct {
|
||||
metrics *Metrics
|
||||
appConn abciclient.Client
|
||||
}
|
||||
|
||||
func NewAppConnSnapshot(appConn abciclient.Client, metrics *Metrics) AppConnSnapshot {
|
||||
return &appConnSnapshot{
|
||||
metrics: metrics,
|
||||
appConn: appConn,
|
||||
}
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) Error() error {
|
||||
return app.appConn.Error()
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) ListSnapshots(
|
||||
ctx context.Context,
|
||||
req types.RequestListSnapshots,
|
||||
) (*types.ResponseListSnapshots, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))()
|
||||
return app.appConn.ListSnapshots(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) OfferSnapshot(
|
||||
ctx context.Context,
|
||||
req types.RequestOfferSnapshot,
|
||||
) (*types.ResponseOfferSnapshot, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))()
|
||||
return app.appConn.OfferSnapshot(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) LoadSnapshotChunk(
|
||||
ctx context.Context,
|
||||
req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))()
|
||||
return app.appConn.LoadSnapshotChunk(ctx, req)
|
||||
}
|
||||
|
||||
func (app *appConnSnapshot) ApplySnapshotChunk(
|
||||
ctx context.Context,
|
||||
req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))()
|
||||
return app.appConn.ApplySnapshotChunk(ctx, req)
|
||||
}
|
||||
|
||||
// addTimeSample returns a function that, when called, adds an observation to m.
|
||||
// The observation added to m is the number of seconds ellapsed since addTimeSample
|
||||
// was initially called. addTimeSample is meant to be called in a defer to calculate
|
||||
// the amount of time a function takes to complete.
|
||||
func addTimeSample(m metrics.Histogram) func() {
|
||||
start := time.Now()
|
||||
return func() { m.Observe(time.Since(start).Seconds()) }
|
||||
}
|
||||
@@ -1,42 +1,218 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
e2e "github.com/tendermint/tendermint/test/e2e/app"
|
||||
)
|
||||
|
||||
// DefaultClientCreator returns a default ClientCreator, which will create a
|
||||
// ClientFactory returns a default ClientCreator, which will create a
|
||||
// local client if addr is one of: 'kvstore',
|
||||
// 'persistent_kvstore', 'e2e', or 'noop', otherwise - a remote client.
|
||||
//
|
||||
// The Closer is a noop except for persistent_kvstore applications,
|
||||
// which will clean up the store.
|
||||
func DefaultClientCreator(logger log.Logger, addr, transport, dbDir string) (abciclient.Creator, io.Closer) {
|
||||
func ClientFactory(logger log.Logger, addr, transport, dbDir string) (abciclient.Client, io.Closer, error) {
|
||||
switch addr {
|
||||
case "kvstore":
|
||||
return abciclient.NewLocalCreator(kvstore.NewApplication()), noopCloser{}
|
||||
return abciclient.NewLocalClient(logger, kvstore.NewApplication()), noopCloser{}, nil
|
||||
case "persistent_kvstore":
|
||||
app := kvstore.NewPersistentKVStoreApplication(logger, dbDir)
|
||||
return abciclient.NewLocalCreator(app), app
|
||||
return abciclient.NewLocalClient(logger, app), app, nil
|
||||
case "e2e":
|
||||
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return nil, noopCloser{}, err
|
||||
}
|
||||
return abciclient.NewLocalCreator(app), noopCloser{}
|
||||
return abciclient.NewLocalClient(logger, app), noopCloser{}, nil
|
||||
case "noop":
|
||||
return abciclient.NewLocalCreator(types.NewBaseApplication()), noopCloser{}
|
||||
return abciclient.NewLocalClient(logger, types.NewBaseApplication()), noopCloser{}, nil
|
||||
default:
|
||||
mustConnect := false // loop retrying
|
||||
return abciclient.NewRemoteCreator(logger, addr, transport, mustConnect), noopCloser{}
|
||||
client, err := abciclient.NewClient(logger, addr, transport, mustConnect)
|
||||
if err != nil {
|
||||
return nil, noopCloser{}, err
|
||||
}
|
||||
|
||||
return client, noopCloser{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
type noopCloser struct{}
|
||||
|
||||
func (noopCloser) Close() error { return nil }
|
||||
|
||||
////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// proxyClient implements provides the application connection.
|
||||
type proxyClient struct {
|
||||
service.BaseService
|
||||
logger log.Logger
|
||||
|
||||
client abciclient.Client
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// New creates a proxy application interface.
|
||||
func New(client abciclient.Client, logger log.Logger, metrics *Metrics) abciclient.Client {
|
||||
conn := &proxyClient{
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
client: client,
|
||||
}
|
||||
conn.BaseService = *service.NewBaseService(logger, "proxyClient", conn)
|
||||
return conn
|
||||
}
|
||||
|
||||
func (app *proxyClient) OnStop() { tryCallStop(app.client) }
|
||||
func (app *proxyClient) Error() error { return app.client.Error() }
|
||||
|
||||
func tryCallStop(client abciclient.Client) {
|
||||
switch c := client.(type) {
|
||||
case nil:
|
||||
return
|
||||
case interface{ Stop() }:
|
||||
c.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
func (app *proxyClient) OnStart(ctx context.Context) error {
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
tryCallStop(app.client)
|
||||
}
|
||||
}()
|
||||
|
||||
// Kill Tendermint if the ABCI application crashes.
|
||||
go func() {
|
||||
if !app.client.IsRunning() {
|
||||
return
|
||||
}
|
||||
app.client.Wait()
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := app.client.Error(); err != nil {
|
||||
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint",
|
||||
"err", err)
|
||||
|
||||
if killErr := kill(); killErr != nil {
|
||||
app.logger.Error("Failed to kill this process - please do so manually",
|
||||
"err", killErr)
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
return app.client.Start(ctx)
|
||||
}
|
||||
|
||||
func kill() error {
|
||||
p, err := os.FindProcess(os.Getpid())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.Signal(syscall.SIGTERM)
|
||||
}
|
||||
|
||||
func (app *proxyClient) InitChain(ctx context.Context, req types.RequestInitChain) (*types.ResponseInitChain, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "init_chain", "type", "sync"))()
|
||||
return app.client.InitChain(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) PrepareProposal(ctx context.Context, req types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "prepare_proposal", "type", "sync"))()
|
||||
return app.client.PrepareProposal(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) ProcessProposal(ctx context.Context, req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "process_proposal", "type", "sync"))()
|
||||
return app.client.ProcessProposal(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) ExtendVote(ctx context.Context, req types.RequestExtendVote) (*types.ResponseExtendVote, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "extend_vote", "type", "sync"))()
|
||||
return app.client.ExtendVote(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) VerifyVoteExtension(ctx context.Context, req types.RequestVerifyVoteExtension) (*types.ResponseVerifyVoteExtension, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "verify_vote_extension", "type", "sync"))()
|
||||
return app.client.VerifyVoteExtension(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) FinalizeBlock(ctx context.Context, req types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "finalize_block", "type", "sync"))()
|
||||
return app.client.FinalizeBlock(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) Commit(ctx context.Context) (*types.ResponseCommit, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "commit", "type", "sync"))()
|
||||
return app.client.Commit(ctx)
|
||||
}
|
||||
|
||||
func (app *proxyClient) Flush(ctx context.Context) error {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "flush", "type", "sync"))()
|
||||
return app.client.Flush(ctx)
|
||||
}
|
||||
|
||||
func (app *proxyClient) CheckTx(ctx context.Context, req types.RequestCheckTx) (*types.ResponseCheckTx, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "check_tx", "type", "sync"))()
|
||||
return app.client.CheckTx(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) Echo(ctx context.Context, msg string) (*types.ResponseEcho, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "echo", "type", "sync"))()
|
||||
return app.client.Echo(ctx, msg)
|
||||
}
|
||||
|
||||
func (app *proxyClient) Info(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "info", "type", "sync"))()
|
||||
return app.client.Info(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) Query(ctx context.Context, reqQuery types.RequestQuery) (*types.ResponseQuery, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "query", "type", "sync"))()
|
||||
return app.client.Query(ctx, reqQuery)
|
||||
}
|
||||
|
||||
func (app *proxyClient) ListSnapshots(ctx context.Context, req types.RequestListSnapshots) (*types.ResponseListSnapshots, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "list_snapshots", "type", "sync"))()
|
||||
return app.client.ListSnapshots(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) OfferSnapshot(ctx context.Context, req types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "offer_snapshot", "type", "sync"))()
|
||||
return app.client.OfferSnapshot(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) LoadSnapshotChunk(ctx context.Context, req types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "load_snapshot_chunk", "type", "sync"))()
|
||||
return app.client.LoadSnapshotChunk(ctx, req)
|
||||
}
|
||||
|
||||
func (app *proxyClient) ApplySnapshotChunk(ctx context.Context, req types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error) {
|
||||
defer addTimeSample(app.metrics.MethodTiming.With("method", "apply_snapshot_chunk", "type", "sync"))()
|
||||
return app.client.ApplySnapshotChunk(ctx, req)
|
||||
}
|
||||
|
||||
// addTimeSample returns a function that, when called, adds an observation to m.
|
||||
// The observation added to m is the number of seconds ellapsed since addTimeSample
|
||||
// was initially called. addTimeSample is meant to be called in a defer to calculate
|
||||
// the amount of time a function takes to complete.
|
||||
func addTimeSample(m metrics.Histogram) func() {
|
||||
start := time.Now()
|
||||
return func() { m.Observe(time.Since(start).Seconds()) }
|
||||
}
|
||||
|
||||
@@ -2,18 +2,26 @@ package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abcimocks "github.com/tendermint/tendermint/abci/client/mocks"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
"github.com/tendermint/tendermint/abci/server"
|
||||
"github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
"gotest.tools/assert"
|
||||
)
|
||||
|
||||
//----------------------------------------
|
||||
@@ -51,7 +59,10 @@ var SOCKET = "socket"
|
||||
func TestEcho(t *testing.T) {
|
||||
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
|
||||
logger := log.TestingLogger()
|
||||
clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true)
|
||||
client, err := abciclient.NewClient(logger, sockPath, SOCKET, true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -62,12 +73,9 @@ func TestEcho(t *testing.T) {
|
||||
t.Cleanup(func() { cancel(); s.Wait() })
|
||||
|
||||
// Start client
|
||||
cli, err := clientCreator(logger.With("module", "abci-client"))
|
||||
require.NoError(t, err, "Error creating ABCI client:")
|
||||
require.NoError(t, client.Start(ctx), "Error starting ABCI client")
|
||||
|
||||
require.NoError(t, cli.Start(ctx), "Error starting ABCI client")
|
||||
|
||||
proxy := newAppConnTest(cli)
|
||||
proxy := newAppConnTest(client)
|
||||
t.Log("Connected")
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
@@ -91,7 +99,10 @@ func BenchmarkEcho(b *testing.B) {
|
||||
b.StopTimer() // Initialize
|
||||
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
|
||||
logger := log.TestingLogger()
|
||||
clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true)
|
||||
client, err := abciclient.NewClient(logger, sockPath, SOCKET, true)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -102,12 +113,9 @@ func BenchmarkEcho(b *testing.B) {
|
||||
b.Cleanup(func() { cancel(); s.Wait() })
|
||||
|
||||
// Start client
|
||||
cli, err := clientCreator(logger.With("module", "abci-client"))
|
||||
require.NoError(b, err, "Error creating ABCI client")
|
||||
require.NoError(b, client.Start(ctx), "Error starting ABCI client")
|
||||
|
||||
require.NoError(b, cli.Start(ctx), "Error starting ABCI client")
|
||||
|
||||
proxy := newAppConnTest(cli)
|
||||
proxy := newAppConnTest(client)
|
||||
b.Log("Connected")
|
||||
echoString := strings.Repeat(" ", 200)
|
||||
b.StartTimer() // Start benchmarking tests
|
||||
@@ -139,7 +147,10 @@ func TestInfo(t *testing.T) {
|
||||
|
||||
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", tmrand.Str(6))
|
||||
logger := log.TestingLogger()
|
||||
clientCreator := abciclient.NewRemoteCreator(logger, sockPath, SOCKET, true)
|
||||
client, err := abciclient.NewClient(logger, sockPath, SOCKET, true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Start server
|
||||
s := server.NewSocketServer(logger.With("module", "abci-server"), sockPath, kvstore.NewApplication())
|
||||
@@ -147,12 +158,9 @@ func TestInfo(t *testing.T) {
|
||||
t.Cleanup(func() { cancel(); s.Wait() })
|
||||
|
||||
// Start client
|
||||
cli, err := clientCreator(logger.With("module", "abci-client"))
|
||||
require.NoError(t, err, "Error creating ABCI client")
|
||||
require.NoError(t, client.Start(ctx), "Error starting ABCI client")
|
||||
|
||||
require.NoError(t, cli.Start(ctx), "Error starting ABCI client")
|
||||
|
||||
proxy := newAppConnTest(cli)
|
||||
proxy := newAppConnTest(client)
|
||||
t.Log("Connected")
|
||||
|
||||
resInfo, err := proxy.Info(ctx, RequestInfo)
|
||||
@@ -162,3 +170,72 @@ func TestInfo(t *testing.T) {
|
||||
t.Error("Expected ResponseInfo with one element '{\"size\":0}' but got something else")
|
||||
}
|
||||
}
|
||||
|
||||
type noopStoppableClientImpl struct {
|
||||
abciclient.Client
|
||||
count int
|
||||
}
|
||||
|
||||
func (c *noopStoppableClientImpl) Stop() { c.count++ }
|
||||
|
||||
func TestAppConns_Start_Stop(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
clientMock := &abcimocks.Client{}
|
||||
clientMock.On("Start", mock.Anything).Return(nil)
|
||||
clientMock.On("Error").Return(nil)
|
||||
clientMock.On("IsRunning").Return(true)
|
||||
clientMock.On("Wait").Return(nil).Times(1)
|
||||
cl := &noopStoppableClientImpl{Client: clientMock}
|
||||
|
||||
appConns := New(cl, log.TestingLogger(), NopMetrics())
|
||||
|
||||
err := appConns.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
cancel()
|
||||
appConns.Wait()
|
||||
|
||||
clientMock.AssertExpectations(t)
|
||||
assert.Equal(t, 1, cl.count)
|
||||
}
|
||||
|
||||
// Upon failure, we call tmos.Kill
|
||||
func TestAppConns_Failure(t *testing.T) {
|
||||
ok := make(chan struct{})
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGTERM)
|
||||
go func() {
|
||||
for range c {
|
||||
close(ok)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
clientMock := &abcimocks.Client{}
|
||||
clientMock.On("SetLogger", mock.Anything).Return()
|
||||
clientMock.On("Start", mock.Anything).Return(nil)
|
||||
clientMock.On("IsRunning").Return(true)
|
||||
clientMock.On("Wait").Return(nil)
|
||||
clientMock.On("Error").Return(errors.New("EOF"))
|
||||
cl := &noopStoppableClientImpl{Client: clientMock}
|
||||
|
||||
appConns := New(cl, log.TestingLogger(), NopMetrics())
|
||||
|
||||
err := appConns.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { cancel(); appConns.Wait() })
|
||||
|
||||
select {
|
||||
case <-ok:
|
||||
t.Log("SIGTERM successfully received")
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("expected process to receive SIGTERM signal")
|
||||
}
|
||||
}
|
||||
@@ -1,131 +0,0 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
)
|
||||
|
||||
// AppConns is the Tendermint's interface to the application that consists of
|
||||
// multiple connections.
|
||||
type AppConns interface {
|
||||
service.Service
|
||||
|
||||
// Mempool connection
|
||||
Mempool() AppConnMempool
|
||||
// Consensus connection
|
||||
Consensus() AppConnConsensus
|
||||
// Query connection
|
||||
Query() AppConnQuery
|
||||
// Snapshot connection
|
||||
Snapshot() AppConnSnapshot
|
||||
}
|
||||
|
||||
// NewAppConns calls NewMultiAppConn.
|
||||
func NewAppConns(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
|
||||
return NewMultiAppConn(clientCreator, logger, metrics)
|
||||
}
|
||||
|
||||
// multiAppConn implements AppConns.
|
||||
//
|
||||
// A multiAppConn is made of a few appConns and manages their underlying abci
|
||||
// clients.
|
||||
// TODO: on app restart, clients must reboot together
|
||||
type multiAppConn struct {
|
||||
service.BaseService
|
||||
logger log.Logger
|
||||
|
||||
metrics *Metrics
|
||||
consensusConn AppConnConsensus
|
||||
mempoolConn AppConnMempool
|
||||
queryConn AppConnQuery
|
||||
snapshotConn AppConnSnapshot
|
||||
|
||||
client stoppableClient
|
||||
|
||||
clientCreator abciclient.Creator
|
||||
}
|
||||
|
||||
// TODO: this is a totally internal and quasi permanent shim for
|
||||
// clients. eventually we can have a single client and have some kind
|
||||
// of reasonable lifecycle witout needing an explicit stop method.
|
||||
type stoppableClient interface {
|
||||
abciclient.Client
|
||||
Stop()
|
||||
}
|
||||
|
||||
// NewMultiAppConn makes all necessary abci connections to the application.
|
||||
func NewMultiAppConn(clientCreator abciclient.Creator, logger log.Logger, metrics *Metrics) AppConns {
|
||||
multiAppConn := &multiAppConn{
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
clientCreator: clientCreator,
|
||||
}
|
||||
multiAppConn.BaseService = *service.NewBaseService(logger, "multiAppConn", multiAppConn)
|
||||
return multiAppConn
|
||||
}
|
||||
|
||||
func (app *multiAppConn) Mempool() AppConnMempool { return app.mempoolConn }
|
||||
func (app *multiAppConn) Consensus() AppConnConsensus { return app.consensusConn }
|
||||
func (app *multiAppConn) Query() AppConnQuery { return app.queryConn }
|
||||
func (app *multiAppConn) Snapshot() AppConnSnapshot { return app.snapshotConn }
|
||||
|
||||
func (app *multiAppConn) OnStart(ctx context.Context) error {
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
app.client.Stop()
|
||||
}
|
||||
}()
|
||||
|
||||
var client abciclient.Client
|
||||
client, err = app.clientCreator(app.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
app.queryConn = NewAppConnQuery(client, app.metrics)
|
||||
app.snapshotConn = NewAppConnSnapshot(client, app.metrics)
|
||||
app.mempoolConn = NewAppConnMempool(client, app.metrics)
|
||||
app.consensusConn = NewAppConnConsensus(client, app.metrics)
|
||||
|
||||
app.client = client.(stoppableClient)
|
||||
|
||||
// Kill Tendermint if the ABCI application crashes.
|
||||
go func() {
|
||||
if !client.IsRunning() {
|
||||
return
|
||||
}
|
||||
app.client.Wait()
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := app.client.Error(); err != nil {
|
||||
app.logger.Error("client connection terminated. Did the application crash? Please restart tendermint",
|
||||
"err", err)
|
||||
if killErr := kill(); killErr != nil {
|
||||
app.logger.Error("Failed to kill this process - please do so manually",
|
||||
"err", killErr)
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
return client.Start(ctx)
|
||||
}
|
||||
|
||||
func (app *multiAppConn) OnStop() { app.client.Stop() }
|
||||
|
||||
func kill() error {
|
||||
p, err := os.FindProcess(os.Getpid())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.Signal(syscall.SIGTERM)
|
||||
}
|
||||
@@ -1,99 +0,0 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abcimocks "github.com/tendermint/tendermint/abci/client/mocks"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
type noopStoppableClientImpl struct {
|
||||
abciclient.Client
|
||||
count int
|
||||
}
|
||||
|
||||
func (c *noopStoppableClientImpl) Stop() { c.count++ }
|
||||
|
||||
func TestAppConns_Start_Stop(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
clientMock := &abcimocks.Client{}
|
||||
clientMock.On("Start", mock.Anything).Return(nil)
|
||||
clientMock.On("Error").Return(nil)
|
||||
clientMock.On("IsRunning").Return(true)
|
||||
clientMock.On("Wait").Return(nil).Times(1)
|
||||
cl := &noopStoppableClientImpl{Client: clientMock}
|
||||
|
||||
creatorCallCount := 0
|
||||
creator := func(logger log.Logger) (abciclient.Client, error) {
|
||||
creatorCallCount++
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
|
||||
|
||||
err := appConns.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
cancel()
|
||||
appConns.Wait()
|
||||
|
||||
clientMock.AssertExpectations(t)
|
||||
assert.Equal(t, 1, cl.count)
|
||||
assert.Equal(t, 1, creatorCallCount)
|
||||
}
|
||||
|
||||
// Upon failure, we call tmos.Kill
|
||||
func TestAppConns_Failure(t *testing.T) {
|
||||
ok := make(chan struct{})
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGTERM)
|
||||
go func() {
|
||||
for range c {
|
||||
close(ok)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
clientMock := &abcimocks.Client{}
|
||||
clientMock.On("SetLogger", mock.Anything).Return()
|
||||
clientMock.On("Start", mock.Anything).Return(nil)
|
||||
clientMock.On("IsRunning").Return(true)
|
||||
clientMock.On("Wait").Return(nil)
|
||||
clientMock.On("Error").Return(errors.New("EOF"))
|
||||
cl := &noopStoppableClientImpl{Client: clientMock}
|
||||
|
||||
creator := func(log.Logger) (abciclient.Client, error) {
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
appConns := NewAppConns(creator, log.TestingLogger(), NopMetrics())
|
||||
|
||||
err := appConns.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { cancel(); appConns.Wait() })
|
||||
|
||||
select {
|
||||
case <-ok:
|
||||
t.Log("SIGTERM successfully received")
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("expected process to receive SIGTERM signal")
|
||||
}
|
||||
}
|
||||
@@ -18,7 +18,7 @@ func (env *Environment) ABCIQuery(
|
||||
height int64,
|
||||
prove bool,
|
||||
) (*coretypes.ResultABCIQuery, error) {
|
||||
resQuery, err := env.ProxyAppQuery.Query(ctx, abci.RequestQuery{
|
||||
resQuery, err := env.ProxyApp.Query(ctx, abci.RequestQuery{
|
||||
Path: path,
|
||||
Data: data,
|
||||
Height: height,
|
||||
@@ -34,7 +34,7 @@ func (env *Environment) ABCIQuery(
|
||||
// ABCIInfo gets some info about the application.
|
||||
// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info
|
||||
func (env *Environment) ABCIInfo(ctx context.Context) (*coretypes.ResultABCIInfo, error) {
|
||||
resInfo, err := env.ProxyAppQuery.Info(ctx, proxy.RequestInfo)
|
||||
resInfo, err := env.ProxyApp.Info(ctx, proxy.RequestInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/rs/cors"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/internal/blocksync"
|
||||
@@ -19,7 +20,6 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/eventlog"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
tmpubsub "github.com/tendermint/tendermint/internal/pubsub"
|
||||
"github.com/tendermint/tendermint/internal/pubsub/query"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
@@ -67,8 +67,7 @@ type peerManager interface {
|
||||
// to be setup once during startup.
|
||||
type Environment struct {
|
||||
// external, thread safe interfaces
|
||||
ProxyAppQuery proxy.AppConnQuery
|
||||
ProxyAppMempool proxy.AppConnMempool
|
||||
ProxyApp abciclient.Client
|
||||
|
||||
// interfaces defined in types and above
|
||||
StateStore sm.Store
|
||||
|
||||
@@ -158,7 +158,7 @@ func (env *Environment) NumUnconfirmedTxs(ctx context.Context) (*coretypes.Resul
|
||||
// be added to the mempool either.
|
||||
// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx
|
||||
func (env *Environment) CheckTx(ctx context.Context, tx types.Tx) (*coretypes.ResultCheckTx, error) {
|
||||
res, err := env.ProxyAppMempool.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
|
||||
res, err := env.ProxyApp.CheckTx(ctx, abci.RequestCheckTx{Tx: tx})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -6,11 +6,11 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto/encoding"
|
||||
"github.com/tendermint/tendermint/internal/eventbus"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -30,7 +30,7 @@ type BlockExecutor struct {
|
||||
blockStore BlockStore
|
||||
|
||||
// execute the app against this
|
||||
proxyApp proxy.AppConnConsensus
|
||||
proxyApp abciclient.Client
|
||||
|
||||
// events
|
||||
eventBus types.BlockEventPublisher
|
||||
@@ -60,7 +60,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
|
||||
func NewBlockExecutor(
|
||||
stateStore Store,
|
||||
logger log.Logger,
|
||||
proxyApp proxy.AppConnConsensus,
|
||||
proxyApp abciclient.Client,
|
||||
pool mempool.Mempool,
|
||||
evpool EvidencePool,
|
||||
blockStore BlockStore,
|
||||
@@ -375,7 +375,7 @@ func (blockExec *BlockExecutor) Commit(
|
||||
func execBlockOnProxyApp(
|
||||
ctx context.Context,
|
||||
logger log.Logger,
|
||||
proxyAppConn proxy.AppConnConsensus,
|
||||
proxyAppConn abciclient.Client,
|
||||
block *types.Block,
|
||||
store Store,
|
||||
initialHeight int64,
|
||||
@@ -617,7 +617,7 @@ func fireEvents(
|
||||
func ExecCommitBlock(
|
||||
ctx context.Context,
|
||||
be *BlockExecutor,
|
||||
appConnConsensus proxy.AppConnConsensus,
|
||||
appConnConsensus abciclient.Client,
|
||||
block *types.Block,
|
||||
logger log.Logger,
|
||||
store Store,
|
||||
|
||||
@@ -39,9 +39,9 @@ var (
|
||||
|
||||
func TestApplyBlock(t *testing.T) {
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
cc := abciclient.NewLocalClient(logger, app)
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
@@ -52,7 +52,7 @@ func TestApplyBlock(t *testing.T) {
|
||||
state, stateDB, _ := makeState(t, 1, 1)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp.Consensus(),
|
||||
blockExec := sm.NewBlockExecutor(stateStore, logger, proxyApp,
|
||||
mmock.Mempool{}, sm.EmptyEvidencePool{}, blockStore)
|
||||
|
||||
block, err := sf.MakeBlock(state, 1, new(types.Commit))
|
||||
@@ -73,9 +73,10 @@ func TestBeginBlockValidators(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
logger := log.TestingLogger()
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
|
||||
cc := abciclient.NewLocalClient(logger, app)
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
@@ -121,7 +122,7 @@ func TestBeginBlockValidators(t *testing.T) {
|
||||
block, err := sf.MakeBlock(state, 2, lastCommit)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sm.ExecCommitBlock(ctx, nil, proxyApp.Consensus(), block, log.TestingLogger(), stateStore, 1, state)
|
||||
_, err = sm.ExecCommitBlock(ctx, nil, proxyApp, block, log.TestingLogger(), stateStore, 1, state)
|
||||
require.NoError(t, err, tc.desc)
|
||||
|
||||
// -> app receives a list of validators with a bool indicating if they signed
|
||||
@@ -145,8 +146,9 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
proxyApp := proxy.NewAppConns(cc, log.TestingLogger(), proxy.NopMetrics())
|
||||
logger := log.TestingLogger()
|
||||
cc := abciclient.NewLocalClient(logger, app)
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -222,7 +224,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) {
|
||||
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(),
|
||||
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp,
|
||||
mmock.Mempool{}, evpool, blockStore)
|
||||
|
||||
block, err := sf.MakeBlock(state, 1, new(types.Commit))
|
||||
@@ -248,9 +250,9 @@ func TestProcessProposal(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
app := abcimocks.NewBaseMock()
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
cc := abciclient.NewLocalClient(logger, app)
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -261,7 +263,7 @@ func TestProcessProposal(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -451,9 +453,9 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
cc := abciclient.NewLocalClient(logger, app)
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -464,7 +466,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -526,9 +528,9 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
cc := abciclient.NewLocalClient(logger, app)
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err := proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -538,7 +540,7 @@ func TestEndBlockValidatorUpdatesResultingInEmptySet(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
|
||||
@@ -11,16 +11,13 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/crypto/encoding"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
sf "github.com/tendermint/tendermint/internal/state/test/factory"
|
||||
"github.com/tendermint/tendermint/internal/test/factory"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
tmtime "github.com/tendermint/tendermint/libs/time"
|
||||
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
|
||||
@@ -33,12 +30,6 @@ type paramsChangeTestCase struct {
|
||||
params types.ConsensusParams
|
||||
}
|
||||
|
||||
func newTestApp() proxy.AppConns {
|
||||
app := &testApp{}
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
return proxy.NewAppConns(cc, log.NewNopLogger(), proxy.NopMetrics())
|
||||
}
|
||||
|
||||
func makeAndCommitGoodBlock(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
|
||||
@@ -10,10 +10,12 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/crypto/tmhash"
|
||||
memmock "github.com/tendermint/tendermint/internal/mempool/mock"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
"github.com/tendermint/tendermint/internal/state/mocks"
|
||||
statefactory "github.com/tendermint/tendermint/internal/state/test/factory"
|
||||
@@ -30,8 +32,8 @@ const validationTestsStopHeight int64 = 10
|
||||
func TestValidateBlockHeader(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
proxyApp := newTestApp()
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx))
|
||||
|
||||
state, stateDB, privVals := makeState(t, 3, 1)
|
||||
@@ -39,8 +41,8 @@ func TestValidateBlockHeader(t *testing.T) {
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
proxyApp.Consensus(),
|
||||
logger,
|
||||
proxyApp,
|
||||
memmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -119,7 +121,8 @@ func TestValidateBlockCommit(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
proxyApp := newTestApp()
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx))
|
||||
|
||||
state, stateDB, privVals := makeState(t, 1, 1)
|
||||
@@ -127,8 +130,8 @@ func TestValidateBlockCommit(t *testing.T) {
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
proxyApp.Consensus(),
|
||||
logger,
|
||||
proxyApp,
|
||||
memmock.Mempool{},
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -245,7 +248,8 @@ func TestValidateBlockEvidence(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
proxyApp := newTestApp()
|
||||
logger := log.TestingLogger()
|
||||
proxyApp := proxy.New(abciclient.NewLocalClient(logger, &testApp{}), logger, proxy.NopMetrics())
|
||||
require.NoError(t, proxyApp.Start(ctx))
|
||||
|
||||
state, stateDB, privVals := makeState(t, 4, 1)
|
||||
@@ -263,7 +267,7 @@ func TestValidateBlockEvidence(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
log.TestingLogger(),
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
memmock.Mempool{},
|
||||
evpool,
|
||||
blockStore,
|
||||
|
||||
@@ -11,11 +11,11 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/eventbus"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
"github.com/tendermint/tendermint/internal/store"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -135,8 +135,7 @@ type Reactor struct {
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore
|
||||
|
||||
conn proxy.AppConnSnapshot
|
||||
connQuery proxy.AppConnQuery
|
||||
conn abciclient.Client
|
||||
tempDir string
|
||||
snapshotCh *p2p.Channel
|
||||
chunkCh *p2p.Channel
|
||||
@@ -173,8 +172,7 @@ func NewReactor(
|
||||
initialHeight int64,
|
||||
cfg config.StateSyncConfig,
|
||||
logger log.Logger,
|
||||
conn proxy.AppConnSnapshot,
|
||||
connQuery proxy.AppConnQuery,
|
||||
conn abciclient.Client,
|
||||
channelCreator p2p.ChannelCreator,
|
||||
peerUpdates *p2p.PeerUpdates,
|
||||
stateStore sm.Store,
|
||||
@@ -209,7 +207,6 @@ func NewReactor(
|
||||
initialHeight: initialHeight,
|
||||
cfg: cfg,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
snapshotCh: snapshotCh,
|
||||
chunkCh: chunkCh,
|
||||
blockCh: blockCh,
|
||||
@@ -287,7 +284,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
|
||||
r.cfg,
|
||||
r.logger,
|
||||
r.conn,
|
||||
r.connQuery,
|
||||
r.stateProvider,
|
||||
r.snapshotCh,
|
||||
r.chunkCh,
|
||||
|
||||
@@ -13,11 +13,11 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
clientmocks "github.com/tendermint/tendermint/abci/client/mocks"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks"
|
||||
smmocks "github.com/tendermint/tendermint/internal/state/mocks"
|
||||
"github.com/tendermint/tendermint/internal/statesync/mocks"
|
||||
"github.com/tendermint/tendermint/internal/store"
|
||||
@@ -37,8 +37,7 @@ type reactorTestSuite struct {
|
||||
reactor *Reactor
|
||||
syncer *syncer
|
||||
|
||||
conn *proxymocks.AppConnSnapshot
|
||||
connQuery *proxymocks.AppConnQuery
|
||||
conn *clientmocks.Client
|
||||
stateProvider *mocks.StateProvider
|
||||
|
||||
snapshotChannel *p2p.Channel
|
||||
@@ -71,21 +70,14 @@ type reactorTestSuite struct {
|
||||
func setup(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
conn *proxymocks.AppConnSnapshot,
|
||||
connQuery *proxymocks.AppConnQuery,
|
||||
conn *clientmocks.Client,
|
||||
stateProvider *mocks.StateProvider,
|
||||
chBuf uint,
|
||||
) *reactorTestSuite {
|
||||
t.Helper()
|
||||
|
||||
if conn == nil {
|
||||
conn = &proxymocks.AppConnSnapshot{}
|
||||
}
|
||||
if connQuery == nil {
|
||||
connQuery = &proxymocks.AppConnQuery{}
|
||||
}
|
||||
if stateProvider == nil {
|
||||
stateProvider = &mocks.StateProvider{}
|
||||
conn = &clientmocks.Client{}
|
||||
}
|
||||
|
||||
rts := &reactorTestSuite{
|
||||
@@ -102,7 +94,6 @@ func setup(
|
||||
paramsOutCh: make(chan p2p.Envelope, chBuf),
|
||||
paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
stateProvider: stateProvider,
|
||||
}
|
||||
|
||||
@@ -171,7 +162,6 @@ func setup(
|
||||
*cfg,
|
||||
logger.With("component", "reactor"),
|
||||
conn,
|
||||
connQuery,
|
||||
chCreator,
|
||||
rts.peerUpdates,
|
||||
rts.stateStore,
|
||||
@@ -186,7 +176,6 @@ func setup(
|
||||
*cfg,
|
||||
logger.With("component", "syncer"),
|
||||
conn,
|
||||
connQuery,
|
||||
stateProvider,
|
||||
rts.snapshotChannel,
|
||||
rts.chunkChannel,
|
||||
@@ -211,7 +200,7 @@ func TestReactor_Sync(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
const snapshotHeight = 7
|
||||
rts := setup(ctx, t, nil, nil, nil, 2)
|
||||
rts := setup(ctx, t, nil, nil, 2)
|
||||
chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
|
||||
// app accepts any snapshot
|
||||
rts.conn.On("OfferSnapshot", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
|
||||
@@ -222,7 +211,7 @@ func TestReactor_Sync(t *testing.T) {
|
||||
Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
|
||||
|
||||
// app query returns valid state app hash
|
||||
rts.connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
|
||||
rts.conn.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
|
||||
AppVersion: testAppVersion,
|
||||
LastBlockHeight: snapshotHeight,
|
||||
LastBlockAppHash: chain[snapshotHeight+1].AppHash,
|
||||
@@ -265,7 +254,7 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, nil, 2)
|
||||
rts := setup(ctx, t, nil, nil, 2)
|
||||
|
||||
rts.chunkInCh <- p2p.Envelope{
|
||||
From: types.NodeID("aa"),
|
||||
@@ -316,14 +305,14 @@ func TestReactor_ChunkRequest(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
// mock ABCI connection to return local snapshots
|
||||
conn := &proxymocks.AppConnSnapshot{}
|
||||
conn := &clientmocks.Client{}
|
||||
conn.On("LoadSnapshotChunk", mock.Anything, abci.RequestLoadSnapshotChunk{
|
||||
Height: tc.request.Height,
|
||||
Format: tc.request.Format,
|
||||
Chunk: tc.request.Index,
|
||||
}).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
|
||||
|
||||
rts := setup(ctx, t, conn, nil, nil, 2)
|
||||
rts := setup(ctx, t, conn, nil, 2)
|
||||
|
||||
rts.chunkInCh <- p2p.Envelope{
|
||||
From: types.NodeID("aa"),
|
||||
@@ -343,7 +332,7 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, nil, 2)
|
||||
rts := setup(ctx, t, nil, nil, 2)
|
||||
|
||||
rts.snapshotInCh <- p2p.Envelope{
|
||||
From: types.NodeID("aa"),
|
||||
@@ -403,12 +392,12 @@ func TestReactor_SnapshotsRequest(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
// mock ABCI connection to return local snapshots
|
||||
conn := &proxymocks.AppConnSnapshot{}
|
||||
conn := &clientmocks.Client{}
|
||||
conn.On("ListSnapshots", mock.Anything, abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
|
||||
Snapshots: tc.snapshots,
|
||||
}, nil)
|
||||
|
||||
rts := setup(ctx, t, conn, nil, nil, 100)
|
||||
rts := setup(ctx, t, conn, nil, 100)
|
||||
|
||||
rts.snapshotInCh <- p2p.Envelope{
|
||||
From: types.NodeID("aa"),
|
||||
@@ -435,7 +424,7 @@ func TestReactor_LightBlockResponse(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, nil, 2)
|
||||
rts := setup(ctx, t, nil, nil, 2)
|
||||
|
||||
var height int64 = 10
|
||||
// generates a random header
|
||||
@@ -492,7 +481,7 @@ func TestReactor_BlockProviders(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, nil, 2)
|
||||
rts := setup(ctx, t, nil, nil, 2)
|
||||
rts.peerUpdateCh <- p2p.PeerUpdate{
|
||||
NodeID: types.NodeID("aa"),
|
||||
Status: p2p.PeerStatusUp,
|
||||
@@ -559,7 +548,7 @@ func TestReactor_StateProviderP2P(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, nil, 2)
|
||||
rts := setup(ctx, t, nil, nil, 2)
|
||||
// make syncer non nil else test won't think we are state syncing
|
||||
rts.reactor.syncer = rts.syncer
|
||||
peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
|
||||
@@ -636,7 +625,7 @@ func TestReactor_Backfill(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
t.Cleanup(leaktest.CheckTimeout(t, 1*time.Minute))
|
||||
rts := setup(ctx, t, nil, nil, nil, 21)
|
||||
rts := setup(ctx, t, nil, nil, 21)
|
||||
|
||||
var (
|
||||
startHeight int64 = 20
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
@@ -54,8 +55,7 @@ var (
|
||||
type syncer struct {
|
||||
logger log.Logger
|
||||
stateProvider StateProvider
|
||||
conn proxy.AppConnSnapshot
|
||||
connQuery proxy.AppConnQuery
|
||||
conn abciclient.Client
|
||||
snapshots *snapshotPool
|
||||
snapshotCh *p2p.Channel
|
||||
chunkCh *p2p.Channel
|
||||
@@ -76,8 +76,7 @@ type syncer struct {
|
||||
func newSyncer(
|
||||
cfg config.StateSyncConfig,
|
||||
logger log.Logger,
|
||||
conn proxy.AppConnSnapshot,
|
||||
connQuery proxy.AppConnQuery,
|
||||
conn abciclient.Client,
|
||||
stateProvider StateProvider,
|
||||
snapshotCh *p2p.Channel,
|
||||
chunkCh *p2p.Channel,
|
||||
@@ -88,7 +87,6 @@ func newSyncer(
|
||||
logger: logger,
|
||||
stateProvider: stateProvider,
|
||||
conn: conn,
|
||||
connQuery: connQuery,
|
||||
snapshots: newSnapshotPool(),
|
||||
snapshotCh: snapshotCh,
|
||||
chunkCh: chunkCh,
|
||||
@@ -547,7 +545,7 @@ func (s *syncer) requestChunk(ctx context.Context, snapshot *snapshot, chunk uin
|
||||
|
||||
// verifyApp verifies the sync, checking the app hash, last block height and app version
|
||||
func (s *syncer) verifyApp(ctx context.Context, snapshot *snapshot, appVersion uint64) error {
|
||||
resp, err := s.connQuery.Info(ctx, proxy.RequestInfo)
|
||||
resp, err := s.conn.Info(ctx, proxy.RequestInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to query ABCI app for appHash: %w", err)
|
||||
}
|
||||
|
||||
@@ -11,9 +11,9 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
clientmocks "github.com/tendermint/tendermint/abci/client/mocks"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
proxymocks "github.com/tendermint/tendermint/internal/proxy/mocks"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
"github.com/tendermint/tendermint/internal/statesync/mocks"
|
||||
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
||||
@@ -62,13 +62,12 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
stateProvider.On("AppHash", mock.Anything, uint64(2)).Return([]byte("app_hash_2"), nil)
|
||||
stateProvider.On("Commit", mock.Anything, uint64(1)).Return(commit, nil)
|
||||
stateProvider.On("State", mock.Anything, uint64(1)).Return(state, nil)
|
||||
connSnapshot := &proxymocks.AppConnSnapshot{}
|
||||
connQuery := &proxymocks.AppConnQuery{}
|
||||
conn := &clientmocks.Client{}
|
||||
|
||||
peerAID := types.NodeID("aa")
|
||||
peerBID := types.NodeID("bb")
|
||||
peerCID := types.NodeID("cc")
|
||||
rts := setup(ctx, t, connSnapshot, connQuery, stateProvider, 4)
|
||||
rts := setup(ctx, t, conn, stateProvider, 4)
|
||||
|
||||
rts.reactor.syncer = rts.syncer
|
||||
|
||||
@@ -110,7 +109,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
|
||||
// We start a sync, with peers sending back chunks when requested. We first reject the snapshot
|
||||
// with height 2 format 2, and accept the snapshot at height 1.
|
||||
connSnapshot.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
|
||||
conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
|
||||
Snapshot: &abci.Snapshot{
|
||||
Height: 2,
|
||||
Format: 2,
|
||||
@@ -119,7 +118,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
},
|
||||
AppHash: []byte("app_hash_2"),
|
||||
}).Return(&abci.ResponseOfferSnapshot{Result: abci.ResponseOfferSnapshot_REJECT_FORMAT}, nil)
|
||||
connSnapshot.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
|
||||
conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
|
||||
Snapshot: &abci.Snapshot{
|
||||
Height: s.Height,
|
||||
Format: s.Format,
|
||||
@@ -171,7 +170,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
// The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
|
||||
// which should cause it to keep the existing chunk 0 and 2, and restart restoration from
|
||||
// beginning. We also wait for a little while, to exercise the retry logic in fetchChunks().
|
||||
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
|
||||
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
|
||||
Index: 2, Chunk: []byte{1, 1, 2},
|
||||
}).Once().Run(func(args mock.Arguments) { time.Sleep(1 * time.Second) }).Return(
|
||||
&abci.ResponseApplySnapshotChunk{
|
||||
@@ -179,16 +178,16 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
RefetchChunks: []uint32{1},
|
||||
}, nil)
|
||||
|
||||
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
|
||||
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
|
||||
Index: 0, Chunk: []byte{1, 1, 0},
|
||||
}).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
|
||||
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
|
||||
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
|
||||
Index: 1, Chunk: []byte{1, 1, 1},
|
||||
}).Times(2).Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
|
||||
connSnapshot.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
|
||||
conn.On("ApplySnapshotChunk", mock.Anything, abci.RequestApplySnapshotChunk{
|
||||
Index: 2, Chunk: []byte{1, 1, 2},
|
||||
}).Once().Return(&abci.ResponseApplySnapshotChunk{Result: abci.ResponseApplySnapshotChunk_ACCEPT}, nil)
|
||||
connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
|
||||
conn.On("Info", mock.Anything, proxy.RequestInfo).Return(&abci.ResponseInfo{
|
||||
AppVersion: testAppVersion,
|
||||
LastBlockHeight: 1,
|
||||
LastBlockAppHash: []byte("app_hash"),
|
||||
@@ -217,8 +216,7 @@ func TestSyncer_SyncAny(t *testing.T) {
|
||||
require.Equal(t, int64(len(rts.syncer.snapshots.snapshots)), rts.reactor.TotalSnapshots())
|
||||
require.Equal(t, int64(0), rts.reactor.SnapshotChunksCount())
|
||||
|
||||
connSnapshot.AssertExpectations(t)
|
||||
connQuery.AssertExpectations(t)
|
||||
conn.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
|
||||
@@ -228,7 +226,7 @@ func TestSyncer_SyncAny_noSnapshots(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
_, _, err := rts.syncer.SyncAny(ctx, 0, func() error { return nil })
|
||||
require.Equal(t, errNoSnapshots, err)
|
||||
@@ -241,7 +239,7 @@ func TestSyncer_SyncAny_abort(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
|
||||
peerID := types.NodeID("aa")
|
||||
@@ -265,7 +263,7 @@ func TestSyncer_SyncAny_reject(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
// s22 is tried first, then s12, then s11, then errNoSnapshots
|
||||
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
|
||||
@@ -307,7 +305,7 @@ func TestSyncer_SyncAny_reject_format(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
// s22 is tried first, which reject s22 and s12, then s11 will abort.
|
||||
s22 := &snapshot{Height: 2, Format: 2, Chunks: 3, Hash: []byte{1, 2, 3}}
|
||||
@@ -345,7 +343,7 @@ func TestSyncer_SyncAny_reject_sender(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
peerAID := types.NodeID("aa")
|
||||
peerBID := types.NodeID("bb")
|
||||
@@ -394,7 +392,7 @@ func TestSyncer_SyncAny_abciError(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
errBoom := errors.New("boom")
|
||||
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}}
|
||||
@@ -444,7 +442,7 @@ func TestSyncer_offerSnapshot(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
s := &snapshot{Height: 1, Format: 1, Chunks: 3, Hash: []byte{1, 2, 3}, trustedAppHash: []byte("app_hash")}
|
||||
rts.conn.On("OfferSnapshot", mock.Anything, abci.RequestOfferSnapshot{
|
||||
@@ -497,7 +495,7 @@ func TestSyncer_applyChunks_Results(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
body := []byte{1, 2, 3}
|
||||
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 1}, t.TempDir())
|
||||
@@ -557,7 +555,7 @@ func TestSyncer_applyChunks_RefetchChunks(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
chunks, err := newChunkQueue(&snapshot{Height: 1, Format: 1, Chunks: 3}, t.TempDir())
|
||||
require.NoError(t, err)
|
||||
@@ -628,7 +626,7 @@ func TestSyncer_applyChunks_RejectSenders(t *testing.T) {
|
||||
stateProvider := &mocks.StateProvider{}
|
||||
stateProvider.On("AppHash", mock.Anything, mock.Anything).Return([]byte("app_hash"), nil)
|
||||
|
||||
rts := setup(ctx, t, nil, nil, stateProvider, 2)
|
||||
rts := setup(ctx, t, nil, stateProvider, 2)
|
||||
|
||||
// Set up three peers across two snapshots, and ask for one of them to be banned.
|
||||
// It should be banned from all snapshots.
|
||||
@@ -761,9 +759,9 @@ func TestSyncer_verifyApp(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
rts := setup(ctx, t, nil, nil, nil, 2)
|
||||
rts := setup(ctx, t, nil, nil, 2)
|
||||
|
||||
rts.connQuery.On("Info", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err)
|
||||
rts.conn.On("Info", mock.Anything, proxy.RequestInfo).Return(tc.response, tc.err)
|
||||
err := rts.syncer.verifyApp(ctx, s, appVersion)
|
||||
unwrapped := errors.Unwrap(err)
|
||||
if unwrapped != nil {
|
||||
|
||||
29
node/node.go
29
node/node.go
@@ -100,7 +100,10 @@ func newDefaultNode(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
appClient, _ := proxy.DefaultClientCreator(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
|
||||
appClient, _, err := proxy.ClientFactory(logger, cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return makeNode(
|
||||
ctx,
|
||||
@@ -120,7 +123,7 @@ func makeNode(
|
||||
cfg *config.Config,
|
||||
filePrivval *privval.FilePV,
|
||||
nodeKey types.NodeKey,
|
||||
clientCreator abciclient.Creator,
|
||||
client abciclient.Client,
|
||||
genesisDocProvider genesisDocProvider,
|
||||
dbProvider config.DBProvider,
|
||||
logger log.Logger,
|
||||
@@ -155,7 +158,7 @@ func makeNode(
|
||||
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
|
||||
|
||||
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
|
||||
proxyApp := proxy.NewAppConns(clientCreator, logger.With("module", "proxy"), nodeMetrics.proxy)
|
||||
proxyApp := proxy.New(client, logger.With("module", "proxy"), nodeMetrics.proxy)
|
||||
if err := proxyApp.Start(ctx); err != nil {
|
||||
return nil, fmt.Errorf("error starting proxy app connections: %w", err)
|
||||
}
|
||||
@@ -281,7 +284,7 @@ func makeNode(
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger.With("module", "state"),
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mp,
|
||||
evPool,
|
||||
blockStore,
|
||||
@@ -339,8 +342,7 @@ func makeNode(
|
||||
genDoc.InitialHeight,
|
||||
*cfg.StateSync,
|
||||
logger.With("module", "statesync"),
|
||||
proxyApp.Snapshot(),
|
||||
proxyApp.Query(),
|
||||
proxyApp,
|
||||
router.OpenChannel,
|
||||
peerManager.Subscribe(ctx),
|
||||
stateStore,
|
||||
@@ -390,14 +392,13 @@ func makeNode(
|
||||
shutdownOps: makeCloser(closers),
|
||||
|
||||
rpcEnv: &rpccore.Environment{
|
||||
ProxyAppQuery: proxyApp.Query(),
|
||||
ProxyAppMempool: proxyApp.Mempool(),
|
||||
|
||||
StateStore: stateStore,
|
||||
BlockStore: blockStore,
|
||||
ProxyApp: proxyApp,
|
||||
EvidencePool: evPool,
|
||||
ConsensusState: csState,
|
||||
|
||||
StateStore: stateStore,
|
||||
BlockStore: blockStore,
|
||||
|
||||
ConsensusReactor: csReactor,
|
||||
BlockSyncReactor: bcReactor,
|
||||
|
||||
@@ -752,14 +753,14 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene
|
||||
return state, nil
|
||||
}
|
||||
|
||||
func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
|
||||
func getRouterConfig(conf *config.Config, proxyApp abciclient.Client) p2p.RouterOptions {
|
||||
opts := p2p.RouterOptions{
|
||||
QueueType: conf.P2P.QueueType,
|
||||
}
|
||||
|
||||
if conf.FilterPeers && proxyApp != nil {
|
||||
opts.FilterPeerByID = func(ctx context.Context, id types.NodeID) error {
|
||||
res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{
|
||||
res, err := proxyApp.Query(ctx, abci.RequestQuery{
|
||||
Path: fmt.Sprintf("/p2p/filter/id/%s", id),
|
||||
})
|
||||
if err != nil {
|
||||
@@ -773,7 +774,7 @@ func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOpt
|
||||
}
|
||||
|
||||
opts.FilterPeerByIP = func(ctx context.Context, ip net.IP, port uint16) error {
|
||||
res, err := proxyApp.Query().Query(ctx, abci.RequestQuery{
|
||||
res, err := proxyApp.Query(ctx, abci.RequestQuery{
|
||||
Path: fmt.Sprintf("/p2p/filter/addr/%s", net.JoinHostPort(ip.String(), strconv.Itoa(int(port)))),
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -273,8 +273,8 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
|
||||
logger := log.NewNopLogger()
|
||||
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
cc := abciclient.NewLocalClient(logger, kvstore.NewApplication())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err = proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -291,7 +291,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
mp := mempool.NewTxMempool(
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
proxyApp,
|
||||
)
|
||||
|
||||
// Make EvidencePool
|
||||
@@ -327,7 +327,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mp,
|
||||
evidencePool,
|
||||
blockStore,
|
||||
@@ -371,8 +371,8 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
|
||||
logger := log.NewNopLogger()
|
||||
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
cc := abciclient.NewLocalClient(logger, kvstore.NewApplication())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err = proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -390,7 +390,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
mp := mempool.NewTxMempool(
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
proxyApp,
|
||||
)
|
||||
|
||||
// fill the mempool with one txs just below the maximum size
|
||||
@@ -402,7 +402,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mp,
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
@@ -438,8 +438,8 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
|
||||
logger := log.NewNopLogger()
|
||||
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc, logger, proxy.NopMetrics())
|
||||
cc := abciclient.NewLocalClient(logger, kvstore.NewApplication())
|
||||
proxyApp := proxy.New(cc, logger, proxy.NopMetrics())
|
||||
err = proxyApp.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -454,7 +454,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
mp := mempool.NewTxMempool(
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
proxyApp,
|
||||
)
|
||||
|
||||
// fill the mempool with one txs just below the maximum size
|
||||
@@ -473,7 +473,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
stateStore,
|
||||
logger,
|
||||
proxyApp.Consensus(),
|
||||
proxyApp,
|
||||
mp,
|
||||
sm.EmptyEvidencePool{},
|
||||
blockStore,
|
||||
|
||||
@@ -35,7 +35,7 @@ func New(
|
||||
ctx context.Context,
|
||||
conf *config.Config,
|
||||
logger log.Logger,
|
||||
cf abciclient.Creator,
|
||||
cf abciclient.Client,
|
||||
gen *types.GenesisDoc,
|
||||
) (service.Service, error) {
|
||||
nodeKey, err := types.LoadOrGenNodeKey(conf.NodeKeyFile())
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/internal/blocksync"
|
||||
@@ -20,7 +21,6 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/p2p/conn"
|
||||
"github.com/tendermint/tendermint/internal/p2p/pex"
|
||||
"github.com/tendermint/tendermint/internal/proxy"
|
||||
sm "github.com/tendermint/tendermint/internal/state"
|
||||
"github.com/tendermint/tendermint/internal/state/indexer"
|
||||
"github.com/tendermint/tendermint/internal/state/indexer/sink"
|
||||
@@ -171,7 +171,7 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
|
||||
func createMempoolReactor(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
store sm.Store,
|
||||
memplMetrics *mempool.Metrics,
|
||||
peerManager *p2p.PeerManager,
|
||||
@@ -183,7 +183,7 @@ func createMempoolReactor(
|
||||
mp := mempool.NewTxMempool(
|
||||
logger,
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
proxyApp,
|
||||
mempool.WithMetrics(memplMetrics),
|
||||
mempool.WithPreCheck(sm.TxPreCheckFromStore(store)),
|
||||
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
|
||||
@@ -387,7 +387,7 @@ func createRouter(
|
||||
nodeKey types.NodeKey,
|
||||
peerManager *p2p.PeerManager,
|
||||
cfg *config.Config,
|
||||
proxyApp proxy.AppConns,
|
||||
proxyApp abciclient.Client,
|
||||
) (*p2p.Router, error) {
|
||||
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
|
||||
@@ -98,7 +98,7 @@ func StartTendermint(
|
||||
}
|
||||
|
||||
}
|
||||
papp := abciclient.NewLocalCreator(app)
|
||||
papp := abciclient.NewLocalClient(logger, app)
|
||||
tmNode, err := node.New(ctx, conf, logger, papp, nil)
|
||||
if err != nil {
|
||||
return nil, func(_ context.Context) error { cancel(); return nil }, err
|
||||
|
||||
@@ -136,7 +136,7 @@ func startNode(ctx context.Context, cfg *Config) error {
|
||||
ctx,
|
||||
tmcfg,
|
||||
nodeLogger,
|
||||
abciclient.NewLocalCreator(app),
|
||||
abciclient.NewLocalClient(nodeLogger, app),
|
||||
nil,
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
@@ -15,9 +15,9 @@ var getMp func() mempool.Mempool
|
||||
|
||||
func init() {
|
||||
app := kvstore.NewApplication()
|
||||
cc := abciclient.NewLocalCreator(app)
|
||||
appConnMem, _ := cc(log.NewNopLogger())
|
||||
err := appConnMem.Start(context.TODO())
|
||||
logger := log.NewNopLogger()
|
||||
conn := abciclient.NewLocalClient(logger, app)
|
||||
err := conn.Start(context.TODO())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -27,12 +27,7 @@ func init() {
|
||||
|
||||
getMp = func() mempool.Mempool {
|
||||
if mp == nil {
|
||||
mp = mempool.NewTxMempool(
|
||||
log.NewNopLogger(),
|
||||
cfg,
|
||||
appConnMem,
|
||||
)
|
||||
|
||||
mp = mempool.NewTxMempool(logger, cfg, conn)
|
||||
}
|
||||
return mp
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user