From 7b162f5c54267d716ef2e66912d59a0bd993a84f Mon Sep 17 00:00:00 2001 From: Thane Thomson Date: Fri, 26 Apr 2019 08:05:39 -0400 Subject: [PATCH] node: refactor node.NewNode (#3456) The node.NewNode method is pretty complex at the moment, an in order to address issues like #3156, we need to simplify the interface for partial node instantiation. In some places, we don't need to build up a full node (like in the node.TestCreateProposalBlock test), but the complexity of such partial instantiation needs to be reduced. This PR aims to eventually make this easier/simpler. See also this gist https://gist.github.com/thanethomson/56e1640d057a26186e38ad678a1d114c for some background work done when starting to refactor here. ## Commits: * [WIP] Refactor node.NewNode to simplify The `node.NewNode` method is pretty complex at the moment, an in order to address issues like #3156, we need to simplify the interface for partial node instantiation. In some places, we don't need to build up a full node (like in the `node.TestCreateProposalBlock` test), but the complexity of such partial instantiation needs to be reduced. This PR aims to eventually make this easier/simpler. * Refactor state loading and genesis doc provider into state package * Refactor for clarity of return parameters * Fix incorrect capitalization of error messages * Simplify extracted functions' names * Document optionally-prefixed functions * Refactor optionallyFastSync for clarity of separation of concerns * Restructure function for early return * Restructure function for early return * Remove dependence on deprecated panic functions * refactor code a bit more plus, expose PEXReactor on node * align logger names * add a changelog entry * align logger names 2 * add a note about PEXReactor returning nil --- CHANGELOG_PENDING.md | 3 +- node/node.go | 458 +++++++++++++++++++++++++------------------ rpc/test/helpers.go | 3 +- state/store.go | 65 ++++++ 4 files changed, 331 insertions(+), 198 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 6065ee9b8..1dc44e686 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -9,7 +9,8 @@ * Apps * Go API -- [libs/common] Removed PanicSanity, PanicCrisis, PanicConsensus and PanicQ +- [libs/common] Removed `PanicSanity`, `PanicCrisis`, `PanicConsensus` and `PanicQ` +- [node] Moved `GenesisDocProvider` and `DefaultGenesisDocProviderFunc` to state package * Blockchain Protocol diff --git a/node/node.go b/node/node.go index 9ad7172fb..36bdb9d57 100644 --- a/node/node.go +++ b/node/node.go @@ -18,8 +18,10 @@ import ( amino "github.com/tendermint/go-amino" abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/consensus" cs "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/evidence" @@ -63,19 +65,6 @@ func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) { return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()), nil } -// GenesisDocProvider returns a GenesisDoc. -// It allows the GenesisDoc to be pulled from sources other than the -// filesystem, for instance from a distributed key-value store cluster. -type GenesisDocProvider func() (*types.GenesisDoc, error) - -// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads -// the GenesisDoc from the config.GenesisFile() on the filesystem. -func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider { - return func() (*types.GenesisDoc, error) { - return types.GenesisDocFromFile(config.GenesisFile()) - } -} - // NodeProvider takes a config and a logger and returns a ready to go Node. type NodeProvider func(*cfg.Config, log.Logger) (*Node, error) @@ -96,7 +85,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { if _, err := os.Stat(oldPrivVal); !os.IsNotExist(err) { oldPV, err := privval.LoadOldFilePV(oldPrivVal) if err != nil { - return nil, fmt.Errorf("Error reading OldPrivValidator from %v: %v\n", oldPrivVal, err) + return nil, fmt.Errorf("error reading OldPrivValidator from %v: %v\n", oldPrivVal, err) } logger.Info("Upgrading PrivValidator file", "old", oldPrivVal, @@ -110,7 +99,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { privval.LoadOrGenFilePV(newPrivValKey, newPrivValState), nodeKey, proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()), - DefaultGenesisDocProviderFunc(config), + sm.DefaultGenesisDocProviderFunc(config), DefaultDBProvider, DefaultMetricsProvider(config.Instrumentation), logger, @@ -162,6 +151,7 @@ type Node struct { mempoolReactor *mempl.MempoolReactor // for gossipping transactions consensusState *cs.ConsensusState // latest consensus state consensusReactor *cs.ConsensusReactor // for participating in the consensus + pexReactor *pex.PEXReactor // for exchanging peer addresses evidencePool *evidence.EvidencePool // tracking evidence proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers @@ -170,73 +160,49 @@ type Node struct { prometheusSrv *http.Server } -// NewNode returns a new, ready to go, Tendermint Node. -func NewNode(config *cfg.Config, - privValidator types.PrivValidator, - nodeKey *p2p.NodeKey, - clientCreator proxy.ClientCreator, - genesisDocProvider GenesisDocProvider, - dbProvider DBProvider, - metricsProvider MetricsProvider, - logger log.Logger) (*Node, error) { - - // Get BlockStore - blockStoreDB, err := dbProvider(&DBContext{"blockstore", config}) +func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *bc.BlockStore, stateDB dbm.DB, err error) { + var blockStoreDB dbm.DB + blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) if err != nil { - return nil, err + return } - blockStore := bc.NewBlockStore(blockStoreDB) + blockStore = bc.NewBlockStore(blockStoreDB) - // Get State - stateDB, err := dbProvider(&DBContext{"state", config}) + stateDB, err = dbProvider(&DBContext{"state", config}) if err != nil { - return nil, err + return } - // Get genesis doc - // TODO: move to state package? - genDoc, err := loadGenesisDoc(stateDB) - if err != nil { - genDoc, err = genesisDocProvider() - if err != nil { - return nil, err - } - // save genesis doc to prevent a certain class of user errors (e.g. when it - // was changed, accidentally or not). Also good for audit trail. - saveGenesisDoc(stateDB, genDoc) - } + return +} - state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc) - if err != nil { - return nil, err - } - - // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). +func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) { proxyApp := proxy.NewAppConns(clientCreator) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { - return nil, fmt.Errorf("Error starting proxy app connections: %v", err) + return nil, fmt.Errorf("error starting proxy app connections: %v", err) } + return proxyApp, nil +} - // EventBus and IndexerService must be started before the handshake because - // we might need to index the txs of the replayed block as this might not have happened - // when the node stopped last time (i.e. the node stopped after it saved the block - // but before it indexed the txs, or, endblocker panicked) +func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { eventBus := types.NewEventBus() eventBus.SetLogger(logger.With("module", "events")) - - err = eventBus.Start() - if err != nil { + if err := eventBus.Start(); err != nil { return nil, err } + return eventBus, nil +} + +func createAndStartIndexerService(config *cfg.Config, dbProvider DBProvider, + eventBus *types.EventBus, logger log.Logger) (*txindex.IndexerService, txindex.TxIndexer, error) { - // Transaction indexing var txIndexer txindex.TxIndexer switch config.TxIndex.Indexer { case "kv": store, err := dbProvider(&DBContext{"tx_index", config}) if err != nil { - return nil, err + return nil, nil, err } if config.TxIndex.IndexTags != "" { txIndexer = kv.NewTxIndex(store, kv.IndexTags(splitAndTrimEmpty(config.TxIndex.IndexTags, ",", " "))) @@ -251,26 +217,26 @@ func NewNode(config *cfg.Config, indexerService := txindex.NewIndexerService(txIndexer, eventBus) indexerService.SetLogger(logger.With("module", "txindex")) - - err = indexerService.Start() - if err != nil { - return nil, err + if err := indexerService.Start(); err != nil { + return nil, nil, err } + return indexerService, txIndexer, nil +} + +func doHandshake(stateDB dbm.DB, state sm.State, blockStore sm.BlockStore, + genDoc *types.GenesisDoc, eventBus *types.EventBus, proxyApp proxy.AppConns, consensusLogger log.Logger) error { - // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, - // and replays any blocks as necessary to sync tendermint with the app. - consensusLogger := logger.With("module", "consensus") handshaker := cs.NewHandshaker(stateDB, state, blockStore, genDoc) handshaker.SetLogger(consensusLogger) handshaker.SetEventBus(eventBus) if err := handshaker.Handshake(proxyApp); err != nil { - return nil, fmt.Errorf("Error during handshake: %v", err) + return fmt.Errorf("error during handshake: %v", err) } + return nil +} - // Reload the state. It will have the Version.Consensus.App set by the - // Handshake, and may have other modifications as well (ie. depending on - // what happened during block replay). - state = sm.LoadState(stateDB) +func logNodeStartupInfo(state sm.State, privValidator types.PrivValidator, logger, + consensusLogger log.Logger) { // Log the version info. logger.Info("Version info", @@ -287,27 +253,6 @@ func NewNode(config *cfg.Config, ) } - if config.PrivValidatorListenAddr != "" { - // If an address is provided, listen on the socket for a connection from an - // external signing process. - // FIXME: we should start services inside OnStart - privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger) - if err != nil { - return nil, errors.Wrap(err, "Error with private validator socket client") - } - } - - // Decide whether to fast-sync or not - // We don't fast-sync when the only validator is us. - fastSync := config.FastSync - if state.Validators.Size() == 1 { - addr, _ := state.Validators.GetByIndex(0) - privValAddr := privValidator.GetPubKey().Address() - if bytes.Equal(privValAddr, addr) { - fastSync = false - } - } - pubKey := privValidator.GetPubKey() addr := pubKey.Address() // Log whether this node is a validator or an observer @@ -316,10 +261,19 @@ func NewNode(config *cfg.Config, } else { consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey) } +} - csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) +func onlyValidatorIsUs(state sm.State, privVal types.PrivValidator) bool { + if state.Validators.Size() > 1 { + return false + } + addr, _ := state.Validators.GetByIndex(0) + return bytes.Equal(privVal.GetPubKey().Address(), addr) +} + +func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, + state sm.State, memplMetrics *mempl.Metrics, logger log.Logger) (*mempl.MempoolReactor, *mempl.Mempool) { - // Make MempoolReactor mempool := mempl.NewMempool( config.Mempool, proxyApp.Mempool(), @@ -339,34 +293,36 @@ func NewNode(config *cfg.Config, if config.Consensus.WaitForTxs() { mempool.EnableTxsAvailable() } + return mempoolReactor, mempool +} + +func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider, + stateDB dbm.DB, logger log.Logger) (*evidence.EvidenceReactor, *evidence.EvidencePool, error) { - // Make Evidence Reactor evidenceDB, err := dbProvider(&DBContext{"evidence", config}) if err != nil { - return nil, err + return nil, nil, err } evidenceLogger := logger.With("module", "evidence") evidencePool := evidence.NewEvidencePool(stateDB, evidenceDB) evidencePool.SetLogger(evidenceLogger) evidenceReactor := evidence.NewEvidenceReactor(evidencePool) evidenceReactor.SetLogger(evidenceLogger) + return evidenceReactor, evidencePool, nil +} - blockExecLogger := logger.With("module", "state") - // make block executor for consensus and blockchain reactors to execute blocks - blockExec := sm.NewBlockExecutor( - stateDB, - blockExecLogger, - proxyApp.Consensus(), - mempool, - evidencePool, - sm.BlockExecutorWithMetrics(smMetrics), - ) +func createConsensusReactor(config *cfg.Config, + state sm.State, + blockExec *sm.BlockExecutor, + blockStore sm.BlockStore, + mempool *mempl.Mempool, + evidencePool *evidence.EvidencePool, + privValidator types.PrivValidator, + csMetrics *cs.Metrics, + fastSync bool, + eventBus *types.EventBus, + consensusLogger log.Logger) (*consensus.ConsensusReactor, *consensus.ConsensusState) { - // Make BlockchainReactor - bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - bcReactor.SetLogger(logger.With("module", "blockchain")) - - // Make ConsensusReactor consensusState := cs.NewConsensusState( config.Consensus, state.Copy(), @@ -382,28 +338,13 @@ func NewNode(config *cfg.Config, } consensusReactor := cs.NewConsensusReactor(consensusState, fastSync, cs.ReactorMetrics(csMetrics)) consensusReactor.SetLogger(consensusLogger) - // services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor consensusReactor.SetEventBus(eventBus) + return consensusReactor, consensusState +} - p2pLogger := logger.With("module", "p2p") - nodeInfo, err := makeNodeInfo( - config, - nodeKey.ID(), - txIndexer, - genDoc.ChainID, - p2p.NewProtocolVersion( - version.P2PProtocol, // global - state.Version.Consensus.Block, - state.Version.Consensus.App, - ), - ) - if err != nil { - return nil, err - } - - // Setup Transport. +func createTransport(config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey *p2p.NodeKey, proxyApp proxy.AppConns) (*p2p.MultiplexTransport, []p2p.PeerFilterFunc) { var ( mConnConfig = p2p.MConnConfig(config.P2P) transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig) @@ -429,7 +370,7 @@ func NewNode(config *cfg.Config, return err } if res.IsErr() { - return fmt.Errorf("Error querying abci app: %v", res) + return fmt.Errorf("error querying abci app: %v", res) } return nil @@ -447,7 +388,7 @@ func NewNode(config *cfg.Config, return err } if res.IsErr() { - return fmt.Errorf("Error querying abci app: %v", res) + return fmt.Errorf("error querying abci app: %v", res) } return nil @@ -456,8 +397,21 @@ func NewNode(config *cfg.Config, } p2p.MultiplexTransportConnFilters(connFilters...)(transport) + return transport, peerFilters +} + +func createSwitch(config *cfg.Config, + transport *p2p.MultiplexTransport, + p2pMetrics *p2p.Metrics, + peerFilters []p2p.PeerFilterFunc, + mempoolReactor *mempl.MempoolReactor, + bcReactor *blockchain.BlockchainReactor, + consensusReactor *consensus.ConsensusReactor, + evidenceReactor *evidence.EvidenceReactor, + nodeInfo p2p.NodeInfo, + nodeKey *p2p.NodeKey, + p2pLogger log.Logger) *p2p.Switch { - // Setup Switch. sw := p2p.NewSwitch( config.P2P, transport, @@ -473,6 +427,159 @@ func NewNode(config *cfg.Config, sw.SetNodeKey(nodeKey) p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile()) + return sw +} + +func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, + p2pLogger log.Logger) pex.AddrBook { + + addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) + addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) + + // Add ourselves to addrbook to prevent dialing ourselves + addrBook.AddOurAddress(sw.NetAddress()) + + sw.SetAddrBook(addrBook) + + return addrBook +} + +func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, + sw *p2p.Switch, logger log.Logger) *pex.PEXReactor { + + // TODO persistent peers ? so we can have their DNS addrs saved + pexReactor := pex.NewPEXReactor(addrBook, + &pex.PEXReactorConfig{ + Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), + SeedMode: config.P2P.SeedMode, + // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 + // blocks assuming 10s blocks ~ 28 hours. + // TODO (melekes): make it dynamic based on the actual block latencies + // from the live network. + // https://github.com/tendermint/tendermint/issues/3523 + SeedDisconnectWaitPeriod: 28 * time.Hour, + }) + pexReactor.SetLogger(logger.With("module", "pex")) + sw.AddReactor("PEX", pexReactor) + return pexReactor +} + +// NewNode returns a new, ready to go, Tendermint Node. +func NewNode(config *cfg.Config, + privValidator types.PrivValidator, + nodeKey *p2p.NodeKey, + clientCreator proxy.ClientCreator, + genesisDocProvider sm.GenesisDocProvider, + dbProvider DBProvider, + metricsProvider MetricsProvider, + logger log.Logger) (*Node, error) { + + blockStore, stateDB, err := initDBs(config, dbProvider) + if err != nil { + return nil, err + } + + state, genDoc, err := sm.LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider) + if err != nil { + return nil, err + } + + // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). + proxyApp, err := createAndStartProxyAppConns(clientCreator, logger) + if err != nil { + return nil, err + } + + // EventBus and IndexerService must be started before the handshake because + // we might need to index the txs of the replayed block as this might not have happened + // when the node stopped last time (i.e. the node stopped after it saved the block + // but before it indexed the txs, or, endblocker panicked) + eventBus, err := createAndStartEventBus(logger) + if err != nil { + return nil, err + } + + // Transaction indexing + indexerService, txIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger) + if err != nil { + return nil, err + } + + // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, + // and replays any blocks as necessary to sync tendermint with the app. + consensusLogger := logger.With("module", "consensus") + if err := doHandshake(stateDB, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { + return nil, err + } + + // Reload the state. It will have the Version.Consensus.App set by the + // Handshake, and may have other modifications as well (ie. depending on + // what happened during block replay). + state = sm.LoadState(stateDB) + + // If an address is provided, listen on the socket for a connection from an + // external signing process. + if config.PrivValidatorListenAddr != "" { + // FIXME: we should start services inside OnStart + privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger) + if err != nil { + return nil, errors.Wrap(err, "error with private validator socket client") + } + } + + logNodeStartupInfo(state, privValidator, logger, consensusLogger) + + // Decide whether to fast-sync or not + // We don't fast-sync when the only validator is us. + fastSync := config.FastSync && !onlyValidatorIsUs(state, privValidator) + + csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) + + // Make MempoolReactor + mempoolReactor, mempool := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) + + // Make Evidence Reactor + evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, logger) + if err != nil { + return nil, err + } + + // make block executor for consensus and blockchain reactors to execute blocks + blockExec := sm.NewBlockExecutor( + stateDB, + logger.With("module", "state"), + proxyApp.Consensus(), + mempool, + evidencePool, + sm.BlockExecutorWithMetrics(smMetrics), + ) + + // Make BlockchainReactor + bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + bcReactor.SetLogger(logger.With("module", "blockchain")) + + // Make ConsensusReactor + consensusReactor, consensusState := createConsensusReactor( + config, state, blockExec, blockStore, mempool, evidencePool, + privValidator, csMetrics, fastSync, eventBus, consensusLogger, + ) + + nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) + if err != nil { + return nil, err + } + + // Setup Transport. + transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp) + + // Setup Switch. + p2pLogger := logger.With("module", "p2p") + sw := createSwitch( + config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, + consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, + ) + + addrBook := createAddrBookAndSetOnSwitch(config, sw, p2pLogger) // Optionally, start the pex reactor // @@ -486,37 +593,13 @@ func NewNode(config *cfg.Config, // // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. // Note we currently use the addrBook regardless at least for AddOurAddress - addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) - - // Add ourselves to addrbook to prevent dialing ourselves - addrBook.AddOurAddress(sw.NetAddress()) - - addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) + var pexReactor *pex.PEXReactor if config.P2P.PexReactor { - // TODO persistent peers ? so we can have their DNS addrs saved - pexReactor := pex.NewPEXReactor(addrBook, - &pex.PEXReactorConfig{ - Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), - SeedMode: config.P2P.SeedMode, - // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 - // blocks assuming 10s blocks ~ 28 hours. - // TODO (melekes): make it dynamic based on the actual block latencies - // from the live network. - // https://github.com/tendermint/tendermint/issues/3523 - SeedDisconnectWaitPeriod: 28 * time.Hour, - }) - pexReactor.SetLogger(logger.With("module", "pex")) - sw.AddReactor("PEX", pexReactor) + pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) } - sw.SetAddrBook(addrBook) - - // run the profile server - profileHost := config.ProfListenAddress - if profileHost != "" { - go func() { - logger.Error("Profile server", "err", http.ListenAndServe(profileHost, nil)) - }() + if config.ProfListenAddress != "" { + go logger.Error("Profile server", "err", http.ListenAndServe(config.ProfListenAddress, nil)) } node := &Node{ @@ -536,6 +619,7 @@ func NewNode(config *cfg.Config, mempoolReactor: mempoolReactor, consensusState: consensusState, consensusReactor: consensusReactor, + pexReactor: pexReactor, evidencePool: evidencePool, proxyApp: proxyApp, txIndexer: txIndexer, @@ -804,6 +888,11 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } +// PEXReactor returns the Node's PEXReactor. It returns nil if PEX is disabled. +func (n *Node) PEXReactor() *pex.PEXReactor { + return n.pexReactor +} + // EvidencePool returns the Node's EvidencePool. func (n *Node) EvidencePool() *evidence.EvidencePool { return n.evidencePool @@ -854,20 +943,24 @@ func (n *Node) NodeInfo() p2p.NodeInfo { func makeNodeInfo( config *cfg.Config, - nodeID p2p.ID, + nodeKey *p2p.NodeKey, txIndexer txindex.TxIndexer, - chainID string, - protocolVersion p2p.ProtocolVersion, + genDoc *types.GenesisDoc, + state sm.State, ) (p2p.NodeInfo, error) { txIndexerStatus := "on" if _, ok := txIndexer.(*null.TxIndex); ok { txIndexerStatus = "off" } nodeInfo := p2p.DefaultNodeInfo{ - ProtocolVersion: protocolVersion, - ID_: nodeID, - Network: chainID, - Version: version.TMCoreSemVer, + ProtocolVersion: p2p.NewProtocolVersion( + version.P2PProtocol, // global + state.Version.Consensus.Block, + state.Version.Consensus.App, + ), + ID_: nodeKey.ID(), + Network: genDoc.ChainID, + Version: version.TMCoreSemVer, Channels: []byte{ bc.BlockchainChannel, cs.StateChannel, cs.DataChannel, cs.VoteChannel, cs.VoteSetBitsChannel, @@ -899,33 +992,6 @@ func makeNodeInfo( //------------------------------------------------------------------------------ -var ( - genesisDocKey = []byte("genesisDoc") -) - -// panics if failed to unmarshal bytes -func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) { - bytes := db.Get(genesisDocKey) - if len(bytes) == 0 { - return nil, errors.New("Genesis doc not found") - } - var genDoc *types.GenesisDoc - err := cdc.UnmarshalJSON(bytes, &genDoc) - if err != nil { - panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, bytes)) - } - return genDoc, nil -} - -// panics if failed to marshal the given genesis document -func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) { - bytes, err := cdc.MarshalJSON(genDoc) - if err != nil { - panic(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err)) - } - db.SetSync(genesisDocKey, bytes) -} - func createAndStartPrivValidatorSocketClient( listenAddr string, logger log.Logger, @@ -946,7 +1012,7 @@ func createAndStartPrivValidatorSocketClient( listener = privval.NewTCPListener(ln, ed25519.GenPrivKey()) default: return nil, fmt.Errorf( - "Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s", + "wrong listen address: expected either 'tcp' or 'unix' protocols, got %s", protocol, ) } diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index cddc80b8e..f5c923bf4 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -9,6 +9,7 @@ import ( "time" "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/state" abci "github.com/tendermint/tendermint/abci/types" @@ -166,7 +167,7 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node { panic(err) } node, err := nm.NewNode(config, pv, nodeKey, papp, - nm.DefaultGenesisDocProviderFunc(config), + state.DefaultGenesisDocProviderFunc(config), nm.DefaultDBProvider, nm.DefaultMetricsProvider(config.Instrumentation), logger) diff --git a/state/store.go b/state/store.go index 0301bc7c3..e1fb5ba60 100644 --- a/state/store.go +++ b/state/store.go @@ -1,9 +1,11 @@ package state import ( + "errors" "fmt" abci "github.com/tendermint/tendermint/abci/types" + cfg "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/types" @@ -17,6 +19,23 @@ const ( valSetCheckpointInterval = 100000 ) +var ( + genesisDocKey = []byte("genesisDoc") +) + +// GenesisDocProvider returns a GenesisDoc. +// It allows the GenesisDoc to be pulled from sources other than the +// filesystem, for instance from a distributed key-value store cluster. +type GenesisDocProvider func() (*types.GenesisDoc, error) + +// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads +// the GenesisDoc from the config.GenesisFile() on the filesystem. +func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider { + return func() (*types.GenesisDoc, error) { + return types.GenesisDocFromFile(config.GenesisFile()) + } +} + //------------------------------------------------------------------------ func calcValidatorsKey(height int64) []byte { @@ -65,6 +84,52 @@ func LoadStateFromDBOrGenesisDoc(stateDB dbm.DB, genesisDoc *types.GenesisDoc) ( return state, nil } +// LoadStateFromDBOrGenesisDocProvider attempts to load the state from the +// database, or creates one using the given genesisDocProvider and persists the +// result to the database. On success this also returns the genesis doc loaded +// through the given provider. +func LoadStateFromDBOrGenesisDocProvider(stateDB dbm.DB, genesisDocProvider GenesisDocProvider) (State, *types.GenesisDoc, error) { + // Get genesis doc + genDoc, err := loadGenesisDoc(stateDB) + if err != nil { + genDoc, err = genesisDocProvider() + if err != nil { + return State{}, nil, err + } + // save genesis doc to prevent a certain class of user errors (e.g. when it + // was changed, accidentally or not). Also good for audit trail. + saveGenesisDoc(stateDB, genDoc) + } + state, err := LoadStateFromDBOrGenesisDoc(stateDB, genDoc) + if err != nil { + return State{}, nil, err + } + return state, genDoc, nil +} + +// panics if failed to unmarshal bytes +func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) { + bytes := db.Get(genesisDocKey) + if len(bytes) == 0 { + return nil, errors.New("Genesis doc not found") + } + var genDoc *types.GenesisDoc + err := cdc.UnmarshalJSON(bytes, &genDoc) + if err != nil { + panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, bytes)) + } + return genDoc, nil +} + +// panics if failed to marshal the given genesis document +func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) { + bytes, err := cdc.MarshalJSON(genDoc) + if err != nil { + panic(fmt.Sprintf("Failed to save genesis doc due to marshaling error: %v", err)) + } + db.SetSync(genesisDocKey, bytes) +} + // LoadState loads the State from the database. func LoadState(db dbm.DB) State { return loadState(db, stateKey)