diff --git a/internal/state/indexer/sink/kv/kv.go b/internal/state/indexer/sink/kv/kv.go index 4c471b4d3..fe7068a1b 100644 --- a/internal/state/indexer/sink/kv/kv.go +++ b/internal/state/indexer/sink/kv/kv.go @@ -18,14 +18,16 @@ var _ indexer.EventSink = (*EventSink)(nil) // The EventSink is an aggregator for redirecting the call path of the tx/block kvIndexer. // For the implementation details please see the kv.go in the indexer/block and indexer/tx folder. type EventSink struct { - txi *kvt.TxIndex - bi *kvb.BlockerIndexer + txi *kvt.TxIndex + bi *kvb.BlockerIndexer + store dbm.DB } func NewEventSink(store dbm.DB) indexer.EventSink { return &EventSink{ - txi: kvt.NewTxIndex(store), - bi: kvb.New(store), + txi: kvt.NewTxIndex(store), + bi: kvb.New(store), + store: store, } } @@ -58,5 +60,5 @@ func (kves *EventSink) HasBlock(h int64) (bool, error) { } func (kves *EventSink) Stop() error { - return nil + return kves.store.Close() } diff --git a/node/node.go b/node/node.go index 17e37bbcc..01a89c784 100644 --- a/node/node.go +++ b/node/node.go @@ -65,6 +65,7 @@ type nodeImpl struct { // services eventBus *types.EventBus // pub/sub for services + eventSinks []indexer.EventSink stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk bcReactor service.Service // for block-syncing @@ -76,6 +77,7 @@ type nodeImpl struct { pexReactor service.Service // for exchanging peer addresses evidenceReactor service.Service rpcListeners []net.Listener // rpc servers + shutdownOps closer indexerService service.Service rpcEnv *rpccore.Environment prometheusSrv *http.Server @@ -109,6 +111,7 @@ func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, err } appClient, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir()) + return makeNode(cfg, pval, nodeKey, @@ -126,27 +129,34 @@ func makeNode(cfg *config.Config, clientCreator abciclient.Creator, genesisDocProvider genesisDocProvider, dbProvider config.DBProvider, - logger log.Logger) (service.Service, error) { + logger log.Logger, +) (service.Service, error) { + closers := []closer{} - blockStore, stateDB, err := initDBs(cfg, dbProvider) + blockStore, stateDB, dbCloser, err := initDBs(cfg, dbProvider) if err != nil { - return nil, err + return nil, combineCloseError(err, dbCloser) } + closers = append(closers, dbCloser) + stateStore := sm.NewStore(stateDB) genDoc, err := genesisDocProvider() if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) } err = genDoc.ValidateAndComplete() if err != nil { - return nil, fmt.Errorf("error in genesis doc: %w", err) + return nil, combineCloseError( + fmt.Errorf("error in genesis doc: %w", err), + makeCloser(closers)) } state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID) @@ -154,7 +164,8 @@ func makeNode(cfg *config.Config, // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, nodeMetrics.proxy) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } // EventBus and IndexerService must be started before the handshake because @@ -163,13 +174,14 @@ func makeNode(cfg *config.Config, // but before it indexed the txs, or, endblocker panicked) eventBus, err := createAndStartEventBus(logger) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID, nodeMetrics.indexer) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) } // If an address is provided, listen on the socket for a connection from an @@ -181,12 +193,16 @@ func makeNode(cfg *config.Config, case "grpc": privValidator, err = createAndStartPrivValidatorGRPCClient(cfg, genDoc.ChainID, logger) if err != nil { - return nil, fmt.Errorf("error with private validator grpc client: %w", err) + return nil, combineCloseError( + fmt.Errorf("error with private validator grpc client: %w", err), + makeCloser(closers)) } default: privValidator, err = createAndStartPrivValidatorSocketClient(cfg.PrivValidator.ListenAddr, genDoc.ChainID, logger) if err != nil { - return nil, fmt.Errorf("error with private validator socket client: %w", err) + return nil, combineCloseError( + fmt.Errorf("error with private validator socket client: %w", err), + makeCloser(closers)) } } } @@ -194,10 +210,14 @@ func makeNode(cfg *config.Config, if cfg.Mode == config.ModeValidator { pubKey, err = privValidator.GetPubKey(context.TODO()) if err != nil { - return nil, fmt.Errorf("can't get pubkey: %w", err) + return nil, combineCloseError(fmt.Errorf("can't get pubkey: %w", err), + makeCloser(closers)) + } if pubKey == nil { - return nil, errors.New("could not retrieve public key from private validator") + return nil, combineCloseError( + errors.New("could not retrieve public key from private validator"), + makeCloser(closers)) } } @@ -213,7 +233,8 @@ func makeNode(cfg *config.Config, consensusLogger := logger.With("module", "consensus") if !stateSync { if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } // Reload the state. It will have the Version.Consensus.App set by the @@ -221,7 +242,9 @@ func makeNode(cfg *config.Config, // what happened during block replay). state, err = stateStore.Load() if err != nil { - return nil, fmt.Errorf("cannot load state: %w", err) + return nil, combineCloseError( + fmt.Errorf("cannot load state: %w", err), + makeCloser(closers)) } } @@ -235,35 +258,43 @@ func makeNode(cfg *config.Config, // TODO: Use a persistent peer database. nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } p2pLogger := logger.With("module", "p2p") transport := createTransport(p2pLogger, cfg) - peerManager, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID) + peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID) + closers = append(closers, peerCloser) if err != nil { - return nil, fmt.Errorf("failed to create peer manager: %w", err) + return nil, combineCloseError( + fmt.Errorf("failed to create peer manager: %w", err), + makeCloser(closers)) } router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey, peerManager, transport, getRouterConfig(cfg, proxyApp)) if err != nil { - return nil, fmt.Errorf("failed to create router: %w", err) + return nil, combineCloseError( + fmt.Errorf("failed to create router: %w", err), + makeCloser(closers)) } mpReactorShim, mpReactor, mp, err := createMempoolReactor( cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger, ) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } evReactorShim, evReactor, evPool, err := createEvidenceReactor( cfg, dbProvider, stateDB, blockStore, peerManager, router, logger, ) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) + } // make block executor for consensus and blockchain reactors to execute blocks @@ -290,7 +321,9 @@ func makeNode(cfg *config.Config, peerManager, router, blockSync && !stateSync, nodeMetrics.consensus, ) if err != nil { - return nil, fmt.Errorf("could not create blockchain reactor: %w", err) + return nil, combineCloseError( + fmt.Errorf("could not create blockchain reactor: %w", err), + makeCloser(closers)) } // TODO: Remove this once the switch is removed. @@ -390,17 +423,20 @@ func makeNode(cfg *config.Config, err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ")) if err != nil { - return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) + err = fmt.Errorf("could not add peers from persistent-peers field: %w", err) + return nil, combineCloseError(err, makeCloser(closers)) } err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ")) if err != nil { - return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + err = fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + return nil, combineCloseError(err, makeCloser(closers)) } addrBook, err = createAddrBookAndSetOnSwitch(cfg, sw, p2pLogger, nodeKey) if err != nil { - return nil, fmt.Errorf("could not create addrbook: %w", err) + err = fmt.Errorf("could not create addrbook: %w", err) + return nil, combineCloseError(err, makeCloser(closers)) } if cfg.P2P.PexReactor { @@ -411,7 +447,7 @@ func makeNode(cfg *config.Config, if cfg.P2P.PexReactor { pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router) if err != nil { - return nil, err + return nil, combineCloseError(err, makeCloser(closers)) } } } @@ -447,6 +483,9 @@ func makeNode(cfg *config.Config, evidenceReactor: evReactor, indexerService: indexerService, eventBus: eventBus, + eventSinks: eventSinks, + + shutdownOps: makeCloser(closers), rpcEnv: &rpccore.Environment{ ProxyAppQuery: proxyApp.Query(), @@ -509,6 +548,7 @@ func makeSeedNode(cfg *config.Config, state, err := sm.MakeGenesisState(genDoc) if err != nil { return nil, err + } nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state) @@ -521,15 +561,19 @@ func makeSeedNode(cfg *config.Config, p2pLogger := logger.With("module", "p2p") transport := createTransport(p2pLogger, cfg) - peerManager, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID) + peerManager, closer, err := createPeerManager(cfg, dbProvider, p2pLogger, nodeKey.ID) if err != nil { - return nil, fmt.Errorf("failed to create peer manager: %w", err) + return nil, combineCloseError( + fmt.Errorf("failed to create peer manager: %w", err), + closer) } router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey, peerManager, transport, getRouterConfig(cfg, nil)) if err != nil { - return nil, fmt.Errorf("failed to create router: %w", err) + return nil, combineCloseError( + fmt.Errorf("failed to create router: %w", err), + closer) } var ( @@ -553,17 +597,20 @@ func makeSeedNode(cfg *config.Config, err = sw.AddPersistentPeers(strings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ")) if err != nil { - return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) + err = fmt.Errorf("could not add peers from persistent_peers field: %w", err) + return nil, combineCloseError(err, closer) } err = sw.AddUnconditionalPeerIDs(strings.SplitAndTrimEmpty(cfg.P2P.UnconditionalPeerIDs, ",", " ")) if err != nil { - return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + err = fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) + return nil, combineCloseError(err, closer) } addrBook, err = createAddrBookAndSetOnSwitch(cfg, sw, p2pLogger, nodeKey) if err != nil { - return nil, fmt.Errorf("could not create addrbook: %w", err) + err = fmt.Errorf("could not create addrbook: %w", err) + return nil, combineCloseError(err, closer) } if cfg.P2P.PexReactor { @@ -573,7 +620,7 @@ func makeSeedNode(cfg *config.Config, if cfg.P2P.PexReactor { pexReactor, err = createPEXReactorV2(cfg, logger, peerManager, router) if err != nil { - return nil, err + return nil, combineCloseError(err, closer) } } } @@ -597,6 +644,8 @@ func makeSeedNode(cfg *config.Config, peerManager: peerManager, router: router, + shutdownOps: closer, + pexReactor: pexReactor, } node.BaseService = *service.NewBaseService(logger, "SeedNode", node) @@ -773,6 +822,12 @@ func (n *nodeImpl) OnStop() { n.Logger.Error("Error closing indexerService", "err", err) } + for _, es := range n.eventSinks { + if err := es.Stop(); err != nil { + n.Logger.Error("failed to stop event sink", "err", err) + } + } + if n.config.Mode != config.ModeSeed { // now stop the reactors if n.config.BlockSync.Version == config.BlockSyncV0 { @@ -842,6 +897,10 @@ func (n *nodeImpl) OnStop() { // Error from closing listeners, or context timeout: n.Logger.Error("Prometheus HTTP server Shutdown", "err", err) } + + } + if err := n.shutdownOps(); err != nil { + n.Logger.Error("problem shutting down additional services", "err", err) } if n.blockStore != nil { if err := n.blockStore.Close(); err != nil { diff --git a/node/node_test.go b/node/node_test.go index b3c651cba..305f9b873 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -31,6 +31,7 @@ import ( "github.com/tendermint/tendermint/internal/test/factory" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" + "github.com/tendermint/tendermint/libs/service" tmtime "github.com/tendermint/tendermint/libs/time" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/types" @@ -535,6 +536,22 @@ func TestNodeSetEventSink(t *testing.T) { t.Cleanup(func() { require.NoError(t, indexService.Stop()) }) return eventSinks } + cleanup := func(ns service.Service) func() { + return func() { + n, ok := ns.(*nodeImpl) + if !ok { + return + } + if n == nil { + return + } + if !n.IsRunning() { + return + } + assert.NoError(t, n.Stop()) + n.Wait() + } + } eventSinks := setupTest(t, cfg) assert.Equal(t, 1, len(eventSinks)) @@ -555,7 +572,8 @@ func TestNodeSetEventSink(t *testing.T) { cfg.TxIndex.Indexer = []string{"kvv"} ns, err := newDefaultNode(cfg, logger) assert.Nil(t, ns) - assert.Equal(t, errors.New("unsupported event sink type"), err) + assert.Contains(t, err.Error(), "unsupported event sink type") + t.Cleanup(cleanup(ns)) cfg.TxIndex.Indexer = []string{} eventSinks = setupTest(t, cfg) @@ -566,7 +584,8 @@ func TestNodeSetEventSink(t *testing.T) { cfg.TxIndex.Indexer = []string{"psql"} ns, err = newDefaultNode(cfg, logger) assert.Nil(t, ns) - assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err) + assert.Contains(t, err.Error(), "the psql connection settings cannot be empty") + t.Cleanup(cleanup(ns)) // N.B. We can't create a PSQL event sink without starting a postgres // instance for it to talk to. The indexer service tests exercise that case. @@ -575,12 +594,14 @@ func TestNodeSetEventSink(t *testing.T) { cfg.TxIndex.Indexer = []string{"null", "kv", "Kv"} _, err = newDefaultNode(cfg, logger) require.Error(t, err) - assert.Equal(t, e, err) + assert.Contains(t, err.Error(), e.Error()) + t.Cleanup(cleanup(ns)) cfg.TxIndex.Indexer = []string{"Null", "kV", "kv", "nUlL"} _, err = newDefaultNode(cfg, logger) require.Error(t, err) - assert.Equal(t, e, err) + assert.Contains(t, err.Error(), e.Error()) + t.Cleanup(cleanup(ns)) } func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) { diff --git a/node/setup.go b/node/setup.go index 7e319db1e..dd0949b93 100644 --- a/node/setup.go +++ b/node/setup.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "net" + "strings" "time" dbm "github.com/tendermint/tm-db" @@ -40,20 +41,57 @@ import ( _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port ) -func initDBs(cfg *config.Config, dbProvider config.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { //nolint:lll - var blockStoreDB dbm.DB - blockStoreDB, err = dbProvider(&config.DBContext{ID: "blockstore", Config: cfg}) - if err != nil { - return nil, nil, fmt.Errorf("unable to initialize blockstore: %w", err) - } - blockStore = store.NewBlockStore(blockStoreDB) +type closer func() error - stateDB, err = dbProvider(&config.DBContext{ID: "state", Config: cfg}) - if err != nil { - return nil, nil, fmt.Errorf("unable to initialize statestore: %w", err) +func makeCloser(cs []closer) closer { + return func() error { + errs := make([]string, 0, len(cs)) + for _, cl := range cs { + if err := cl(); err != nil { + errs = append(errs, err.Error()) + } + } + if len(errs) >= 0 { + return errors.New(strings.Join(errs, "; ")) + } + return nil + } +} + +func combineCloseError(err error, cl closer) error { + if err == nil { + return cl() } - return blockStore, stateDB, nil + clerr := cl() + if clerr == nil { + return err + } + + return fmt.Errorf("error=%q closerError=%q", err.Error(), clerr.Error()) +} + +func initDBs( + cfg *config.Config, + dbProvider config.DBProvider, +) (*store.BlockStore, dbm.DB, closer, error) { + + blockStoreDB, err := dbProvider(&config.DBContext{ID: "blockstore", Config: cfg}) + if err != nil { + return nil, nil, func() error { return nil }, fmt.Errorf("unable to initialize blockstore: %w", err) + } + closers := []closer{} + blockStore := store.NewBlockStore(blockStoreDB) + closers = append(closers, blockStoreDB.Close) + + stateDB, err := dbProvider(&config.DBContext{ID: "state", Config: cfg}) + if err != nil { + return nil, nil, makeCloser(closers), fmt.Errorf("unable to initialize statestore: %w", err) + } + + closers = append(closers, stateDB.Close) + + return blockStore, stateDB, makeCloser(closers), nil } // nolint:lll @@ -421,11 +459,11 @@ func createPeerManager( dbProvider config.DBProvider, p2pLogger log.Logger, nodeID types.NodeID, -) (*p2p.PeerManager, error) { +) (*p2p.PeerManager, closer, error) { selfAddr, err := p2p.ParseNodeAddress(nodeID.AddressString(cfg.P2P.ExternalAddress)) if err != nil { - return nil, fmt.Errorf("couldn't parse ExternalAddress %q: %w", cfg.P2P.ExternalAddress, err) + return nil, func() error { return nil }, fmt.Errorf("couldn't parse ExternalAddress %q: %w", cfg.P2P.ExternalAddress, err) } var maxConns uint16 @@ -437,7 +475,7 @@ func createPeerManager( case cfg.P2P.MaxNumInboundPeers > 0 && cfg.P2P.MaxNumOutboundPeers > 0: x := cfg.P2P.MaxNumInboundPeers + cfg.P2P.MaxNumOutboundPeers if x > math.MaxUint16 { - return nil, fmt.Errorf( + return nil, func() error { return nil }, fmt.Errorf( "max inbound peers (%d) + max outbound peers (%d) exceeds maximum (%d)", cfg.P2P.MaxNumInboundPeers, cfg.P2P.MaxNumOutboundPeers, @@ -472,7 +510,7 @@ func createPeerManager( for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ") { address, err := p2p.ParseNodeAddress(p) if err != nil { - return nil, fmt.Errorf("invalid peer address %q: %w", p, err) + return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err) } peers = append(peers, address) @@ -482,28 +520,28 @@ func createPeerManager( for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BootstrapPeers, ",", " ") { address, err := p2p.ParseNodeAddress(p) if err != nil { - return nil, fmt.Errorf("invalid peer address %q: %w", p, err) + return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err) } peers = append(peers, address) } peerDB, err := dbProvider(&config.DBContext{ID: "peerstore", Config: cfg}) if err != nil { - return nil, fmt.Errorf("unable to initialize peer store: %w", err) + return nil, func() error { return nil }, fmt.Errorf("unable to initialize peer store: %w", err) } peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options) if err != nil { - return nil, fmt.Errorf("failed to create peer manager: %w", err) + return nil, peerDB.Close, fmt.Errorf("failed to create peer manager: %w", err) } for _, peer := range peers { if _, err := peerManager.Add(peer); err != nil { - return nil, fmt.Errorf("failed to add peer %q: %w", peer, err) + return nil, peerDB.Close, fmt.Errorf("failed to add peer %q: %w", peer, err) } } - return peerManager, nil + return peerManager, peerDB.Close, nil } func createRouter(