add indexer service to inspect node

This commit is contained in:
William Banfield
2021-08-12 10:47:07 -04:00
parent 6c7dbec283
commit 45eedbc087
4 changed files with 30 additions and 62 deletions

View File

@@ -30,16 +30,24 @@ type Inspect struct {
rpcConfig *cfg.RPCConfig
logger log.Logger
indexerService *indexer.Service
eventBus *types.EventBus
logger log.Logger
}
// New constructs a new Inspect from the passed in parameters.
func New(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store, eventSinks []indexer.EventSink, logger log.Logger) *Inspect {
routes := inspect_rpc.Routes(stateStore, blockStore, eventSinks)
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
return &Inspect{
routes: routes,
rpcConfig: rpcConfig,
logger: logger,
routes: routes,
rpcConfig: rpcConfig,
logger: logger,
eventBus: eventBus,
indexerService: indexerService,
}
}
@@ -80,6 +88,17 @@ func NewDefault() (*Inspect, error) {
// Run starts the Inspect servers and blocks until the servers shut down. The passed
// in context is used to control the lifecycle of the servers.
func (inspect *Inspect) Run(ctx context.Context) error {
err := inspect.eventBus.Start()
if err != nil {
return err
}
defer inspect.eventBus.Stop()
err = inspect.indexerService.Start()
if err != nil {
return err
}
defer inspect.indexerService.Stop()
return startRPCServers(ctx, inspect.rpcConfig, inspect.logger, inspect.routes)
}

View File

@@ -97,7 +97,7 @@ func (srv *Server) ListenAndServe(ctx context.Context) error {
<-ctx.Done()
listener.Close()
}()
return rpcservesrv.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config))
return rpcserver.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config))
}
// ListenAndServeTLS listens on the address specified in srv.Addr. ListenAndServeTLS handles

View File

@@ -849,7 +849,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger)
listener, err := rpcserver.Listen(
listenAddr,
config,
config.MaxOpenConnections,
)
if err != nil {
return nil, err
@@ -907,7 +907,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
listener, err := rpcserver.Listen(grpcListenAddr, config)
listener, err := rpcserver.Listen(grpcListenAddr, config.MaxOpenConnections)
if err != nil {
return nil, err
}

View File

@@ -8,7 +8,6 @@ import (
"math"
"net"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"strings"
"time"
dbm "github.com/tendermint/tm-db"
@@ -33,9 +32,7 @@ import (
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
null "github.com/tendermint/tendermint/state/indexer/sink/null"
psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/state/indexer/sink"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
@@ -78,58 +75,10 @@ func createAndStartIndexerService(
logger log.Logger,
chainID string,
) (*indexer.Service, []indexer.EventSink, error) {
eventSinks := []indexer.EventSink{}
// check for duplicated sinks
sinks := map[string]bool{}
for _, s := range config.TxIndex.Indexer {
sl := strings.ToLower(s)
if sinks[sl] {
return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
}
sinks[sl] = true
eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID)
if err != nil {
return nil, nil, err
}
loop:
for k := range sinks {
switch k {
case string(indexer.NULL):
// When we see null in the config, the eventsinks will be reset with the
// nullEventSink.
eventSinks = []indexer.EventSink{null.NewEventSink()}
break loop
case string(indexer.KV):
store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
if err != nil {
return nil, nil, err
}
eventSinks = append(eventSinks, kv.NewEventSink(store))
case string(indexer.PSQL):
conn := config.TxIndex.PsqlConn
if conn == "" {
return nil, nil, errors.New("the psql connection settings cannot be empty")
}
es, _, err := psql.NewEventSink(conn, chainID)
if err != nil {
return nil, nil, err
}
eventSinks = append(eventSinks, es)
default:
return nil, nil, errors.New("unsupported event sink type")
}
}
if len(eventSinks) == 0 {
eventSinks = []indexer.EventSink{null.NewEventSink()}
}
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))