package node import ( "bytes" "context" "errors" "fmt" "net" "strings" _ "net/http/pprof" //nolint: gosec // securely exposed on separate, optional port "time" dbm "github.com/tendermint/tm-db" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/blocksync" cfg "github.com/tendermint/tendermint/config" cs "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/evidence" "github.com/tendermint/tendermint/statesync" tmjson "github.com/tendermint/tendermint/libs/json" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/light" mempl "github.com/tendermint/tendermint/mempool" mempoolv0 "github.com/tendermint/tendermint/mempool/v0" mempoolv1 "github.com/tendermint/tendermint/mempool/v1" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/pex" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/state/indexer/block" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" _ "github.com/lib/pq" // provide the psql db driver ) const readHeaderTimeout = 10 * time.Second // GenesisDocProvider returns a GenesisDoc. // It allows the GenesisDoc to be pulled from sources other than the // filesystem, for instance from a distributed key-value store cluster. type GenesisDocProvider func() (*types.GenesisDoc, error) // DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads // the GenesisDoc from the config.GenesisFile() on the filesystem. func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider { return func() (*types.GenesisDoc, error) { return types.GenesisDocFromFile(config.GenesisFile()) } } // Provider takes a config and a logger and returns a ready to go Node. type Provider func(*cfg.Config, log.Logger) (*Node, error) // DefaultNewNode returns a Tendermint node with default settings for the // PrivValidator, ClientCreator, GenesisDoc, and DBProvider. // It implements NodeProvider. func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) if err != nil { return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err) } return NewNode(config, privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()), nodeKey, proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), DefaultGenesisDocProviderFunc(config), cfg.DefaultDBProvider, DefaultMetricsProvider(config.Instrumentation), logger, ) } // MetricsProvider returns a consensus, p2p and mempool Metrics. type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) // DefaultMetricsProvider returns Metrics build using Prometheus client library // if Prometheus is enabled. Otherwise, it returns no-op Metrics. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics, *blocksync.Metrics, *statesync.Metrics) { if config.Prometheus { return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID), sm.PrometheusMetrics(config.Namespace, "chain_id", chainID), proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID), blocksync.PrometheusMetrics(config.Namespace, "chain_id", chainID), statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID) } return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics(), blocksync.NopMetrics(), statesync.NopMetrics() } } type blockSyncReactor interface { SwitchToBlockSync(sm.State) error } //------------------------------------------------------------------------------ func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { var blockStoreDB dbm.DB blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config}) if err != nil { return } blockStore = store.NewBlockStore(blockStoreDB) stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config}) if err != nil { return } return } func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) { proxyApp := proxy.NewAppConns(clientCreator, metrics) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { return nil, fmt.Errorf("error starting proxy app connections: %v", err) } return proxyApp, nil } func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { eventBus := types.NewEventBus() eventBus.SetLogger(logger.With("module", "events")) if err := eventBus.Start(); err != nil { return nil, err } return eventBus, nil } func createAndStartIndexerService( config *cfg.Config, chainID string, dbProvider cfg.DBProvider, eventBus *types.EventBus, logger log.Logger, ) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { var ( txIndexer txindex.TxIndexer blockIndexer indexer.BlockIndexer ) txIndexer, blockIndexer, err := block.IndexerFromConfig(config, dbProvider, chainID) if err != nil { return nil, nil, nil, err } indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) indexerService.SetLogger(logger.With("module", "txindex")) if err := indexerService.Start(); err != nil { return nil, nil, nil, err } return indexerService, txIndexer, blockIndexer, nil } func doHandshake( stateStore sm.Store, state sm.State, blockStore sm.BlockStore, genDoc *types.GenesisDoc, eventBus types.BlockEventPublisher, proxyApp proxy.AppConns, consensusLogger log.Logger, ) error { handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) handshaker.SetLogger(consensusLogger) handshaker.SetEventBus(eventBus) if err := handshaker.Handshake(proxyApp); err != nil { return fmt.Errorf("error during handshake: %v", err) } return nil } func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) { // Log the version info. logger.Info("Version info", "tendermint_version", version.TMCoreSemVer, "abci", version.ABCISemVer, "block", version.BlockProtocol, "p2p", version.P2PProtocol, "commit_hash", version.TMGitCommitHash, ) // If the state and software differ in block version, at least log it. if state.Version.Consensus.Block != version.BlockProtocol { logger.Info("Software and state have different block protocols", "software", version.BlockProtocol, "state", state.Version.Consensus.Block, ) } addr := pubKey.Address() // Log whether this node is a validator or an observer if state.Validators.HasAddress(addr) { consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey) } else { consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey) } } func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool { if state.Validators.Size() > 1 { return false } addr, _ := state.Validators.GetByIndex(0) return bytes.Equal(pubKey.Address(), addr) } func createMempoolAndMempoolReactor( config *cfg.Config, proxyApp proxy.AppConns, state sm.State, memplMetrics *mempl.Metrics, logger log.Logger, ) (mempl.Mempool, p2p.Reactor) { logger = logger.With("module", "mempool") switch config.Mempool.Version { case cfg.MempoolV1: mp := mempoolv1.NewTxMempool( logger, config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, mempoolv1.WithMetrics(memplMetrics), mempoolv1.WithPreCheck(sm.TxPreCheck(state)), mempoolv1.WithPostCheck(sm.TxPostCheck(state)), ) reactor := mempoolv1.NewReactor( config.Mempool, mp, ) if config.Consensus.WaitForTxs() { mp.EnableTxsAvailable() } reactor.SetLogger(logger) return mp, reactor case cfg.MempoolV0: mp := mempoolv0.NewCListMempool( config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, mempoolv0.WithMetrics(memplMetrics), mempoolv0.WithPreCheck(sm.TxPreCheck(state)), mempoolv0.WithPostCheck(sm.TxPostCheck(state)), ) mp.SetLogger(logger) reactor := mempoolv0.NewReactor( config.Mempool, mp, ) if config.Consensus.WaitForTxs() { mp.EnableTxsAvailable() } reactor.SetLogger(logger) return mp, reactor default: return nil, nil } } func createEvidenceReactor(config *cfg.Config, dbProvider cfg.DBProvider, stateStore sm.Store, blockStore *store.BlockStore, logger log.Logger, ) (*evidence.Reactor, *evidence.Pool, error) { evidenceDB, err := dbProvider(&cfg.DBContext{ID: "evidence", Config: config}) if err != nil { return nil, nil, err } evidenceLogger := logger.With("module", "evidence") evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore) if err != nil { return nil, nil, err } evidenceReactor := evidence.NewReactor(evidencePool) evidenceReactor.SetLogger(evidenceLogger) return evidenceReactor, evidencePool, nil } func createBlocksyncReactor(config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore *store.BlockStore, blockSync bool, logger log.Logger, metrics *blocksync.Metrics, ) (bcReactor p2p.Reactor, err error) { switch config.BlockSync.Version { case "v0": bcReactor = blocksync.NewReactor(state.Copy(), blockExec, blockStore, blockSync, metrics) case "v1", "v2": return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version) default: return nil, fmt.Errorf("unknown fastsync version %s", config.BlockSync.Version) } bcReactor.SetLogger(logger.With("module", "blocksync")) return bcReactor, nil } func createConsensusReactor(config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, mempool mempl.Mempool, evidencePool *evidence.Pool, privValidator types.PrivValidator, csMetrics *cs.Metrics, waitSync bool, eventBus *types.EventBus, consensusLogger log.Logger, ) (*cs.Reactor, *cs.State) { consensusState := cs.NewState( config.Consensus, state.Copy(), blockExec, blockStore, mempool, evidencePool, cs.StateMetrics(csMetrics), ) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) } consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics)) consensusReactor.SetLogger(consensusLogger) // services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor consensusReactor.SetEventBus(eventBus) return consensusReactor, consensusState } func createTransport( config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey *p2p.NodeKey, proxyApp proxy.AppConns, ) ( *p2p.MultiplexTransport, []p2p.PeerFilterFunc, ) { var ( mConnConfig = p2p.MConnConfig(config.P2P) transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) if !config.P2P.AllowDuplicateIP { connFilters = append(connFilters, p2p.ConnDuplicateIPFilter()) } // Filter peers by addr or pubkey with an ABCI query. // If the query return code is OK, add peer. if config.FilterPeers { connFilters = append( connFilters, // ABCI query for address filtering. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), }) if err != nil { return err } if res.IsErr() { return fmt.Errorf("error querying abci app: %v", res) } return nil }, ) peerFilters = append( peerFilters, // ABCI query for ID filtering. func(_ p2p.IPeerSet, p p2p.Peer) error { res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), }) if err != nil { return err } if res.IsErr() { return fmt.Errorf("error querying abci app: %v", res) } return nil }, ) } p2p.MultiplexTransportConnFilters(connFilters...)(transport) // Limit the number of incoming connections. max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) p2p.MultiplexTransportMaxIncomingConnections(max)(transport) return transport, peerFilters } func createSwitch(config *cfg.Config, transport p2p.Transport, p2pMetrics *p2p.Metrics, peerFilters []p2p.PeerFilterFunc, mempoolReactor p2p.Reactor, bcReactor p2p.Reactor, stateSyncReactor *statesync.Reactor, consensusReactor *cs.Reactor, evidenceReactor *evidence.Reactor, nodeInfo p2p.NodeInfo, nodeKey *p2p.NodeKey, p2pLogger log.Logger, ) *p2p.Switch { sw := p2p.NewSwitch( config.P2P, transport, p2p.WithMetrics(p2pMetrics), p2p.SwitchPeerFilters(peerFilters...), ) sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("BLOCKSYNC", bcReactor) sw.AddReactor("CONSENSUS", consensusReactor) sw.AddReactor("EVIDENCE", evidenceReactor) sw.AddReactor("STATESYNC", stateSyncReactor) sw.SetNodeInfo(nodeInfo) sw.SetNodeKey(nodeKey) p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile()) return sw } func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, p2pLogger log.Logger, nodeKey *p2p.NodeKey, ) (pex.AddrBook, error) { addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) // Add ourselves to addrbook to prevent dialing ourselves if config.P2P.ExternalAddress != "" { addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ExternalAddress)) if err != nil { return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err) } addrBook.AddOurAddress(addr) } if config.P2P.ListenAddress != "" { addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ListenAddress)) if err != nil { return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err) } addrBook.AddOurAddress(addr) } sw.SetAddrBook(addrBook) return addrBook, nil } func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, sw *p2p.Switch, logger log.Logger, ) *pex.Reactor { // TODO persistent peers ? so we can have their DNS addrs saved pexReactor := pex.NewReactor(addrBook, &pex.ReactorConfig{ Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), SeedMode: config.P2P.SeedMode, // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 // blocks assuming 10s blocks ~ 28 hours. // TODO (melekes): make it dynamic based on the actual block latencies // from the live network. // https://github.com/tendermint/tendermint/issues/3523 SeedDisconnectWaitPeriod: 28 * time.Hour, PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, }) pexReactor.SetLogger(logger.With("module", "pex")) sw.AddReactor("PEX", pexReactor) return pexReactor } // startStateSync starts an asynchronous state sync process, then switches to block sync mode. func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.Reactor, stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, blockSync bool, stateStore sm.Store, blockStore *store.BlockStore, state sm.State, ) error { ssR.Logger.Info("Starting state sync") if stateProvider == nil { var err error ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stateProvider, err = statesync.NewLightClientStateProvider( ctx, state.ChainID, state.Version, state.InitialHeight, config.RPCServers, light.TrustOptions{ Period: config.TrustPeriod, Height: config.TrustHeight, Hash: config.TrustHashBytes(), }, ssR.Logger.With("module", "light")) if err != nil { return fmt.Errorf("failed to set up light client state provider: %w", err) } } go func() { state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime) if err != nil { ssR.Logger.Error("State sync failed", "err", err) return } err = stateStore.Bootstrap(state) if err != nil { ssR.Logger.Error("Failed to bootstrap node with new state", "err", err) return } err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit) if err != nil { ssR.Logger.Error("Failed to store last seen commit", "err", err) return } if blockSync { err = bcR.SwitchToBlockSync(state) if err != nil { ssR.Logger.Error("Failed to switch to block sync", "err", err) return } } else { conR.SwitchToConsensus(state, true) } }() return nil } //------------------------------------------------------------------------------ var genesisDocKey = []byte("genesisDoc") // LoadStateFromDBOrGenesisDocProvider attempts to load the state from the // database, or creates one using the given genesisDocProvider. On success this also // returns the genesis doc loaded through the given provider. func LoadStateFromDBOrGenesisDocProvider( stateDB dbm.DB, genesisDocProvider GenesisDocProvider, ) (sm.State, *types.GenesisDoc, error) { // Get genesis doc genDoc, err := loadGenesisDoc(stateDB) if err != nil { genDoc, err = genesisDocProvider() if err != nil { return sm.State{}, nil, err } err = genDoc.ValidateAndComplete() if err != nil { return sm.State{}, nil, fmt.Errorf("error in genesis doc: %w", err) } // save genesis doc to prevent a certain class of user errors (e.g. when it // was changed, accidentally or not). Also good for audit trail. if err := saveGenesisDoc(stateDB, genDoc); err != nil { return sm.State{}, nil, err } } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) if err != nil { return sm.State{}, nil, err } return state, genDoc, nil } // panics if failed to unmarshal bytes func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) { b, err := db.Get(genesisDocKey) if err != nil { panic(err) } if len(b) == 0 { return nil, errors.New("genesis doc not found") } var genDoc *types.GenesisDoc err = tmjson.Unmarshal(b, &genDoc) if err != nil { panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, b)) } return genDoc, nil } // panics if failed to marshal the given genesis document func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error { b, err := tmjson.Marshal(genDoc) if err != nil { return fmt.Errorf("failed to save genesis doc due to marshaling error: %w", err) } if err := db.SetSync(genesisDocKey, b); err != nil { return err } return nil } func createAndStartPrivValidatorSocketClient( listenAddr, chainID string, logger log.Logger, ) (types.PrivValidator, error) { pve, err := privval.NewSignerListener(listenAddr, logger) if err != nil { return nil, fmt.Errorf("failed to start private validator: %w", err) } pvsc, err := privval.NewSignerClient(pve, chainID) if err != nil { return nil, fmt.Errorf("failed to start private validator: %w", err) } // try to get a pubkey from private validate first time _, err = pvsc.GetPubKey() if err != nil { return nil, fmt.Errorf("can't get pubkey: %w", err) } const ( retries = 50 // 50 * 100ms = 5s total timeout = 100 * time.Millisecond ) pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout) return pvscWithRetries, nil } // splitAndTrimEmpty slices s into all subslices separated by sep and returns a // slice of the string s with all leading and trailing Unicode code points // contained in cutset removed. If sep is empty, SplitAndTrim splits after each // UTF-8 sequence. First part is equivalent to strings.SplitN with a count of // -1. also filter out empty strings, only return non-empty strings. func splitAndTrimEmpty(s, sep, cutset string) []string { if s == "" { return []string{} } spl := strings.Split(s, sep) nonEmptyStrings := make([]string, 0, len(spl)) for i := 0; i < len(spl); i++ { element := strings.Trim(spl[i], cutset) if element != "" { nonEmptyStrings = append(nonEmptyStrings, element) } } return nonEmptyStrings }