mirror of
https://github.com/tendermint/tendermint.git
synced 2026-04-28 03:16:58 +00:00
cleanup: Reduce and normalize import path aliasing. (#6975)
The code in the Tendermint repository makes heavy use of import aliasing. This is made necessary by our extensive reuse of common base package names, and by repetition of similar names across different subdirectories. Unfortunately we have not been very consistent about which packages we alias in various circumstances, and the aliases we use vary. In the spirit of the advice in the style guide and https://github.com/golang/go/wiki/CodeReviewComments#imports, his change makes an effort to clean up and normalize import aliasing. This change makes no API or behavioral changes. It is a pure cleanup intended o help make the code more readable to developers (including myself) trying to understand what is being imported where. Only unexported names have been modified, and the changes were generated and applied mechanically with gofmt -r and comby, respecting the lexical and syntactic rules of Go. Even so, I did not fix every inconsistency. Where the changes would be too disruptive, I left it alone. The principles I followed in this cleanup are: - Remove aliases that restate the package name. - Remove aliases where the base package name is unambiguous. - Move overly-terse abbreviations from the import to the usage site. - Fix lexical issues (remove underscores, remove capitalization). - Fix import groupings to more closely match the style guide. - Group blank (side-effecting) imports and ensure they are commented. - Add aliases to multiple imports with the same base package name.
This commit is contained in:
238
node/node.go
238
node/node.go
@@ -6,19 +6,17 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
_ "github.com/lib/pq" // provide the psql db driver
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/rs/cors"
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cs "github.com/tendermint/tendermint/internal/consensus"
|
||||
"github.com/tendermint/tendermint/internal/consensus"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/p2p/pex"
|
||||
@@ -38,6 +36,10 @@ import (
|
||||
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
||||
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
||||
|
||||
_ "github.com/lib/pq" // provide the psql db driver
|
||||
)
|
||||
|
||||
// nodeImpl is the highest level interface to a full Tendermint node.
|
||||
@@ -46,7 +48,7 @@ type nodeImpl struct {
|
||||
service.BaseService
|
||||
|
||||
// config
|
||||
config *cfg.Config
|
||||
config *config.Config
|
||||
genesisDoc *types.GenesisDoc // initial validator set
|
||||
privValidator types.PrivValidator // local node's validator key
|
||||
|
||||
@@ -69,7 +71,7 @@ type nodeImpl struct {
|
||||
mempool mempool.Mempool
|
||||
stateSync bool // whether the node should state sync on startup
|
||||
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
|
||||
consensusReactor *cs.Reactor // for participating in the consensus
|
||||
consensusReactor *consensus.Reactor // for participating in the consensus
|
||||
pexReactor service.Service // for exchanging peer addresses
|
||||
evidenceReactor service.Service
|
||||
rpcListeners []net.Listener // rpc servers
|
||||
@@ -81,23 +83,23 @@ type nodeImpl struct {
|
||||
// newDefaultNode returns a Tendermint node with default settings for the
|
||||
// PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
|
||||
// It implements NodeProvider.
|
||||
func newDefaultNode(config *cfg.Config, logger log.Logger) (service.Service, error) {
|
||||
nodeKey, err := types.LoadOrGenNodeKey(config.NodeKeyFile())
|
||||
func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, error) {
|
||||
nodeKey, err := types.LoadOrGenNodeKey(cfg.NodeKeyFile())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err)
|
||||
return nil, fmt.Errorf("failed to load or gen node key %s: %w", cfg.NodeKeyFile(), err)
|
||||
}
|
||||
if config.Mode == cfg.ModeSeed {
|
||||
return makeSeedNode(config,
|
||||
cfg.DefaultDBProvider,
|
||||
if cfg.Mode == config.ModeSeed {
|
||||
return makeSeedNode(cfg,
|
||||
config.DefaultDBProvider,
|
||||
nodeKey,
|
||||
defaultGenesisDocProviderFunc(config),
|
||||
defaultGenesisDocProviderFunc(cfg),
|
||||
logger,
|
||||
)
|
||||
}
|
||||
|
||||
var pval *privval.FilePV
|
||||
if config.Mode == cfg.ModeValidator {
|
||||
pval, err = privval.LoadOrGenFilePV(config.PrivValidator.KeyFile(), config.PrivValidator.StateFile())
|
||||
if cfg.Mode == config.ModeValidator {
|
||||
pval, err = privval.LoadOrGenFilePV(cfg.PrivValidator.KeyFile(), cfg.PrivValidator.StateFile())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -105,27 +107,27 @@ func newDefaultNode(config *cfg.Config, logger log.Logger) (service.Service, err
|
||||
pval = nil
|
||||
}
|
||||
|
||||
appClient, _ := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
|
||||
return makeNode(config,
|
||||
appClient, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
|
||||
return makeNode(cfg,
|
||||
pval,
|
||||
nodeKey,
|
||||
appClient,
|
||||
defaultGenesisDocProviderFunc(config),
|
||||
cfg.DefaultDBProvider,
|
||||
defaultGenesisDocProviderFunc(cfg),
|
||||
config.DefaultDBProvider,
|
||||
logger,
|
||||
)
|
||||
}
|
||||
|
||||
// makeNode returns a new, ready to go, Tendermint Node.
|
||||
func makeNode(config *cfg.Config,
|
||||
func makeNode(cfg *config.Config,
|
||||
privValidator types.PrivValidator,
|
||||
nodeKey types.NodeKey,
|
||||
clientCreator abciclient.Creator,
|
||||
genesisDocProvider genesisDocProvider,
|
||||
dbProvider cfg.DBProvider,
|
||||
dbProvider config.DBProvider,
|
||||
logger log.Logger) (service.Service, error) {
|
||||
|
||||
blockStore, stateDB, err := initDBs(config, dbProvider)
|
||||
blockStore, stateDB, err := initDBs(cfg, dbProvider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -161,31 +163,31 @@ func makeNode(config *cfg.Config,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
indexerService, eventSinks, err := createAndStartIndexerService(config, dbProvider, eventBus, logger, genDoc.ChainID)
|
||||
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If an address is provided, listen on the socket for a connection from an
|
||||
// external signing process.
|
||||
if config.PrivValidator.ListenAddr != "" {
|
||||
protocol, _ := tmnet.ProtocolAndAddress(config.PrivValidator.ListenAddr)
|
||||
if cfg.PrivValidator.ListenAddr != "" {
|
||||
protocol, _ := tmnet.ProtocolAndAddress(cfg.PrivValidator.ListenAddr)
|
||||
// FIXME: we should start services inside OnStart
|
||||
switch protocol {
|
||||
case "grpc":
|
||||
privValidator, err = createAndStartPrivValidatorGRPCClient(config, genDoc.ChainID, logger)
|
||||
privValidator, err = createAndStartPrivValidatorGRPCClient(cfg, genDoc.ChainID, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error with private validator grpc client: %w", err)
|
||||
}
|
||||
default:
|
||||
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidator.ListenAddr, genDoc.ChainID, logger)
|
||||
privValidator, err = createAndStartPrivValidatorSocketClient(cfg.PrivValidator.ListenAddr, genDoc.ChainID, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error with private validator socket client: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
var pubKey crypto.PubKey
|
||||
if config.Mode == cfg.ModeValidator {
|
||||
if cfg.Mode == config.ModeValidator {
|
||||
pubKey, err = privValidator.GetPubKey(context.TODO())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
@@ -196,7 +198,7 @@ func makeNode(config *cfg.Config,
|
||||
}
|
||||
|
||||
// Determine whether we should attempt state sync.
|
||||
stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
|
||||
stateSync := cfg.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
|
||||
if stateSync && state.LastBlockHeight > 0 {
|
||||
logger.Info("Found local state with non-zero height, skipping state sync")
|
||||
stateSync = false
|
||||
@@ -221,43 +223,43 @@ func makeNode(config *cfg.Config,
|
||||
|
||||
// Determine whether we should do block sync. This must happen after the handshake, since the
|
||||
// app may modify the validator set, specifying ourself as the only validator.
|
||||
blockSync := config.BlockSync.Enable && !onlyValidatorIsUs(state, pubKey)
|
||||
blockSync := cfg.BlockSync.Enable && !onlyValidatorIsUs(state, pubKey)
|
||||
|
||||
logNodeStartupInfo(state, pubKey, logger, consensusLogger, config.Mode)
|
||||
logNodeStartupInfo(state, pubKey, logger, consensusLogger, cfg.Mode)
|
||||
|
||||
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
|
||||
// TODO: Use a persistent peer database.
|
||||
nodeInfo, err := makeNodeInfo(config, nodeKey, eventSinks, genDoc, state)
|
||||
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
transport := createTransport(p2pLogger, config)
|
||||
transport := createTransport(p2pLogger, cfg)
|
||||
|
||||
peerManager, err := createPeerManager(config, dbProvider, p2pLogger, nodeKey.ID)
|
||||
peerManager, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create peer manager: %w", err)
|
||||
}
|
||||
|
||||
nodeMetrics :=
|
||||
defaultMetricsProvider(config.Instrumentation)(genDoc.ChainID)
|
||||
defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
|
||||
|
||||
router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey,
|
||||
peerManager, transport, getRouterConfig(config, proxyApp))
|
||||
peerManager, transport, getRouterConfig(cfg, proxyApp))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create router: %w", err)
|
||||
}
|
||||
|
||||
mpReactorShim, mpReactor, mp, err := createMempoolReactor(
|
||||
config, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
|
||||
cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
evReactorShim, evReactor, evPool, err := createEvidenceReactor(
|
||||
config, dbProvider, stateDB, blockStore, peerManager, router, logger,
|
||||
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -275,16 +277,16 @@ func makeNode(config *cfg.Config,
|
||||
)
|
||||
|
||||
csReactorShim, csReactor, csState := createConsensusReactor(
|
||||
config, state, blockExec, blockStore, mp, evPool,
|
||||
privValidator, nodeMetrics.cs, stateSync || blockSync, eventBus,
|
||||
cfg, state, blockExec, blockStore, mp, evPool,
|
||||
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
|
||||
peerManager, router, consensusLogger,
|
||||
)
|
||||
|
||||
// Create the blockchain reactor. Note, we do not start block sync if we're
|
||||
// doing a state sync first.
|
||||
bcReactorShim, bcReactor, err := createBlockchainReactor(
|
||||
logger, config, state, blockExec, blockStore, csReactor,
|
||||
peerManager, router, blockSync && !stateSync, nodeMetrics.cs,
|
||||
logger, cfg, state, blockExec, blockStore, csReactor,
|
||||
peerManager, router, blockSync && !stateSync, nodeMetrics.consensus,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
|
||||
@@ -301,9 +303,9 @@ func makeNode(config *cfg.Config,
|
||||
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
|
||||
// FIXME We need to update metrics here, since other reactors don't have access to them.
|
||||
if stateSync {
|
||||
nodeMetrics.cs.StateSyncing.Set(1)
|
||||
nodeMetrics.consensus.StateSyncing.Set(1)
|
||||
} else if blockSync {
|
||||
nodeMetrics.cs.BlockSyncing.Set(1)
|
||||
nodeMetrics.consensus.BlockSyncing.Set(1)
|
||||
}
|
||||
|
||||
// Set up state sync reactor, and schedule a sync if requested.
|
||||
@@ -320,7 +322,7 @@ func makeNode(config *cfg.Config,
|
||||
|
||||
stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
|
||||
|
||||
if config.P2P.UseLegacy {
|
||||
if cfg.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(stateSyncReactorShim)
|
||||
peerUpdates = stateSyncReactorShim.PeerUpdates
|
||||
} else {
|
||||
@@ -331,7 +333,7 @@ func makeNode(config *cfg.Config,
|
||||
stateSyncReactor = statesync.NewReactor(
|
||||
genDoc.ChainID,
|
||||
genDoc.InitialHeight,
|
||||
*config.StateSync,
|
||||
*cfg.StateSync,
|
||||
stateSyncReactorShim.Logger,
|
||||
proxyApp.Snapshot(),
|
||||
proxyApp.Query(),
|
||||
@@ -342,7 +344,7 @@ func makeNode(config *cfg.Config,
|
||||
peerUpdates,
|
||||
stateStore,
|
||||
blockStore,
|
||||
config.StateSync.TempDir,
|
||||
cfg.StateSync.TempDir,
|
||||
nodeMetrics.statesync,
|
||||
)
|
||||
|
||||
@@ -378,46 +380,46 @@ func makeNode(config *cfg.Config,
|
||||
pexCh := pex.ChannelDescriptor()
|
||||
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
|
||||
|
||||
if config.P2P.UseLegacy {
|
||||
if cfg.P2P.UseLegacy {
|
||||
// setup Transport and Switch
|
||||
sw = createSwitch(
|
||||
config, transport, nodeMetrics.p2p, mpReactorShim, bcReactorForSwitch,
|
||||
cfg, transport, nodeMetrics.p2p, mpReactorShim, bcReactorForSwitch,
|
||||
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
|
||||
)
|
||||
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
|
||||
}
|
||||
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
}
|
||||
|
||||
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
|
||||
addrBook, err = createAddrBookAndSetOnSwitch(cfg, sw, p2pLogger, nodeKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create addrbook: %w", err)
|
||||
}
|
||||
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, cfg, sw, logger)
|
||||
} else {
|
||||
addrBook = nil
|
||||
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
|
||||
pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if config.RPC.PprofListenAddress != "" {
|
||||
if cfg.RPC.PprofListenAddress != "" {
|
||||
go func() {
|
||||
logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress)
|
||||
logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil))
|
||||
logger.Info("Starting pprof server", "laddr", cfg.RPC.PprofListenAddress)
|
||||
logger.Error("pprof server error", "err", http.ListenAndServe(cfg.RPC.PprofListenAddress, nil))
|
||||
}()
|
||||
}
|
||||
|
||||
node := &nodeImpl{
|
||||
config: config,
|
||||
config: cfg,
|
||||
genesisDoc: genDoc,
|
||||
privValidator: privValidator,
|
||||
|
||||
@@ -452,7 +454,7 @@ func makeNode(config *cfg.Config,
|
||||
ConsensusState: csState,
|
||||
|
||||
ConsensusReactor: csReactor,
|
||||
BlockSyncReactor: bcReactor.(cs.BlockSyncReactor),
|
||||
BlockSyncReactor: bcReactor.(consensus.BlockSyncReactor),
|
||||
|
||||
P2PPeers: sw,
|
||||
PeerManager: peerManager,
|
||||
@@ -462,7 +464,7 @@ func makeNode(config *cfg.Config,
|
||||
EventBus: eventBus,
|
||||
Mempool: mp,
|
||||
Logger: logger.With("module", "rpc"),
|
||||
Config: *config.RPC,
|
||||
Config: *cfg.RPC,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -485,8 +487,8 @@ func makeNode(config *cfg.Config,
|
||||
}
|
||||
|
||||
// makeSeedNode returns a new seed node, containing only p2p, pex reactor
|
||||
func makeSeedNode(config *cfg.Config,
|
||||
dbProvider cfg.DBProvider,
|
||||
func makeSeedNode(cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
nodeKey types.NodeKey,
|
||||
genesisDocProvider genesisDocProvider,
|
||||
logger log.Logger,
|
||||
@@ -502,23 +504,23 @@ func makeSeedNode(config *cfg.Config,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeInfo, err := makeSeedNodeInfo(config, nodeKey, genDoc, state)
|
||||
nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Setup Transport and Switch.
|
||||
p2pMetrics := p2p.PrometheusMetrics(config.Instrumentation.Namespace, "chain_id", genDoc.ChainID)
|
||||
p2pMetrics := p2p.PrometheusMetrics(cfg.Instrumentation.Namespace, "chain_id", genDoc.ChainID)
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
transport := createTransport(p2pLogger, config)
|
||||
transport := createTransport(p2pLogger, cfg)
|
||||
|
||||
peerManager, err := createPeerManager(config, dbProvider, p2pLogger, nodeKey.ID)
|
||||
peerManager, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create peer manager: %w", err)
|
||||
}
|
||||
|
||||
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey,
|
||||
peerManager, transport, getRouterConfig(config, nil))
|
||||
peerManager, transport, getRouterConfig(cfg, nil))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create router: %w", err)
|
||||
}
|
||||
@@ -536,44 +538,44 @@ func makeSeedNode(config *cfg.Config,
|
||||
pexCh := pex.ChannelDescriptor()
|
||||
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
|
||||
|
||||
if config.P2P.UseLegacy {
|
||||
if cfg.P2P.UseLegacy {
|
||||
sw = createSwitch(
|
||||
config, transport, p2pMetrics, nil, nil,
|
||||
cfg, transport, p2pMetrics, nil, nil,
|
||||
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
|
||||
)
|
||||
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
|
||||
}
|
||||
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
}
|
||||
|
||||
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
|
||||
addrBook, err = createAddrBookAndSetOnSwitch(cfg, sw, p2pLogger, nodeKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create addrbook: %w", err)
|
||||
}
|
||||
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, cfg, sw, logger)
|
||||
} else {
|
||||
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
|
||||
pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if config.RPC.PprofListenAddress != "" {
|
||||
if cfg.RPC.PprofListenAddress != "" {
|
||||
go func() {
|
||||
logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress)
|
||||
logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil))
|
||||
logger.Info("Starting pprof server", "laddr", cfg.RPC.PprofListenAddress)
|
||||
logger.Error("pprof server error", "err", http.ListenAndServe(cfg.RPC.PprofListenAddress, nil))
|
||||
}()
|
||||
}
|
||||
|
||||
node := &nodeImpl{
|
||||
config: config,
|
||||
config: cfg,
|
||||
genesisDoc: genDoc,
|
||||
|
||||
transport: transport,
|
||||
@@ -602,7 +604,7 @@ func (n *nodeImpl) OnStart() error {
|
||||
|
||||
// Start the RPC server before the P2P server
|
||||
// so we can eg. receive txs for the first block
|
||||
if n.config.RPC.ListenAddress != "" && n.config.Mode != cfg.ModeSeed {
|
||||
if n.config.RPC.ListenAddress != "" && n.config.Mode != config.ModeSeed {
|
||||
listeners, err := n.startRPC()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -637,8 +639,8 @@ func (n *nodeImpl) OnStart() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if n.config.Mode != cfg.ModeSeed {
|
||||
if n.config.BlockSync.Version == cfg.BlockSyncV0 {
|
||||
if n.config.Mode != config.ModeSeed {
|
||||
if n.config.BlockSync.Version == config.BlockSyncV0 {
|
||||
if err := n.bcReactor.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -679,7 +681,7 @@ func (n *nodeImpl) OnStart() error {
|
||||
// TODO: We shouldn't run state sync if we already have state that has a
|
||||
// LastBlockHeight that is not InitialHeight
|
||||
if n.stateSync {
|
||||
bcR, ok := n.bcReactor.(cs.BlockSyncReactor)
|
||||
bcR, ok := n.bcReactor.(consensus.BlockSyncReactor)
|
||||
if !ok {
|
||||
return fmt.Errorf("this blockchain reactor does not support switching from state sync")
|
||||
}
|
||||
@@ -758,9 +760,9 @@ func (n *nodeImpl) OnStop() {
|
||||
n.Logger.Error("Error closing indexerService", "err", err)
|
||||
}
|
||||
|
||||
if n.config.Mode != cfg.ModeSeed {
|
||||
if n.config.Mode != config.ModeSeed {
|
||||
// now stop the reactors
|
||||
if n.config.BlockSync.Version == cfg.BlockSyncV0 {
|
||||
if n.config.BlockSync.Version == config.BlockSyncV0 {
|
||||
// Stop the real blockchain reactor separately since the switch uses the shim.
|
||||
if err := n.bcReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the blockchain reactor", "err", err)
|
||||
@@ -831,7 +833,7 @@ func (n *nodeImpl) OnStop() {
|
||||
}
|
||||
|
||||
func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
if n.config.Mode == cfg.ModeValidator {
|
||||
if n.config.Mode == config.ModeValidator {
|
||||
pubKey, err := n.privValidator.GetPubKey(context.TODO())
|
||||
if pubKey == nil || err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
@@ -849,15 +851,15 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
n.rpcEnv.AddUnsafe(routes)
|
||||
}
|
||||
|
||||
config := rpcserver.DefaultConfig()
|
||||
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
|
||||
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
|
||||
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
|
||||
cfg := rpcserver.DefaultConfig()
|
||||
cfg.MaxBodyBytes = n.config.RPC.MaxBodyBytes
|
||||
cfg.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
|
||||
cfg.MaxOpenConnections = n.config.RPC.MaxOpenConnections
|
||||
// If necessary adjust global WriteTimeout to ensure it's greater than
|
||||
// TimeoutBroadcastTxCommit.
|
||||
// See https://github.com/tendermint/tendermint/issues/3435
|
||||
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
|
||||
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
if cfg.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
|
||||
cfg.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
}
|
||||
|
||||
// we may expose the rpc over both a unix and tcp socket
|
||||
@@ -873,14 +875,14 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
|
||||
}
|
||||
}),
|
||||
rpcserver.ReadLimit(config.MaxBodyBytes),
|
||||
rpcserver.ReadLimit(cfg.MaxBodyBytes),
|
||||
)
|
||||
wm.SetLogger(wmLogger)
|
||||
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
||||
rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger)
|
||||
listener, err := rpcserver.Listen(
|
||||
listenAddr,
|
||||
config.MaxOpenConnections,
|
||||
cfg.MaxOpenConnections,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -903,7 +905,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
n.config.RPC.CertFile(),
|
||||
n.config.RPC.KeyFile(),
|
||||
rpcLogger,
|
||||
config,
|
||||
cfg,
|
||||
); err != nil {
|
||||
n.Logger.Error("Error serving server with TLS", "err", err)
|
||||
}
|
||||
@@ -914,7 +916,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
listener,
|
||||
rootHandler,
|
||||
rpcLogger,
|
||||
config,
|
||||
cfg,
|
||||
); err != nil {
|
||||
n.Logger.Error("Error serving server", "err", err)
|
||||
}
|
||||
@@ -927,18 +929,18 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
// we expose a simplified api over grpc for convenience to app devs
|
||||
grpcListenAddr := n.config.RPC.GRPCListenAddress
|
||||
if grpcListenAddr != "" {
|
||||
config := rpcserver.DefaultConfig()
|
||||
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
|
||||
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
|
||||
cfg := rpcserver.DefaultConfig()
|
||||
cfg.MaxBodyBytes = n.config.RPC.MaxBodyBytes
|
||||
cfg.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
|
||||
// NOTE: GRPCMaxOpenConnections is used, not MaxOpenConnections
|
||||
config.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections
|
||||
cfg.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections
|
||||
// If necessary adjust global WriteTimeout to ensure it's greater than
|
||||
// TimeoutBroadcastTxCommit.
|
||||
// See https://github.com/tendermint/tendermint/issues/3435
|
||||
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
|
||||
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
if cfg.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
|
||||
cfg.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
}
|
||||
listener, err := rpcserver.Listen(grpcListenAddr, config.MaxOpenConnections)
|
||||
listener, err := rpcserver.Listen(grpcListenAddr, cfg.MaxOpenConnections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -977,7 +979,7 @@ func (n *nodeImpl) startPrometheusServer(addr string) *http.Server {
|
||||
}
|
||||
|
||||
// ConsensusReactor returns the Node's ConsensusReactor.
|
||||
func (n *nodeImpl) ConsensusReactor() *cs.Reactor {
|
||||
func (n *nodeImpl) ConsensusReactor() *consensus.Reactor {
|
||||
return n.consensusReactor
|
||||
}
|
||||
|
||||
@@ -1031,14 +1033,14 @@ type genesisDocProvider func() (*types.GenesisDoc, error)
|
||||
|
||||
// defaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
|
||||
// the GenesisDoc from the config.GenesisFile() on the filesystem.
|
||||
func defaultGenesisDocProviderFunc(config *cfg.Config) genesisDocProvider {
|
||||
func defaultGenesisDocProviderFunc(cfg *config.Config) genesisDocProvider {
|
||||
return func() (*types.GenesisDoc, error) {
|
||||
return types.GenesisDocFromFile(config.GenesisFile())
|
||||
return types.GenesisDocFromFile(cfg.GenesisFile())
|
||||
}
|
||||
}
|
||||
|
||||
type nodeMetrics struct {
|
||||
cs *cs.Metrics
|
||||
consensus *consensus.Metrics
|
||||
p2p *p2p.Metrics
|
||||
mempool *mempool.Metrics
|
||||
state *sm.Metrics
|
||||
@@ -1050,19 +1052,19 @@ type metricsProvider func(chainID string) *nodeMetrics
|
||||
|
||||
// defaultMetricsProvider returns Metrics build using Prometheus client library
|
||||
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
|
||||
func defaultMetricsProvider(config *cfg.InstrumentationConfig) metricsProvider {
|
||||
func defaultMetricsProvider(cfg *config.InstrumentationConfig) metricsProvider {
|
||||
return func(chainID string) *nodeMetrics {
|
||||
if config.Prometheus {
|
||||
if cfg.Prometheus {
|
||||
return &nodeMetrics{
|
||||
cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
mempool.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
statesync.PrometheusMetrics(config.Namespace, "chain_id", chainID),
|
||||
consensus.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
|
||||
p2p.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
|
||||
mempool.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
|
||||
sm.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
|
||||
statesync.PrometheusMetrics(cfg.Namespace, "chain_id", chainID),
|
||||
}
|
||||
}
|
||||
return &nodeMetrics{
|
||||
cs.NopMetrics(),
|
||||
consensus.NopMetrics(),
|
||||
p2p.NopMetrics(),
|
||||
mempool.NopMetrics(),
|
||||
sm.NopMetrics(),
|
||||
@@ -1130,15 +1132,15 @@ func createAndStartPrivValidatorSocketClient(
|
||||
}
|
||||
|
||||
func createAndStartPrivValidatorGRPCClient(
|
||||
config *cfg.Config,
|
||||
cfg *config.Config,
|
||||
chainID string,
|
||||
logger log.Logger,
|
||||
) (types.PrivValidator, error) {
|
||||
pvsc, err := tmgrpc.DialRemoteSigner(
|
||||
config.PrivValidator,
|
||||
cfg.PrivValidator,
|
||||
chainID,
|
||||
logger,
|
||||
config.Instrumentation.Prometheus,
|
||||
cfg.Instrumentation.Prometheus,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to start private validator: %w", err)
|
||||
@@ -1153,7 +1155,7 @@ func createAndStartPrivValidatorGRPCClient(
|
||||
return pvsc, nil
|
||||
}
|
||||
|
||||
func getRouterConfig(conf *cfg.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
|
||||
func getRouterConfig(conf *config.Config, proxyApp proxy.AppConns) p2p.RouterOptions {
|
||||
opts := p2p.RouterOptions{
|
||||
QueueType: conf.P2P.QueueType,
|
||||
}
|
||||
|
||||
@@ -13,12 +13,11 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/crypto/tmhash"
|
||||
@@ -38,12 +37,12 @@ import (
|
||||
)
|
||||
|
||||
func TestNodeStartStop(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_node_test")
|
||||
cfg := config.ResetTestRoot("node_node_test")
|
||||
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
|
||||
// create & start node
|
||||
ns, err := newDefaultNode(config, log.TestingLogger())
|
||||
ns, err := newDefaultNode(cfg, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, ns.Start())
|
||||
|
||||
@@ -81,7 +80,7 @@ func TestNodeStartStop(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func getTestNode(t *testing.T, conf *cfg.Config, logger log.Logger) *nodeImpl {
|
||||
func getTestNode(t *testing.T, conf *config.Config, logger log.Logger) *nodeImpl {
|
||||
t.Helper()
|
||||
ns, err := newDefaultNode(conf, logger)
|
||||
require.NoError(t, err)
|
||||
@@ -92,12 +91,12 @@ func getTestNode(t *testing.T, conf *cfg.Config, logger log.Logger) *nodeImpl {
|
||||
}
|
||||
|
||||
func TestNodeDelayedStart(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_delayed_start_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
cfg := config.ResetTestRoot("node_delayed_start_test")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
now := tmtime.Now()
|
||||
|
||||
// create & start node
|
||||
n := getTestNode(t, config, log.TestingLogger())
|
||||
n := getTestNode(t, cfg, log.TestingLogger())
|
||||
n.GenesisDoc().GenesisTime = now.Add(2 * time.Second)
|
||||
|
||||
require.NoError(t, n.Start())
|
||||
@@ -108,11 +107,11 @@ func TestNodeDelayedStart(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeSetAppVersion(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_app_version_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
cfg := config.ResetTestRoot("node_app_version_test")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
|
||||
// create node
|
||||
n := getTestNode(t, config, log.TestingLogger())
|
||||
n := getTestNode(t, cfg, log.TestingLogger())
|
||||
|
||||
// default config uses the kvstore app
|
||||
var appVersion uint64 = kvstore.ProtocolVersion
|
||||
@@ -129,9 +128,9 @@ func TestNodeSetAppVersion(t *testing.T) {
|
||||
func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
addr := "tcp://" + testFreeAddr(t)
|
||||
|
||||
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
config.PrivValidator.ListenAddr = addr
|
||||
cfg := config.ResetTestRoot("node_priv_val_tcp_test")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cfg.PrivValidator.ListenAddr = addr
|
||||
|
||||
dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey())
|
||||
dialerEndpoint := privval.NewSignerDialerEndpoint(
|
||||
@@ -142,7 +141,7 @@ func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
|
||||
signerServer := privval.NewSignerServer(
|
||||
dialerEndpoint,
|
||||
config.ChainID(),
|
||||
cfg.ChainID(),
|
||||
types.NewMockPV(),
|
||||
)
|
||||
|
||||
@@ -154,7 +153,7 @@ func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
}()
|
||||
defer signerServer.Stop() //nolint:errcheck // ignore for tests
|
||||
|
||||
n := getTestNode(t, config, log.TestingLogger())
|
||||
n := getTestNode(t, cfg, log.TestingLogger())
|
||||
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
@@ -162,11 +161,11 @@ func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
|
||||
addrNoPrefix := testFreeAddr(t)
|
||||
|
||||
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
config.PrivValidator.ListenAddr = addrNoPrefix
|
||||
cfg := config.ResetTestRoot("node_priv_val_tcp_test")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cfg.PrivValidator.ListenAddr = addrNoPrefix
|
||||
|
||||
_, err := newDefaultNode(config, log.TestingLogger())
|
||||
_, err := newDefaultNode(cfg, log.TestingLogger())
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
@@ -174,9 +173,9 @@ func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
tmpfile := "/tmp/kms." + tmrand.Str(6) + ".sock"
|
||||
defer os.Remove(tmpfile) // clean up
|
||||
|
||||
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
config.PrivValidator.ListenAddr = "unix://" + tmpfile
|
||||
cfg := config.ResetTestRoot("node_priv_val_tcp_test")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cfg.PrivValidator.ListenAddr = "unix://" + tmpfile
|
||||
|
||||
dialer := privval.DialUnixFn(tmpfile)
|
||||
dialerEndpoint := privval.NewSignerDialerEndpoint(
|
||||
@@ -187,7 +186,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
|
||||
pvsc := privval.NewSignerServer(
|
||||
dialerEndpoint,
|
||||
config.ChainID(),
|
||||
cfg.ChainID(),
|
||||
types.NewMockPV(),
|
||||
)
|
||||
|
||||
@@ -196,7 +195,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
defer pvsc.Stop() //nolint:errcheck // ignore for tests
|
||||
n := getTestNode(t, config, log.TestingLogger())
|
||||
n := getTestNode(t, cfg, log.TestingLogger())
|
||||
assert.IsType(t, &privval.RetrySignerClient{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
@@ -212,8 +211,8 @@ func testFreeAddr(t *testing.T) string {
|
||||
// create a proposal block using real and full
|
||||
// mempool and evidence pool and validate it.
|
||||
func TestCreateProposalBlock(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_create_proposal")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
cfg := config.ResetTestRoot("node_create_proposal")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc)
|
||||
err := proxyApp.Start()
|
||||
@@ -233,7 +232,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
proposerAddr, _ := state.Validators.GetByIndex(0)
|
||||
|
||||
mp := mempoolv0.NewCListMempool(
|
||||
config.Mempool,
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempoolv0.WithMetrics(mempool.NopMetrics()),
|
||||
@@ -304,8 +303,8 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_create_proposal")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
cfg := config.ResetTestRoot("node_create_proposal")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc)
|
||||
err := proxyApp.Start()
|
||||
@@ -325,7 +324,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
|
||||
// Make Mempool
|
||||
mp := mempoolv0.NewCListMempool(
|
||||
config.Mempool,
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempoolv0.WithMetrics(mempool.NopMetrics()),
|
||||
@@ -366,8 +365,8 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMaxProposalBlockSize(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_create_proposal")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
cfg := config.ResetTestRoot("node_create_proposal")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
proxyApp := proxy.NewAppConns(cc)
|
||||
err := proxyApp.Start()
|
||||
@@ -385,7 +384,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
|
||||
// Make Mempool
|
||||
mp := mempoolv0.NewCListMempool(
|
||||
config.Mempool,
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempoolv0.WithMetrics(mempool.NopMetrics()),
|
||||
@@ -481,17 +480,17 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeNewSeedNode(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_new_node_custom_reactors_test")
|
||||
config.Mode = cfg.ModeSeed
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
cfg := config.ResetTestRoot("node_new_node_custom_reactors_test")
|
||||
cfg.Mode = config.ModeSeed
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
|
||||
nodeKey, err := types.LoadOrGenNodeKey(config.NodeKeyFile())
|
||||
nodeKey, err := types.LoadOrGenNodeKey(cfg.NodeKeyFile())
|
||||
require.NoError(t, err)
|
||||
|
||||
ns, err := makeSeedNode(config,
|
||||
cfg.DefaultDBProvider,
|
||||
ns, err := makeSeedNode(cfg,
|
||||
config.DefaultDBProvider,
|
||||
nodeKey,
|
||||
defaultGenesisDocProviderFunc(config),
|
||||
defaultGenesisDocProviderFunc(cfg),
|
||||
log.TestingLogger(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
@@ -505,68 +504,68 @@ func TestNodeNewSeedNode(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNodeSetEventSink(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_app_version_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
cfg := config.ResetTestRoot("node_app_version_test")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
|
||||
logger := log.TestingLogger()
|
||||
setupTest := func(t *testing.T, conf *cfg.Config) []indexer.EventSink {
|
||||
setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink {
|
||||
eventBus, err := createAndStartEventBus(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
|
||||
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
|
||||
indexService, eventSinks, err := createAndStartIndexerService(config,
|
||||
cfg.DefaultDBProvider, eventBus, logger, genDoc.ChainID)
|
||||
indexService, eventSinks, err := createAndStartIndexerService(cfg,
|
||||
config.DefaultDBProvider, eventBus, logger, genDoc.ChainID)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
|
||||
return eventSinks
|
||||
}
|
||||
|
||||
eventSinks := setupTest(t, config)
|
||||
eventSinks := setupTest(t, cfg)
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.KV, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"null"}
|
||||
eventSinks = setupTest(t, config)
|
||||
cfg.TxIndex.Indexer = []string{"null"}
|
||||
eventSinks = setupTest(t, cfg)
|
||||
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"null", "kv"}
|
||||
eventSinks = setupTest(t, config)
|
||||
cfg.TxIndex.Indexer = []string{"null", "kv"}
|
||||
eventSinks = setupTest(t, cfg)
|
||||
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"kvv"}
|
||||
ns, err := newDefaultNode(config, logger)
|
||||
cfg.TxIndex.Indexer = []string{"kvv"}
|
||||
ns, err := newDefaultNode(cfg, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Equal(t, errors.New("unsupported event sink type"), err)
|
||||
|
||||
config.TxIndex.Indexer = []string{}
|
||||
eventSinks = setupTest(t, config)
|
||||
cfg.TxIndex.Indexer = []string{}
|
||||
eventSinks = setupTest(t, cfg)
|
||||
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"psql"}
|
||||
ns, err = newDefaultNode(config, logger)
|
||||
cfg.TxIndex.Indexer = []string{"psql"}
|
||||
ns, err = newDefaultNode(cfg, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
|
||||
|
||||
var psqlConn = "test"
|
||||
|
||||
config.TxIndex.Indexer = []string{"psql"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
eventSinks = setupTest(t, config)
|
||||
cfg.TxIndex.Indexer = []string{"psql"}
|
||||
cfg.TxIndex.PsqlConn = psqlConn
|
||||
eventSinks = setupTest(t, cfg)
|
||||
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"psql", "kv"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
eventSinks = setupTest(t, config)
|
||||
cfg.TxIndex.Indexer = []string{"psql", "kv"}
|
||||
cfg.TxIndex.PsqlConn = psqlConn
|
||||
eventSinks = setupTest(t, cfg)
|
||||
|
||||
assert.Equal(t, 2, len(eventSinks))
|
||||
// we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks.
|
||||
@@ -577,9 +576,9 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
assert.Equal(t, indexer.KV, eventSinks[1].Type())
|
||||
}
|
||||
|
||||
config.TxIndex.Indexer = []string{"kv", "psql"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
eventSinks = setupTest(t, config)
|
||||
cfg.TxIndex.Indexer = []string{"kv", "psql"}
|
||||
cfg.TxIndex.PsqlConn = psqlConn
|
||||
eventSinks = setupTest(t, cfg)
|
||||
|
||||
assert.Equal(t, 2, len(eventSinks))
|
||||
if eventSinks[0].Type() == indexer.KV {
|
||||
@@ -590,15 +589,15 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
}
|
||||
|
||||
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
|
||||
config.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
_, err = newDefaultNode(config, logger)
|
||||
cfg.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
|
||||
cfg.TxIndex.PsqlConn = psqlConn
|
||||
_, err = newDefaultNode(cfg, logger)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, e, err)
|
||||
|
||||
config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
_, err = newDefaultNode(config, logger)
|
||||
cfg.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
|
||||
cfg.TxIndex.PsqlConn = psqlConn
|
||||
_, err = newDefaultNode(cfg, logger)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, e, err)
|
||||
}
|
||||
@@ -648,13 +647,13 @@ func loadStatefromGenesis(t *testing.T) sm.State {
|
||||
|
||||
stateDB := dbm.NewMemDB()
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
config := cfg.ResetTestRoot("load_state_from_genesis")
|
||||
cfg := config.ResetTestRoot("load_state_from_genesis")
|
||||
|
||||
loadedState, err := stateStore.Load()
|
||||
require.NoError(t, err)
|
||||
require.True(t, loadedState.IsEmpty())
|
||||
|
||||
genDoc, _ := factory.RandGenesisDoc(config, 0, false, 10)
|
||||
genDoc, _ := factory.RandGenesisDoc(cfg, 0, false, 10)
|
||||
|
||||
state, err := loadStateFromDBOrGenesisDocProvider(
|
||||
stateStore,
|
||||
|
||||
217
node/setup.go
217
node/setup.go
@@ -7,18 +7,17 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
||||
"time"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
|
||||
abciclient "github.com/tendermint/tendermint/abci/client"
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
bcv0 "github.com/tendermint/tendermint/internal/blocksync/v0"
|
||||
bcv2 "github.com/tendermint/tendermint/internal/blocksync/v2"
|
||||
cs "github.com/tendermint/tendermint/internal/consensus"
|
||||
"github.com/tendermint/tendermint/internal/consensus"
|
||||
"github.com/tendermint/tendermint/internal/evidence"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
mempoolv0 "github.com/tendermint/tendermint/internal/mempool/v0"
|
||||
@@ -37,17 +36,19 @@ import (
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tendermint/version"
|
||||
|
||||
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
||||
)
|
||||
|
||||
func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
|
||||
func initDBs(cfg *config.Config, dbProvider config.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { //nolint:lll
|
||||
var blockStoreDB dbm.DB
|
||||
blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config})
|
||||
blockStoreDB, err = dbProvider(&config.DBContext{ID: "blockstore", Config: cfg})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
blockStore = store.NewBlockStore(blockStoreDB)
|
||||
|
||||
stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config})
|
||||
stateDB, err = dbProvider(&config.DBContext{ID: "state", Config: cfg})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -70,13 +71,13 @@ func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
|
||||
}
|
||||
|
||||
func createAndStartIndexerService(
|
||||
config *cfg.Config,
|
||||
dbProvider cfg.DBProvider,
|
||||
cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
eventBus *types.EventBus,
|
||||
logger log.Logger,
|
||||
chainID string,
|
||||
) (*indexer.Service, []indexer.EventSink, error) {
|
||||
eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID)
|
||||
eventSinks, err := sink.EventSinksFromConfig(cfg, dbProvider, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -100,7 +101,7 @@ func doHandshake(
|
||||
proxyApp proxy.AppConns,
|
||||
consensusLogger log.Logger) error {
|
||||
|
||||
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
|
||||
handshaker := consensus.NewHandshaker(stateStore, state, blockStore, genDoc)
|
||||
handshaker.SetLogger(consensusLogger)
|
||||
handshaker.SetEventBus(eventBus)
|
||||
if err := handshaker.Handshake(proxyApp); err != nil {
|
||||
@@ -126,9 +127,9 @@ func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusL
|
||||
)
|
||||
}
|
||||
switch {
|
||||
case mode == cfg.ModeFull:
|
||||
case mode == config.ModeFull:
|
||||
consensusLogger.Info("This node is a fullnode")
|
||||
case mode == cfg.ModeValidator:
|
||||
case mode == config.ModeValidator:
|
||||
addr := pubKey.Address()
|
||||
// Log whether this node is a validator or an observer
|
||||
if state.Validators.HasAddress(addr) {
|
||||
@@ -149,7 +150,7 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
|
||||
}
|
||||
|
||||
func createMempoolReactor(
|
||||
config *cfg.Config,
|
||||
cfg *config.Config,
|
||||
proxyApp proxy.AppConns,
|
||||
state sm.State,
|
||||
memplMetrics *mempool.Metrics,
|
||||
@@ -158,8 +159,8 @@ func createMempoolReactor(
|
||||
logger log.Logger,
|
||||
) (*p2p.ReactorShim, service.Service, mempool.Mempool, error) {
|
||||
|
||||
logger = logger.With("module", "mempool", "version", config.Mempool.Version)
|
||||
channelShims := mempoolv0.GetChannelShims(config.Mempool)
|
||||
logger = logger.With("module", "mempool", "version", cfg.Mempool.Version)
|
||||
channelShims := mempoolv0.GetChannelShims(cfg.Mempool)
|
||||
reactorShim := p2p.NewReactorShim(logger, "MempoolShim", channelShims)
|
||||
|
||||
var (
|
||||
@@ -167,7 +168,7 @@ func createMempoolReactor(
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
)
|
||||
|
||||
if config.P2P.UseLegacy {
|
||||
if cfg.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(reactorShim)
|
||||
peerUpdates = reactorShim.PeerUpdates
|
||||
} else {
|
||||
@@ -175,10 +176,10 @@ func createMempoolReactor(
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
}
|
||||
|
||||
switch config.Mempool.Version {
|
||||
case cfg.MempoolV0:
|
||||
switch cfg.Mempool.Version {
|
||||
case config.MempoolV0:
|
||||
mp := mempoolv0.NewCListMempool(
|
||||
config.Mempool,
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempoolv0.WithMetrics(memplMetrics),
|
||||
@@ -190,23 +191,23 @@ func createMempoolReactor(
|
||||
|
||||
reactor := mempoolv0.NewReactor(
|
||||
logger,
|
||||
config.Mempool,
|
||||
cfg.Mempool,
|
||||
peerManager,
|
||||
mp,
|
||||
channels[mempool.MempoolChannel],
|
||||
peerUpdates,
|
||||
)
|
||||
|
||||
if config.Consensus.WaitForTxs() {
|
||||
if cfg.Consensus.WaitForTxs() {
|
||||
mp.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
return reactorShim, reactor, mp, nil
|
||||
|
||||
case cfg.MempoolV1:
|
||||
case config.MempoolV1:
|
||||
mp := mempoolv1.NewTxMempool(
|
||||
logger,
|
||||
config.Mempool,
|
||||
cfg.Mempool,
|
||||
proxyApp.Mempool(),
|
||||
state.LastBlockHeight,
|
||||
mempoolv1.WithMetrics(memplMetrics),
|
||||
@@ -216,34 +217,34 @@ func createMempoolReactor(
|
||||
|
||||
reactor := mempoolv1.NewReactor(
|
||||
logger,
|
||||
config.Mempool,
|
||||
cfg.Mempool,
|
||||
peerManager,
|
||||
mp,
|
||||
channels[mempool.MempoolChannel],
|
||||
peerUpdates,
|
||||
)
|
||||
|
||||
if config.Consensus.WaitForTxs() {
|
||||
if cfg.Consensus.WaitForTxs() {
|
||||
mp.EnableTxsAvailable()
|
||||
}
|
||||
|
||||
return reactorShim, reactor, mp, nil
|
||||
|
||||
default:
|
||||
return nil, nil, nil, fmt.Errorf("unknown mempool version: %s", config.Mempool.Version)
|
||||
return nil, nil, nil, fmt.Errorf("unknown mempool version: %s", cfg.Mempool.Version)
|
||||
}
|
||||
}
|
||||
|
||||
func createEvidenceReactor(
|
||||
config *cfg.Config,
|
||||
dbProvider cfg.DBProvider,
|
||||
cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
stateDB dbm.DB,
|
||||
blockStore *store.BlockStore,
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
logger log.Logger,
|
||||
) (*p2p.ReactorShim, *evidence.Reactor, *evidence.Pool, error) {
|
||||
evidenceDB, err := dbProvider(&cfg.DBContext{ID: "evidence", Config: config})
|
||||
evidenceDB, err := dbProvider(&config.DBContext{ID: "evidence", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
@@ -261,7 +262,7 @@ func createEvidenceReactor(
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
)
|
||||
|
||||
if config.P2P.UseLegacy {
|
||||
if cfg.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(reactorShim)
|
||||
peerUpdates = reactorShim.PeerUpdates
|
||||
} else {
|
||||
@@ -281,21 +282,21 @@ func createEvidenceReactor(
|
||||
|
||||
func createBlockchainReactor(
|
||||
logger log.Logger,
|
||||
config *cfg.Config,
|
||||
cfg *config.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore *store.BlockStore,
|
||||
csReactor *cs.Reactor,
|
||||
csReactor *consensus.Reactor,
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
blockSync bool,
|
||||
metrics *cs.Metrics,
|
||||
metrics *consensus.Metrics,
|
||||
) (*p2p.ReactorShim, service.Service, error) {
|
||||
|
||||
logger = logger.With("module", "blockchain")
|
||||
|
||||
switch config.BlockSync.Version {
|
||||
case cfg.BlockSyncV0:
|
||||
switch cfg.BlockSync.Version {
|
||||
case config.BlockSyncV0:
|
||||
reactorShim := p2p.NewReactorShim(logger, "BlockchainShim", bcv0.ChannelShims)
|
||||
|
||||
var (
|
||||
@@ -303,7 +304,7 @@ func createBlockchainReactor(
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
)
|
||||
|
||||
if config.P2P.UseLegacy {
|
||||
if cfg.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(reactorShim)
|
||||
peerUpdates = reactorShim.PeerUpdates
|
||||
} else {
|
||||
@@ -322,69 +323,69 @@ func createBlockchainReactor(
|
||||
|
||||
return reactorShim, reactor, nil
|
||||
|
||||
case cfg.BlockSyncV2:
|
||||
case config.BlockSyncV2:
|
||||
return nil, nil, errors.New("block sync version v2 is no longer supported. Please use v0")
|
||||
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unknown block sync version %s", config.BlockSync.Version)
|
||||
return nil, nil, fmt.Errorf("unknown block sync version %s", cfg.BlockSync.Version)
|
||||
}
|
||||
}
|
||||
|
||||
func createConsensusReactor(
|
||||
config *cfg.Config,
|
||||
cfg *config.Config,
|
||||
state sm.State,
|
||||
blockExec *sm.BlockExecutor,
|
||||
blockStore sm.BlockStore,
|
||||
mp mempool.Mempool,
|
||||
evidencePool *evidence.Pool,
|
||||
privValidator types.PrivValidator,
|
||||
csMetrics *cs.Metrics,
|
||||
csMetrics *consensus.Metrics,
|
||||
waitSync bool,
|
||||
eventBus *types.EventBus,
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
logger log.Logger,
|
||||
) (*p2p.ReactorShim, *cs.Reactor, *cs.State) {
|
||||
) (*p2p.ReactorShim, *consensus.Reactor, *consensus.State) {
|
||||
|
||||
consensusState := cs.NewState(
|
||||
config.Consensus,
|
||||
consensusState := consensus.NewState(
|
||||
cfg.Consensus,
|
||||
state.Copy(),
|
||||
blockExec,
|
||||
blockStore,
|
||||
mp,
|
||||
evidencePool,
|
||||
cs.StateMetrics(csMetrics),
|
||||
consensus.StateMetrics(csMetrics),
|
||||
)
|
||||
consensusState.SetLogger(logger)
|
||||
if privValidator != nil && config.Mode == cfg.ModeValidator {
|
||||
if privValidator != nil && cfg.Mode == config.ModeValidator {
|
||||
consensusState.SetPrivValidator(privValidator)
|
||||
}
|
||||
|
||||
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", cs.ChannelShims)
|
||||
reactorShim := p2p.NewReactorShim(logger, "ConsensusShim", consensus.ChannelShims)
|
||||
|
||||
var (
|
||||
channels map[p2p.ChannelID]*p2p.Channel
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
)
|
||||
|
||||
if config.P2P.UseLegacy {
|
||||
if cfg.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(reactorShim)
|
||||
peerUpdates = reactorShim.PeerUpdates
|
||||
} else {
|
||||
channels = makeChannelsFromShims(router, cs.ChannelShims)
|
||||
channels = makeChannelsFromShims(router, consensus.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
}
|
||||
|
||||
reactor := cs.NewReactor(
|
||||
reactor := consensus.NewReactor(
|
||||
logger,
|
||||
consensusState,
|
||||
channels[cs.StateChannel],
|
||||
channels[cs.DataChannel],
|
||||
channels[cs.VoteChannel],
|
||||
channels[cs.VoteSetBitsChannel],
|
||||
channels[consensus.StateChannel],
|
||||
channels[consensus.DataChannel],
|
||||
channels[consensus.VoteChannel],
|
||||
channels[consensus.VoteSetBitsChannel],
|
||||
peerUpdates,
|
||||
waitSync,
|
||||
cs.ReactorMetrics(csMetrics),
|
||||
consensus.ReactorMetrics(csMetrics),
|
||||
)
|
||||
|
||||
// Services which will be publishing and/or subscribing for messages (events)
|
||||
@@ -394,20 +395,20 @@ func createConsensusReactor(
|
||||
return reactorShim, reactor, consensusState
|
||||
}
|
||||
|
||||
func createTransport(logger log.Logger, config *cfg.Config) *p2p.MConnTransport {
|
||||
func createTransport(logger log.Logger, cfg *config.Config) *p2p.MConnTransport {
|
||||
return p2p.NewMConnTransport(
|
||||
logger, p2p.MConnConfig(config.P2P), []*p2p.ChannelDescriptor{},
|
||||
logger, p2p.MConnConfig(cfg.P2P), []*p2p.ChannelDescriptor{},
|
||||
p2p.MConnTransportOptions{
|
||||
MaxAcceptedConnections: uint32(config.P2P.MaxNumInboundPeers +
|
||||
len(tmstrings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")),
|
||||
MaxAcceptedConnections: uint32(cfg.P2P.MaxNumInboundPeers +
|
||||
len(tmstrings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ")),
|
||||
),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func createPeerManager(
|
||||
config *cfg.Config,
|
||||
dbProvider cfg.DBProvider,
|
||||
cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
p2pLogger log.Logger,
|
||||
nodeID types.NodeID,
|
||||
) (*p2p.PeerManager, error) {
|
||||
@@ -415,16 +416,16 @@ func createPeerManager(
|
||||
var maxConns uint16
|
||||
|
||||
switch {
|
||||
case config.P2P.MaxConnections > 0:
|
||||
maxConns = config.P2P.MaxConnections
|
||||
case cfg.P2P.MaxConnections > 0:
|
||||
maxConns = cfg.P2P.MaxConnections
|
||||
|
||||
case config.P2P.MaxNumInboundPeers > 0 && config.P2P.MaxNumOutboundPeers > 0:
|
||||
x := config.P2P.MaxNumInboundPeers + config.P2P.MaxNumOutboundPeers
|
||||
case cfg.P2P.MaxNumInboundPeers > 0 && cfg.P2P.MaxNumOutboundPeers > 0:
|
||||
x := cfg.P2P.MaxNumInboundPeers + cfg.P2P.MaxNumOutboundPeers
|
||||
if x > math.MaxUint16 {
|
||||
return nil, fmt.Errorf(
|
||||
"max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
|
||||
config.P2P.MaxNumInboundPeers,
|
||||
config.P2P.MaxNumOutboundPeers,
|
||||
cfg.P2P.MaxNumInboundPeers,
|
||||
cfg.P2P.MaxNumOutboundPeers,
|
||||
math.MaxUint16,
|
||||
)
|
||||
}
|
||||
@@ -436,7 +437,7 @@ func createPeerManager(
|
||||
}
|
||||
|
||||
privatePeerIDs := make(map[types.NodeID]struct{})
|
||||
for _, id := range tmstrings.SplitAndTrimEmpty(config.P2P.PrivatePeerIDs, ",", " ") {
|
||||
for _, id := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PrivatePeerIDs, ",", " ") {
|
||||
privatePeerIDs[types.NodeID(id)] = struct{}{}
|
||||
}
|
||||
|
||||
@@ -452,7 +453,7 @@ func createPeerManager(
|
||||
}
|
||||
|
||||
peers := []p2p.NodeAddress{}
|
||||
for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ") {
|
||||
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ") {
|
||||
address, err := p2p.ParseNodeAddress(p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
|
||||
@@ -462,7 +463,7 @@ func createPeerManager(
|
||||
options.PersistentPeers = append(options.PersistentPeers, address.NodeID)
|
||||
}
|
||||
|
||||
for _, p := range tmstrings.SplitAndTrimEmpty(config.P2P.BootstrapPeers, ",", " ") {
|
||||
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BootstrapPeers, ",", " ") {
|
||||
address, err := p2p.ParseNodeAddress(p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
|
||||
@@ -470,7 +471,7 @@ func createPeerManager(
|
||||
peers = append(peers, address)
|
||||
}
|
||||
|
||||
peerDB, err := dbProvider(&cfg.DBContext{ID: "peerstore", Config: config})
|
||||
peerDB, err := dbProvider(&config.DBContext{ID: "peerstore", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -511,7 +512,7 @@ func createRouter(
|
||||
}
|
||||
|
||||
func createSwitch(
|
||||
config *cfg.Config,
|
||||
cfg *config.Config,
|
||||
transport p2p.Transport,
|
||||
p2pMetrics *p2p.Metrics,
|
||||
mempoolReactor *p2p.ReactorShim,
|
||||
@@ -530,13 +531,13 @@ func createSwitch(
|
||||
peerFilters = []p2p.PeerFilterFunc{}
|
||||
)
|
||||
|
||||
if !config.P2P.AllowDuplicateIP {
|
||||
if !cfg.P2P.AllowDuplicateIP {
|
||||
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter)
|
||||
}
|
||||
|
||||
// Filter peers by addr or pubkey with an ABCI query.
|
||||
// If the query return code is OK, add peer.
|
||||
if config.FilterPeers {
|
||||
if cfg.FilterPeers {
|
||||
connFilters = append(
|
||||
connFilters,
|
||||
// ABCI query for address filtering.
|
||||
@@ -575,7 +576,7 @@ func createSwitch(
|
||||
}
|
||||
|
||||
sw := p2p.NewSwitch(
|
||||
config.P2P,
|
||||
cfg.P2P,
|
||||
transport,
|
||||
p2p.WithMetrics(p2pMetrics),
|
||||
p2p.SwitchPeerFilters(peerFilters...),
|
||||
@@ -583,7 +584,7 @@ func createSwitch(
|
||||
)
|
||||
|
||||
sw.SetLogger(p2pLogger)
|
||||
if config.Mode != cfg.ModeSeed {
|
||||
if cfg.Mode != config.ModeSeed {
|
||||
sw.AddReactor("MEMPOOL", mempoolReactor)
|
||||
sw.AddReactor("BLOCKCHAIN", bcReactor)
|
||||
sw.AddReactor("CONSENSUS", consensusReactor)
|
||||
@@ -594,26 +595,26 @@ func createSwitch(
|
||||
sw.SetNodeInfo(nodeInfo)
|
||||
sw.SetNodeKey(nodeKey)
|
||||
|
||||
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", config.NodeKeyFile())
|
||||
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID, "file", cfg.NodeKeyFile())
|
||||
return sw
|
||||
}
|
||||
|
||||
func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
|
||||
func createAddrBookAndSetOnSwitch(cfg *config.Config, sw *p2p.Switch,
|
||||
p2pLogger log.Logger, nodeKey types.NodeKey) (pex.AddrBook, error) {
|
||||
|
||||
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
|
||||
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
|
||||
addrBook := pex.NewAddrBook(cfg.P2P.AddrBookFile(), cfg.P2P.AddrBookStrict)
|
||||
addrBook.SetLogger(p2pLogger.With("book", cfg.P2P.AddrBookFile()))
|
||||
|
||||
// Add ourselves to addrbook to prevent dialing ourselves
|
||||
if config.P2P.ExternalAddress != "" {
|
||||
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ExternalAddress))
|
||||
if cfg.P2P.ExternalAddress != "" {
|
||||
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(cfg.P2P.ExternalAddress))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
|
||||
}
|
||||
addrBook.AddOurAddress(addr)
|
||||
}
|
||||
if config.P2P.ListenAddress != "" {
|
||||
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(config.P2P.ListenAddress))
|
||||
if cfg.P2P.ListenAddress != "" {
|
||||
addr, err := types.NewNetAddressString(nodeKey.ID.AddressString(cfg.P2P.ListenAddress))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
|
||||
}
|
||||
@@ -625,19 +626,19 @@ func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
|
||||
return addrBook, nil
|
||||
}
|
||||
|
||||
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
|
||||
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, cfg *config.Config,
|
||||
sw *p2p.Switch, logger log.Logger) *pex.Reactor {
|
||||
|
||||
reactorConfig := &pex.ReactorConfig{
|
||||
Seeds: tmstrings.SplitAndTrimEmpty(config.P2P.Seeds, ",", " "),
|
||||
SeedMode: config.Mode == cfg.ModeSeed,
|
||||
Seeds: tmstrings.SplitAndTrimEmpty(cfg.P2P.Seeds, ",", " "),
|
||||
SeedMode: cfg.Mode == config.ModeSeed,
|
||||
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
|
||||
// blocks assuming 10s blocks ~ 28 hours.
|
||||
// TODO (melekes): make it dynamic based on the actual block latencies
|
||||
// from the live network.
|
||||
// https://github.com/tendermint/tendermint/issues/3523
|
||||
SeedDisconnectWaitPeriod: 28 * time.Hour,
|
||||
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
|
||||
PersistentPeersMaxDialPeriod: cfg.P2P.PersistentPeersMaxDialPeriod,
|
||||
}
|
||||
// TODO persistent peers ? so we can have their DNS addrs saved
|
||||
pexReactor := pex.NewReactor(addrBook, reactorConfig)
|
||||
@@ -647,7 +648,7 @@ func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
|
||||
}
|
||||
|
||||
func createPEXReactorV2(
|
||||
config *cfg.Config,
|
||||
cfg *config.Config,
|
||||
logger log.Logger,
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
@@ -663,7 +664,7 @@ func createPEXReactorV2(
|
||||
}
|
||||
|
||||
func makeNodeInfo(
|
||||
config *cfg.Config,
|
||||
cfg *config.Config,
|
||||
nodeKey types.NodeKey,
|
||||
eventSinks []indexer.EventSink,
|
||||
genDoc *types.GenesisDoc,
|
||||
@@ -676,15 +677,15 @@ func makeNodeInfo(
|
||||
}
|
||||
|
||||
var bcChannel byte
|
||||
switch config.BlockSync.Version {
|
||||
case cfg.BlockSyncV0:
|
||||
switch cfg.BlockSync.Version {
|
||||
case config.BlockSyncV0:
|
||||
bcChannel = byte(bcv0.BlockSyncChannel)
|
||||
|
||||
case cfg.BlockSyncV2:
|
||||
case config.BlockSyncV2:
|
||||
bcChannel = bcv2.BlockchainChannel
|
||||
|
||||
default:
|
||||
return types.NodeInfo{}, fmt.Errorf("unknown blocksync version %s", config.BlockSync.Version)
|
||||
return types.NodeInfo{}, fmt.Errorf("unknown blocksync version %s", cfg.BlockSync.Version)
|
||||
}
|
||||
|
||||
nodeInfo := types.NodeInfo{
|
||||
@@ -698,10 +699,10 @@ func makeNodeInfo(
|
||||
Version: version.TMVersion,
|
||||
Channels: []byte{
|
||||
bcChannel,
|
||||
byte(cs.StateChannel),
|
||||
byte(cs.DataChannel),
|
||||
byte(cs.VoteChannel),
|
||||
byte(cs.VoteSetBitsChannel),
|
||||
byte(consensus.StateChannel),
|
||||
byte(consensus.DataChannel),
|
||||
byte(consensus.VoteChannel),
|
||||
byte(consensus.VoteSetBitsChannel),
|
||||
byte(mempool.MempoolChannel),
|
||||
byte(evidence.EvidenceChannel),
|
||||
byte(statesync.SnapshotChannel),
|
||||
@@ -709,21 +710,21 @@ func makeNodeInfo(
|
||||
byte(statesync.LightBlockChannel),
|
||||
byte(statesync.ParamsChannel),
|
||||
},
|
||||
Moniker: config.Moniker,
|
||||
Moniker: cfg.Moniker,
|
||||
Other: types.NodeInfoOther{
|
||||
TxIndex: txIndexerStatus,
|
||||
RPCAddress: config.RPC.ListenAddress,
|
||||
RPCAddress: cfg.RPC.ListenAddress,
|
||||
},
|
||||
}
|
||||
|
||||
if config.P2P.PexReactor {
|
||||
if cfg.P2P.PexReactor {
|
||||
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
|
||||
}
|
||||
|
||||
lAddr := config.P2P.ExternalAddress
|
||||
lAddr := cfg.P2P.ExternalAddress
|
||||
|
||||
if lAddr == "" {
|
||||
lAddr = config.P2P.ListenAddress
|
||||
lAddr = cfg.P2P.ListenAddress
|
||||
}
|
||||
|
||||
nodeInfo.ListenAddr = lAddr
|
||||
@@ -733,7 +734,7 @@ func makeNodeInfo(
|
||||
}
|
||||
|
||||
func makeSeedNodeInfo(
|
||||
config *cfg.Config,
|
||||
cfg *config.Config,
|
||||
nodeKey types.NodeKey,
|
||||
genDoc *types.GenesisDoc,
|
||||
state sm.State,
|
||||
@@ -748,21 +749,21 @@ func makeSeedNodeInfo(
|
||||
Network: genDoc.ChainID,
|
||||
Version: version.TMVersion,
|
||||
Channels: []byte{},
|
||||
Moniker: config.Moniker,
|
||||
Moniker: cfg.Moniker,
|
||||
Other: types.NodeInfoOther{
|
||||
TxIndex: "off",
|
||||
RPCAddress: config.RPC.ListenAddress,
|
||||
RPCAddress: cfg.RPC.ListenAddress,
|
||||
},
|
||||
}
|
||||
|
||||
if config.P2P.PexReactor {
|
||||
if cfg.P2P.PexReactor {
|
||||
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
|
||||
}
|
||||
|
||||
lAddr := config.P2P.ExternalAddress
|
||||
lAddr := cfg.P2P.ExternalAddress
|
||||
|
||||
if lAddr == "" {
|
||||
lAddr = config.P2P.ListenAddress
|
||||
lAddr = cfg.P2P.ListenAddress
|
||||
}
|
||||
|
||||
nodeInfo.ListenAddr = lAddr
|
||||
|
||||
Reference in New Issue
Block a user