mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-03 11:45:18 +00:00
node: reorder service construction (#8262)
This commit is contained in:
141
node/node.go
141
node/node.go
@@ -54,10 +54,10 @@ type nodeImpl struct {
|
||||
privValidator types.PrivValidator // local node's validator key
|
||||
|
||||
// network
|
||||
peerManager *p2p.PeerManager
|
||||
router *p2p.Router
|
||||
nodeInfoProducer func() *types.NodeInfo
|
||||
nodeKey types.NodeKey // our node privkey
|
||||
peerManager *p2p.PeerManager
|
||||
router *p2p.Router
|
||||
nodeInfo types.NodeInfo
|
||||
nodeKey types.NodeKey // our node privkey
|
||||
|
||||
// services
|
||||
eventSinks []indexer.EventSink
|
||||
@@ -249,25 +249,67 @@ func makeNode(
|
||||
makeCloser(closers))
|
||||
}
|
||||
|
||||
router, err := createRouter(logger, nodeMetrics.p2p, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, proxyApp)
|
||||
// TODO construct node here:
|
||||
node := &nodeImpl{
|
||||
config: cfg,
|
||||
logger: logger,
|
||||
genesisDoc: genDoc,
|
||||
privValidator: privValidator,
|
||||
|
||||
peerManager: peerManager,
|
||||
nodeInfo: nodeInfo,
|
||||
nodeKey: nodeKey,
|
||||
|
||||
eventSinks: eventSinks,
|
||||
|
||||
services: []service.Service{eventBus},
|
||||
|
||||
stateStore: stateStore,
|
||||
blockStore: blockStore,
|
||||
|
||||
shutdownOps: makeCloser(closers),
|
||||
|
||||
rpcEnv: &rpccore.Environment{
|
||||
ProxyApp: proxyApp,
|
||||
|
||||
StateStore: stateStore,
|
||||
BlockStore: blockStore,
|
||||
|
||||
PeerManager: peerManager,
|
||||
|
||||
GenDoc: genDoc,
|
||||
EventSinks: eventSinks,
|
||||
EventBus: eventBus,
|
||||
EventLog: eventLog,
|
||||
Logger: logger.With("module", "rpc"),
|
||||
Config: *cfg.RPC,
|
||||
},
|
||||
}
|
||||
|
||||
node.router, err = createRouter(logger, nodeMetrics.p2p, node.NodeInfo, nodeKey, peerManager, cfg, proxyApp)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("failed to create router: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
|
||||
mpReactor, mp, err := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool,
|
||||
peerManager.Subscribe, router.OpenChannel, peerManager.GetHeight)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
evReactor, evPool, edbCloser, err := createEvidenceReactor(logger, cfg, dbProvider,
|
||||
stateStore, blockStore, peerManager.Subscribe, router.OpenChannel, nodeMetrics.evidence, eventBus)
|
||||
stateStore, blockStore, peerManager.Subscribe, node.router.OpenChannel, nodeMetrics.evidence, eventBus)
|
||||
closers = append(closers, edbCloser)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
node.services = append(node.services, evReactor)
|
||||
node.rpcEnv.EvidencePool = evPool
|
||||
node.evPool = evPool
|
||||
|
||||
mpReactor, mp, err := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool,
|
||||
peerManager.Subscribe, node.router.OpenChannel, peerManager.GetHeight)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
node.rpcEnv.Mempool = mp
|
||||
node.services = append(node.services, mpReactor)
|
||||
|
||||
// make block executor for consensus and blockchain reactors to execute blocks
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
@@ -296,11 +338,14 @@ func makeNode(
|
||||
csReactor, csState, err := createConsensusReactor(ctx,
|
||||
cfg, stateStore, blockExec, blockStore, mp, evPool,
|
||||
privValidator, nodeMetrics.consensus, waitSync, eventBus,
|
||||
peerManager, router.OpenChannel, logger,
|
||||
peerManager, node.router.OpenChannel, logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
node.services = append(node.services, csReactor)
|
||||
node.rpcEnv.ConsensusState = csState
|
||||
node.rpcEnv.ConsensusReactor = csReactor
|
||||
|
||||
// Create the blockchain reactor. Note, we do not start block sync if we're
|
||||
// doing a state sync first.
|
||||
@@ -310,12 +355,14 @@ func makeNode(
|
||||
blockExec,
|
||||
blockStore,
|
||||
csReactor,
|
||||
router.OpenChannel,
|
||||
node.router.OpenChannel,
|
||||
peerManager.Subscribe,
|
||||
blockSync && !stateSync,
|
||||
nodeMetrics.consensus,
|
||||
eventBus,
|
||||
)
|
||||
node.services = append(node.services, bcReactor)
|
||||
node.rpcEnv.BlockSyncReactor = bcReactor
|
||||
|
||||
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
|
||||
// FIXME We need to update metrics here, since other reactors don't have access to them.
|
||||
@@ -329,14 +376,15 @@ func makeNode(
|
||||
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
|
||||
// we should clean this whole thing up. See:
|
||||
// https://github.com/tendermint/tendermint/issues/4644
|
||||
stateSyncReactor := statesync.NewReactor(
|
||||
node.stateSync = stateSync
|
||||
node.stateSyncReactor = statesync.NewReactor(
|
||||
ctx,
|
||||
genDoc.ChainID,
|
||||
genDoc.InitialHeight,
|
||||
*cfg.StateSync,
|
||||
logger.With("module", "statesync"),
|
||||
proxyApp,
|
||||
router.OpenChannel,
|
||||
node.router.OpenChannel,
|
||||
peerManager.Subscribe,
|
||||
stateStore,
|
||||
blockStore,
|
||||
@@ -345,61 +393,8 @@ func makeNode(
|
||||
eventBus,
|
||||
)
|
||||
|
||||
var pexReactor service.Service = service.NopService{}
|
||||
if cfg.P2P.PexReactor {
|
||||
pexReactor = pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe)
|
||||
}
|
||||
node := &nodeImpl{
|
||||
config: cfg,
|
||||
logger: logger,
|
||||
genesisDoc: genDoc,
|
||||
privValidator: privValidator,
|
||||
|
||||
peerManager: peerManager,
|
||||
router: router,
|
||||
nodeInfoProducer: func() *types.NodeInfo { return &nodeInfo },
|
||||
nodeKey: nodeKey,
|
||||
|
||||
eventSinks: eventSinks,
|
||||
|
||||
services: []service.Service{
|
||||
eventBus,
|
||||
evReactor,
|
||||
mpReactor,
|
||||
csReactor,
|
||||
bcReactor,
|
||||
pexReactor,
|
||||
},
|
||||
|
||||
stateStore: stateStore,
|
||||
blockStore: blockStore,
|
||||
stateSyncReactor: stateSyncReactor,
|
||||
stateSync: stateSync,
|
||||
evPool: evPool,
|
||||
|
||||
shutdownOps: makeCloser(closers),
|
||||
|
||||
rpcEnv: &rpccore.Environment{
|
||||
ProxyApp: proxyApp,
|
||||
EvidencePool: evPool,
|
||||
ConsensusState: csState,
|
||||
|
||||
StateStore: stateStore,
|
||||
BlockStore: blockStore,
|
||||
|
||||
ConsensusReactor: csReactor,
|
||||
BlockSyncReactor: bcReactor,
|
||||
|
||||
PeerManager: peerManager,
|
||||
|
||||
GenDoc: genDoc,
|
||||
EventSinks: eventSinks,
|
||||
EventBus: eventBus,
|
||||
EventLog: eventLog,
|
||||
Mempool: mp,
|
||||
Logger: logger.With("module", "rpc"),
|
||||
Config: *cfg.RPC,
|
||||
},
|
||||
node.services = append(node.services, pex.NewReactor(logger, peerManager, node.router.OpenChannel, peerManager.Subscribe))
|
||||
}
|
||||
|
||||
if cfg.Mode == config.ModeValidator {
|
||||
@@ -458,7 +453,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
n.rpcEnv.NodeInfo = n.nodeInfoProducer().Copy()
|
||||
n.rpcEnv.NodeInfo = n.nodeInfo
|
||||
// Start the RPC server before the P2P server
|
||||
// so we can eg. receive txs for the first block
|
||||
if n.config.RPC.ListenAddress != "" {
|
||||
@@ -644,6 +639,10 @@ func (n *nodeImpl) startPrometheusServer(ctx context.Context, addr string) *http
|
||||
return srv
|
||||
}
|
||||
|
||||
func (n *nodeImpl) NodeInfo() *types.NodeInfo {
|
||||
return &n.nodeInfo
|
||||
}
|
||||
|
||||
// EventBus returns the Node's EventBus.
|
||||
func (n *nodeImpl) EventBus() *eventbus.EventBus {
|
||||
return n.rpcEnv.EventBus
|
||||
|
||||
@@ -147,7 +147,7 @@ func TestNodeSetAppVersion(t *testing.T) {
|
||||
assert.Equal(t, state.Version.Consensus.App, appVersion)
|
||||
|
||||
// check version is set in node info
|
||||
assert.Equal(t, n.nodeInfoProducer().ProtocolVersion.App, appVersion)
|
||||
assert.Equal(t, n.nodeInfo.ProtocolVersion.App, appVersion)
|
||||
}
|
||||
|
||||
func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user