node: update bootstrapping for blockchain reactors

This commit is contained in:
Aleksandr Bezobchuk
2021-01-11 09:36:06 -05:00
parent 95b08bb775
commit 42049faed6
2 changed files with 111 additions and 46 deletions

View File

@@ -191,7 +191,7 @@ type Node struct {
eventBus *types.EventBus // pub/sub for services
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor p2p.Reactor // for fast-syncing
bcReactor service.Service // for fast-syncing
mempoolReactor *mempl.Reactor // for gossipping transactions
mempool mempl.Mempool
stateSync bool // whether the node should state sync on startup
@@ -369,24 +369,43 @@ func createEvidenceReactor(
return evidenceReactorShim, evidenceReactor, evidencePool, nil
}
func createBlockchainReactor(config *cfg.Config,
func createBlockchainReactor(
logger log.Logger,
config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
csReactor *cs.Reactor,
fastSync bool,
logger log.Logger) (bcReactor p2p.Reactor, err error) {
) (*p2p.ReactorShim, service.Service, error) {
logger = logger.With("module", "blockchain")
switch config.FastSync.Version {
case "v0":
bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
case "v2":
bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}
reactorShim := p2p.NewReactorShim(logger, "BlokchainShim", bcv0.ChannelShims)
reactor := bcv0.NewReactor(
logger,
state.Copy(),
blockExec,
blockStore,
csReactor,
reactorShim.GetChannel(bcv0.BlockchainChannel),
reactorShim.PeerUpdates,
fastSync,
)
bcReactor.SetLogger(logger.With("module", "blockchain"))
return bcReactor, nil
return reactorShim, reactor, nil
case "v2":
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactor.SetLogger(logger)
return nil, reactor, nil
default:
return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}
}
func createConsensusReactor(config *cfg.Config,
@@ -737,12 +756,28 @@ func NewNode(config *cfg.Config,
sm.BlockExecutorWithMetrics(smMetrics),
)
// Make BlockchainReactor. Don't start fast sync if we're doing a state sync first.
bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger)
csReactor, csState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
)
// Create the blockchain reactor. Note, we do not start fast sync if we're
// doing a state sync first.
bcReactorShim, bcReactor, err := createBlockchainReactor(
logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync,
)
if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
}
// TODO: Remove this once the switch is removed.
var bcReactorForSwitch p2p.Reactor
if bcReactorShim != nil {
bcReactorForSwitch = bcReactorShim
} else {
bcReactorForSwitch = bcReactor.(p2p.Reactor)
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
if stateSync {
@@ -751,11 +786,6 @@ func NewNode(config *cfg.Config,
csMetrics.FastSyncing.Set(1)
}
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evPool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
)
// Set up state sync reactor, and schedule a sync if requested.
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
// we should clean this whole thing up. See:
@@ -781,8 +811,8 @@ func NewNode(config *cfg.Config,
p2pLogger := logger.With("module", "p2p")
transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp)
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch,
stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
@@ -840,8 +870,8 @@ func NewNode(config *cfg.Config,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
mempool: mempool,
consensusState: consensusState,
consensusReactor: consensusReactor,
consensusState: csState,
consensusReactor: csReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
@@ -1276,9 +1306,11 @@ func makeNodeInfo(
var bcChannel byte
switch config.FastSync.Version {
case "v0":
bcChannel = bcv0.BlockchainChannel
bcChannel = byte(bcv0.BlockchainChannel)
case "v2":
bcChannel = bcv2.BlockchainChannel
default:
return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}