node: implement tendermint modes (#6241)

Co-authored-by: dongsam <dongsamb@gmail.com>
Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
Callum Waters
2021-03-18 11:17:53 +01:00
committed by GitHub
parent 5c547137f6
commit 9f7051d38a
18 changed files with 428 additions and 150 deletions

View File

@@ -100,12 +100,23 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
if err != nil {
return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err)
}
pval, err := privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
if err != nil {
return nil, err
if config.Mode == cfg.ModeSeed {
return NewSeedNode(config,
nodeKey,
DefaultGenesisDocProviderFunc(config),
logger,
)
}
var pval *privval.FilePV
if config.Mode == cfg.ModeValidator {
pval, err = privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile())
if err != nil {
return nil, err
}
} else {
pval = nil
}
return NewNode(config,
pval,
nodeKey,
@@ -298,12 +309,13 @@ func doHandshake(
return nil
}
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger, mode string) {
// Log the version info.
logger.Info("Version info",
"software", version.TMCoreSemVer,
"block", version.BlockProtocol,
"p2p", version.P2PProtocol,
"mode", mode,
)
// If the state and software differ in block version, at least log it.
@@ -313,13 +325,18 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
"state", state.Version.Consensus.Block,
)
}
addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
} else {
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
switch {
case mode == cfg.ModeFull:
consensusLogger.Info("This node is a fullnode")
case mode == cfg.ModeValidator:
addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey.Bytes())
} else {
consensusLogger.Info("This node is a validator (NOT in the active validator set)",
"addr", addr, "pubKey", pubKey.Bytes())
}
}
}
@@ -328,7 +345,7 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
return false
}
addr, _ := state.Validators.GetByIndex(0)
return bytes.Equal(pubKey.Address(), addr)
return pubKey != nil && bytes.Equal(pubKey.Address(), addr)
}
func createMempoolReactor(
@@ -445,7 +462,7 @@ func createBlockchainReactor(
logger = logger.With("module", "blockchain")
switch config.FastSync.Version {
case "v0":
case cfg.BlockchainV0:
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
var (
@@ -471,7 +488,7 @@ func createBlockchainReactor(
return reactorShim, reactor, nil
case "v2":
case cfg.BlockchainV2:
reactor := bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
reactor.SetLogger(logger)
@@ -507,10 +524,8 @@ func createConsensusReactor(
evidencePool,
cs.StateMetrics(csMetrics),
)
consensusState.SetLogger(logger)
if privValidator != nil {
if privValidator != nil && config.Mode == cfg.ModeValidator {
consensusState.SetPrivValidator(privValidator)
}
@@ -677,11 +692,13 @@ func createSwitch(config *cfg.Config,
)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
if config.Mode != cfg.ModeSeed {
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)
}
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
@@ -723,19 +740,19 @@ func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
sw *p2p.Switch, logger log.Logger) *pex.Reactor {
reactorConfig := &pex.ReactorConfig{
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.Mode == cfg.ModeSeed,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
}
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewReactor(addrBook,
&pex.ReactorConfig{
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.P2P.SeedMode,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
})
pexReactor := pex.NewReactor(addrBook, reactorConfig)
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
return pexReactor
@@ -813,6 +830,100 @@ func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reacto
return nil
}
// NewSeedNode returns a new seed node, containing only p2p, pex reactor
func NewSeedNode(config *cfg.Config,
nodeKey p2p.NodeKey,
genesisDocProvider GenesisDocProvider,
logger log.Logger,
options ...Option) (*Node, error) {
genDoc, err := genesisDocProvider()
if err != nil {
return nil, err
}
state, err := sm.MakeGenesisState(genDoc)
if err != nil {
return nil, err
}
nodeInfo, err := makeSeedNodeInfo(config, nodeKey, genDoc, state)
if err != nil {
return nil, err
}
// Setup Transport and Switch.
p2pMetrics := p2p.PrometheusMetrics(config.Instrumentation.Namespace, "chain_id", genDoc.ChainID)
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, config)
sw := createSwitch(
config, transport, p2pMetrics, nil, nil,
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
)
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
}
err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}
peerManager, err := createPeerManager(config, p2pLogger, nodeKey.ID)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
}
router, err := createRouter(p2pLogger, nodeInfo, nodeKey.PrivKey, peerManager, transport)
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
}
// start the pex reactor
pexReactor := createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
pexReactorV2, err := createPEXReactorV2(config, logger, peerManager, router)
if err != nil {
return nil, err
}
if config.RPC.PprofListenAddress != "" {
go func() {
logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress)
logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil))
}()
}
node := &Node{
config: config,
genesisDoc: genDoc,
transport: transport,
sw: sw,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
peerManager: peerManager,
router: router,
pexReactor: pexReactor,
pexReactorV2: pexReactorV2,
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
for _, option := range options {
option(node)
}
return node, nil
}
// NewNode returns a new, ready to go, Tendermint Node.
func NewNode(config *cfg.Config,
privValidator types.PrivValidator,
@@ -875,10 +986,15 @@ func NewNode(config *cfg.Config,
}
}
}
pubKey, err := privValidator.GetPubKey(context.TODO())
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
var pubKey crypto.PubKey
if config.Mode == cfg.ModeValidator {
pubKey, err = privValidator.GetPubKey(context.TODO())
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}
if pubKey == nil {
return nil, errors.New("could not retrieve public key from private validator")
}
}
// Determine whether we should attempt state sync.
@@ -909,7 +1025,7 @@ func NewNode(config *cfg.Config,
// app may modify the validator set, specifying ourself as the only validator.
fastSync := config.FastSyncMode && !onlyValidatorIsUs(state, pubKey)
logNodeStartupInfo(state, pubKey, logger, consensusLogger)
logNodeStartupInfo(state, pubKey, logger, consensusLogger, config.Mode)
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
@@ -989,13 +1105,16 @@ func NewNode(config *cfg.Config,
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
stateSyncReactorShim := p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
var (
stateSyncReactor *statesync.Reactor
stateSyncReactorShim *p2p.ReactorShim
channels map[p2p.ChannelID]*p2p.Channel
peerUpdates *p2p.PeerUpdates
)
stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
if useLegacyP2P {
channels = getChannelsFromShim(stateSyncReactorShim)
peerUpdates = stateSyncReactorShim.PeerUpdates
@@ -1004,7 +1123,7 @@ func NewNode(config *cfg.Config,
peerUpdates = peerManager.Subscribe()
}
stateSyncReactor := statesync.NewReactor(
stateSyncReactor = statesync.NewReactor(
stateSyncReactorShim.Logger,
proxyApp.Snapshot(),
proxyApp.Query(),
@@ -1121,7 +1240,7 @@ func (n *Node) OnStart() error {
// Start the RPC server before the P2P server
// so we can eg. receive txs for the first block
if n.config.RPC.ListenAddress != "" {
if n.config.RPC.ListenAddress != "" && n.config.Mode != cfg.ModeSeed {
listeners, err := n.startRPC()
if err != nil {
return err
@@ -1164,31 +1283,33 @@ func (n *Node) OnStart() error {
return err
}
if n.config.FastSync.Version == "v0" {
// Start the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Start(); err != nil {
if n.config.Mode != cfg.ModeSeed {
if n.config.FastSync.Version == cfg.BlockchainV0 {
// Start the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Start(); err != nil {
return err
}
}
// Start the real consensus reactor separately since the switch uses the shim.
if err := n.consensusReactor.Start(); err != nil {
return err
}
}
// Start the real consensus reactor separately since the switch uses the shim.
if err := n.consensusReactor.Start(); err != nil {
return err
}
// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(); err != nil {
return err
}
// Start the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Start(); err != nil {
return err
}
// Start the real mempool reactor separately since the switch uses the shim.
if err := n.mempoolReactor.Start(); err != nil {
return err
}
// Start the real mempool reactor separately since the switch uses the shim.
if err := n.mempoolReactor.Start(); err != nil {
return err
}
// Start the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Start(); err != nil {
return err
// Start the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Start(); err != nil {
return err
}
}
if !useLegacyP2P && n.pexReactorV2 != nil {
@@ -1233,32 +1354,35 @@ func (n *Node) OnStop() {
n.Logger.Error("Error closing indexerService", "err", err)
}
// now stop the reactors
if n.config.FastSync.Version == "v0" {
// Stop the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
if n.config.Mode != cfg.ModeSeed {
// now stop the reactors
if n.config.FastSync.Version == "v0" {
// Stop the real blockchain reactor separately since the switch uses the shim.
if err := n.bcReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
}
}
}
// Stop the real consensus reactor separately since the switch uses the shim.
if err := n.consensusReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the consensus reactor", "err", err)
}
// Stop the real consensus reactor separately since the switch uses the shim.
if err := n.consensusReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the consensus reactor", "err", err)
}
// Stop the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the state sync reactor", "err", err)
}
// Stop the real state sync reactor separately since the switch uses the shim.
if err := n.stateSyncReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the state sync reactor", "err", err)
}
// Stop the real mempool reactor separately since the switch uses the shim.
if err := n.mempoolReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the mempool reactor", "err", err)
}
// Stop the real mempool reactor separately since the switch uses the shim.
if err := n.mempoolReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the mempool reactor", "err", err)
}
// Stop the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the evidence reactor", "err", err)
// Stop the real evidence reactor separately since the switch uses the shim.
if err := n.evidenceReactor.Stop(); err != nil {
n.Logger.Error("failed to stop the evidence reactor", "err", err)
}
}
if !useLegacyP2P && n.pexReactorV2 != nil {
@@ -1312,11 +1436,7 @@ func (n *Node) OnStop() {
// ConfigureRPC makes sure RPC has all the objects it needs to operate.
func (n *Node) ConfigureRPC() error {
pubKey, err := n.privValidator.GetPubKey(context.TODO())
if err != nil {
return fmt.Errorf("can't get pubkey: %w", err)
}
rpccore.SetEnvironment(&rpccore.Environment{
rpcCoreEnv := rpccore.Environment{
ProxyAppQuery: n.proxyApp.Query(),
ProxyAppMempool: n.proxyApp.Mempool(),
@@ -1327,7 +1447,6 @@ func (n *Node) ConfigureRPC() error {
P2PPeers: n.sw,
P2PTransport: n,
PubKey: pubKey,
GenDoc: n.genesisDoc,
TxIndexer: n.txIndexer,
ConsensusReactor: n.consensusReactor,
@@ -1337,7 +1456,15 @@ func (n *Node) ConfigureRPC() error {
Logger: n.Logger.With("module", "rpc"),
Config: *n.config.RPC,
})
}
if n.config.Mode == cfg.ModeValidator {
pubKey, err := n.privValidator.GetPubKey(context.TODO())
if pubKey == nil || err != nil {
return fmt.Errorf("can't get pubkey: %w", err)
}
rpcCoreEnv.PubKey = pubKey
}
rpccore.SetEnvironment(&rpcCoreEnv)
return nil
}
@@ -1577,10 +1704,10 @@ func makeNodeInfo(
var bcChannel byte
switch config.FastSync.Version {
case "v0":
case cfg.BlockchainV0:
bcChannel = byte(bcv0.BlockchainChannel)
case "v2":
case cfg.BlockchainV2:
bcChannel = bcv2.BlockchainChannel
default:
@@ -1630,6 +1757,45 @@ func makeNodeInfo(
return nodeInfo, err
}
func makeSeedNodeInfo(
config *cfg.Config,
nodeKey p2p.NodeKey,
genDoc *types.GenesisDoc,
state sm.State,
) (p2p.NodeInfo, error) {
nodeInfo := p2p.NodeInfo{
ProtocolVersion: p2p.NewProtocolVersion(
version.P2PProtocol, // global
state.Version.Consensus.Block,
state.Version.Consensus.App,
),
NodeID: nodeKey.ID,
Network: genDoc.ChainID,
Version: version.TMCoreSemVer,
Channels: []byte{},
Moniker: config.Moniker,
Other: p2p.NodeInfoOther{
TxIndex: "off",
RPCAddress: config.RPC.ListenAddress,
},
}
if config.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
lAddr := config.P2P.ExternalAddress
if lAddr == "" {
lAddr = config.P2P.ListenAddress
}
nodeInfo.ListenAddr = lAddr
err := nodeInfo.Validate()
return nodeInfo, err
}
//------------------------------------------------------------------------------
var (

View File

@@ -522,6 +522,27 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))
}
func TestNodeNewSeedNode(t *testing.T) {
config := cfg.ResetTestRoot("node_new_node_custom_reactors_test")
config.Mode = cfg.ModeSeed
defer os.RemoveAll(config.RootDir)
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)
n, err := NewSeedNode(config,
nodeKey,
DefaultGenesisDocProviderFunc(config),
log.TestingLogger(),
)
require.NoError(t, err)
err = n.Start()
require.NoError(t, err)
assert.True(t, n.pexReactor.IsRunning())
}
func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
privVals := make([]types.PrivValidator, nVals)
vals := make([]types.GenesisValidator, nVals)