diff --git a/inspect/inspect.go b/inspect/inspect.go index d8db2081e..c43244633 100644 --- a/inspect/inspect.go +++ b/inspect/inspect.go @@ -30,16 +30,24 @@ type Inspect struct { rpcConfig *cfg.RPCConfig - logger log.Logger + indexerService *indexer.Service + eventBus *types.EventBus + logger log.Logger } // New constructs a new Inspect from the passed in parameters. func New(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store, eventSinks []indexer.EventSink, logger log.Logger) *Inspect { routes := inspect_rpc.Routes(stateStore, blockStore, eventSinks) + eventBus := types.NewEventBus() + eventBus.SetLogger(logger.With("module", "events")) + indexerService := indexer.NewIndexerService(eventSinks, eventBus) + indexerService.SetLogger(logger.With("module", "txindex")) return &Inspect{ - routes: routes, - rpcConfig: rpcConfig, - logger: logger, + routes: routes, + rpcConfig: rpcConfig, + logger: logger, + eventBus: eventBus, + indexerService: indexerService, } } @@ -80,6 +88,17 @@ func NewDefault() (*Inspect, error) { // Run starts the Inspect servers and blocks until the servers shut down. The passed // in context is used to control the lifecycle of the servers. func (inspect *Inspect) Run(ctx context.Context) error { + err := inspect.eventBus.Start() + if err != nil { + return err + } + defer inspect.eventBus.Stop() + + err = inspect.indexerService.Start() + if err != nil { + return err + } + defer inspect.indexerService.Stop() return startRPCServers(ctx, inspect.rpcConfig, inspect.logger, inspect.routes) } diff --git a/inspect/rpc/rpc.go b/inspect/rpc/rpc.go index 51559b410..891155896 100644 --- a/inspect/rpc/rpc.go +++ b/inspect/rpc/rpc.go @@ -97,7 +97,7 @@ func (srv *Server) ListenAndServe(ctx context.Context) error { <-ctx.Done() listener.Close() }() - return rpcservesrv.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config)) + return rpcserver.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config)) } // ListenAndServeTLS listens on the address specified in srv.Addr. ListenAndServeTLS handles diff --git a/node/node.go b/node/node.go index ced8af729..2dca7b82c 100644 --- a/node/node.go +++ b/node/node.go @@ -849,7 +849,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) { rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger) listener, err := rpcserver.Listen( listenAddr, - config, + config.MaxOpenConnections, ) if err != nil { return nil, err @@ -907,7 +907,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) { if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second } - listener, err := rpcserver.Listen(grpcListenAddr, config) + listener, err := rpcserver.Listen(grpcListenAddr, config.MaxOpenConnections) if err != nil { return nil, err } diff --git a/node/setup.go b/node/setup.go index af48fb382..fb88846f5 100644 --- a/node/setup.go +++ b/node/setup.go @@ -8,7 +8,6 @@ import ( "math" "net" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port - "strings" "time" dbm "github.com/tendermint/tm-db" @@ -33,9 +32,7 @@ import ( "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" - kv "github.com/tendermint/tendermint/state/indexer/sink/kv" - null "github.com/tendermint/tendermint/state/indexer/sink/null" - psql "github.com/tendermint/tendermint/state/indexer/sink/psql" + "github.com/tendermint/tendermint/state/indexer/sink" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -78,58 +75,10 @@ func createAndStartIndexerService( logger log.Logger, chainID string, ) (*indexer.Service, []indexer.EventSink, error) { - - eventSinks := []indexer.EventSink{} - - // check for duplicated sinks - sinks := map[string]bool{} - for _, s := range config.TxIndex.Indexer { - sl := strings.ToLower(s) - if sinks[sl] { - return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") - } - - sinks[sl] = true + eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID) + if err != nil { + return nil, nil, err } - -loop: - for k := range sinks { - switch k { - case string(indexer.NULL): - // When we see null in the config, the eventsinks will be reset with the - // nullEventSink. - eventSinks = []indexer.EventSink{null.NewEventSink()} - break loop - - case string(indexer.KV): - store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config}) - if err != nil { - return nil, nil, err - } - - eventSinks = append(eventSinks, kv.NewEventSink(store)) - - case string(indexer.PSQL): - conn := config.TxIndex.PsqlConn - if conn == "" { - return nil, nil, errors.New("the psql connection settings cannot be empty") - } - - es, _, err := psql.NewEventSink(conn, chainID) - if err != nil { - return nil, nil, err - } - eventSinks = append(eventSinks, es) - - default: - return nil, nil, errors.New("unsupported event sink type") - } - } - - if len(eventSinks) == 0 { - eventSinks = []indexer.EventSink{null.NewEventSink()} - } - indexerService := indexer.NewIndexerService(eventSinks, eventBus) indexerService.SetLogger(logger.With("module", "txindex"))