package node import ( "context" "fmt" "net" "net/http" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/cors" "github.com/tendermint/tendermint/blocksync" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/evidence" "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/service" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/pex" "github.com/tendermint/tendermint/proxy" rpccore "github.com/tendermint/tendermint/rpc/core" grpccore "github.com/tendermint/tendermint/rpc/grpc" rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/statesync" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" "github.com/tendermint/tendermint/version" _ "net/http/pprof" //nolint: gosec ) // Node is the highest level interface to a full Tendermint node. // It includes all configuration information and running services. type Node struct { service.BaseService // config config *cfg.Config genesisDoc *types.GenesisDoc // initial validator set privValidator types.PrivValidator // local node's validator key // network sw *p2p.Switch // p2p connections addrBook pex.AddrBook // known peers nodeInfo p2p.NodeInfo nodeKey *p2p.NodeKey // our node privkey isListening bool // services eventBus *types.EventBus // pub/sub for services stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk bcReactor *blocksync.Reactor // for block-syncing mempoolReactor p2p.Reactor // for gossipping transactions mempool mempl.Mempool stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node stateSyncGenesis sm.State // provides the genesis state for state sync consensusState *cs.State // latest consensus state consensusReactor *cs.Reactor // for participating in the consensus pexReactor *pex.Reactor // for exchanging peer addresses evidencePool *evidence.Pool // tracking evidence proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer blockIndexer indexer.BlockIndexer indexerService *txindex.IndexerService prometheusSrv *http.Server pprofSrv *http.Server customReactors map[string]p2p.Reactor } // Option sets a parameter for the node. type Option func(*Node) // CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to // the node's Switch. // // WARNING: using any name from the below list of the existing reactors will // result in replacing it with the custom one. // // - MEMPOOL // - BLOCKSYNC // - CONSENSUS // - EVIDENCE // - PEX // - STATESYNC func CustomReactors(reactors map[string]p2p.Reactor) Option { return func(n *Node) { n.customReactors = reactors } } // StateProvider overrides the state provider used by state sync to retrieve trusted app hashes and // build a State object for bootstrapping the node. // WARNING: this interface is considered unstable and subject to change. func StateProvider(stateProvider statesync.StateProvider) Option { return func(n *Node) { n.stateSyncProvider = stateProvider } } //------------------------------------------------------------------------------ // NewNode returns a new, ready to go, Tendermint Node. func NewNode(config *cfg.Config, privValidator types.PrivValidator, nodeKey *p2p.NodeKey, clientCreator proxy.ClientCreator, genesisDocProvider GenesisDocProvider, dbProvider DBProvider, metricsProvider MetricsProvider, logger log.Logger, options ...Option, ) (*Node, error) { blockStore, stateDB, err := initDBs(config, dbProvider) if err != nil { return nil, err } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ DiscardABCIResponses: config.Storage.DiscardABCIResponses, }) state, genDoc, err := LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider) if err != nil { return nil, err } csMetrics, p2pMetrics, memplMetrics, smMetrics, abciMetrics, bsMetrics, ssMetrics := metricsProvider(genDoc.ChainID) // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). proxyApp, err := createProxyAppConns(clientCreator, logger, abciMetrics) if err != nil { return nil, fmt.Errorf("error starting proxy app connections: %v", err) } // EventBus and IndexerService must be started before the handshake because // we might need to index the txs of the replayed block as this might not have happened // when the node stopped last time (i.e. the node stopped after it saved the block // but before it indexed the txs, or, endblocker panicked) eventBus := createEventBus(logger) indexerService, txIndexer, blockIndexer, err := createIndexerService(config, genDoc.ChainID, dbProvider, eventBus, logger) if err != nil { return nil, err } // Make MempoolReactor mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) // Make Evidence Reactor evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateStore, blockStore, logger) if err != nil { return nil, err } // make block executor for consensus and blocksync reactors to execute blocks blockExec := sm.NewBlockExecutor( stateStore, logger.With("module", "state"), proxyApp.Consensus(), mempool, evidencePool, blockStore, sm.BlockExecutorWithMetrics(smMetrics), ) // Make BlocksyncReactor. Don't start block sync if we're doing a state sync first. bcReactor, err := createBlocksyncReactor(config, state, blockExec, blockStore, bsMetrics, logger) if err != nil { return nil, fmt.Errorf("could not create blocksync reactor: %w", err) } // Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first. consensusLogger := logger.With("module", "consensus") consensusReactor, consensusState := createConsensusReactor( config, blockExec, blockStore, mempool, evidencePool, privValidator, csMetrics, eventBus, consensusLogger, ) // Set up state sync reactor, and schedule a sync if requested. // FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 stateSyncReactor := statesync.NewReactor( *config.StateSync, proxyApp.Snapshot(), proxyApp.Query(), config.StateSync.TempDir, ssMetrics, ) stateSyncReactor.SetLogger(logger.With("module", "statesync")) // Setup Switch. p2pLogger := logger.With("module", "p2p") sw := createSwitch( config, p2pMetrics, mempoolReactor, bcReactor, stateSyncReactor, consensusReactor, evidenceReactor, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) } err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) } addrBook, err := createAddrBook(config, p2pLogger, nodeKey) if err != nil { return nil, fmt.Errorf("could not create addrbook: %w", err) } // Optionally, start the pex reactor // // TODO: // // We need to set Seeds and PersistentPeers on the switch, // since it needs to be able to use these (and their DNS names) // even if the PEX is off. We can include the DNS name in the NetAddress, // but it would still be nice to have a clear list of the current "PersistentPeers" // somewhere that we can return with net_info. // // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. // Note we currently use the addrBook regardless at least for AddOurAddress var pexReactor *pex.Reactor if config.P2P.PexReactor { pexReactor = createPEXReactor(addrBook, config, logger) sw.AddReactor("PEX", pexReactor) } // Add private IDs to addrbook to block those peers being added addrBook.AddPrivateIDs(splitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ")) node := &Node{ config: config, genesisDoc: genDoc, privValidator: privValidator, sw: sw, addrBook: addrBook, nodeKey: nodeKey, stateStore: stateStore, blockStore: blockStore, bcReactor: bcReactor, mempoolReactor: mempoolReactor, mempool: mempool, consensusState: consensusState, consensusReactor: consensusReactor, stateSyncReactor: stateSyncReactor, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state pexReactor: pexReactor, evidencePool: evidencePool, proxyApp: proxyApp, txIndexer: txIndexer, indexerService: indexerService, blockIndexer: blockIndexer, eventBus: eventBus, } node.BaseService = *service.NewBaseService(logger, "Node", node) for _, option := range options { option(node) } for name, reactor := range node.customReactors { if existingReactor := node.sw.Reactor(name); existingReactor != nil { node.sw.Logger.Info("Replacing existing reactor with a custom one", "name", name, "existing", existingReactor, "custom", reactor) node.sw.RemoveReactor(name, existingReactor) } node.sw.AddReactor(name, reactor) } return node, nil } // OnStart starts the Node. It implements service.Service. func (n *Node) OnStart() error { now := tmtime.Now() genTime := n.genesisDoc.GenesisTime if genTime.After(now) { n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime) time.Sleep(genTime.Sub(now)) } // If an address is provided, listen on the socket for a connection from an // external signing process. This will overwrite the privvalidator provided in the constructor if n.config.PrivValidatorListenAddr != "" { var err error n.privValidator, err = createPrivValidatorSocketClient(n.config.PrivValidatorListenAddr, n.genesisDoc.ChainID, n.Logger) if err != nil { return fmt.Errorf("error with private validator socket client: %w", err) } } pubKey, err := n.privValidator.GetPubKey() if err != nil { return fmt.Errorf("can't get pubkey: %w", err) } state, err := n.stateStore.LoadFromDBOrGenesisDoc(n.genesisDoc) if err != nil { return fmt.Errorf("cannot load state: %w", err) } // Determine whether we should attempt state sync. stateSync := n.config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey) if stateSync && state.LastBlockHeight > 0 { n.Logger.Info("Found local state with non-zero height, skipping state sync") stateSync = false } // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. if !stateSync { if err := doHandshake(n.stateStore, state, n.blockStore, n.genesisDoc, n.eventBus, n.proxyApp, n.Logger); err != nil { return err } // Reload the state. It will have the Version.Consensus.App set by the // Handshake, and may have other modifications as well (ie. depending on // what happened during block replay). state, err = n.stateStore.Load() if err != nil { return fmt.Errorf("cannot load state: %w", err) } } nodeInfo, err := makeNodeInfo(n.config, n.nodeKey, n.txIndexer, n.genesisDoc, state) if err != nil { return err } for _, reactor := range n.customReactors { for _, chDesc := range reactor.GetChannels() { if !nodeInfo.HasChannel(chDesc.ID) { nodeInfo.Channels = append(nodeInfo.Channels, chDesc.ID) } } } n.nodeInfo = nodeInfo addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress)) if err != nil { return err } // Setup Transport. transport, peerFilters := createTransport(n.config, n.nodeInfo, n.nodeKey, addr, n.proxyApp) n.sw.SetTransport(transport) n.sw.SetPeerFilters(peerFilters...) n.sw.SetNodeInfo(n.nodeInfo) n.sw.SetNodeKey(n.nodeKey) // run pprof server if it is enabled if n.config.RPC.IsPprofEnabled() { n.pprofSrv = n.startPprofServer() } // begin prometheus metrics gathering if it is enabled if n.config.Instrumentation.IsPrometheusEnabled() { n.prometheusSrv = n.startPrometheusServer() } // Start the RPC server before the P2P server // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" { listeners, err := n.startRPC() if err != nil { return err } n.rpcListeners = listeners } if err := n.eventBus.Start(); err != nil { return err } if err := n.indexerService.Start(); err != nil { return err } // Start the switch (the P2P server). err = n.sw.Start() if err != nil { return err } n.isListening = true // Always connect to persistent peers err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) if err != nil { return fmt.Errorf("could not dial peers from persistent_peers field: %w", err) } // Determine whether we should do block sync. This must happen after the handshake, since the // app may modify the validator set, specifying ourself as the only validator. blockSync := n.config.BlockSyncMode && !onlyValidatorIsUs(state, pubKey) logNodeStartupInfo(state, pubKey, n.Logger) // Run start up phases if stateSync { err := startStateSync(n.stateSyncReactor, n.bcReactor, n.consensusReactor, n.stateSyncProvider, n.config.StateSync, n.config.BlockSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis) if err != nil { return fmt.Errorf("failed to start state sync: %w", err) } } else if blockSync { err := n.bcReactor.SwitchToBlockSync(state) if err != nil { return fmt.Errorf("failed to start block sync: %w", err) } } else { err := n.consensusReactor.SwitchToConsensus(state, false) if err != nil { return fmt.Errorf("failed to switch to consensus: %w", err) } } return nil } // OnStop stops the Node. It implements service.Service. func (n *Node) OnStop() { n.BaseService.OnStop() n.Logger.Info("Stopping Node") // first stop the non-reactor services if err := n.eventBus.Stop(); err != nil { n.Logger.Error("Error closing eventBus", "err", err) } if err := n.indexerService.Stop(); err != nil { n.Logger.Error("Error closing indexerService", "err", err) } // now stop the reactors if err := n.sw.Stop(); err != nil { n.Logger.Error("Error closing switch", "err", err) } n.isListening = false // finally stop the listeners / external services for _, l := range n.rpcListeners { n.Logger.Info("Closing rpc listener", "listener", l) if err := l.Close(); err != nil { n.Logger.Error("Error closing listener", "listener", l, "err", err) } } if pvsc, ok := n.privValidator.(service.Service); ok { if err := pvsc.Stop(); err != nil { n.Logger.Error("Error closing private validator", "err", err) } } if n.prometheusSrv != nil { if err := n.prometheusSrv.Shutdown(context.Background()); err != nil { // Error from closing listeners, or context timeout: n.Logger.Error("Prometheus HTTP server Shutdown", "err", err) } } if n.pprofSrv != nil { if err := n.pprofSrv.Shutdown(context.Background()); err != nil { n.Logger.Error("Pprof HTTP server Shutdown", "err", err) } } if n.blockStore != nil { if err := n.blockStore.Close(); err != nil { n.Logger.Error("problem closing blockstore", "err", err) } } if n.stateStore != nil { if err := n.stateStore.Close(); err != nil { n.Logger.Error("problem closing statestore", "err", err) } } } // ConfigureRPC makes sure RPC has all the objects it needs to operate. func (n *Node) ConfigureRPC() error { pubKey, err := n.privValidator.GetPubKey() if err != nil { return fmt.Errorf("can't get pubkey: %w", err) } rpccore.SetEnvironment(&rpccore.Environment{ ProxyAppQuery: n.proxyApp.Query(), ProxyAppMempool: n.proxyApp.Mempool(), StateStore: n.stateStore, BlockStore: n.blockStore, EvidencePool: n.evidencePool, ConsensusState: n.consensusState, P2PPeers: n.sw, P2PTransport: n, PubKey: pubKey, GenDoc: n.genesisDoc, TxIndexer: n.txIndexer, BlockIndexer: n.blockIndexer, ConsensusReactor: n.consensusReactor, BlocksyncReactor: n.bcReactor, StatesyncReactor: n.stateSyncReactor, EventBus: n.eventBus, Mempool: n.mempool, Logger: n.Logger.With("module", "rpc"), Config: *n.config.RPC, }) if err := rpccore.InitGenesisChunks(); err != nil { return err } return nil } func (n *Node) startRPC() ([]net.Listener, error) { err := n.ConfigureRPC() if err != nil { return nil, err } listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") if n.config.RPC.Unsafe { rpccore.AddUnsafeRoutes() } config := rpcserver.DefaultConfig() config.MaxBodyBytes = n.config.RPC.MaxBodyBytes config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes config.MaxOpenConnections = n.config.RPC.MaxOpenConnections // If necessary adjust global WriteTimeout to ensure it's greater than // TimeoutBroadcastTxCommit. // See https://github.com/tendermint/tendermint/issues/3435 if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second } // we may expose the rpc over both a unix and tcp socket listeners := make([]net.Listener, len(listenAddrs)) for i, listenAddr := range listenAddrs { mux := http.NewServeMux() rpcLogger := n.Logger.With("module", "rpc-server") wmLogger := rpcLogger.With("protocol", "websocket") wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpcserver.OnDisconnect(func(remoteAddr string) { err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr) if err != nil && err != tmpubsub.ErrSubscriptionNotFound { wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) } }), rpcserver.ReadLimit(config.MaxBodyBytes), rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize), ) wm.SetLogger(wmLogger) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) listener, err := rpcserver.Listen( listenAddr, config, ) if err != nil { return nil, err } var rootHandler http.Handler = mux if n.config.RPC.IsCorsEnabled() { corsMiddleware := cors.New(cors.Options{ AllowedOrigins: n.config.RPC.CORSAllowedOrigins, AllowedMethods: n.config.RPC.CORSAllowedMethods, AllowedHeaders: n.config.RPC.CORSAllowedHeaders, }) rootHandler = corsMiddleware.Handler(mux) } if n.config.RPC.IsTLSEnabled() { go func() { if err := rpcserver.ServeTLS( listener, rootHandler, n.config.RPC.CertFile(), n.config.RPC.KeyFile(), rpcLogger, config, ); err != nil { n.Logger.Error("Error serving server with TLS", "err", err) } }() } else { go func() { if err := rpcserver.Serve( listener, rootHandler, rpcLogger, config, ); err != nil { n.Logger.Error("Error serving server", "err", err) } }() } listeners[i] = listener } // we expose a simplified api over grpc for convenience to app devs grpcListenAddr := n.config.RPC.GRPCListenAddress if grpcListenAddr != "" { config := rpcserver.DefaultConfig() config.MaxBodyBytes = n.config.RPC.MaxBodyBytes config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes // NOTE: GRPCMaxOpenConnections is used, not MaxOpenConnections config.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections // If necessary adjust global WriteTimeout to ensure it's greater than // TimeoutBroadcastTxCommit. // See https://github.com/tendermint/tendermint/issues/3435 if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second } listener, err := rpcserver.Listen(grpcListenAddr, config) if err != nil { return nil, err } go func() { if err := grpccore.StartGRPCServer(listener); err != nil { n.Logger.Error("Error starting gRPC server", "err", err) } }() listeners = append(listeners, listener) } return listeners, nil } // startPrometheusServer starts a Prometheus HTTP server, listening for metrics // collectors on the provided address. func (n *Node) startPrometheusServer() *http.Server { srv := &http.Server{ Addr: n.config.Instrumentation.PrometheusListenAddr, Handler: promhttp.InstrumentMetricHandler( prometheus.DefaultRegisterer, promhttp.HandlerFor( prometheus.DefaultGatherer, promhttp.HandlerOpts{MaxRequestsInFlight: n.config.Instrumentation.MaxOpenConnections}, ), ), ReadHeaderTimeout: readHeaderTimeout, } go func() { if err := srv.ListenAndServe(); err != http.ErrServerClosed { // Error starting or closing listener: n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err) } }() return srv } // starts a pprof server at the specified listen address func (n *Node) startPprofServer() *http.Server { srv := &http.Server{ Addr: n.config.RPC.PprofListenAddress, Handler: nil, ReadHeaderTimeout: readHeaderTimeout, } go func() { if err := srv.ListenAndServe(); err != http.ErrServerClosed { // Error starting or closing listener: n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err) } }() return srv } // Switch returns the Node's Switch. func (n *Node) Switch() *p2p.Switch { return n.sw } // BlockStore returns the Node's BlockStore. func (n *Node) BlockStore() *store.BlockStore { return n.blockStore } // ConsensusState returns the Node's ConsensusState. func (n *Node) ConsensusState() *cs.State { return n.consensusState } // ConsensusReactor returns the Node's ConsensusReactor. func (n *Node) ConsensusReactor() *cs.Reactor { return n.consensusReactor } // MempoolReactor returns the Node's mempool reactor. func (n *Node) MempoolReactor() p2p.Reactor { return n.mempoolReactor } // Mempool returns the Node's mempool. func (n *Node) Mempool() mempl.Mempool { return n.mempool } // PEXReactor returns the Node's PEXReactor. It returns nil if PEX is disabled. func (n *Node) PEXReactor() *pex.Reactor { return n.pexReactor } // EvidencePool returns the Node's EvidencePool. func (n *Node) EvidencePool() *evidence.Pool { return n.evidencePool } // EventBus returns the Node's EventBus. func (n *Node) EventBus() *types.EventBus { return n.eventBus } // PrivValidator returns the Node's PrivValidator. // XXX: for convenience only! func (n *Node) PrivValidator() types.PrivValidator { return n.privValidator } // GenesisDoc returns the Node's GenesisDoc. func (n *Node) GenesisDoc() *types.GenesisDoc { return n.genesisDoc } // ProxyApp returns the Node's AppConns, representing its connections to the ABCI application. func (n *Node) ProxyApp() proxy.AppConns { return n.proxyApp } // Config returns the Node's config. func (n *Node) Config() *cfg.Config { return n.config } //------------------------------------------------------------------------------ func (n *Node) Listeners() []string { return []string{ fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress), } } func (n *Node) IsListening() bool { return n.isListening } // NodeInfo returns the Node's Info from the Switch. func (n *Node) NodeInfo() p2p.NodeInfo { return n.nodeInfo } func makeNodeInfo( config *cfg.Config, nodeKey *p2p.NodeKey, txIndexer txindex.TxIndexer, genDoc *types.GenesisDoc, state sm.State, ) (p2p.DefaultNodeInfo, error) { txIndexerStatus := "on" if _, ok := txIndexer.(*null.TxIndex); ok { txIndexerStatus = "off" } nodeInfo := p2p.DefaultNodeInfo{ ProtocolVersion: p2p.NewProtocolVersion( version.P2PProtocol, // global state.Version.Consensus.Block, state.Version.Consensus.App, ), DefaultNodeID: nodeKey.ID(), Network: genDoc.ChainID, Version: version.TMCoreSemVer, Channels: []byte{ blocksync.BlocksyncChannel, cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, mempl.MempoolChannel, evidence.EvidenceChannel, statesync.SnapshotChannel, statesync.ChunkChannel, }, Moniker: config.Moniker, Other: p2p.DefaultNodeInfoOther{ TxIndex: txIndexerStatus, RPCAddress: config.RPC.ListenAddress, }, } if config.P2P.PexReactor { nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel) } lAddr := config.P2P.ExternalAddress if lAddr == "" { lAddr = config.P2P.ListenAddress } nodeInfo.ListenAddr = lAddr err := nodeInfo.Validate() return nodeInfo, err }