From 85364a9ba87e1fcd8fcaed71ad2c98983e3702ce Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Wed, 6 Apr 2022 14:59:30 -0400 Subject: [PATCH] node: reorder service construction (#8262) --- node/node.go | 141 +++++++++++++++++++++++----------------------- node/node_test.go | 2 +- 2 files changed, 71 insertions(+), 72 deletions(-) diff --git a/node/node.go b/node/node.go index e1f4205ad..9b608d6f0 100644 --- a/node/node.go +++ b/node/node.go @@ -54,10 +54,10 @@ type nodeImpl struct { privValidator types.PrivValidator // local node's validator key // network - peerManager *p2p.PeerManager - router *p2p.Router - nodeInfoProducer func() *types.NodeInfo - nodeKey types.NodeKey // our node privkey + peerManager *p2p.PeerManager + router *p2p.Router + nodeInfo types.NodeInfo + nodeKey types.NodeKey // our node privkey // services eventSinks []indexer.EventSink @@ -249,25 +249,67 @@ func makeNode( makeCloser(closers)) } - router, err := createRouter(logger, nodeMetrics.p2p, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, proxyApp) + // TODO construct node here: + node := &nodeImpl{ + config: cfg, + logger: logger, + genesisDoc: genDoc, + privValidator: privValidator, + + peerManager: peerManager, + nodeInfo: nodeInfo, + nodeKey: nodeKey, + + eventSinks: eventSinks, + + services: []service.Service{eventBus}, + + stateStore: stateStore, + blockStore: blockStore, + + shutdownOps: makeCloser(closers), + + rpcEnv: &rpccore.Environment{ + ProxyApp: proxyApp, + + StateStore: stateStore, + BlockStore: blockStore, + + PeerManager: peerManager, + + GenDoc: genDoc, + EventSinks: eventSinks, + EventBus: eventBus, + EventLog: eventLog, + Logger: logger.With("module", "rpc"), + Config: *cfg.RPC, + }, + } + + node.router, err = createRouter(logger, nodeMetrics.p2p, node.NodeInfo, nodeKey, peerManager, cfg, proxyApp) if err != nil { return nil, combineCloseError( fmt.Errorf("failed to create router: %w", err), makeCloser(closers)) } - mpReactor, mp, err := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool, - peerManager.Subscribe, router.OpenChannel, peerManager.GetHeight) - if err != nil { - return nil, combineCloseError(err, makeCloser(closers)) - } - evReactor, evPool, edbCloser, err := createEvidenceReactor(logger, cfg, dbProvider, - stateStore, blockStore, peerManager.Subscribe, router.OpenChannel, nodeMetrics.evidence, eventBus) + stateStore, blockStore, peerManager.Subscribe, node.router.OpenChannel, nodeMetrics.evidence, eventBus) closers = append(closers, edbCloser) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } + node.services = append(node.services, evReactor) + node.rpcEnv.EvidencePool = evPool + node.evPool = evPool + + mpReactor, mp, err := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool, + peerManager.Subscribe, node.router.OpenChannel, peerManager.GetHeight) + if err != nil { + return nil, combineCloseError(err, makeCloser(closers)) + } + node.rpcEnv.Mempool = mp + node.services = append(node.services, mpReactor) // make block executor for consensus and blockchain reactors to execute blocks blockExec := sm.NewBlockExecutor( @@ -296,11 +338,14 @@ func makeNode( csReactor, csState, err := createConsensusReactor(ctx, cfg, stateStore, blockExec, blockStore, mp, evPool, privValidator, nodeMetrics.consensus, waitSync, eventBus, - peerManager, router.OpenChannel, logger, + peerManager, node.router.OpenChannel, logger, ) if err != nil { return nil, combineCloseError(err, makeCloser(closers)) } + node.services = append(node.services, csReactor) + node.rpcEnv.ConsensusState = csState + node.rpcEnv.ConsensusReactor = csReactor // Create the blockchain reactor. Note, we do not start block sync if we're // doing a state sync first. @@ -310,12 +355,14 @@ func makeNode( blockExec, blockStore, csReactor, - router.OpenChannel, + node.router.OpenChannel, peerManager.Subscribe, blockSync && !stateSync, nodeMetrics.consensus, eventBus, ) + node.services = append(node.services, bcReactor) + node.rpcEnv.BlockSyncReactor = bcReactor // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. // FIXME We need to update metrics here, since other reactors don't have access to them. @@ -329,14 +376,15 @@ func makeNode( // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 - stateSyncReactor := statesync.NewReactor( + node.stateSync = stateSync + node.stateSyncReactor = statesync.NewReactor( ctx, genDoc.ChainID, genDoc.InitialHeight, *cfg.StateSync, logger.With("module", "statesync"), proxyApp, - router.OpenChannel, + node.router.OpenChannel, peerManager.Subscribe, stateStore, blockStore, @@ -345,61 +393,8 @@ func makeNode( eventBus, ) - var pexReactor service.Service = service.NopService{} if cfg.P2P.PexReactor { - pexReactor = pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe) - } - node := &nodeImpl{ - config: cfg, - logger: logger, - genesisDoc: genDoc, - privValidator: privValidator, - - peerManager: peerManager, - router: router, - nodeInfoProducer: func() *types.NodeInfo { return &nodeInfo }, - nodeKey: nodeKey, - - eventSinks: eventSinks, - - services: []service.Service{ - eventBus, - evReactor, - mpReactor, - csReactor, - bcReactor, - pexReactor, - }, - - stateStore: stateStore, - blockStore: blockStore, - stateSyncReactor: stateSyncReactor, - stateSync: stateSync, - evPool: evPool, - - shutdownOps: makeCloser(closers), - - rpcEnv: &rpccore.Environment{ - ProxyApp: proxyApp, - EvidencePool: evPool, - ConsensusState: csState, - - StateStore: stateStore, - BlockStore: blockStore, - - ConsensusReactor: csReactor, - BlockSyncReactor: bcReactor, - - PeerManager: peerManager, - - GenDoc: genDoc, - EventSinks: eventSinks, - EventBus: eventBus, - EventLog: eventLog, - Mempool: mp, - Logger: logger.With("module", "rpc"), - Config: *cfg.RPC, - }, + node.services = append(node.services, pex.NewReactor(logger, peerManager, node.router.OpenChannel, peerManager.Subscribe)) } if cfg.Mode == config.ModeValidator { @@ -458,7 +453,7 @@ func (n *nodeImpl) OnStart(ctx context.Context) error { return err } - n.rpcEnv.NodeInfo = n.nodeInfoProducer().Copy() + n.rpcEnv.NodeInfo = n.nodeInfo // Start the RPC server before the P2P server // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" { @@ -644,6 +639,10 @@ func (n *nodeImpl) startPrometheusServer(ctx context.Context, addr string) *http return srv } +func (n *nodeImpl) NodeInfo() *types.NodeInfo { + return &n.nodeInfo +} + // EventBus returns the Node's EventBus. func (n *nodeImpl) EventBus() *eventbus.EventBus { return n.rpcEnv.EventBus diff --git a/node/node_test.go b/node/node_test.go index 86ed7960c..1a1fa6f81 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -147,7 +147,7 @@ func TestNodeSetAppVersion(t *testing.T) { assert.Equal(t, state.Version.Consensus.App, appVersion) // check version is set in node info - assert.Equal(t, n.nodeInfoProducer().ProtocolVersion.App, appVersion) + assert.Equal(t, n.nodeInfo.ProtocolVersion.App, appVersion) } func TestNodeSetPrivValTCP(t *testing.T) {