mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-10 14:00:33 +00:00
Merge branch 'master' into callum/p2p-provider
This commit is contained in:
311
node/node.go
311
node/node.go
@@ -18,7 +18,6 @@ import (
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cs "github.com/tendermint/tendermint/internal/consensus"
|
||||
"github.com/tendermint/tendermint/internal/evidence"
|
||||
"github.com/tendermint/tendermint/internal/mempool"
|
||||
"github.com/tendermint/tendermint/internal/p2p"
|
||||
"github.com/tendermint/tendermint/internal/p2p/pex"
|
||||
@@ -36,7 +35,6 @@ import (
|
||||
grpccore "github.com/tendermint/tendermint/rpc/grpc"
|
||||
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -70,16 +68,12 @@ type nodeImpl struct {
|
||||
mempool mempool.Mempool
|
||||
stateSync bool // whether the node should state sync on startup
|
||||
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
|
||||
consensusState *cs.State // latest consensus state
|
||||
consensusReactor *cs.Reactor // for participating in the consensus
|
||||
pexReactor *pex.Reactor // for exchanging peer addresses
|
||||
pexReactorV2 *pex.ReactorV2 // for exchanging peer addresses
|
||||
evidenceReactor *evidence.Reactor
|
||||
evidencePool *evidence.Pool // tracking evidence
|
||||
proxyApp proxy.AppConns // connection to the application
|
||||
pexReactor service.Service // for exchanging peer addresses
|
||||
evidenceReactor service.Service
|
||||
rpcListeners []net.Listener // rpc servers
|
||||
eventSinks []indexer.EventSink
|
||||
indexerService *indexer.Service
|
||||
indexerService service.Service
|
||||
rpcEnv *rpccore.Environment
|
||||
prometheusSrv *http.Server
|
||||
}
|
||||
|
||||
@@ -324,12 +318,12 @@ func makeNode(config *cfg.Config,
|
||||
|
||||
stateSyncReactorShim = p2p.NewReactorShim(logger.With("module", "statesync"), "StateSyncShim", statesync.ChannelShims)
|
||||
|
||||
if config.P2P.DisableLegacy {
|
||||
channels = makeChannelsFromShims(router, statesync.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
} else {
|
||||
if config.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(stateSyncReactorShim)
|
||||
peerUpdates = stateSyncReactorShim.PeerUpdates
|
||||
} else {
|
||||
channels = makeChannelsFromShims(router, statesync.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
}
|
||||
|
||||
stateSyncReactor = statesync.NewReactor(
|
||||
@@ -373,45 +367,42 @@ func makeNode(config *cfg.Config,
|
||||
// Note we currently use the addrBook regardless at least for AddOurAddress
|
||||
|
||||
var (
|
||||
pexReactor *pex.Reactor
|
||||
pexReactorV2 *pex.ReactorV2
|
||||
sw *p2p.Switch
|
||||
addrBook pex.AddrBook
|
||||
pexReactor service.Service
|
||||
sw *p2p.Switch
|
||||
addrBook pex.AddrBook
|
||||
)
|
||||
|
||||
pexCh := pex.ChannelDescriptor()
|
||||
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
|
||||
|
||||
if config.P2P.PexReactor {
|
||||
if config.P2P.DisableLegacy {
|
||||
addrBook = nil
|
||||
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
// setup Transport and Switch
|
||||
sw = createSwitch(
|
||||
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
|
||||
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
|
||||
)
|
||||
if config.P2P.UseLegacy {
|
||||
// setup Transport and Switch
|
||||
sw = createSwitch(
|
||||
config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch,
|
||||
stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger,
|
||||
)
|
||||
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
|
||||
}
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
|
||||
}
|
||||
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
}
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
}
|
||||
|
||||
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create addrbook: %w", err)
|
||||
}
|
||||
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create addrbook: %w", err)
|
||||
}
|
||||
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
|
||||
} else {
|
||||
addrBook = nil
|
||||
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -440,19 +431,51 @@ func makeNode(config *cfg.Config,
|
||||
bcReactor: bcReactor,
|
||||
mempoolReactor: mpReactor,
|
||||
mempool: mp,
|
||||
consensusState: csState,
|
||||
consensusReactor: csReactor,
|
||||
stateSyncReactor: stateSyncReactor,
|
||||
stateSync: stateSync,
|
||||
pexReactor: pexReactor,
|
||||
pexReactorV2: pexReactorV2,
|
||||
evidenceReactor: evReactor,
|
||||
evidencePool: evPool,
|
||||
proxyApp: proxyApp,
|
||||
indexerService: indexerService,
|
||||
eventBus: eventBus,
|
||||
eventSinks: eventSinks,
|
||||
|
||||
rpcEnv: &rpccore.Environment{
|
||||
ProxyAppQuery: proxyApp.Query(),
|
||||
ProxyAppMempool: proxyApp.Mempool(),
|
||||
|
||||
StateStore: stateStore,
|
||||
BlockStore: blockStore,
|
||||
EvidencePool: evPool,
|
||||
ConsensusState: csState,
|
||||
|
||||
ConsensusReactor: csReactor,
|
||||
BlockSyncReactor: bcReactor.(cs.BlockSyncReactor),
|
||||
|
||||
P2PPeers: sw,
|
||||
PeerManager: peerManager,
|
||||
|
||||
GenDoc: genDoc,
|
||||
EventSinks: eventSinks,
|
||||
EventBus: eventBus,
|
||||
Mempool: mp,
|
||||
Logger: logger.With("module", "rpc"),
|
||||
Config: *config.RPC,
|
||||
},
|
||||
}
|
||||
|
||||
// this is a terrible, because typed nil interfaces are not ==
|
||||
// nil, so this is just cleanup to avoid having a non-nil
|
||||
// value in the RPC environment that has the semantic
|
||||
// properties of nil.
|
||||
if sw == nil {
|
||||
node.rpcEnv.P2PPeers = nil
|
||||
} else if peerManager == nil {
|
||||
node.rpcEnv.PeerManager = nil
|
||||
}
|
||||
// end hack
|
||||
|
||||
node.rpcEnv.P2PTransport = node
|
||||
|
||||
node.BaseService = *service.NewBaseService(logger, "Node", node)
|
||||
|
||||
return node, nil
|
||||
@@ -485,25 +508,6 @@ func makeSeedNode(config *cfg.Config,
|
||||
p2pMetrics := p2p.PrometheusMetrics(config.Instrumentation.Namespace, "chain_id", genDoc.ChainID)
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
transport := createTransport(p2pLogger, config)
|
||||
sw := createSwitch(
|
||||
config, transport, p2pMetrics, nil, nil,
|
||||
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
|
||||
)
|
||||
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
|
||||
}
|
||||
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
}
|
||||
|
||||
addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create addrbook: %w", err)
|
||||
}
|
||||
|
||||
peerManager, err := createPeerManager(config, dbProvider, p2pLogger, nodeKey.ID)
|
||||
if err != nil {
|
||||
@@ -517,8 +521,9 @@ func makeSeedNode(config *cfg.Config,
|
||||
}
|
||||
|
||||
var (
|
||||
pexReactor *pex.Reactor
|
||||
pexReactorV2 *pex.ReactorV2
|
||||
pexReactor service.Service
|
||||
sw *p2p.Switch
|
||||
addrBook pex.AddrBook
|
||||
)
|
||||
|
||||
// add the pex reactor
|
||||
@@ -527,13 +532,34 @@ func makeSeedNode(config *cfg.Config,
|
||||
// p2p stack is removed.
|
||||
pexCh := pex.ChannelDescriptor()
|
||||
transport.AddChannelDescriptors([]*p2p.ChannelDescriptor{&pexCh})
|
||||
if config.P2P.DisableLegacy {
|
||||
pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router)
|
||||
|
||||
if config.P2P.UseLegacy {
|
||||
sw = createSwitch(
|
||||
config, transport, p2pMetrics, nil, nil,
|
||||
nil, nil, nil, nil, nodeInfo, nodeKey, p2pLogger,
|
||||
)
|
||||
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
|
||||
}
|
||||
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
}
|
||||
|
||||
addrBook, err = createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create addrbook: %w", err)
|
||||
}
|
||||
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
|
||||
} else {
|
||||
pexReactor, err = createPEXReactorV2(config, logger, peerManager, router)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
|
||||
}
|
||||
|
||||
if config.RPC.PprofListenAddress != "" {
|
||||
@@ -555,8 +581,7 @@ func makeSeedNode(config *cfg.Config,
|
||||
peerManager: peerManager,
|
||||
router: router,
|
||||
|
||||
pexReactor: pexReactor,
|
||||
pexReactorV2: pexReactorV2,
|
||||
pexReactor: pexReactor,
|
||||
}
|
||||
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
|
||||
|
||||
@@ -597,23 +622,20 @@ func (n *nodeImpl) OnStart() error {
|
||||
}
|
||||
|
||||
n.isListening = true
|
||||
n.Logger.Info("p2p service", "legacy_enabled", n.config.P2P.UseLegacy)
|
||||
|
||||
n.Logger.Info("p2p service", "legacy_enabled", !n.config.P2P.DisableLegacy)
|
||||
|
||||
if n.config.P2P.DisableLegacy {
|
||||
err = n.router.Start()
|
||||
} else {
|
||||
if n.config.P2P.UseLegacy {
|
||||
// Add private IDs to addrbook to block those peers being added
|
||||
n.addrBook.AddPrivateIDs(strings.SplitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
|
||||
err = n.sw.Start()
|
||||
}
|
||||
if err != nil {
|
||||
if err = n.sw.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if err = n.router.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if n.config.Mode != cfg.ModeSeed {
|
||||
if n.config.BlockSync.Version == cfg.BlockSyncV0 {
|
||||
// Start the real blockchain reactor separately since the switch uses the shim.
|
||||
if err := n.bcReactor.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -640,17 +662,14 @@ func (n *nodeImpl) OnStart() error {
|
||||
}
|
||||
}
|
||||
|
||||
if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil {
|
||||
if err := n.pexReactorV2.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if n.config.P2P.UseLegacy {
|
||||
// Always connect to persistent peers
|
||||
err = n.sw.DialPeersAsync(strings.SplitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not dial peers from persistent-peers field: %w", err)
|
||||
}
|
||||
|
||||
} else if err := n.pexReactor.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Run state sync
|
||||
@@ -764,20 +783,18 @@ func (n *nodeImpl) OnStop() {
|
||||
}
|
||||
}
|
||||
|
||||
if n.config.P2P.DisableLegacy && n.pexReactorV2 != nil {
|
||||
if err := n.pexReactorV2.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
|
||||
}
|
||||
if err := n.pexReactor.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop the PEX v2 reactor", "err", err)
|
||||
}
|
||||
|
||||
if n.config.P2P.DisableLegacy {
|
||||
if err := n.router.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop router", "err", err)
|
||||
}
|
||||
} else {
|
||||
if n.config.P2P.UseLegacy {
|
||||
if err := n.sw.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop switch", "err", err)
|
||||
}
|
||||
} else {
|
||||
if err := n.router.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop router", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := n.transport.Close(); err != nil {
|
||||
@@ -808,55 +825,23 @@ func (n *nodeImpl) OnStop() {
|
||||
}
|
||||
}
|
||||
|
||||
// ConfigureRPC makes sure RPC has all the objects it needs to operate.
|
||||
func (n *nodeImpl) ConfigureRPC() (*rpccore.Environment, error) {
|
||||
rpcCoreEnv := rpccore.Environment{
|
||||
ProxyAppQuery: n.proxyApp.Query(),
|
||||
ProxyAppMempool: n.proxyApp.Mempool(),
|
||||
|
||||
StateStore: n.stateStore,
|
||||
BlockStore: n.blockStore,
|
||||
EvidencePool: n.evidencePool,
|
||||
ConsensusState: n.consensusState,
|
||||
P2PPeers: n.sw,
|
||||
P2PTransport: n,
|
||||
|
||||
GenDoc: n.genesisDoc,
|
||||
EventSinks: n.eventSinks,
|
||||
ConsensusReactor: n.consensusReactor,
|
||||
EventBus: n.eventBus,
|
||||
Mempool: n.mempool,
|
||||
|
||||
Logger: n.Logger.With("module", "rpc"),
|
||||
|
||||
Config: *n.config.RPC,
|
||||
BlockSyncReactor: n.bcReactor.(cs.BlockSyncReactor),
|
||||
}
|
||||
func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
if n.config.Mode == cfg.ModeValidator {
|
||||
pubKey, err := n.privValidator.GetPubKey(context.TODO())
|
||||
if pubKey == nil || err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
}
|
||||
rpcCoreEnv.PubKey = pubKey
|
||||
n.rpcEnv.PubKey = pubKey
|
||||
}
|
||||
if err := rpcCoreEnv.InitGenesisChunks(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &rpcCoreEnv, nil
|
||||
}
|
||||
|
||||
func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
env, err := n.ConfigureRPC()
|
||||
if err != nil {
|
||||
if err := n.rpcEnv.InitGenesisChunks(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
listenAddrs := strings.SplitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")
|
||||
routes := env.GetRoutes()
|
||||
routes := n.rpcEnv.GetRoutes()
|
||||
|
||||
if n.config.RPC.Unsafe {
|
||||
env.AddUnsafe(routes)
|
||||
n.rpcEnv.AddUnsafe(routes)
|
||||
}
|
||||
|
||||
config := rpcserver.DefaultConfig()
|
||||
@@ -890,7 +875,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger)
|
||||
listener, err := rpcserver.Listen(
|
||||
listenAddr,
|
||||
config,
|
||||
config.MaxOpenConnections,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -948,12 +933,12 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
|
||||
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
}
|
||||
listener, err := rpcserver.Listen(grpcListenAddr, config)
|
||||
listener, err := rpcserver.Listen(grpcListenAddr, config.MaxOpenConnections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
if err := grpccore.StartGRPCServer(env, listener); err != nil {
|
||||
if err := grpccore.StartGRPCServer(n.rpcEnv, listener); err != nil {
|
||||
n.Logger.Error("Error starting gRPC server", "err", err)
|
||||
}
|
||||
}()
|
||||
@@ -986,46 +971,16 @@ func (n *nodeImpl) startPrometheusServer(addr string) *http.Server {
|
||||
return srv
|
||||
}
|
||||
|
||||
// Switch returns the Node's Switch.
|
||||
func (n *nodeImpl) Switch() *p2p.Switch {
|
||||
return n.sw
|
||||
}
|
||||
|
||||
// BlockStore returns the Node's BlockStore.
|
||||
func (n *nodeImpl) BlockStore() *store.BlockStore {
|
||||
return n.blockStore
|
||||
}
|
||||
|
||||
// ConsensusState returns the Node's ConsensusState.
|
||||
func (n *nodeImpl) ConsensusState() *cs.State {
|
||||
return n.consensusState
|
||||
}
|
||||
|
||||
// ConsensusReactor returns the Node's ConsensusReactor.
|
||||
func (n *nodeImpl) ConsensusReactor() *cs.Reactor {
|
||||
return n.consensusReactor
|
||||
}
|
||||
|
||||
// MempoolReactor returns the Node's mempool reactor.
|
||||
func (n *nodeImpl) MempoolReactor() service.Service {
|
||||
return n.mempoolReactor
|
||||
}
|
||||
|
||||
// Mempool returns the Node's mempool.
|
||||
func (n *nodeImpl) Mempool() mempool.Mempool {
|
||||
return n.mempool
|
||||
}
|
||||
|
||||
// PEXReactor returns the Node's PEXReactor. It returns nil if PEX is disabled.
|
||||
func (n *nodeImpl) PEXReactor() *pex.Reactor {
|
||||
return n.pexReactor
|
||||
}
|
||||
|
||||
// EvidencePool returns the Node's EvidencePool.
|
||||
func (n *nodeImpl) EvidencePool() *evidence.Pool {
|
||||
return n.evidencePool
|
||||
}
|
||||
|
||||
// EventBus returns the Node's EventBus.
|
||||
func (n *nodeImpl) EventBus() *types.EventBus {
|
||||
return n.eventBus
|
||||
@@ -1042,19 +997,9 @@ func (n *nodeImpl) GenesisDoc() *types.GenesisDoc {
|
||||
return n.genesisDoc
|
||||
}
|
||||
|
||||
// ProxyApp returns the Node's AppConns, representing its connections to the ABCI application.
|
||||
func (n *nodeImpl) ProxyApp() proxy.AppConns {
|
||||
return n.proxyApp
|
||||
}
|
||||
|
||||
// Config returns the Node's config.
|
||||
func (n *nodeImpl) Config() *cfg.Config {
|
||||
return n.config
|
||||
}
|
||||
|
||||
// EventSinks returns the Node's event indexing sinks.
|
||||
func (n *nodeImpl) EventSinks() []indexer.EventSink {
|
||||
return n.eventSinks
|
||||
// RPCEnvironment makes sure RPC has all the objects it needs to operate.
|
||||
func (n *nodeImpl) RPCEnvironment() *rpccore.Environment {
|
||||
return n.rpcEnv
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
@@ -39,6 +39,7 @@ import (
|
||||
|
||||
func TestNodeStartStop(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_node_test")
|
||||
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
|
||||
// create & start node
|
||||
@@ -49,8 +50,6 @@ func TestNodeStartStop(t *testing.T) {
|
||||
n, ok := ns.(*nodeImpl)
|
||||
require.True(t, ok)
|
||||
|
||||
t.Logf("Started node %v", n.sw.NodeInfo())
|
||||
|
||||
// wait for the node to produce a block
|
||||
blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
|
||||
require.NoError(t, err)
|
||||
@@ -509,36 +508,50 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_app_version_test")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
|
||||
n := getTestNode(t, config, log.TestingLogger())
|
||||
logger := log.TestingLogger()
|
||||
setupTest := func(t *testing.T, conf *cfg.Config) []indexer.EventSink {
|
||||
eventBus, err := createAndStartEventBus(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 1, len(n.eventSinks))
|
||||
assert.Equal(t, indexer.KV, n.eventSinks[0].Type())
|
||||
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
|
||||
indexService, eventSinks, err := createAndStartIndexerService(config,
|
||||
cfg.DefaultDBProvider, eventBus, logger, genDoc.ChainID)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
|
||||
return eventSinks
|
||||
}
|
||||
|
||||
eventSinks := setupTest(t, config)
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.KV, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"null"}
|
||||
n = getTestNode(t, config, log.TestingLogger())
|
||||
eventSinks = setupTest(t, config)
|
||||
|
||||
assert.Equal(t, 1, len(n.eventSinks))
|
||||
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"null", "kv"}
|
||||
n = getTestNode(t, config, log.TestingLogger())
|
||||
eventSinks = setupTest(t, config)
|
||||
|
||||
assert.Equal(t, 1, len(n.eventSinks))
|
||||
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"kvv"}
|
||||
ns, err := newDefaultNode(config, log.TestingLogger())
|
||||
ns, err := newDefaultNode(config, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Equal(t, errors.New("unsupported event sink type"), err)
|
||||
|
||||
config.TxIndex.Indexer = []string{}
|
||||
n = getTestNode(t, config, log.TestingLogger())
|
||||
eventSinks = setupTest(t, config)
|
||||
|
||||
assert.Equal(t, 1, len(n.eventSinks))
|
||||
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.NULL, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"psql"}
|
||||
ns, err = newDefaultNode(config, log.TestingLogger())
|
||||
ns, err = newDefaultNode(config, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
|
||||
|
||||
@@ -546,46 +559,46 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
|
||||
config.TxIndex.Indexer = []string{"psql"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
n = getTestNode(t, config, log.TestingLogger())
|
||||
assert.Equal(t, 1, len(n.eventSinks))
|
||||
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
|
||||
n.OnStop()
|
||||
eventSinks = setupTest(t, config)
|
||||
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
|
||||
|
||||
config.TxIndex.Indexer = []string{"psql", "kv"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
n = getTestNode(t, config, log.TestingLogger())
|
||||
assert.Equal(t, 2, len(n.eventSinks))
|
||||
eventSinks = setupTest(t, config)
|
||||
|
||||
assert.Equal(t, 2, len(eventSinks))
|
||||
// we use map to filter the duplicated sinks, so it's not guarantee the order when append sinks.
|
||||
if n.eventSinks[0].Type() == indexer.KV {
|
||||
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
|
||||
if eventSinks[0].Type() == indexer.KV {
|
||||
assert.Equal(t, indexer.PSQL, eventSinks[1].Type())
|
||||
} else {
|
||||
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
|
||||
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
|
||||
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
|
||||
assert.Equal(t, indexer.KV, eventSinks[1].Type())
|
||||
}
|
||||
n.OnStop()
|
||||
|
||||
config.TxIndex.Indexer = []string{"kv", "psql"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
n = getTestNode(t, config, log.TestingLogger())
|
||||
assert.Equal(t, 2, len(n.eventSinks))
|
||||
if n.eventSinks[0].Type() == indexer.KV {
|
||||
assert.Equal(t, indexer.PSQL, n.eventSinks[1].Type())
|
||||
eventSinks = setupTest(t, config)
|
||||
|
||||
assert.Equal(t, 2, len(eventSinks))
|
||||
if eventSinks[0].Type() == indexer.KV {
|
||||
assert.Equal(t, indexer.PSQL, eventSinks[1].Type())
|
||||
} else {
|
||||
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
|
||||
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
|
||||
assert.Equal(t, indexer.PSQL, eventSinks[0].Type())
|
||||
assert.Equal(t, indexer.KV, eventSinks[1].Type())
|
||||
}
|
||||
n.OnStop()
|
||||
|
||||
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
|
||||
config.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
_, err = newDefaultNode(config, log.TestingLogger())
|
||||
_, err = newDefaultNode(config, logger)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, e, err)
|
||||
|
||||
config.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
|
||||
config.TxIndex.PsqlConn = psqlConn
|
||||
_, err = newDefaultNode(config, log.TestingLogger())
|
||||
_, err = newDefaultNode(config, logger)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, e, err)
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"math"
|
||||
"net"
|
||||
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
@@ -33,9 +32,7 @@ import (
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
|
||||
null "github.com/tendermint/tendermint/state/indexer/sink/null"
|
||||
psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tendermint/version"
|
||||
@@ -78,56 +75,9 @@ func createAndStartIndexerService(
|
||||
logger log.Logger,
|
||||
chainID string,
|
||||
) (*indexer.Service, []indexer.EventSink, error) {
|
||||
|
||||
eventSinks := []indexer.EventSink{}
|
||||
|
||||
// check for duplicated sinks
|
||||
sinks := map[string]bool{}
|
||||
for _, s := range config.TxIndex.Indexer {
|
||||
sl := strings.ToLower(s)
|
||||
if sinks[sl] {
|
||||
return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
|
||||
}
|
||||
|
||||
sinks[sl] = true
|
||||
}
|
||||
|
||||
loop:
|
||||
for k := range sinks {
|
||||
switch k {
|
||||
case string(indexer.NULL):
|
||||
// When we see null in the config, the eventsinks will be reset with the
|
||||
// nullEventSink.
|
||||
eventSinks = []indexer.EventSink{null.NewEventSink()}
|
||||
break loop
|
||||
|
||||
case string(indexer.KV):
|
||||
store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
eventSinks = append(eventSinks, kv.NewEventSink(store))
|
||||
|
||||
case string(indexer.PSQL):
|
||||
conn := config.TxIndex.PsqlConn
|
||||
if conn == "" {
|
||||
return nil, nil, errors.New("the psql connection settings cannot be empty")
|
||||
}
|
||||
|
||||
es, _, err := psql.NewEventSink(conn, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
eventSinks = append(eventSinks, es)
|
||||
|
||||
default:
|
||||
return nil, nil, errors.New("unsupported event sink type")
|
||||
}
|
||||
}
|
||||
|
||||
if len(eventSinks) == 0 {
|
||||
eventSinks = []indexer.EventSink{null.NewEventSink()}
|
||||
eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
|
||||
@@ -216,12 +166,12 @@ func createMempoolReactor(
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
)
|
||||
|
||||
if config.P2P.DisableLegacy {
|
||||
channels = makeChannelsFromShims(router, channelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
} else {
|
||||
if config.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(reactorShim)
|
||||
peerUpdates = reactorShim.PeerUpdates
|
||||
} else {
|
||||
channels = makeChannelsFromShims(router, channelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
}
|
||||
|
||||
switch config.Mempool.Version {
|
||||
@@ -310,12 +260,12 @@ func createEvidenceReactor(
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
)
|
||||
|
||||
if config.P2P.DisableLegacy {
|
||||
channels = makeChannelsFromShims(router, evidence.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
} else {
|
||||
if config.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(reactorShim)
|
||||
peerUpdates = reactorShim.PeerUpdates
|
||||
} else {
|
||||
channels = makeChannelsFromShims(router, evidence.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
}
|
||||
|
||||
evidenceReactor := evidence.NewReactor(
|
||||
@@ -352,17 +302,17 @@ func createBlockchainReactor(
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
)
|
||||
|
||||
if config.P2P.DisableLegacy {
|
||||
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
} else {
|
||||
if config.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(reactorShim)
|
||||
peerUpdates = reactorShim.PeerUpdates
|
||||
} else {
|
||||
channels = makeChannelsFromShims(router, bcv0.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
}
|
||||
|
||||
reactor, err := bcv0.NewReactor(
|
||||
logger, state.Copy(), blockExec, blockStore, csReactor,
|
||||
channels[bcv0.BlockchainChannel], peerUpdates, blockSync,
|
||||
channels[bcv0.BlockSyncChannel], peerUpdates, blockSync,
|
||||
metrics,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -416,12 +366,12 @@ func createConsensusReactor(
|
||||
peerUpdates *p2p.PeerUpdates
|
||||
)
|
||||
|
||||
if config.P2P.DisableLegacy {
|
||||
channels = makeChannelsFromShims(router, cs.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
} else {
|
||||
if config.P2P.UseLegacy {
|
||||
channels = getChannelsFromShim(reactorShim)
|
||||
peerUpdates = reactorShim.PeerUpdates
|
||||
} else {
|
||||
channels = makeChannelsFromShims(router, cs.ChannelShims)
|
||||
peerUpdates = peerManager.Subscribe()
|
||||
}
|
||||
|
||||
reactor := cs.NewReactor(
|
||||
@@ -700,7 +650,7 @@ func createPEXReactorV2(
|
||||
logger log.Logger,
|
||||
peerManager *p2p.PeerManager,
|
||||
router *p2p.Router,
|
||||
) (*pex.ReactorV2, error) {
|
||||
) (service.Service, error) {
|
||||
|
||||
channel, err := router.OpenChannel(pex.ChannelDescriptor(), &protop2p.PexMessage{}, 128)
|
||||
if err != nil {
|
||||
@@ -727,7 +677,7 @@ func makeNodeInfo(
|
||||
var bcChannel byte
|
||||
switch config.BlockSync.Version {
|
||||
case cfg.BlockSyncV0:
|
||||
bcChannel = byte(bcv0.BlockchainChannel)
|
||||
bcChannel = byte(bcv0.BlockSyncChannel)
|
||||
|
||||
case cfg.BlockSyncV2:
|
||||
bcChannel = bcv2.BlockchainChannel
|
||||
|
||||
Reference in New Issue
Block a user