mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-03 11:45:18 +00:00
split out node into a setup file
This commit is contained in:
@@ -423,6 +423,7 @@ type RPCConfig struct {
|
||||
TLSKeyFile string `mapstructure:"tls_key_file"`
|
||||
|
||||
// pprof listen address (https://golang.org/pkg/net/http/pprof)
|
||||
// FIXME: This should be moved under the instrumentation section
|
||||
PprofListenAddress string `mapstructure:"pprof_laddr"`
|
||||
}
|
||||
|
||||
@@ -506,6 +507,10 @@ func (cfg *RPCConfig) IsCorsEnabled() bool {
|
||||
return len(cfg.CORSAllowedOrigins) != 0
|
||||
}
|
||||
|
||||
func (cfg *RPCConfig) IsPprofEnabled() bool {
|
||||
return len(cfg.PprofListenAddress) != 0
|
||||
}
|
||||
|
||||
func (cfg RPCConfig) KeyFile() string {
|
||||
path := cfg.TLSKeyFile
|
||||
if filepath.IsAbs(path) {
|
||||
@@ -1201,6 +1206,10 @@ func (cfg *InstrumentationConfig) ValidateBasic() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cfg *InstrumentationConfig) IsPrometheusEnabled() bool {
|
||||
return cfg.Prometheus && cfg.PrometheusListenAddr != ""
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Utils
|
||||
|
||||
|
||||
807
node/node.go
807
node/node.go
@@ -1,49 +1,34 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/rs/cors"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
bc "github.com/tendermint/tendermint/blocksync"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
cs "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/evidence"
|
||||
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
"github.com/tendermint/tendermint/light"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
|
||||
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/p2p/pex"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
rpccore "github.com/tendermint/tendermint/rpc/core"
|
||||
grpccore "github.com/tendermint/tendermint/rpc/grpc"
|
||||
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
|
||||
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink/psql"
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/state/txindex/kv"
|
||||
"github.com/tendermint/tendermint/state/txindex/null"
|
||||
"github.com/tendermint/tendermint/statesync"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
@@ -56,89 +41,51 @@ import (
|
||||
_ "github.com/lib/pq" // provide the psql db driver
|
||||
)
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
// Node is the highest level interface to a full Tendermint node.
|
||||
// It includes all configuration information and running services.
|
||||
type Node struct {
|
||||
service.BaseService
|
||||
|
||||
// DBContext specifies config information for loading a new DB.
|
||||
type DBContext struct {
|
||||
ID string
|
||||
Config *cfg.Config
|
||||
}
|
||||
// config
|
||||
config *cfg.Config
|
||||
genesisDoc *types.GenesisDoc // initial validator set
|
||||
privValidator types.PrivValidator // local node's validator key
|
||||
|
||||
// DBProvider takes a DBContext and returns an instantiated DB.
|
||||
type DBProvider func(*DBContext) (dbm.DB, error)
|
||||
// network
|
||||
transport *p2p.MultiplexTransport
|
||||
sw *p2p.Switch // p2p connections
|
||||
addrBook pex.AddrBook // known peers
|
||||
nodeInfo p2p.NodeInfo
|
||||
nodeKey *p2p.NodeKey // our node privkey
|
||||
isListening bool
|
||||
|
||||
const readHeaderTimeout = 10 * time.Second
|
||||
|
||||
// DefaultDBProvider returns a database using the DBBackend and DBDir
|
||||
// specified in the ctx.Config.
|
||||
func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
|
||||
dbType := dbm.BackendType(ctx.Config.DBBackend)
|
||||
return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir())
|
||||
}
|
||||
|
||||
// GenesisDocProvider returns a GenesisDoc.
|
||||
// It allows the GenesisDoc to be pulled from sources other than the
|
||||
// filesystem, for instance from a distributed key-value store cluster.
|
||||
type GenesisDocProvider func() (*types.GenesisDoc, error)
|
||||
|
||||
// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
|
||||
// the GenesisDoc from the config.GenesisFile() on the filesystem.
|
||||
func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
|
||||
return func() (*types.GenesisDoc, error) {
|
||||
return types.GenesisDocFromFile(config.GenesisFile())
|
||||
}
|
||||
}
|
||||
|
||||
// Provider takes a config and a logger and returns a ready to go Node.
|
||||
type Provider func(*cfg.Config, log.Logger) (*Node, error)
|
||||
|
||||
// DefaultNewNode returns a Tendermint node with default settings for the
|
||||
// PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
|
||||
// It implements NodeProvider.
|
||||
func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
|
||||
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err)
|
||||
}
|
||||
|
||||
return NewNode(config,
|
||||
privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()),
|
||||
nodeKey,
|
||||
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
|
||||
DefaultGenesisDocProviderFunc(config),
|
||||
DefaultDBProvider,
|
||||
DefaultMetricsProvider(config.Instrumentation),
|
||||
logger,
|
||||
)
|
||||
}
|
||||
|
||||
// MetricsProvider returns a consensus, p2p and mempool Metrics.
|
||||
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
|
||||
|
||||
// DefaultMetricsProvider returns Metrics build using Prometheus client library
|
||||
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
|
||||
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
|
||||
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
|
||||
if config.Prometheus {
|
||||
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID)
|
||||
}
|
||||
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
|
||||
}
|
||||
// services
|
||||
eventBus *types.EventBus // pub/sub for services
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore // store the blockchain to disk
|
||||
bcReactor p2p.Reactor // for block-syncing
|
||||
mempoolReactor p2p.Reactor // for gossipping transactions
|
||||
mempool mempl.Mempool
|
||||
stateSync bool // whether the node should state sync on startup
|
||||
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
|
||||
stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node
|
||||
stateSyncGenesis sm.State // provides the genesis state for state sync
|
||||
consensusState *cs.State // latest consensus state
|
||||
consensusReactor *cs.Reactor // for participating in the consensus
|
||||
pexReactor *pex.Reactor // for exchanging peer addresses
|
||||
evidencePool *evidence.Pool // tracking evidence
|
||||
proxyApp proxy.AppConns // connection to the application
|
||||
rpcListeners []net.Listener // rpc servers
|
||||
txIndexer txindex.TxIndexer
|
||||
blockIndexer indexer.BlockIndexer
|
||||
indexerService *txindex.IndexerService
|
||||
prometheusSrv *http.Server
|
||||
pprofSrv *http.Server
|
||||
}
|
||||
|
||||
// Option sets a parameter for the node.
|
||||
type Option func(*Node)
|
||||
|
||||
// Temporary interface for switching to block sync, we should get rid of v0 and v1 reactors.
|
||||
// See: https://github.com/tendermint/tendermint/issues/4595
|
||||
type blockSyncReactor interface {
|
||||
SwitchToBlockSync(sm.State) error
|
||||
}
|
||||
|
||||
// CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to
|
||||
// the node's Switch.
|
||||
//
|
||||
@@ -190,517 +137,6 @@ func StateProvider(stateProvider statesync.StateProvider) Option {
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// Node is the highest level interface to a full Tendermint node.
|
||||
// It includes all configuration information and running services.
|
||||
type Node struct {
|
||||
service.BaseService
|
||||
|
||||
// config
|
||||
config *cfg.Config
|
||||
genesisDoc *types.GenesisDoc // initial validator set
|
||||
privValidator types.PrivValidator // local node's validator key
|
||||
|
||||
// network
|
||||
transport *p2p.MultiplexTransport
|
||||
sw *p2p.Switch // p2p connections
|
||||
addrBook pex.AddrBook // known peers
|
||||
nodeInfo p2p.NodeInfo
|
||||
nodeKey *p2p.NodeKey // our node privkey
|
||||
isListening bool
|
||||
|
||||
// services
|
||||
eventBus *types.EventBus // pub/sub for services
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore // store the blockchain to disk
|
||||
bcReactor p2p.Reactor // for block-syncing
|
||||
mempoolReactor p2p.Reactor // for gossipping transactions
|
||||
mempool mempl.Mempool
|
||||
stateSync bool // whether the node should state sync on startup
|
||||
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
|
||||
stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node
|
||||
stateSyncGenesis sm.State // provides the genesis state for state sync
|
||||
consensusState *cs.State // latest consensus state
|
||||
consensusReactor *cs.Reactor // for participating in the consensus
|
||||
pexReactor *pex.Reactor // for exchanging peer addresses
|
||||
evidencePool *evidence.Pool // tracking evidence
|
||||
proxyApp proxy.AppConns // connection to the application
|
||||
rpcListeners []net.Listener // rpc servers
|
||||
txIndexer txindex.TxIndexer
|
||||
blockIndexer indexer.BlockIndexer
|
||||
indexerService *txindex.IndexerService
|
||||
prometheusSrv *http.Server
|
||||
}
|
||||
|
||||
func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
|
||||
var blockStoreDB dbm.DB
|
||||
blockStoreDB, err = dbProvider(&DBContext{"blockstore", config})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
blockStore = store.NewBlockStore(blockStoreDB)
|
||||
|
||||
stateDB, err = dbProvider(&DBContext{"state", config})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
|
||||
proxyApp := proxy.NewAppConns(clientCreator, metrics)
|
||||
proxyApp.SetLogger(logger.With("module", "proxy"))
|
||||
if err := proxyApp.Start(); err != nil {
|
||||
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
|
||||
}
|
||||
return proxyApp, nil
|
||||
}
|
||||
|
||||
func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
|
||||
eventBus := types.NewEventBus()
|
||||
eventBus.SetLogger(logger.With("module", "events"))
|
||||
if err := eventBus.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return eventBus, nil
|
||||
}
|
||||
|
||||
func createAndStartIndexerService(
|
||||
config *cfg.Config,
|
||||
chainID string,
|
||||
dbProvider DBProvider,
|
||||
eventBus *types.EventBus,
|
||||
logger log.Logger,
|
||||
) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) {
|
||||
var (
|
||||
txIndexer txindex.TxIndexer
|
||||
blockIndexer indexer.BlockIndexer
|
||||
)
|
||||
|
||||
switch config.TxIndex.Indexer {
|
||||
case "kv":
|
||||
store, err := dbProvider(&DBContext{"tx_index", config})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
txIndexer = kv.NewTxIndex(store)
|
||||
blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events")))
|
||||
|
||||
case "psql":
|
||||
if config.TxIndex.PsqlConn == "" {
|
||||
return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`)
|
||||
}
|
||||
es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err)
|
||||
}
|
||||
txIndexer = es.TxIndexer()
|
||||
blockIndexer = es.BlockIndexer()
|
||||
|
||||
default:
|
||||
txIndexer = &null.TxIndex{}
|
||||
blockIndexer = &blockidxnull.BlockerIndexer{}
|
||||
}
|
||||
|
||||
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
|
||||
indexerService.SetLogger(logger.With("module", "txindex"))
|
||||
|
||||
if err := indexerService.Start(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return indexerService, txIndexer, blockIndexer, nil
|
||||
}
|
||||
|
||||
func doHandshake(
|
||||
stateStore sm.Store,
|
||||
state sm.State,
|
||||
blockStore sm.BlockStore,
|
||||
genDoc *types.GenesisDoc,
|
||||
eventBus types.BlockEventPublisher,
|
||||
proxyApp proxy.AppConns,
|
||||
consensusLogger log.Logger,
|
||||
) error {
|
||||
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
|
||||
handshaker.SetLogger(consensusLogger)
|
||||
handshaker.SetEventBus(eventBus)
|
||||
if err := handshaker.Handshake(proxyApp); err != nil {
|
||||
return fmt.Errorf("error during handshake: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
|
||||
// Log the version info.
|
||||
logger.Info("Version info",
|
||||
"tendermint_version", version.TMCoreSemVer,
|
||||
"abci", version.ABCISemVer,
|
||||
"block", version.BlockProtocol,
|
||||
"p2p", version.P2PProtocol,
|
||||
"commit_hash", version.TMGitCommitHash,
|
||||
)
|
||||
|
||||
// If the state and software differ in block version, at least log it.
|
||||
if state.Version.Consensus.Block != version.BlockProtocol {
|
||||
logger.Info("Software and state have different block protocols",
|
||||
"software", version.BlockProtocol,
|
||||
"state", state.Version.Consensus.Block,
|
||||
)
|
||||
}
|
||||
|
||||
addr := pubKey.Address()
|
||||
// Log whether this node is a validator or an observer
|
||||
if state.Validators.HasAddress(addr) {
|
||||
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
|
||||
} else {
|
||||
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
|
||||
}
|
||||
}
|
||||
|
||||
func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
|
||||
if state.Validators.Size() > 1 {
|
||||
return false
|
||||
}
|
||||
addr, _ := state.Validators.GetByIndex(0)
|
||||
return bytes.Equal(pubKey.Address(), addr)
|
||||
}
|
||||
|
||||
func createMempoolAndMempoolReactor(
|
||||
config *cfg.Config,
|
||||
proxyApp proxy.AppConns,
|
||||
state sm.State,
|
||||
memplMetrics *mempl.Metrics,
|
||||
logger log.Logger,
|
||||
) (mempl.Mempool, p2p.Reactor) {
|
||||
switch config.Mempool.Version {
|
||||
case cfg.MempoolV1:
|
||||
mp := mempoolv1.NewTxMempool(
|
||||
logger,
|
||||
config.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempoolv1.WithMetrics(memplMetrics),
|
||||
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
|
||||
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
|
||||
)
|
||||
|
||||
reactor := mempoolv1.NewReactor(
|
||||
config.Mempool,
|
||||
mp,
|
||||
)
|
||||
if config.Consensus.WaitForTxs() {
|
||||
mp.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
return mp, reactor
|
||||
|
||||
case cfg.MempoolV0:
|
||||
mp := mempoolv0.NewCListMempool(
|
||||
config.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempoolv0.WithMetrics(memplMetrics),
|
||||
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
|
||||
mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
|
||||
)
|
||||
|
||||
mp.SetLogger(logger)
|
||||
|
||||
reactor := mempoolv0.NewReactor(
|
||||
config.Mempool,
|
||||
mp,
|
||||
)
|
||||
if config.Consensus.WaitForTxs() {
|
||||
mp.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
return mp, reactor
|
||||
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
|
||||
stateDB dbm.DB, blockStore *store.BlockStore, logger log.Logger,
|
||||
) (*evidence.Reactor, *evidence.Pool, error) {
|
||||
evidenceDB, err := dbProvider(&DBContext{"evidence", config})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
evidenceLogger := logger.With("module", "evidence")
|
||||
evidencePool, err := evidence.NewPool(evidenceDB, sm.NewStore(stateDB, sm.StoreOptions{
|
||||
DiscardABCIResponses: config.Storage.DiscardABCIResponses,
|
||||
}), blockStore)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
evidenceReactor := evidence.NewReactor(evidencePool)
|
||||
evidenceReactor.SetLogger(evidenceLogger)
|
||||
return evidenceReactor, evidencePool, nil
|
||||
}
|
||||
|
||||
func createBlocksyncReactor(config *cfg.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore *store.BlockStore,
|
||||
blockSync bool,
|
||||
logger log.Logger,
|
||||
) (bcReactor p2p.Reactor, err error) {
|
||||
switch config.BlockSync.Version {
|
||||
case "v0":
|
||||
bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync)
|
||||
case "v1", "v2":
|
||||
return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown fastsync version %s", config.BlockSync.Version)
|
||||
}
|
||||
|
||||
bcReactor.SetLogger(logger.With("module", "blocksync"))
|
||||
return bcReactor, nil
|
||||
}
|
||||
|
||||
func createConsensusReactor(config *cfg.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore sm.BlockStore,
|
||||
mempool mempl.Mempool,
|
||||
evidencePool *evidence.Pool,
|
||||
privValidator types.PrivValidator,
|
||||
csMetrics *cs.Metrics,
|
||||
waitSync bool,
|
||||
eventBus *types.EventBus,
|
||||
consensusLogger log.Logger,
|
||||
) (*cs.Reactor, *cs.State) {
|
||||
consensusState := cs.NewState(
|
||||
config.Consensus,
|
||||
state.Copy(),
|
||||
blockExec,
|
||||
blockStore,
|
||||
mempool,
|
||||
evidencePool,
|
||||
cs.StateMetrics(csMetrics),
|
||||
)
|
||||
consensusState.SetLogger(consensusLogger)
|
||||
if privValidator != nil {
|
||||
consensusState.SetPrivValidator(privValidator)
|
||||
}
|
||||
consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics))
|
||||
consensusReactor.SetLogger(consensusLogger)
|
||||
// services which will be publishing and/or subscribing for messages (events)
|
||||
// consensusReactor will set it on consensusState and blockExecutor
|
||||
consensusReactor.SetEventBus(eventBus)
|
||||
return consensusReactor, consensusState
|
||||
}
|
||||
|
||||
func createTransport(
|
||||
config *cfg.Config,
|
||||
nodeInfo p2p.NodeInfo,
|
||||
nodeKey *p2p.NodeKey,
|
||||
proxyApp proxy.AppConns,
|
||||
) (
|
||||
*p2p.MultiplexTransport,
|
||||
[]p2p.PeerFilterFunc,
|
||||
) {
|
||||
var (
|
||||
mConnConfig = p2p.MConnConfig(config.P2P)
|
||||
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
|
||||
connFilters = []p2p.ConnFilterFunc{}
|
||||
peerFilters = []p2p.PeerFilterFunc{}
|
||||
)
|
||||
|
||||
if !config.P2P.AllowDuplicateIP {
|
||||
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
|
||||
}
|
||||
|
||||
// Filter peers by addr or pubkey with an ABCI query.
|
||||
// If the query return code is OK, add peer.
|
||||
if config.FilterPeers {
|
||||
connFilters = append(
|
||||
connFilters,
|
||||
// ABCI query for address filtering.
|
||||
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
|
||||
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
|
||||
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.IsErr() {
|
||||
return fmt.Errorf("error querying abci app: %v", res)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
peerFilters = append(
|
||||
peerFilters,
|
||||
// ABCI query for ID filtering.
|
||||
func(_ p2p.IPeerSet, p p2p.Peer) error {
|
||||
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
|
||||
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.IsErr() {
|
||||
return fmt.Errorf("error querying abci app: %v", res)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
p2p.MultiplexTransportConnFilters(connFilters...)(transport)
|
||||
|
||||
// Limit the number of incoming connections.
|
||||
max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)
|
||||
|
||||
return transport, peerFilters
|
||||
}
|
||||
|
||||
func createSwitch(config *cfg.Config,
|
||||
transport p2p.Transport,
|
||||
p2pMetrics *p2p.Metrics,
|
||||
peerFilters []p2p.PeerFilterFunc,
|
||||
mempoolReactor p2p.Reactor,
|
||||
bcReactor p2p.Reactor,
|
||||
stateSyncReactor *statesync.Reactor,
|
||||
consensusReactor *cs.Reactor,
|
||||
evidenceReactor *evidence.Reactor,
|
||||
nodeInfo p2p.NodeInfo,
|
||||
nodeKey *p2p.NodeKey,
|
||||
p2pLogger log.Logger,
|
||||
) *p2p.Switch {
|
||||
sw := p2p.NewSwitch(
|
||||
config.P2P,
|
||||
transport,
|
||||
p2p.WithMetrics(p2pMetrics),
|
||||
p2p.SwitchPeerFilters(peerFilters...),
|
||||
)
|
||||
sw.SetLogger(p2pLogger)
|
||||
sw.AddReactor("MEMPOOL", mempoolReactor)
|
||||
sw.AddReactor("BLOCKSYNC", bcReactor)
|
||||
sw.AddReactor("CONSENSUS", consensusReactor)
|
||||
sw.AddReactor("EVIDENCE", evidenceReactor)
|
||||
sw.AddReactor("STATESYNC", stateSyncReactor)
|
||||
|
||||
sw.SetNodeInfo(nodeInfo)
|
||||
sw.SetNodeKey(nodeKey)
|
||||
|
||||
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
|
||||
return sw
|
||||
}
|
||||
|
||||
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
|
||||
p2pLogger log.Logger, nodeKey *p2p.NodeKey,
|
||||
) (pex.AddrBook, error) {
|
||||
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
|
||||
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
|
||||
|
||||
// Add ourselves to addrbook to prevent dialing ourselves
|
||||
if config.P2P.ExternalAddress != "" {
|
||||
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ExternalAddress))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
|
||||
}
|
||||
addrBook.AddOurAddress(addr)
|
||||
}
|
||||
if config.P2P.ListenAddress != "" {
|
||||
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ListenAddress))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
|
||||
}
|
||||
addrBook.AddOurAddress(addr)
|
||||
}
|
||||
|
||||
sw.SetAddrBook(addrBook)
|
||||
|
||||
return addrBook, nil
|
||||
}
|
||||
|
||||
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
|
||||
sw *p2p.Switch, logger log.Logger,
|
||||
) *pex.Reactor {
|
||||
// TODO persistent peers ? so we can have their DNS addrs saved
|
||||
pexReactor := pex.NewReactor(addrBook,
|
||||
&pex.ReactorConfig{
|
||||
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
|
||||
SeedMode: config.P2P.SeedMode,
|
||||
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
|
||||
// blocks assuming 10s blocks ~ 28 hours.
|
||||
// TODO (melekes): make it dynamic based on the actual block latencies
|
||||
// from the live network.
|
||||
// https://github.com/tendermint/tendermint/issues/3523
|
||||
SeedDisconnectWaitPeriod: 28 * time.Hour,
|
||||
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
|
||||
})
|
||||
pexReactor.SetLogger(logger.With("module", "pex"))
|
||||
sw.AddReactor("PEX", pexReactor)
|
||||
return pexReactor
|
||||
}
|
||||
|
||||
// startStateSync starts an asynchronous state sync process, then switches to block sync mode.
|
||||
func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.Reactor,
|
||||
stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, blockSync bool,
|
||||
stateStore sm.Store, blockStore *store.BlockStore, state sm.State,
|
||||
) error {
|
||||
ssR.Logger.Info("Starting state sync")
|
||||
|
||||
if stateProvider == nil {
|
||||
var err error
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
stateProvider, err = statesync.NewLightClientStateProvider(
|
||||
ctx,
|
||||
state.ChainID, state.Version, state.InitialHeight,
|
||||
config.RPCServers, light.TrustOptions{
|
||||
Period: config.TrustPeriod,
|
||||
Height: config.TrustHeight,
|
||||
Hash: config.TrustHashBytes(),
|
||||
}, ssR.Logger.With("module", "light"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set up light client state provider: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("State sync failed", "err", err)
|
||||
return
|
||||
}
|
||||
err = stateStore.Bootstrap(state)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("Failed to bootstrap node with new state", "err", err)
|
||||
return
|
||||
}
|
||||
err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("Failed to store last seen commit", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if blockSync {
|
||||
// FIXME Very ugly to have these metrics bleed through here.
|
||||
conR.Metrics.StateSyncing.Set(0)
|
||||
conR.Metrics.BlockSyncing.Set(1)
|
||||
err = bcR.SwitchToBlockSync(state)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("Failed to switch to block sync", "err", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
conR.SwitchToConsensus(state, true)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewNode returns a new, ready to go, Tendermint Node.
|
||||
func NewNode(config *cfg.Config,
|
||||
privValidator types.PrivValidator,
|
||||
@@ -798,7 +234,7 @@ func NewNode(config *cfg.Config,
|
||||
mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)
|
||||
|
||||
// Make Evidence Reactor
|
||||
evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
|
||||
evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateStore, blockStore, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -891,12 +327,8 @@ func NewNode(config *cfg.Config,
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
|
||||
}
|
||||
|
||||
if config.RPC.PprofListenAddress != "" {
|
||||
go func() {
|
||||
logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress)
|
||||
logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil))
|
||||
}()
|
||||
}
|
||||
// Add private IDs to addrbook to block those peers being added
|
||||
addrBook.AddPrivateIDs(splitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " "))
|
||||
|
||||
node := &Node{
|
||||
config: config,
|
||||
@@ -945,8 +377,15 @@ func (n *Node) OnStart() error {
|
||||
time.Sleep(genTime.Sub(now))
|
||||
}
|
||||
|
||||
// Add private IDs to addrbook to block those peers being added
|
||||
n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
|
||||
// run pprof server if it is enabled
|
||||
if n.config.RPC.IsPprofEnabled() {
|
||||
n.pprofSrv = n.startPprofServer()
|
||||
}
|
||||
|
||||
// begin prometheus metrics gathering if it is enabled
|
||||
if n.config.Instrumentation.IsPrometheusEnabled() {
|
||||
n.prometheusSrv = n.startPrometheusServer()
|
||||
}
|
||||
|
||||
// Start the RPC server before the P2P server
|
||||
// so we can eg. receive txs for the first block
|
||||
@@ -958,10 +397,6 @@ func (n *Node) OnStart() error {
|
||||
n.rpcListeners = listeners
|
||||
}
|
||||
|
||||
if n.config.Instrumentation.Prometheus &&
|
||||
n.config.Instrumentation.PrometheusListenAddr != "" {
|
||||
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
|
||||
}
|
||||
|
||||
// Start the transport.
|
||||
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress))
|
||||
@@ -1047,6 +482,11 @@ func (n *Node) OnStop() {
|
||||
n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
|
||||
}
|
||||
}
|
||||
if n.pprofSrv != nil {
|
||||
if err := n.pprofSrv.Shutdown(context.Background()); err != nil {
|
||||
n.Logger.Error("Pprof HTTP server Shutdown", "err", err)
|
||||
}
|
||||
}
|
||||
if n.blockStore != nil {
|
||||
if err := n.blockStore.Close(); err != nil {
|
||||
n.Logger.Error("problem closing blockstore", "err", err)
|
||||
@@ -1215,9 +655,9 @@ func (n *Node) startRPC() ([]net.Listener, error) {
|
||||
|
||||
// startPrometheusServer starts a Prometheus HTTP server, listening for metrics
|
||||
// collectors on addr.
|
||||
func (n *Node) startPrometheusServer(addr string) *http.Server {
|
||||
func (n *Node) startPrometheusServer() *http.Server {
|
||||
srv := &http.Server{
|
||||
Addr: addr,
|
||||
Addr: n.config.Instrumentation.PrometheusListenAddr,
|
||||
Handler: promhttp.InstrumentMetricHandler(
|
||||
prometheus.DefaultRegisterer, promhttp.HandlerFor(
|
||||
prometheus.DefaultGatherer,
|
||||
@@ -1235,6 +675,22 @@ func (n *Node) startPrometheusServer(addr string) *http.Server {
|
||||
return srv
|
||||
}
|
||||
|
||||
// starts a ppro
|
||||
func (n *Node) startPprofServer() *http.Server {
|
||||
srv := &http.Server{
|
||||
Addr: n.config.RPC.PprofListenAddress,
|
||||
Handler: nil,
|
||||
ReadHeaderTimeout: readHeaderTimeout,
|
||||
}
|
||||
go func() {
|
||||
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
|
||||
// Error starting or closing listener:
|
||||
n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
|
||||
}
|
||||
}()
|
||||
return srv
|
||||
}
|
||||
|
||||
// Switch returns the Node's Switch.
|
||||
func (n *Node) Switch() *p2p.Switch {
|
||||
return n.sw
|
||||
@@ -1369,122 +825,3 @@ func makeNodeInfo(
|
||||
return nodeInfo, err
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
var genesisDocKey = []byte("genesisDoc")
|
||||
|
||||
// LoadStateFromDBOrGenesisDocProvider attempts to load the state from the
|
||||
// database, or creates one using the given genesisDocProvider. On success this also
|
||||
// returns the genesis doc loaded through the given provider.
|
||||
func LoadStateFromDBOrGenesisDocProvider(
|
||||
stateDB dbm.DB,
|
||||
genesisDocProvider GenesisDocProvider,
|
||||
) (sm.State, *types.GenesisDoc, error) {
|
||||
// Get genesis doc
|
||||
genDoc, err := loadGenesisDoc(stateDB)
|
||||
if err != nil {
|
||||
genDoc, err = genesisDocProvider()
|
||||
if err != nil {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
|
||||
err = genDoc.ValidateAndComplete()
|
||||
if err != nil {
|
||||
return sm.State{}, nil, fmt.Errorf("error in genesis doc: %w", err)
|
||||
}
|
||||
// save genesis doc to prevent a certain class of user errors (e.g. when it
|
||||
// was changed, accidentally or not). Also good for audit trail.
|
||||
if err := saveGenesisDoc(stateDB, genDoc); err != nil {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
}
|
||||
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
|
||||
DiscardABCIResponses: false,
|
||||
})
|
||||
state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
|
||||
if err != nil {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
return state, genDoc, nil
|
||||
}
|
||||
|
||||
// panics if failed to unmarshal bytes
|
||||
func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
|
||||
b, err := db.Get(genesisDocKey)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if len(b) == 0 {
|
||||
return nil, errors.New("genesis doc not found")
|
||||
}
|
||||
var genDoc *types.GenesisDoc
|
||||
err = tmjson.Unmarshal(b, &genDoc)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, b))
|
||||
}
|
||||
return genDoc, nil
|
||||
}
|
||||
|
||||
// panics if failed to marshal the given genesis document
|
||||
func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error {
|
||||
b, err := tmjson.Marshal(genDoc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save genesis doc due to marshaling error: %w", err)
|
||||
}
|
||||
if err := db.SetSync(genesisDocKey, b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createAndStartPrivValidatorSocketClient(
|
||||
listenAddr,
|
||||
chainID string,
|
||||
logger log.Logger,
|
||||
) (types.PrivValidator, error) {
|
||||
pve, err := privval.NewSignerListener(listenAddr, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
|
||||
pvsc, err := privval.NewSignerClient(pve, chainID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
|
||||
// try to get a pubkey from private validate first time
|
||||
_, err = pvsc.GetPubKey()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
|
||||
const (
|
||||
retries = 50 // 50 * 100ms = 5s total
|
||||
timeout = 100 * time.Millisecond
|
||||
)
|
||||
pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout)
|
||||
|
||||
return pvscWithRetries, nil
|
||||
}
|
||||
|
||||
// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
|
||||
// slice of the string s with all leading and trailing Unicode code points
|
||||
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each
|
||||
// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
|
||||
// -1. also filter out empty strings, only return non-empty strings.
|
||||
func splitAndTrimEmpty(s, sep, cutset string) []string {
|
||||
if s == "" {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
spl := strings.Split(s, sep)
|
||||
nonEmptyStrings := make([]string, 0, len(spl))
|
||||
for i := 0; i < len(spl); i++ {
|
||||
element := strings.Trim(spl[i], cutset)
|
||||
if element != "" {
|
||||
nonEmptyStrings = append(nonEmptyStrings, element)
|
||||
}
|
||||
}
|
||||
return nonEmptyStrings
|
||||
}
|
||||
|
||||
716
node/setup.go
Normal file
716
node/setup.go
Normal file
@@ -0,0 +1,716 @@
|
||||
package node
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
bc "github.com/tendermint/tendermint/blocksync"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
cs "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/evidence"
|
||||
|
||||
tmjson "github.com/tendermint/tendermint/libs/json"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/light"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
|
||||
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/p2p/pex"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
|
||||
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink/psql"
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/state/txindex/kv"
|
||||
"github.com/tendermint/tendermint/state/txindex/null"
|
||||
"github.com/tendermint/tendermint/statesync"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tendermint/version"
|
||||
|
||||
_ "net/http/pprof" //nolint: gosec // securely exposed on separate, optional port
|
||||
|
||||
_ "github.com/lib/pq" // provide the psql db driver
|
||||
)
|
||||
|
||||
|
||||
// DBContext specifies config information for loading a new DB.
|
||||
type DBContext struct {
|
||||
ID string
|
||||
Config *cfg.Config
|
||||
}
|
||||
|
||||
// DBProvider takes a DBContext and returns an instantiated DB.
|
||||
type DBProvider func(*DBContext) (dbm.DB, error)
|
||||
|
||||
const readHeaderTimeout = 10 * time.Second
|
||||
|
||||
// DefaultDBProvider returns a database using the DBBackend and DBDir
|
||||
// specified in the ctx.Config.
|
||||
func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
|
||||
dbType := dbm.BackendType(ctx.Config.DBBackend)
|
||||
return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir())
|
||||
}
|
||||
|
||||
// GenesisDocProvider returns a GenesisDoc.
|
||||
// It allows the GenesisDoc to be pulled from sources other than the
|
||||
// filesystem, for instance from a distributed key-value store cluster.
|
||||
type GenesisDocProvider func() (*types.GenesisDoc, error)
|
||||
|
||||
// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
|
||||
// the GenesisDoc from the config.GenesisFile() on the filesystem.
|
||||
func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
|
||||
return func() (*types.GenesisDoc, error) {
|
||||
return types.GenesisDocFromFile(config.GenesisFile())
|
||||
}
|
||||
}
|
||||
|
||||
// Provider takes a config and a logger and returns a ready to go Node.
|
||||
type Provider func(*cfg.Config, log.Logger) (*Node, error)
|
||||
|
||||
// DefaultNewNode returns a Tendermint node with default settings for the
|
||||
// PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
|
||||
// It implements NodeProvider.
|
||||
func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
|
||||
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err)
|
||||
}
|
||||
|
||||
return NewNode(config,
|
||||
privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()),
|
||||
nodeKey,
|
||||
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
|
||||
DefaultGenesisDocProviderFunc(config),
|
||||
DefaultDBProvider,
|
||||
DefaultMetricsProvider(config.Instrumentation),
|
||||
logger,
|
||||
)
|
||||
}
|
||||
|
||||
// MetricsProvider returns a consensus, p2p and mempool Metrics.
|
||||
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics)
|
||||
|
||||
// DefaultMetricsProvider returns Metrics build using Prometheus client library
|
||||
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
|
||||
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
|
||||
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics, *proxy.Metrics) {
|
||||
if config.Prometheus {
|
||||
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
proxy.PrometheusMetrics(config.Namespace, "chain_id", chainID)
|
||||
}
|
||||
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics(), proxy.NopMetrics()
|
||||
}
|
||||
}
|
||||
|
||||
type blockSyncReactor interface {
|
||||
SwitchToBlockSync(sm.State) error
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
|
||||
func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
|
||||
var blockStoreDB dbm.DB
|
||||
blockStoreDB, err = dbProvider(&DBContext{"blockstore", config})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
blockStore = store.NewBlockStore(blockStoreDB)
|
||||
|
||||
stateDB, err = dbProvider(&DBContext{"state", config})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger, metrics *proxy.Metrics) (proxy.AppConns, error) {
|
||||
proxyApp := proxy.NewAppConns(clientCreator, metrics)
|
||||
proxyApp.SetLogger(logger.With("module", "proxy"))
|
||||
if err := proxyApp.Start(); err != nil {
|
||||
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
|
||||
}
|
||||
return proxyApp, nil
|
||||
}
|
||||
|
||||
func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
|
||||
eventBus := types.NewEventBus()
|
||||
eventBus.SetLogger(logger.With("module", "events"))
|
||||
if err := eventBus.Start(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return eventBus, nil
|
||||
}
|
||||
|
||||
func createAndStartIndexerService(
|
||||
config *cfg.Config,
|
||||
chainID string,
|
||||
dbProvider DBProvider,
|
||||
eventBus *types.EventBus,
|
||||
logger log.Logger,
|
||||
) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) {
|
||||
var (
|
||||
txIndexer txindex.TxIndexer
|
||||
blockIndexer indexer.BlockIndexer
|
||||
)
|
||||
|
||||
switch config.TxIndex.Indexer {
|
||||
case "kv":
|
||||
store, err := dbProvider(&DBContext{"tx_index", config})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
txIndexer = kv.NewTxIndex(store)
|
||||
blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events")))
|
||||
|
||||
case "psql":
|
||||
if config.TxIndex.PsqlConn == "" {
|
||||
return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`)
|
||||
}
|
||||
es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err)
|
||||
}
|
||||
txIndexer = es.TxIndexer()
|
||||
blockIndexer = es.BlockIndexer()
|
||||
|
||||
default:
|
||||
txIndexer = &null.TxIndex{}
|
||||
blockIndexer = &blockidxnull.BlockerIndexer{}
|
||||
}
|
||||
|
||||
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
|
||||
indexerService.SetLogger(logger.With("module", "txindex"))
|
||||
|
||||
if err := indexerService.Start(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
return indexerService, txIndexer, blockIndexer, nil
|
||||
}
|
||||
|
||||
func doHandshake(
|
||||
stateStore sm.Store,
|
||||
state sm.State,
|
||||
blockStore sm.BlockStore,
|
||||
genDoc *types.GenesisDoc,
|
||||
eventBus types.BlockEventPublisher,
|
||||
proxyApp proxy.AppConns,
|
||||
consensusLogger log.Logger,
|
||||
) error {
|
||||
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
|
||||
handshaker.SetLogger(consensusLogger)
|
||||
handshaker.SetEventBus(eventBus)
|
||||
if err := handshaker.Handshake(proxyApp); err != nil {
|
||||
return fmt.Errorf("error during handshake: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
|
||||
// Log the version info.
|
||||
logger.Info("Version info",
|
||||
"tendermint_version", version.TMCoreSemVer,
|
||||
"abci", version.ABCISemVer,
|
||||
"block", version.BlockProtocol,
|
||||
"p2p", version.P2PProtocol,
|
||||
"commit_hash", version.TMGitCommitHash,
|
||||
)
|
||||
|
||||
// If the state and software differ in block version, at least log it.
|
||||
if state.Version.Consensus.Block != version.BlockProtocol {
|
||||
logger.Info("Software and state have different block protocols",
|
||||
"software", version.BlockProtocol,
|
||||
"state", state.Version.Consensus.Block,
|
||||
)
|
||||
}
|
||||
|
||||
addr := pubKey.Address()
|
||||
// Log whether this node is a validator or an observer
|
||||
if state.Validators.HasAddress(addr) {
|
||||
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
|
||||
} else {
|
||||
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
|
||||
}
|
||||
}
|
||||
|
||||
func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
|
||||
if state.Validators.Size() > 1 {
|
||||
return false
|
||||
}
|
||||
addr, _ := state.Validators.GetByIndex(0)
|
||||
return bytes.Equal(pubKey.Address(), addr)
|
||||
}
|
||||
|
||||
func createMempoolAndMempoolReactor(
|
||||
config *cfg.Config,
|
||||
proxyApp proxy.AppConns,
|
||||
state sm.State,
|
||||
memplMetrics *mempl.Metrics,
|
||||
logger log.Logger,
|
||||
) (mempl.Mempool, p2p.Reactor) {
|
||||
switch config.Mempool.Version {
|
||||
case cfg.MempoolV1:
|
||||
mp := mempoolv1.NewTxMempool(
|
||||
logger,
|
||||
config.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempoolv1.WithMetrics(memplMetrics),
|
||||
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
|
||||
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
|
||||
)
|
||||
|
||||
reactor := mempoolv1.NewReactor(
|
||||
config.Mempool,
|
||||
mp,
|
||||
)
|
||||
if config.Consensus.WaitForTxs() {
|
||||
mp.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
return mp, reactor
|
||||
|
||||
case cfg.MempoolV0:
|
||||
mp := mempoolv0.NewCListMempool(
|
||||
config.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempoolv0.WithMetrics(memplMetrics),
|
||||
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
|
||||
mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
|
||||
)
|
||||
|
||||
mp.SetLogger(logger)
|
||||
|
||||
reactor := mempoolv0.NewReactor(
|
||||
config.Mempool,
|
||||
mp,
|
||||
)
|
||||
if config.Consensus.WaitForTxs() {
|
||||
mp.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
return mp, reactor
|
||||
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
|
||||
stateStore sm.Store, blockStore *store.BlockStore, logger log.Logger,
|
||||
) (*evidence.Reactor, *evidence.Pool, error) {
|
||||
evidenceDB, err := dbProvider(&DBContext{"evidence", config})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
evidenceLogger := logger.With("module", "evidence")
|
||||
evidencePool, err := evidence.NewPool(evidenceDB, stateStore, blockStore)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
evidenceReactor := evidence.NewReactor(evidencePool)
|
||||
evidenceReactor.SetLogger(evidenceLogger)
|
||||
return evidenceReactor, evidencePool, nil
|
||||
}
|
||||
|
||||
func createBlocksyncReactor(config *cfg.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore *store.BlockStore,
|
||||
blockSync bool,
|
||||
logger log.Logger,
|
||||
) (bcReactor p2p.Reactor, err error) {
|
||||
switch config.BlockSync.Version {
|
||||
case "v0":
|
||||
bcReactor = bc.NewReactor(state.Copy(), blockExec, blockStore, blockSync)
|
||||
case "v1", "v2":
|
||||
return nil, fmt.Errorf("block sync version %s has been deprecated. Please use v0", config.BlockSync.Version)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown fastsync version %s", config.BlockSync.Version)
|
||||
}
|
||||
|
||||
bcReactor.SetLogger(logger.With("module", "blocksync"))
|
||||
return bcReactor, nil
|
||||
}
|
||||
|
||||
func createConsensusReactor(config *cfg.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore sm.BlockStore,
|
||||
mempool mempl.Mempool,
|
||||
evidencePool *evidence.Pool,
|
||||
privValidator types.PrivValidator,
|
||||
csMetrics *cs.Metrics,
|
||||
waitSync bool,
|
||||
eventBus *types.EventBus,
|
||||
consensusLogger log.Logger,
|
||||
) (*cs.Reactor, *cs.State) {
|
||||
consensusState := cs.NewState(
|
||||
config.Consensus,
|
||||
state.Copy(),
|
||||
blockExec,
|
||||
blockStore,
|
||||
mempool,
|
||||
evidencePool,
|
||||
cs.StateMetrics(csMetrics),
|
||||
)
|
||||
consensusState.SetLogger(consensusLogger)
|
||||
if privValidator != nil {
|
||||
consensusState.SetPrivValidator(privValidator)
|
||||
}
|
||||
consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics))
|
||||
consensusReactor.SetLogger(consensusLogger)
|
||||
// services which will be publishing and/or subscribing for messages (events)
|
||||
// consensusReactor will set it on consensusState and blockExecutor
|
||||
consensusReactor.SetEventBus(eventBus)
|
||||
return consensusReactor, consensusState
|
||||
}
|
||||
|
||||
func createTransport(
|
||||
config *cfg.Config,
|
||||
nodeInfo p2p.NodeInfo,
|
||||
nodeKey *p2p.NodeKey,
|
||||
proxyApp proxy.AppConns,
|
||||
) (
|
||||
*p2p.MultiplexTransport,
|
||||
[]p2p.PeerFilterFunc,
|
||||
) {
|
||||
var (
|
||||
mConnConfig = p2p.MConnConfig(config.P2P)
|
||||
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
|
||||
connFilters = []p2p.ConnFilterFunc{}
|
||||
peerFilters = []p2p.PeerFilterFunc{}
|
||||
)
|
||||
|
||||
if !config.P2P.AllowDuplicateIP {
|
||||
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
|
||||
}
|
||||
|
||||
// Filter peers by addr or pubkey with an ABCI query.
|
||||
// If the query return code is OK, add peer.
|
||||
if config.FilterPeers {
|
||||
connFilters = append(
|
||||
connFilters,
|
||||
// ABCI query for address filtering.
|
||||
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
|
||||
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
|
||||
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.IsErr() {
|
||||
return fmt.Errorf("error querying abci app: %v", res)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
peerFilters = append(
|
||||
peerFilters,
|
||||
// ABCI query for ID filtering.
|
||||
func(_ p2p.IPeerSet, p p2p.Peer) error {
|
||||
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
|
||||
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if res.IsErr() {
|
||||
return fmt.Errorf("error querying abci app: %v", res)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
p2p.MultiplexTransportConnFilters(connFilters...)(transport)
|
||||
|
||||
// Limit the number of incoming connections.
|
||||
max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)
|
||||
|
||||
return transport, peerFilters
|
||||
}
|
||||
|
||||
func createSwitch(config *cfg.Config,
|
||||
transport p2p.Transport,
|
||||
p2pMetrics *p2p.Metrics,
|
||||
peerFilters []p2p.PeerFilterFunc,
|
||||
mempoolReactor p2p.Reactor,
|
||||
bcReactor p2p.Reactor,
|
||||
stateSyncReactor *statesync.Reactor,
|
||||
consensusReactor *cs.Reactor,
|
||||
evidenceReactor *evidence.Reactor,
|
||||
nodeInfo p2p.NodeInfo,
|
||||
nodeKey *p2p.NodeKey,
|
||||
p2pLogger log.Logger,
|
||||
) *p2p.Switch {
|
||||
sw := p2p.NewSwitch(
|
||||
config.P2P,
|
||||
transport,
|
||||
p2p.WithMetrics(p2pMetrics),
|
||||
p2p.SwitchPeerFilters(peerFilters...),
|
||||
)
|
||||
sw.SetLogger(p2pLogger)
|
||||
sw.AddReactor("MEMPOOL", mempoolReactor)
|
||||
sw.AddReactor("BLOCKSYNC", bcReactor)
|
||||
sw.AddReactor("CONSENSUS", consensusReactor)
|
||||
sw.AddReactor("EVIDENCE", evidenceReactor)
|
||||
sw.AddReactor("STATESYNC", stateSyncReactor)
|
||||
|
||||
sw.SetNodeInfo(nodeInfo)
|
||||
sw.SetNodeKey(nodeKey)
|
||||
|
||||
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
|
||||
return sw
|
||||
}
|
||||
|
||||
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
|
||||
p2pLogger log.Logger, nodeKey *p2p.NodeKey,
|
||||
) (pex.AddrBook, error) {
|
||||
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
|
||||
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
|
||||
|
||||
// Add ourselves to addrbook to prevent dialing ourselves
|
||||
if config.P2P.ExternalAddress != "" {
|
||||
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ExternalAddress))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
|
||||
}
|
||||
addrBook.AddOurAddress(addr)
|
||||
}
|
||||
if config.P2P.ListenAddress != "" {
|
||||
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ListenAddress))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
|
||||
}
|
||||
addrBook.AddOurAddress(addr)
|
||||
}
|
||||
|
||||
sw.SetAddrBook(addrBook)
|
||||
|
||||
return addrBook, nil
|
||||
}
|
||||
|
||||
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
|
||||
sw *p2p.Switch, logger log.Logger,
|
||||
) *pex.Reactor {
|
||||
// TODO persistent peers ? so we can have their DNS addrs saved
|
||||
pexReactor := pex.NewReactor(addrBook,
|
||||
&pex.ReactorConfig{
|
||||
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
|
||||
SeedMode: config.P2P.SeedMode,
|
||||
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
|
||||
// blocks assuming 10s blocks ~ 28 hours.
|
||||
// TODO (melekes): make it dynamic based on the actual block latencies
|
||||
// from the live network.
|
||||
// https://github.com/tendermint/tendermint/issues/3523
|
||||
SeedDisconnectWaitPeriod: 28 * time.Hour,
|
||||
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
|
||||
})
|
||||
pexReactor.SetLogger(logger.With("module", "pex"))
|
||||
sw.AddReactor("PEX", pexReactor)
|
||||
return pexReactor
|
||||
}
|
||||
|
||||
// startStateSync starts an asynchronous state sync process, then switches to block sync mode.
|
||||
func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.Reactor,
|
||||
stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, blockSync bool,
|
||||
stateStore sm.Store, blockStore *store.BlockStore, state sm.State,
|
||||
) error {
|
||||
ssR.Logger.Info("Starting state sync")
|
||||
|
||||
if stateProvider == nil {
|
||||
var err error
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
stateProvider, err = statesync.NewLightClientStateProvider(
|
||||
ctx,
|
||||
state.ChainID, state.Version, state.InitialHeight,
|
||||
config.RPCServers, light.TrustOptions{
|
||||
Period: config.TrustPeriod,
|
||||
Height: config.TrustHeight,
|
||||
Hash: config.TrustHashBytes(),
|
||||
}, ssR.Logger.With("module", "light"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set up light client state provider: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("State sync failed", "err", err)
|
||||
return
|
||||
}
|
||||
err = stateStore.Bootstrap(state)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("Failed to bootstrap node with new state", "err", err)
|
||||
return
|
||||
}
|
||||
err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("Failed to store last seen commit", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
if blockSync {
|
||||
// FIXME Very ugly to have these metrics bleed through here.
|
||||
conR.Metrics.StateSyncing.Set(0)
|
||||
conR.Metrics.BlockSyncing.Set(1)
|
||||
err = bcR.SwitchToBlockSync(state)
|
||||
if err != nil {
|
||||
ssR.Logger.Error("Failed to switch to block sync", "err", err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
conR.SwitchToConsensus(state, true)
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
var genesisDocKey = []byte("genesisDoc")
|
||||
|
||||
// LoadStateFromDBOrGenesisDocProvider attempts to load the state from the
|
||||
// database, or creates one using the given genesisDocProvider. On success this also
|
||||
// returns the genesis doc loaded through the given provider.
|
||||
func LoadStateFromDBOrGenesisDocProvider(
|
||||
stateDB dbm.DB,
|
||||
genesisDocProvider GenesisDocProvider,
|
||||
) (sm.State, *types.GenesisDoc, error) {
|
||||
// Get genesis doc
|
||||
genDoc, err := loadGenesisDoc(stateDB)
|
||||
if err != nil {
|
||||
genDoc, err = genesisDocProvider()
|
||||
if err != nil {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
|
||||
err = genDoc.ValidateAndComplete()
|
||||
if err != nil {
|
||||
return sm.State{}, nil, fmt.Errorf("error in genesis doc: %w", err)
|
||||
}
|
||||
// save genesis doc to prevent a certain class of user errors (e.g. when it
|
||||
// was changed, accidentally or not). Also good for audit trail.
|
||||
if err := saveGenesisDoc(stateDB, genDoc); err != nil {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
}
|
||||
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
|
||||
DiscardABCIResponses: false,
|
||||
})
|
||||
state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
|
||||
if err != nil {
|
||||
return sm.State{}, nil, err
|
||||
}
|
||||
return state, genDoc, nil
|
||||
}
|
||||
|
||||
// panics if failed to unmarshal bytes
|
||||
func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
|
||||
b, err := db.Get(genesisDocKey)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if len(b) == 0 {
|
||||
return nil, errors.New("genesis doc not found")
|
||||
}
|
||||
var genDoc *types.GenesisDoc
|
||||
err = tmjson.Unmarshal(b, &genDoc)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, b))
|
||||
}
|
||||
return genDoc, nil
|
||||
}
|
||||
|
||||
// panics if failed to marshal the given genesis document
|
||||
func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error {
|
||||
b, err := tmjson.Marshal(genDoc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save genesis doc due to marshaling error: %w", err)
|
||||
}
|
||||
if err := db.SetSync(genesisDocKey, b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createAndStartPrivValidatorSocketClient(
|
||||
listenAddr,
|
||||
chainID string,
|
||||
logger log.Logger,
|
||||
) (types.PrivValidator, error) {
|
||||
pve, err := privval.NewSignerListener(listenAddr, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
|
||||
pvsc, err := privval.NewSignerClient(pve, chainID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
}
|
||||
|
||||
// try to get a pubkey from private validate first time
|
||||
_, err = pvsc.GetPubKey()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
|
||||
const (
|
||||
retries = 50 // 50 * 100ms = 5s total
|
||||
timeout = 100 * time.Millisecond
|
||||
)
|
||||
pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout)
|
||||
|
||||
return pvscWithRetries, nil
|
||||
}
|
||||
|
||||
// splitAndTrimEmpty slices s into all subslices separated by sep and returns a
|
||||
// slice of the string s with all leading and trailing Unicode code points
|
||||
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each
|
||||
// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
|
||||
// -1. also filter out empty strings, only return non-empty strings.
|
||||
func splitAndTrimEmpty(s, sep, cutset string) []string {
|
||||
if s == "" {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
spl := strings.Split(s, sep)
|
||||
nonEmptyStrings := make([]string, 0, len(spl))
|
||||
for i := 0; i < len(spl); i++ {
|
||||
element := strings.Trim(spl[i], cutset)
|
||||
if element != "" {
|
||||
nonEmptyStrings = append(nonEmptyStrings, element)
|
||||
}
|
||||
}
|
||||
return nonEmptyStrings
|
||||
}
|
||||
Reference in New Issue
Block a user