From 1f2ba5a87d3fe67204bbca7e087b7228ede11c6a Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 18 Aug 2021 17:47:30 -0400 Subject: [PATCH] cr feedback --- cmd/tendermint/commands/inspect.go | 4 +- inspect/inspect.go | 39 +++++++--------- inspect/inspect_test.go | 8 ++-- inspect/rpc/rpc.go | 16 +++++-- node/node.go | 24 ++++------ node/setup.go | 8 +++- rpc/core/env.go | 20 ++++---- rpc/core/status.go | 2 +- state/indexer/indexer_service.go | 75 ++++++++++++++++-------------- 9 files changed, 103 insertions(+), 93 deletions(-) diff --git a/cmd/tendermint/commands/inspect.go b/cmd/tendermint/commands/inspect.go index 73d0280ca..635478699 100644 --- a/cmd/tendermint/commands/inspect.go +++ b/cmd/tendermint/commands/inspect.go @@ -35,6 +35,8 @@ var InspectCmd = &cobra.Command{ func init() { InspectCmd.Flags().String("rpc.laddr", config.RPC.ListenAddress, "RPC listenener address. Port required") + InspectCmd.Flags().String("db-backend", config.DBBackend, "database backend: goleveldb | cleveldb | boltdb | rocksdb | badgerdb") + InspectCmd.Flags().String("db-dir", config.DBPath, "database directory") } func runInspect(cmd *cobra.Command, args []string) error { @@ -53,7 +55,7 @@ func runInspect(cmd *cobra.Command, args []string) error { return err } blockStore := store.NewBlockStore(blockStoreDB) - stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "statestore", Config: config}) + stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "state", Config: config}) if err != nil { if err := blockStoreDB.Close(); err != nil { logger.Error("error closing block store db", "error", err) diff --git a/inspect/inspect.go b/inspect/inspect.go index 713f72720..d3d0bc376 100644 --- a/inspect/inspect.go +++ b/inspect/inspect.go @@ -3,12 +3,12 @@ package inspect import ( "context" "errors" + "fmt" "net" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/inspect/rpc" "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/pubsub" tmstrings "github.com/tendermint/tendermint/libs/strings" rpccore "github.com/tendermint/tendermint/rpc/core" "github.com/tendermint/tendermint/state" @@ -36,11 +36,14 @@ type Inspect struct { logger log.Logger } -// New returns an Inspect that serves RPC on the given BlockStore and StateStore. +// New returns an Inspect that serves RPC on the specified BlockStore and StateStore. +// The Inspect type does not modify the state or block stores. +// The sinks are used to enable block and transaction querying via the RPC server. +// The caller is responsible for starting and stopping the Inspect service. /// //nolint:lll func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexer.EventSink, logger log.Logger) *Inspect { - routes := rpc.Routes(ss, bs, es) + routes := rpc.Routes(*cfg, ss, bs, es) eb := types.NewEventBus() eb.SetLogger(logger.With("module", "events")) is := indexer.NewIndexerService(es, eb) @@ -61,7 +64,7 @@ func NewFromConfig(cfg *config.Config) (*Inspect, error) { return nil, err } bs := store.NewBlockStore(bsDB) - sDB, err := config.DefaultDBProvider(&config.DBContext{ID: "statestore", Config: cfg}) + sDB, err := config.DefaultDBProvider(&config.DBContext{ID: "state", Config: cfg}) if err != nil { return nil, err } @@ -83,7 +86,7 @@ func NewFromConfig(cfg *config.Config) (*Inspect, error) { func (ins *Inspect) Run(ctx context.Context) error { err := ins.eventBus.Start() if err != nil { - return err + return fmt.Errorf("error starting event bus: %s", err) } defer func() { err := ins.eventBus.Stop() @@ -91,23 +94,17 @@ func (ins *Inspect) Run(ctx context.Context) error { ins.logger.Error("event bus stopped with error", "err", err) } }() - g, tctx := errgroup.WithContext(ctx) - g.Go(func() error { - if !errors.Is(ins.indexerService.Start(), pubsub.ErrUnsubscribed) { - return err - } - return nil - }) - g.Go(func() error { - return startRPCServers(tctx, ins.config, ins.logger, ins.routes) - }) - - <-tctx.Done() - err = ins.indexerService.Stop() + err = ins.indexerService.Start() if err != nil { - ins.logger.Error("event bus stopped with error", "err", err) + return fmt.Errorf("error starting indexer service: %s", err) } - return g.Wait() + defer func() { + err := ins.indexerService.Stop() + if err != nil { + ins.logger.Error("indexer service stopped with error", "err", err) + } + }() + return startRPCServers(ctx, ins.config, ins.logger, ins.routes) } func startRPCServers(ctx context.Context, cfg *config.RPCConfig, logger log.Logger, routes rpccore.RoutesMap) error { @@ -129,7 +126,6 @@ func startRPCServers(ctx context.Context, cfg *config.RPCConfig, logger log.Logg "certfile", certFile, "keyfile", keyFile) err := server.ListenAndServeTLS(tctx, certFile, keyFile) if !errors.Is(err, net.ErrClosed) { - logger.Error("RPC HTTPS server stopped with error", "address", listenerAddr, "err", err) return err } logger.Info("RPC HTTPS server stopped", "address", listenerAddr) @@ -140,7 +136,6 @@ func startRPCServers(ctx context.Context, cfg *config.RPCConfig, logger log.Logg logger.Info("RPC HTTP server starting", "address", listenerAddr) err := server.ListenAndServe(tctx) if !errors.Is(err, net.ErrClosed) { - logger.Error("RPC HTTP server stopped with error", "address", listenerAddr, "err", err) return err } logger.Info("RPC HTTP server stopped", "address", listenerAddr) diff --git a/inspect/inspect_test.go b/inspect/inspect_test.go index 592ff2560..153172d7e 100644 --- a/inspect/inspect_test.go +++ b/inspect/inspect_test.go @@ -45,14 +45,14 @@ func TestInspectRun(t *testing.T) { d, err := inspect.NewFromConfig(cfg) require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) - wg := &sync.WaitGroup{} - wg.Add(1) + stoppedWG := &sync.WaitGroup{} + stoppedWG.Add(1) go func() { - defer wg.Done() require.NoError(t, d.Run(ctx)) + stoppedWG.Done() }() cancel() - wg.Wait() + stoppedWG.Wait() }) } diff --git a/inspect/rpc/rpc.go b/inspect/rpc/rpc.go index 2066f8bcd..c33213838 100644 --- a/inspect/rpc/rpc.go +++ b/inspect/rpc/rpc.go @@ -26,11 +26,13 @@ type Server struct { } // Routes returns the set of routes used by the Inspect server. -func Routes(s state.Store, bs state.BlockStore, es []indexer.EventSink) core.RoutesMap { +func Routes(cfg config.RPCConfig, s state.Store, bs state.BlockStore, es []indexer.EventSink) core.RoutesMap { env := &core.Environment{ - EventSinks: es, - StateStore: s, - BlockStore: bs, + Config: cfg, + EventSinks: es, + StateStore: s, + BlockStore: bs, + WaitSyncChecker: waitSyncCheckerImpl{}, } return core.RoutesMap{ "blockchain": server.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true), @@ -85,6 +87,12 @@ func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler { return h } +type waitSyncCheckerImpl struct{} + +func (_ waitSyncCheckerImpl) WaitSync() bool { + return false +} + // ListenAndServe listens on the address specified in srv.Addr and handles any // incoming requests over HTTP using the Inspect rpc handler specified on the server. func (srv *Server) ListenAndServe(ctx context.Context) error { diff --git a/node/node.go b/node/node.go index cec117510..60a22fad0 100644 --- a/node/node.go +++ b/node/node.go @@ -24,7 +24,6 @@ import ( "github.com/tendermint/tendermint/internal/statesync" "github.com/tendermint/tendermint/libs/log" tmnet "github.com/tendermint/tendermint/libs/net" - "github.com/tendermint/tendermint/libs/pubsub" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/libs/strings" @@ -162,18 +161,11 @@ func makeNode(config *cfg.Config, return nil, err } - indexerService, eventSinks, err := createIndexerService(config, dbProvider, eventBus, logger, genDoc.ChainID) + indexerService, eventSinks, err := createAndStartIndexerService(config, dbProvider, eventBus, logger, genDoc.ChainID) if err != nil { return nil, err } - go func() { - err := indexerService.Start() - if !errors.Is(err, pubsub.ErrUnsubscribed) { - logger.Error("error starting indexer service", "error", err) - } - }() - // If an address is provided, listen on the socket for a connection from an // external signing process. if config.PrivValidator.ListenAddr != "" { @@ -454,15 +446,15 @@ func makeNode(config *cfg.Config, EvidencePool: evPool, ConsensusState: csState, P2PPeers: sw, + WaitSyncChecker: csReactor, BlockSyncReactor: bcReactor.(cs.BlockSyncReactor), - GenDoc: genDoc, - EventSinks: eventSinks, - ConsensusReactor: csReactor, - EventBus: eventBus, - Mempool: mp, - Logger: logger.With("module", "rpc"), - Config: *config.RPC, + GenDoc: genDoc, + EventSinks: eventSinks, + EventBus: eventBus, + Mempool: mp, + Logger: logger.With("module", "rpc"), + Config: *config.RPC, }, } diff --git a/node/setup.go b/node/setup.go index 8f9d11894..541763ff7 100644 --- a/node/setup.go +++ b/node/setup.go @@ -68,7 +68,7 @@ func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { return eventBus, nil } -func createIndexerService( +func createAndStartIndexerService( config *cfg.Config, dbProvider cfg.DBProvider, eventBus *types.EventBus, @@ -79,8 +79,14 @@ func createIndexerService( if err != nil { return nil, nil, err } + indexerService := indexer.NewIndexerService(eventSinks, eventBus) indexerService.SetLogger(logger.With("module", "txindex")) + + if err := indexerService.Start(); err != nil { + return nil, nil, err + } + return indexerService, eventSinks, nil } diff --git a/rpc/core/env.go b/rpc/core/env.go index eb7232c01..093d52fbd 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -58,6 +58,10 @@ type peers interface { Peers() p2p.IPeerSet } +type WaitSyncChecker interface { + WaitSync() bool +} + //---------------------------------------------- // Environment contains objects and interfaces used by the RPC. It is expected // to be setup once during startup. @@ -67,18 +71,18 @@ type Environment struct { ProxyAppMempool proxy.AppConnMempool // interfaces defined in types and above - StateStore sm.Store - BlockStore sm.BlockStore - EvidencePool sm.EvidencePool - ConsensusState Consensus - P2PPeers peers - P2PTransport transport + StateStore sm.Store + BlockStore sm.BlockStore + EvidencePool sm.EvidencePool + ConsensusState Consensus + WaitSyncChecker WaitSyncChecker + P2PPeers peers + P2PTransport transport // objects PubKey crypto.PubKey GenDoc *types.GenesisDoc // cache the genesis structure EventSinks []indexer.EventSink - ConsensusReactor *consensus.Reactor EventBus *types.EventBus // thread safe Mempool mempl.Mempool BlockSyncReactor consensus.BlockSyncReactor @@ -190,7 +194,7 @@ func (env *Environment) getHeight(latestHeight int64, heightPtr *int64) (int64, } func (env *Environment) latestUncommittedHeight() int64 { - nodeIsSyncing := env.ConsensusReactor.WaitSync() + nodeIsSyncing := env.WaitSyncChecker.WaitSync() if nodeIsSyncing { return env.BlockStore.Height() } diff --git a/rpc/core/status.go b/rpc/core/status.go index 815ab37f5..a21b15859 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -70,7 +70,7 @@ func (env *Environment) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, err EarliestBlockHeight: earliestBlockHeight, EarliestBlockTime: time.Unix(0, earliestBlockTimeNano), MaxPeerBlockHeight: env.BlockSyncReactor.GetMaxPeerBlockHeight(), - CatchingUp: env.ConsensusReactor.WaitSync(), + CatchingUp: env.WaitSyncChecker.WaitSync(), TotalSyncedTime: env.BlockSyncReactor.GetTotalSyncedTime(), RemainingTime: env.BlockSyncReactor.GetRemainingSyncTime(), }, diff --git a/state/indexer/indexer_service.go b/state/indexer/indexer_service.go index 1acfc9931..39a1847f8 100644 --- a/state/indexer/indexer_service.go +++ b/state/indexer/indexer_service.go @@ -49,52 +49,55 @@ func (is *Service) OnStart() error { return err } - for { - select { - case <-blockHeadersSub.Canceled(): - return blockHeadersSub.Err() - case msg := <-blockHeadersSub.Out(): + go func() { + for { + select { + case <-blockHeadersSub.Canceled(): + return + case msg := <-blockHeadersSub.Out(): - eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) - height := eventDataHeader.Header.Height - batch := NewBatch(eventDataHeader.NumTxs) + eventDataHeader := msg.Data().(types.EventDataNewBlockHeader) + height := eventDataHeader.Header.Height + batch := NewBatch(eventDataHeader.NumTxs) - for i := int64(0); i < eventDataHeader.NumTxs; i++ { - msg2 := <-txsSub.Out() - txResult := msg2.Data().(types.EventDataTx).TxResult + for i := int64(0); i < eventDataHeader.NumTxs; i++ { + msg2 := <-txsSub.Out() + txResult := msg2.Data().(types.EventDataTx).TxResult - if err = batch.Add(&txResult); err != nil { - is.Logger.Error( - "failed to add tx to batch", - "height", height, - "index", txResult.Index, - "err", err, - ) - } - } - - if !IndexingEnabled(is.eventSinks) { - continue - } - - for _, sink := range is.eventSinks { - if err := sink.IndexBlockEvents(eventDataHeader); err != nil { - is.Logger.Error("failed to index block", "height", height, "err", err) - } else { - is.Logger.Debug("indexed block", "height", height, "sink", sink.Type()) + if err = batch.Add(&txResult); err != nil { + is.Logger.Error( + "failed to add tx to batch", + "height", height, + "index", txResult.Index, + "err", err, + ) + } } - if len(batch.Ops) > 0 { - err := sink.IndexTxEvents(batch.Ops) - if err != nil { - is.Logger.Error("failed to index block txs", "height", height, "err", err) + if !IndexingEnabled(is.eventSinks) { + continue + } + + for _, sink := range is.eventSinks { + if err := sink.IndexBlockEvents(eventDataHeader); err != nil { + is.Logger.Error("failed to index block", "height", height, "err", err) } else { - is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type()) + is.Logger.Debug("indexed block", "height", height, "sink", sink.Type()) + } + + if len(batch.Ops) > 0 { + err := sink.IndexTxEvents(batch.Ops) + if err != nil { + is.Logger.Error("failed to index block txs", "height", height, "err", err) + } else { + is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type()) + } } } } } - } + }() + return nil } // OnStop implements service.Service by unsubscribing from all transactions and