mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-06 13:26:23 +00:00
@@ -18,14 +18,16 @@ var _ indexer.EventSink = (*EventSink)(nil)
|
||||
// The EventSink is an aggregator for redirecting the call path of the tx/block kvIndexer.
|
||||
// For the implementation details please see the kv.go in the indexer/block and indexer/tx folder.
|
||||
type EventSink struct {
|
||||
txi *kvt.TxIndex
|
||||
bi *kvb.BlockerIndexer
|
||||
txi *kvt.TxIndex
|
||||
bi *kvb.BlockerIndexer
|
||||
store dbm.DB
|
||||
}
|
||||
|
||||
func NewEventSink(store dbm.DB) indexer.EventSink {
|
||||
return &EventSink{
|
||||
txi: kvt.NewTxIndex(store),
|
||||
bi: kvb.New(store),
|
||||
txi: kvt.NewTxIndex(store),
|
||||
bi: kvb.New(store),
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,5 +60,5 @@ func (kves *EventSink) HasBlock(h int64) (bool, error) {
|
||||
}
|
||||
|
||||
func (kves *EventSink) Stop() error {
|
||||
return nil
|
||||
return kves.store.Close()
|
||||
}
|
||||
|
||||
125
node/node.go
125
node/node.go
@@ -65,6 +65,7 @@ type nodeImpl struct {
|
||||
|
||||
// services
|
||||
eventBus *types.EventBus // pub/sub for services
|
||||
eventSinks []indexer.EventSink
|
||||
stateStore sm.Store
|
||||
blockStore *store.BlockStore // store the blockchain to disk
|
||||
bcReactor service.Service // for block-syncing
|
||||
@@ -76,6 +77,7 @@ type nodeImpl struct {
|
||||
pexReactor service.Service // for exchanging peer addresses
|
||||
evidenceReactor service.Service
|
||||
rpcListeners []net.Listener // rpc servers
|
||||
shutdownOps closer
|
||||
indexerService service.Service
|
||||
rpcEnv *rpccore.Environment
|
||||
prometheusSrv *http.Server
|
||||
@@ -109,6 +111,7 @@ func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, err
|
||||
}
|
||||
|
||||
appClient, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
|
||||
|
||||
return makeNode(cfg,
|
||||
pval,
|
||||
nodeKey,
|
||||
@@ -126,27 +129,34 @@ func makeNode(cfg *config.Config,
|
||||
clientCreator abciclient.Creator,
|
||||
genesisDocProvider genesisDocProvider,
|
||||
dbProvider config.DBProvider,
|
||||
logger log.Logger) (service.Service, error) {
|
||||
logger log.Logger,
|
||||
) (service.Service, error) {
|
||||
closers := []closer{}
|
||||
|
||||
blockStore, stateDB, err := initDBs(cfg, dbProvider)
|
||||
blockStore, stateDB, dbCloser, err := initDBs(cfg, dbProvider)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, dbCloser)
|
||||
}
|
||||
closers = append(closers, dbCloser)
|
||||
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
|
||||
genDoc, err := genesisDocProvider()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
err = genDoc.ValidateAndComplete()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error in genesis doc: %w", err)
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("error in genesis doc: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
|
||||
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
|
||||
}
|
||||
|
||||
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
|
||||
@@ -154,7 +164,8 @@ func makeNode(cfg *config.Config,
|
||||
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
|
||||
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, nodeMetrics.proxy)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
|
||||
}
|
||||
|
||||
// EventBus and IndexerService must be started before the handshake because
|
||||
@@ -163,13 +174,14 @@ func makeNode(cfg *config.Config,
|
||||
// but before it indexed the txs, or, endblocker panicked)
|
||||
eventBus, err := createAndStartEventBus(logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
|
||||
}
|
||||
|
||||
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider,
|
||||
eventBus, logger, genDoc.ChainID, nodeMetrics.indexer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
// If an address is provided, listen on the socket for a connection from an
|
||||
@@ -181,12 +193,16 @@ func makeNode(cfg *config.Config,
|
||||
case "grpc":
|
||||
privValidator, err = createAndStartPrivValidatorGRPCClient(cfg, genDoc.ChainID, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error with private validator grpc client: %w", err)
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("error with private validator grpc client: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
default:
|
||||
privValidator, err = createAndStartPrivValidatorSocketClient(cfg.PrivValidator.ListenAddr, genDoc.ChainID, logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error with private validator socket client: %w", err)
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("error with private validator socket client: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -194,10 +210,14 @@ func makeNode(cfg *config.Config,
|
||||
if cfg.Mode == config.ModeValidator {
|
||||
pubKey, err = privValidator.GetPubKey(context.TODO())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't get pubkey: %w", err)
|
||||
return nil, combineCloseError(fmt.Errorf("can't get pubkey: %w", err),
|
||||
makeCloser(closers))
|
||||
|
||||
}
|
||||
if pubKey == nil {
|
||||
return nil, errors.New("could not retrieve public key from private validator")
|
||||
return nil, combineCloseError(
|
||||
errors.New("could not retrieve public key from private validator"),
|
||||
makeCloser(closers))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -213,7 +233,8 @@ func makeNode(cfg *config.Config,
|
||||
consensusLogger := logger.With("module", "consensus")
|
||||
if !stateSync {
|
||||
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
|
||||
}
|
||||
|
||||
// Reload the state. It will have the Version.Consensus.App set by the
|
||||
@@ -221,7 +242,9 @@ func makeNode(cfg *config.Config,
|
||||
// what happened during block replay).
|
||||
state, err = stateStore.Load()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot load state: %w", err)
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("cannot load state: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -235,35 +258,43 @@ func makeNode(cfg *config.Config,
|
||||
// TODO: Use a persistent peer database.
|
||||
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
|
||||
}
|
||||
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
transport := createTransport(p2pLogger, cfg)
|
||||
|
||||
peerManager, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID)
|
||||
peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID)
|
||||
closers = append(closers, peerCloser)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create peer manager: %w", err)
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("failed to create peer manager: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
|
||||
router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey,
|
||||
peerManager, transport, getRouterConfig(cfg, proxyApp))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create router: %w", err)
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("failed to create router: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
|
||||
mpReactorShim, mpReactor, mp, err := createMempoolReactor(
|
||||
cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
|
||||
}
|
||||
|
||||
evReactorShim, evReactor, evPool, err := createEvidenceReactor(
|
||||
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
|
||||
}
|
||||
|
||||
// make block executor for consensus and blockchain reactors to execute blocks
|
||||
@@ -290,7 +321,9 @@ func makeNode(cfg *config.Config,
|
||||
peerManager, router, blockSync && !stateSync, nodeMetrics.consensus,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("could not create blockchain reactor: %w", err),
|
||||
makeCloser(closers))
|
||||
}
|
||||
|
||||
// TODO: Remove this once the switch is removed.
|
||||
@@ -390,17 +423,20 @@ func makeNode(cfg *config.Config,
|
||||
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err)
|
||||
err = fmt.Errorf("could not add peers from persistent-peers field: %w", err)
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
err = fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
addrBook, err = createAddrBookAndSetOnSwitch(cfg, sw, p2pLogger, nodeKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create addrbook: %w", err)
|
||||
err = fmt.Errorf("could not create addrbook: %w", err)
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
|
||||
if cfg.P2P.PexReactor {
|
||||
@@ -411,7 +447,7 @@ func makeNode(cfg *config.Config,
|
||||
if cfg.P2P.PexReactor {
|
||||
pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, makeCloser(closers))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -447,6 +483,9 @@ func makeNode(cfg *config.Config,
|
||||
evidenceReactor: evReactor,
|
||||
indexerService: indexerService,
|
||||
eventBus: eventBus,
|
||||
eventSinks: eventSinks,
|
||||
|
||||
shutdownOps: makeCloser(closers),
|
||||
|
||||
rpcEnv: &rpccore.Environment{
|
||||
ProxyAppQuery: proxyApp.Query(),
|
||||
@@ -509,6 +548,7 @@ func makeSeedNode(cfg *config.Config,
|
||||
state, err := sm.MakeGenesisState(genDoc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
}
|
||||
|
||||
nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state)
|
||||
@@ -521,15 +561,19 @@ func makeSeedNode(cfg *config.Config,
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
transport := createTransport(p2pLogger, cfg)
|
||||
|
||||
peerManager, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID)
|
||||
peerManager, closer, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create peer manager: %w", err)
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("failed to create peer manager: %w", err),
|
||||
closer)
|
||||
}
|
||||
|
||||
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey,
|
||||
peerManager, transport, getRouterConfig(cfg, nil))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create router: %w", err)
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("failed to create router: %w", err),
|
||||
closer)
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -553,17 +597,20 @@ func makeSeedNode(cfg *config.Config,
|
||||
|
||||
err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
|
||||
err = fmt.Errorf("could not add peers from persistent_peers field: %w", err)
|
||||
return nil, combineCloseError(err, closer)
|
||||
}
|
||||
|
||||
err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " "))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
err = fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
|
||||
return nil, combineCloseError(err, closer)
|
||||
}
|
||||
|
||||
addrBook, err = createAddrBookAndSetOnSwitch(cfg, sw, p2pLogger, nodeKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create addrbook: %w", err)
|
||||
err = fmt.Errorf("could not create addrbook: %w", err)
|
||||
return nil, combineCloseError(err, closer)
|
||||
}
|
||||
|
||||
if cfg.P2P.PexReactor {
|
||||
@@ -573,7 +620,7 @@ func makeSeedNode(cfg *config.Config,
|
||||
if cfg.P2P.PexReactor {
|
||||
pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, combineCloseError(err, closer)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -597,6 +644,8 @@ func makeSeedNode(cfg *config.Config,
|
||||
peerManager: peerManager,
|
||||
router: router,
|
||||
|
||||
shutdownOps: closer,
|
||||
|
||||
pexReactor: pexReactor,
|
||||
}
|
||||
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
|
||||
@@ -773,6 +822,12 @@ func (n *nodeImpl) OnStop() {
|
||||
n.Logger.Error("Error closing indexerService", "err", err)
|
||||
}
|
||||
|
||||
for _, es := range n.eventSinks {
|
||||
if err := es.Stop(); err != nil {
|
||||
n.Logger.Error("failed to stop event sink", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if n.config.Mode != config.ModeSeed {
|
||||
// now stop the reactors
|
||||
if n.config.BlockSync.Version == config.BlockSyncV0 {
|
||||
@@ -842,6 +897,10 @@ func (n *nodeImpl) OnStop() {
|
||||
// Error from closing listeners, or context timeout:
|
||||
n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
|
||||
}
|
||||
|
||||
}
|
||||
if err := n.shutdownOps(); err != nil {
|
||||
n.Logger.Error("problem shutting down additional services", "err", err)
|
||||
}
|
||||
if n.blockStore != nil {
|
||||
if err := n.blockStore.Close(); err != nil {
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/test/factory"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
tmtime "github.com/tendermint/tendermint/libs/time"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -535,6 +536,22 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
|
||||
return eventSinks
|
||||
}
|
||||
cleanup := func(ns service.Service) func() {
|
||||
return func() {
|
||||
n, ok := ns.(*nodeImpl)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if n == nil {
|
||||
return
|
||||
}
|
||||
if !n.IsRunning() {
|
||||
return
|
||||
}
|
||||
assert.NoError(t, n.Stop())
|
||||
n.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
eventSinks := setupTest(t, cfg)
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
@@ -555,7 +572,8 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
cfg.TxIndex.Indexer = []string{"kvv"}
|
||||
ns, err := newDefaultNode(cfg, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Equal(t, errors.New("unsupported event sink type"), err)
|
||||
assert.Contains(t, err.Error(), "unsupported event sink type")
|
||||
t.Cleanup(cleanup(ns))
|
||||
|
||||
cfg.TxIndex.Indexer = []string{}
|
||||
eventSinks = setupTest(t, cfg)
|
||||
@@ -566,7 +584,8 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
cfg.TxIndex.Indexer = []string{"psql"}
|
||||
ns, err = newDefaultNode(cfg, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
|
||||
assert.Contains(t, err.Error(), "the psql connection settings cannot be empty")
|
||||
t.Cleanup(cleanup(ns))
|
||||
|
||||
// N.B. We can't create a PSQL event sink without starting a postgres
|
||||
// instance for it to talk to. The indexer service tests exercise that case.
|
||||
@@ -575,12 +594,14 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
cfg.TxIndex.Indexer = []string{"null", "kv", "Kv"}
|
||||
_, err = newDefaultNode(cfg, logger)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, e, err)
|
||||
assert.Contains(t, err.Error(), e.Error())
|
||||
t.Cleanup(cleanup(ns))
|
||||
|
||||
cfg.TxIndex.Indexer = []string{"Null", "kV", "kv", "nUlL"}
|
||||
_, err = newDefaultNode(cfg, logger)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, e, err)
|
||||
assert.Contains(t, err.Error(), e.Error())
|
||||
t.Cleanup(cleanup(ns))
|
||||
}
|
||||
|
||||
func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
@@ -40,20 +41,57 @@ import (
|
||||
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
||||
)
|
||||
|
||||
func initDBs(cfg *config.Config, dbProvider config.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { //nolint:lll
|
||||
var blockStoreDB dbm.DB
|
||||
blockStoreDB, err = dbProvider(&config.DBContext{ID: "blockstore", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to initialize blockstore: %w", err)
|
||||
}
|
||||
blockStore = store.NewBlockStore(blockStoreDB)
|
||||
type closer func() error
|
||||
|
||||
stateDB, err = dbProvider(&config.DBContext{ID: "state", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("unable to initialize statestore: %w", err)
|
||||
func makeCloser(cs []closer) closer {
|
||||
return func() error {
|
||||
errs := make([]string, 0, len(cs))
|
||||
for _, cl := range cs {
|
||||
if err := cl(); err != nil {
|
||||
errs = append(errs, err.Error())
|
||||
}
|
||||
}
|
||||
if len(errs) >= 0 {
|
||||
return errors.New(strings.Join(errs, "; "))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func combineCloseError(err error, cl closer) error {
|
||||
if err == nil {
|
||||
return cl()
|
||||
}
|
||||
|
||||
return blockStore, stateDB, nil
|
||||
clerr := cl()
|
||||
if clerr == nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return fmt.Errorf("error=%q closerError=%q", err.Error(), clerr.Error())
|
||||
}
|
||||
|
||||
func initDBs(
|
||||
cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
) (*store.BlockStore, dbm.DB, closer, error) {
|
||||
|
||||
blockStoreDB, err := dbProvider(&config.DBContext{ID: "blockstore", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, nil, func() error { return nil }, fmt.Errorf("unable to initialize blockstore: %w", err)
|
||||
}
|
||||
closers := []closer{}
|
||||
blockStore := store.NewBlockStore(blockStoreDB)
|
||||
closers = append(closers, blockStoreDB.Close)
|
||||
|
||||
stateDB, err := dbProvider(&config.DBContext{ID: "state", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, nil, makeCloser(closers), fmt.Errorf("unable to initialize statestore: %w", err)
|
||||
}
|
||||
|
||||
closers = append(closers, stateDB.Close)
|
||||
|
||||
return blockStore, stateDB, makeCloser(closers), nil
|
||||
}
|
||||
|
||||
// nolint:lll
|
||||
@@ -421,11 +459,11 @@ func createPeerManager(
|
||||
dbProvider config.DBProvider,
|
||||
p2pLogger log.Logger,
|
||||
nodeID types.NodeID,
|
||||
) (*p2p.PeerManager, error) {
|
||||
) (*p2p.PeerManager, closer, error) {
|
||||
|
||||
selfAddr, err := p2p.ParseNodeAddress(nodeID.AddressString(cfg.P2P.ExternalAddress))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't parse ExternalAddress %q: %w", cfg.P2P.ExternalAddress, err)
|
||||
return nil, func() error { return nil }, fmt.Errorf("couldn't parse ExternalAddress %q: %w", cfg.P2P.ExternalAddress, err)
|
||||
}
|
||||
|
||||
var maxConns uint16
|
||||
@@ -437,7 +475,7 @@ func createPeerManager(
|
||||
case cfg.P2P.MaxNumInboundPeers > 0 && cfg.P2P.MaxNumOutboundPeers > 0:
|
||||
x := cfg.P2P.MaxNumInboundPeers + cfg.P2P.MaxNumOutboundPeers
|
||||
if x > math.MaxUint16 {
|
||||
return nil, fmt.Errorf(
|
||||
return nil, func() error { return nil }, fmt.Errorf(
|
||||
"max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)",
|
||||
cfg.P2P.MaxNumInboundPeers,
|
||||
cfg.P2P.MaxNumOutboundPeers,
|
||||
@@ -472,7 +510,7 @@ func createPeerManager(
|
||||
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ") {
|
||||
address, err := p2p.ParseNodeAddress(p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
|
||||
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
|
||||
}
|
||||
|
||||
peers = append(peers, address)
|
||||
@@ -482,28 +520,28 @@ func createPeerManager(
|
||||
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BootstrapPeers, ",", " ") {
|
||||
address, err := p2p.ParseNodeAddress(p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
|
||||
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
|
||||
}
|
||||
peers = append(peers, address)
|
||||
}
|
||||
|
||||
peerDB, err := dbProvider(&config.DBContext{ID: "peerstore", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to initialize peer store: %w", err)
|
||||
return nil, func() error { return nil }, fmt.Errorf("unable to initialize peer store: %w", err)
|
||||
}
|
||||
|
||||
peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create peer manager: %w", err)
|
||||
return nil, peerDB.Close, fmt.Errorf("failed to create peer manager: %w", err)
|
||||
}
|
||||
|
||||
for _, peer := range peers {
|
||||
if _, err := peerManager.Add(peer); err != nil {
|
||||
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
|
||||
return nil, peerDB.Close, fmt.Errorf("failed to add peer %q: %w", peer, err)
|
||||
}
|
||||
}
|
||||
|
||||
return peerManager, nil
|
||||
return peerManager, peerDB.Close, nil
|
||||
}
|
||||
|
||||
func createRouter(
|
||||
|
||||
Reference in New Issue
Block a user