mirror of
https://github.com/tendermint/tendermint.git
synced 2026-04-12 03:37:04 +00:00
Merge remote-tracking branch 'origin/master' into wb/abci-finalize-block-synchronize
This commit is contained in:
57
node/node.go
57
node/node.go
@@ -58,7 +58,6 @@ type nodeImpl struct {
|
||||
router *p2p.Router
|
||||
nodeInfo types.NodeInfo
|
||||
nodeKey types.NodeKey // our node privkey
|
||||
isListening bool
|
||||
|
||||
// services
|
||||
eventSinks []indexer.EventSink
|
||||
@@ -144,11 +143,8 @@ func makeNode(
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
err = genDoc.ValidateAndComplete()
|
||||
if err != nil {
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("error in genesis doc: %w", err),
|
||||
makeCloser(closers))
|
||||
if err = genDoc.ValidateAndComplete(); err != nil {
|
||||
return nil, combineCloseError(fmt.Errorf("error in genesis doc: %w", err), makeCloser(closers))
|
||||
}
|
||||
|
||||
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
|
||||
@@ -242,10 +238,6 @@ func makeNode(
|
||||
}
|
||||
}
|
||||
|
||||
// Determine whether we should do block sync. This must happen after the handshake, since the
|
||||
// app may modify the validator set, specifying ourself as the only validator.
|
||||
blockSync := !onlyValidatorIsUs(state, pubKey)
|
||||
|
||||
logNodeStartupInfo(state, pubKey, logger, cfg.Mode)
|
||||
|
||||
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
|
||||
@@ -272,14 +264,14 @@ func makeNode(
|
||||
}
|
||||
|
||||
mpReactor, mp, err := createMempoolReactor(ctx,
|
||||
cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
|
||||
cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
evReactor, evPool, err := createEvidenceReactor(ctx,
|
||||
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
|
||||
cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
@@ -296,8 +288,12 @@ func makeNode(
|
||||
sm.BlockExecutorWithMetrics(nodeMetrics.state),
|
||||
)
|
||||
|
||||
// Determine whether we should do block sync. This must happen after the handshake, since the
|
||||
// app may modify the validator set, specifying ourself as the only validator.
|
||||
blockSync := !onlyValidatorIsUs(state, pubKey)
|
||||
|
||||
csReactor, csState, err := createConsensusReactor(ctx,
|
||||
cfg, state, blockExec, blockStore, mp, evPool,
|
||||
cfg, stateStore, blockExec, blockStore, mp, evPool,
|
||||
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
|
||||
peerManager, router, logger,
|
||||
)
|
||||
@@ -309,7 +305,7 @@ func makeNode(
|
||||
// doing a state sync first.
|
||||
bcReactor, err := blocksync.NewReactor(ctx,
|
||||
logger.With("module", "blockchain"),
|
||||
state.Copy(),
|
||||
stateStore,
|
||||
blockExec,
|
||||
blockStore,
|
||||
csReactor,
|
||||
@@ -421,8 +417,6 @@ func makeNode(
|
||||
node.rpcEnv.PubKey = pubKey
|
||||
}
|
||||
|
||||
node.rpcEnv.P2PTransport = node
|
||||
|
||||
node.BaseService = *service.NewBaseService(logger, "Node", node)
|
||||
|
||||
return node, nil
|
||||
@@ -467,6 +461,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
n.rpcEnv.NodeInfo = n.nodeInfo
|
||||
// Start the RPC server before the P2P server
|
||||
// so we can eg. receive txs for the first block
|
||||
if n.config.RPC.ListenAddress != "" {
|
||||
@@ -485,7 +480,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
|
||||
if err := n.router.Start(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
n.isListening = true
|
||||
n.rpcEnv.IsListening = true
|
||||
|
||||
for _, reactor := range n.services {
|
||||
if err := reactor.Start(ctx); err != nil {
|
||||
@@ -580,7 +575,7 @@ func (n *nodeImpl) OnStop() {
|
||||
|
||||
n.stateSyncReactor.Wait()
|
||||
n.router.Wait()
|
||||
n.isListening = false
|
||||
n.rpcEnv.IsListening = false
|
||||
|
||||
// finally stop the listeners / external services
|
||||
for _, l := range n.rpcListeners {
|
||||
@@ -669,21 +664,6 @@ func (n *nodeImpl) RPCEnvironment() *rpccore.Environment {
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
func (n *nodeImpl) Listeners() []string {
|
||||
return []string{
|
||||
fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress),
|
||||
}
|
||||
}
|
||||
|
||||
func (n *nodeImpl) IsListening() bool {
|
||||
return n.isListening
|
||||
}
|
||||
|
||||
// NodeInfo returns the Node's Info from the Switch.
|
||||
func (n *nodeImpl) NodeInfo() types.NodeInfo {
|
||||
return n.nodeInfo
|
||||
}
|
||||
|
||||
// genesisDocProvider returns a GenesisDoc.
|
||||
// It allows the GenesisDoc to be pulled from sources other than the
|
||||
// filesystem, for instance from a distributed key-value store cluster.
|
||||
@@ -747,10 +727,7 @@ func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
|
||||
// loadStateFromDBOrGenesisDocProvider attempts to load the state from the
|
||||
// database, or creates one using the given genesisDocProvider. On success this also
|
||||
// returns the genesis doc loaded through the given provider.
|
||||
func loadStateFromDBOrGenesisDocProvider(
|
||||
stateStore sm.Store,
|
||||
genDoc *types.GenesisDoc,
|
||||
) (sm.State, error) {
|
||||
func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.GenesisDoc) (sm.State, error) {
|
||||
|
||||
// 1. Attempt to load state form the database
|
||||
state, err := stateStore.Load()
|
||||
@@ -764,6 +741,12 @@ func loadStateFromDBOrGenesisDocProvider(
|
||||
if err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
|
||||
// 3. save the gensis document to the state store so
|
||||
// its fetchable by other callers.
|
||||
if err := stateStore.Save(state); err != nil {
|
||||
return sm.State{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return state, nil
|
||||
|
||||
@@ -292,7 +292,6 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
)
|
||||
|
||||
// Make EvidencePool
|
||||
@@ -392,7 +391,6 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
)
|
||||
|
||||
// fill the mempool with one txs just below the maximum size
|
||||
@@ -457,7 +455,6 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
logger.With("module", "mempool"),
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
)
|
||||
|
||||
// fill the mempool with one txs just below the maximum size
|
||||
|
||||
@@ -172,7 +172,7 @@ func createMempoolReactor(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
proxyApp proxy.AppConns,
|
||||
state sm.State,
|
||||
store sm.Store,
|
||||
memplMetrics *mempool.Metrics,
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
@@ -184,10 +184,9 @@ func createMempoolReactor(
|
||||
logger,
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempool.WithMetrics(memplMetrics),
|
||||
mempool.WithPreCheck(sm.TxPreCheck(state)),
|
||||
mempool.WithPostCheck(sm.TxPostCheck(state)),
|
||||
mempool.WithPreCheck(sm.TxPreCheckFromStore(store)),
|
||||
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
|
||||
)
|
||||
|
||||
reactor, err := mempool.NewReactor(
|
||||
@@ -214,7 +213,7 @@ func createEvidenceReactor(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
stateDB dbm.DB,
|
||||
store sm.Store,
|
||||
blockStore *store.BlockStore,
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
@@ -229,7 +228,7 @@ func createEvidenceReactor(
|
||||
|
||||
logger = logger.With("module", "evidence")
|
||||
|
||||
evidencePool, err := evidence.NewPool(logger, evidenceDB, sm.NewStore(stateDB), blockStore, metrics)
|
||||
evidencePool, err := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("creating evidence pool: %w", err)
|
||||
}
|
||||
@@ -253,7 +252,7 @@ func createEvidenceReactor(
|
||||
func createConsensusReactor(
|
||||
ctx context.Context,
|
||||
cfg *config.Config,
|
||||
state sm.State,
|
||||
store sm.Store,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore sm.BlockStore,
|
||||
mp mempool.Mempool,
|
||||
@@ -268,16 +267,19 @@ func createConsensusReactor(
|
||||
) (*consensus.Reactor, *consensus.State, error) {
|
||||
logger = logger.With("module", "consensus")
|
||||
|
||||
consensusState := consensus.NewState(ctx,
|
||||
consensusState, err := consensus.NewState(ctx,
|
||||
logger,
|
||||
cfg.Consensus,
|
||||
state.Copy(),
|
||||
store,
|
||||
blockExec,
|
||||
blockStore,
|
||||
mp,
|
||||
evidencePool,
|
||||
consensus.StateMetrics(csMetrics),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if privValidator != nil && cfg.Mode == config.ModeValidator {
|
||||
consensusState.SetPrivValidator(ctx, privValidator)
|
||||
|
||||
Reference in New Issue
Block a user