diff --git a/node/node.go b/node/node.go index b79aa1d6c..ae6535d5f 100644 --- a/node/node.go +++ b/node/node.go @@ -191,7 +191,7 @@ type Node struct { eventBus *types.EventBus // pub/sub for services stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk - bcReactor p2p.Reactor // for fast-syncing + bcReactor service.Service // for fast-syncing mempoolReactor *mempl.Reactor // for gossipping transactions mempool mempl.Mempool stateSync bool // whether the node should state sync on startup @@ -369,24 +369,43 @@ func createEvidenceReactor( return evidenceReactorShim, evidenceReactor, evidencePool, nil } -func createBlockchainReactor(config *cfg.Config, +func createBlockchainReactor( + logger log.Logger, + config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore *store.BlockStore, + csReactor *cs.Reactor, fastSync bool, - logger log.Logger) (bcReactor p2p.Reactor, err error) { +) (*p2p.ReactorShim, service.Service, error) { + + logger = logger.With("module", "blockchain") switch config.FastSync.Version { case "v0": - bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - case "v2": - bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - default: - return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) - } + reactorShim := p2p.NewReactorShim(logger, "BlokchainShim", bcv0.ChannelShims) + reactor := bcv0.NewReactor( + logger, + state.Copy(), + blockExec, + blockStore, + csReactor, + reactorShim.GetChannel(bcv0.BlockchainChannel), + reactorShim.PeerUpdates, + fastSync, + ) - bcReactor.SetLogger(logger.With("module", "blockchain")) - return bcReactor, nil + return reactorShim, reactor, nil + + case "v2": + reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactor.SetLogger(logger) + + return nil, reactor, nil + + default: + return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) + } } func createConsensusReactor(config *cfg.Config, @@ -737,12 +756,28 @@ func NewNode(config *cfg.Config, sm.BlockExecutorWithMetrics(smMetrics), ) - // Make BlockchainReactor. Don't start fast sync if we're doing a state sync first. - bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger) + csReactor, csState := createConsensusReactor( + config, state, blockExec, blockStore, mempool, evPool, + privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, + ) + + // Create the blockchain reactor. Note, we do not start fast sync if we're + // doing a state sync first. + bcReactorShim, bcReactor, err := createBlockchainReactor( + logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync, + ) if err != nil { return nil, fmt.Errorf("could not create blockchain reactor: %w", err) } + // TODO: Remove this once the switch is removed. + var bcReactorForSwitch p2p.Reactor + if bcReactorShim != nil { + bcReactorForSwitch = bcReactorShim + } else { + bcReactorForSwitch = bcReactor.(p2p.Reactor) + } + // Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first. // FIXME We need to update metrics here, since other reactors don't have access to them. if stateSync { @@ -751,11 +786,6 @@ func NewNode(config *cfg.Config, csMetrics.FastSyncing.Set(1) } - consensusReactor, consensusState := createConsensusReactor( - config, state, blockExec, blockStore, mempool, evPool, - privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, - ) - // Set up state sync reactor, and schedule a sync if requested. // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy, // we should clean this whole thing up. See: @@ -781,8 +811,8 @@ func NewNode(config *cfg.Config, p2pLogger := logger.With("module", "p2p") transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( - config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, - stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, + config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch, + stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) @@ -840,8 +870,8 @@ func NewNode(config *cfg.Config, bcReactor: bcReactor, mempoolReactor: mempoolReactor, mempool: mempool, - consensusState: consensusState, - consensusReactor: consensusReactor, + consensusState: csState, + consensusReactor: csReactor, stateSyncReactor: stateSyncReactor, stateSync: stateSync, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state @@ -1276,9 +1306,11 @@ func makeNodeInfo( var bcChannel byte switch config.FastSync.Version { case "v0": - bcChannel = bcv0.BlockchainChannel + bcChannel = byte(bcv0.BlockchainChannel) + case "v2": bcChannel = bcv2.BlockchainChannel + default: return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) } diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index dd3b684af..74c1173f5 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -233,7 +233,7 @@ type Node struct { eventBus *types.EventBus // pub/sub for services stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk - bcReactor p2p.Reactor // for fast-syncing + bcReactor service.Service // for fast-syncing mempoolReactor *mempl.Reactor // for gossipping transactions mempool mempl.Mempool stateSync bool // whether the node should state sync on startup @@ -411,24 +411,43 @@ func createEvidenceReactor( return evidenceReactorShim, evidenceReactor, evidencePool, nil } -func createBlockchainReactor(config *cfg.Config, +func createBlockchainReactor( + logger log.Logger, + config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore *store.BlockStore, + csReactor *cs.Reactor, fastSync bool, - logger log.Logger) (bcReactor p2p.Reactor, err error) { +) (*p2p.ReactorShim, service.Service, error) { + + logger = logger.With("module", "blockchain") switch config.FastSync.Version { case "v0": - bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - case "v2": - bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - default: - return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) - } + reactorShim := p2p.NewReactorShim(logger, "BlokchainShim", bcv0.ChannelShims) + reactor := bcv0.NewReactor( + logger, + state.Copy(), + blockExec, + blockStore, + csReactor, + reactorShim.GetChannel(bcv0.BlockchainChannel), + reactorShim.PeerUpdates, + fastSync, + ) - bcReactor.SetLogger(logger.With("module", "blockchain")) - return bcReactor, nil + return reactorShim, reactor, nil + + case "v2": + reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + reactor.SetLogger(logger) + + return nil, reactor, nil + + default: + return nil, nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) + } } func createConsensusReactor(config *cfg.Config, @@ -780,12 +799,29 @@ func NewNode(config *cfg.Config, sm.BlockExecutorWithMetrics(smMetrics), ) - // Make BlockchainReactor. Don't start fast sync if we're doing a state sync first. - bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger) + logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors) + csReactor, csState := createConsensusReactor( + config, state, blockExec, blockStore, mempool, evPool, + privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors, + ) + + // Create the blockchain reactor. Note, we do not start fast sync if we're + // doing a state sync first. + bcReactorShim, bcReactor, err := createBlockchainReactor( + logger, config, state, blockExec, blockStore, csReactor, fastSync && !stateSync, + ) if err != nil { return nil, fmt.Errorf("could not create blockchain reactor: %w", err) } + // TODO: Remove this once the switch is removed. + var bcReactorForSwitch p2p.Reactor + if bcReactorShim != nil { + bcReactorForSwitch = bcReactorShim + } else { + bcReactorForSwitch = bcReactor.(p2p.Reactor) + } + // Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first. // FIXME We need to update metrics here, since other reactors don't have access to them. if stateSync { @@ -794,11 +830,6 @@ func NewNode(config *cfg.Config, csMetrics.FastSyncing.Set(1) } - logger.Info("Setting up maverick consensus reactor", "Misbehaviors", misbehaviors) - consensusReactor, consensusState := createConsensusReactor( - config, state, blockExec, blockStore, mempool, evPool, - privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, misbehaviors) - // Set up state sync reactor, and schedule a sync if requested. // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy, // we should clean this whole thing up. See: @@ -824,8 +855,8 @@ func NewNode(config *cfg.Config, p2pLogger := logger.With("module", "p2p") transport, peerFilters := createTransport(p2pLogger, config, nodeInfo, nodeKey, proxyApp) sw := createSwitch( - config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, - stateSyncReactorShim, consensusReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, + config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactorForSwitch, + stateSyncReactorShim, csReactor, evReactorShim, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) @@ -883,8 +914,8 @@ func NewNode(config *cfg.Config, bcReactor: bcReactor, mempoolReactor: mempoolReactor, mempool: mempool, - consensusState: consensusState, - consensusReactor: consensusReactor, + consensusState: csState, + consensusReactor: csReactor, stateSyncReactor: stateSyncReactor, stateSync: stateSync, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state @@ -1317,9 +1348,11 @@ func makeNodeInfo( var bcChannel byte switch config.FastSync.Version { case "v0": - bcChannel = bcv0.BlockchainChannel + bcChannel = byte(bcv0.BlockchainChannel) + case "v2": bcChannel = bcv2.BlockchainChannel + default: return p2p.NodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) }