From c37e4a43a2a91af519f4331e5b6870cc71f24297 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Mon, 9 Aug 2021 11:33:00 -0400 Subject: [PATCH] refactor debug code into separate package and rename to inspect --- cmd/tendermint/commands/inspect.go | 68 +++++++++ cmd/tendermint/main.go | 1 + inspect/inspect.go | 139 ++++++++++++++++++ node/debug_test.go => inspect/inspect_test.go | 63 +++++--- inspect/rpc/rpc.go | 119 +++++++++++++++ node/debug.go | 88 ----------- node/setup.go | 72 +-------- rpc/core/routes.go | 23 +-- state/indexer/sink/sink.go | 61 ++++++++ 9 files changed, 434 insertions(+), 200 deletions(-) create mode 100644 cmd/tendermint/commands/inspect.go create mode 100644 inspect/inspect.go rename node/debug_test.go => inspect/inspect_test.go (67%) create mode 100644 inspect/rpc/rpc.go delete mode 100644 node/debug.go create mode 100644 state/indexer/sink/sink.go diff --git a/cmd/tendermint/commands/inspect.go b/cmd/tendermint/commands/inspect.go new file mode 100644 index 000000000..fb43e501d --- /dev/null +++ b/cmd/tendermint/commands/inspect.go @@ -0,0 +1,68 @@ +package commands + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/spf13/cobra" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/inspect" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer/sink" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" +) + +// InspectCmd represents the base command when called without any subcommands +var InspectCmd = &cobra.Command{ + Use: "inspect", + Short: "Run an inspect server", + + RunE: runInspect, +} + +func init() { + InspectCmd.Flags().String("rpc.laddr", config.RPC.ListenAddress, "RPC listenener address. Port required") +} + +func runInspect(cmd *cobra.Command, args []string) error { + ctx, cancelFunc := context.WithCancel(context.Background()) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + go func() { + <-c + cancelFunc() + }() + + blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config}) + if err != nil { + return err + } + blockStore := store.NewBlockStore(blockStoreDB) + stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "statestore", Config: config}) + if err != nil { + return err + } + genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) + if err != nil { + return err + } + sinks, err := sink.EventSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID) + if err != nil { + return err + } + stateStore := state.NewStore(stateDB) + + d := inspect.New(config.RPC, blockStore, stateStore, sinks, logger) + + logger.Info("starting inspect server") + if err := d.Run(ctx); err != nil { + logger.Error("error encountered while running inspect server", "err", err) + return err + } + return nil +} diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index b40624cc3..810e35327 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -28,6 +28,7 @@ func main() { cmd.ShowNodeIDCmd, cmd.GenNodeKeyCmd, cmd.VersionCmd, + cmd.InspectCmd, debug.DebugCmd, cli.NewCompletionCmd(rootCmd, true), ) diff --git a/inspect/inspect.go b/inspect/inspect.go new file mode 100644 index 000000000..a84884083 --- /dev/null +++ b/inspect/inspect.go @@ -0,0 +1,139 @@ +package inspect + +import ( + "context" + "errors" + "net" + "sync" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/inspect/rpc" + inspect_rpc "github.com/tendermint/tendermint/inspect/rpc" + "github.com/tendermint/tendermint/libs/log" + tmstrings "github.com/tendermint/tendermint/libs/strings" + rpccore "github.com/tendermint/tendermint/rpc/core" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/state/indexer/sink" + "github.com/tendermint/tendermint/store" + "github.com/tendermint/tendermint/types" +) + +// Inspect manages an RPC service that exports methods to debug a failed node. +// After a node shuts down due to a consensus failure,, it will no longer start +// up and cannot easily be inspected. A Inspect value provides a similar interface +// to the node, using the underlying Tendermint data stores, without bringing up +// any other components. A caller can query the Inspect service to inspect the +// persisted state and debug the failure. +type Inspect struct { + routes rpccore.RoutesMap + + rpcConfig *cfg.RPCConfig + + logger log.Logger +} + +func NewFromConfig(config *cfg.Config) (*Inspect, error) { + blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config}) + if err != nil { + return nil, err + } + blockStore := store.NewBlockStore(blockStoreDB) + stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "statestore", Config: config}) + if err != nil { + return nil, err + } + genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) + if err != nil { + return nil, err + } + sinks, err := sink.EventSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID) + if err != nil { + return nil, err + } + l := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false) + stateStore := sm.NewStore(stateDB) + return New(config.RPC, blockStore, stateStore, sinks, l), nil +} + +func New(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store, eventSinks []indexer.EventSink, logger log.Logger) *Inspect { + routes := inspect_rpc.Routes(stateStore, blockStore, eventSinks) + return &Inspect{ + routes: routes, + rpcConfig: rpcConfig, + logger: logger, + } +} + +func NewDefault() (*Inspect, error) { + config := cfg.Config{ + BaseConfig: cfg.DefaultBaseConfig(), + RPC: cfg.DefaultRPCConfig(), + TxIndex: cfg.DefaultTxIndexConfig(), + } + return NewFromConfig(&config) +} + +func (inspect *Inspect) Run(ctx context.Context) error { + return startRPCServers(ctx, inspect.rpcConfig, inspect.logger, inspect.routes) +} + +func startRPCServers(ctx context.Context, rpcConfig *cfg.RPCConfig, logger log.Logger, routes rpccore.RoutesMap) error { + wg := &sync.WaitGroup{} + listenAddrs := tmstrings.SplitAndTrimEmpty(rpcConfig.ListenAddress, ",", " ") + rootHandler := inspect_rpc.Handler(rpcConfig, routes, logger) + errChan := make(chan error) + for _, listenerAddr := range listenAddrs { + server := rpc.Server{ + Logger: logger, + Config: rpcConfig, + Handler: rootHandler, + Addr: listenerAddr, + } + if rpcConfig.IsTLSEnabled() { + keyFile := rpcConfig.KeyFile() + certFile := rpcConfig.CertFile() + wg.Add(1) + go func() { + defer wg.Done() + logger.Info("RPC HTTPS server starting", "address", listenerAddr, + "certfile", certFile, "keyfile", keyFile) + err := server.ListenAndServeTLS(ctx, certFile, keyFile) + if !errors.Is(err, net.ErrClosed) { + logger.Error("RPC HTTPS server stopped with error", "address", listenerAddr, "err", err) + errChan <- err + return + } + logger.Info("RPC HTTPS server stopped", "address", listenerAddr) + }() + } else { + wg.Add(1) + go func() { + defer wg.Done() + logger.Info("RPC HTTP server starting", "address", listenerAddr) + err := server.ListenAndServe(ctx) + if !errors.Is(err, net.ErrClosed) { + logger.Error("RPC HTTP server stopped with error", "address", listenerAddr, "err", err) + errChan <- err + return + } + logger.Info("RPC HTTP server stopped", "address", listenerAddr) + }() + } + } + select { + case <-chanFromWG(wg): + return nil + case err := <-errChan: + return err + } +} + +func chanFromWG(wg *sync.WaitGroup) chan struct{} { + ch := make(chan struct{}) + go func() { + wg.Wait() + close(ch) + }() + return ch +} diff --git a/node/debug_test.go b/inspect/inspect_test.go similarity index 67% rename from node/debug_test.go rename to inspect/inspect_test.go index 35077b91f..377fb7f24 100644 --- a/node/debug_test.go +++ b/inspect/inspect_test.go @@ -1,9 +1,11 @@ -package node_test +package inspect_test import ( "context" "fmt" + "net" "os" + "strings" "testing" "github.com/fortytw2/leaktest" @@ -12,8 +14,9 @@ import ( abci_types "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/inspect" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/pubsub/query" - "github.com/tendermint/tendermint/node" http_client "github.com/tendermint/tendermint/rpc/client/http" "github.com/tendermint/tendermint/state/indexer" indexer_mocks "github.com/tendermint/tendermint/state/indexer/mocks" @@ -21,35 +24,33 @@ import ( "github.com/tendermint/tendermint/types" ) -func TestDebugConstructor(t *testing.T) { +func TestInspectConstructor(t *testing.T) { config := cfg.ResetTestRoot("test") t.Cleanup(leaktest.Check(t)) defer func() { _ = os.RemoveAll(config.RootDir) }() t.Run("from config", func(t *testing.T) { - d, err := node.NewDebugFromConfig(config) + d, err := inspect.NewFromConfig(config) require.NoError(t, err) require.NotNil(t, d) - - d.OnStop() }) } -func TestDebugRun(t *testing.T) { +func TestInspectRun(t *testing.T) { config := cfg.ResetTestRoot("test") t.Cleanup(leaktest.Check(t)) defer func() { _ = os.RemoveAll(config.RootDir) }() t.Run("from config", func(t *testing.T) { - d, err := node.NewDebugFromConfig(config) + d, err := inspect.NewFromConfig(config) require.NoError(t, err) - err = d.OnStart() - require.NoError(t, err) - d.OnStop() + ctx, cancel := context.WithCancel(context.Background()) + go d.Run(ctx) + cancel() }) } -func TestDebugServeInfoRPC(t *testing.T) { +func TestInspectServeInfoRPC(t *testing.T) { testHeight := int64(1) testBlock := new(types.Block) testBlock.Header.Height = testHeight @@ -64,22 +65,24 @@ func TestDebugServeInfoRPC(t *testing.T) { eventSinkMock := &indexer_mocks.EventSink{} rpcConfig := config.TestRPCConfig() - d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}) - require.NoError(t, d.OnStart()) + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + ctx, cancel := context.WithCancel(context.Background()) + go d.Run(ctx) + requireConnect(t, rpcConfig.ListenAddress, 15) cli, err := http_client.New(rpcConfig.ListenAddress) require.NoError(t, err) resultBlock, err := cli.Block(context.Background(), &testHeight) require.NoError(t, err) require.Equal(t, testBlock.Height, resultBlock.Block.Height) require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash) - - d.OnStop() + cancel() blockStoreMock.AssertExpectations(t) stateStoreMock.AssertExpectations(t) } -func TestDebugTxSearch(t *testing.T) { +func TestInspectTxSearch(t *testing.T) { testHash := []byte("test") testTx := []byte("tx") testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash)) @@ -97,8 +100,11 @@ func TestDebugTxSearch(t *testing.T) { Return([]*abci_types.TxResult{testTxResult}, nil) rpcConfig := config.TestRPCConfig() - d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}) - require.NoError(t, d.OnStart()) + l := log.TestingLogger() + d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l) + ctx, cancel := context.WithCancel(context.Background()) + go d.Run(ctx) + requireConnect(t, rpcConfig.ListenAddress, 15) cli, err := http_client.New(rpcConfig.ListenAddress) require.NoError(t, err) @@ -108,7 +114,24 @@ func TestDebugTxSearch(t *testing.T) { require.Len(t, resultTxSearch.Txs, 1) require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx) - d.OnStop() + cancel() eventSinkMock.AssertExpectations(t) } + +func requireConnect(t testing.TB, addr string, retries int) { + parts := strings.SplitN(addr, "://", 2) + if len(parts) != 2 { + t.Fatalf("malformed address to dial: %s", addr) + } + var err error + for i := 0; i < retries; i++ { + var conn net.Conn + conn, err = net.Dial(parts[0], parts[1]) + if err == nil { + conn.Close() + return + } + } + t.Fatalf("unable to connect to server %s after %d tries: %s", addr, retries, err) +} diff --git a/inspect/rpc/rpc.go b/inspect/rpc/rpc.go new file mode 100644 index 000000000..fb7dc36d1 --- /dev/null +++ b/inspect/rpc/rpc.go @@ -0,0 +1,119 @@ +package rpc + +import ( + "context" + "net/http" + "time" + + "github.com/rs/cors" + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/log" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + "github.com/tendermint/tendermint/rpc/core" + rpccore "github.com/tendermint/tendermint/rpc/core" + "github.com/tendermint/tendermint/rpc/jsonrpc/server" + rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server" + "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/types" +) + +type Server struct { + Addr string // TCP address to listen on, ":http" if empty + Handler http.Handler + Logger log.Logger + Config *config.RPCConfig +} + +func Routes(store state.Store, blockStore state.BlockStore, eventSinks []indexer.EventSink) rpccore.RoutesMap { + env := &core.Environment{ + EventSinks: eventSinks, + StateStore: store, + BlockStore: blockStore, + } + return rpccore.RoutesMap{ + "blockchain": rpcserver.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true), + "consensus_params": rpcserver.NewRPCFunc(env.ConsensusParams, "height", true), + "block": rpcserver.NewRPCFunc(env.Block, "height", true), + "block_by_hash": rpcserver.NewRPCFunc(env.BlockByHash, "hash", true), + "block_results": rpcserver.NewRPCFunc(env.BlockResults, "height", true), + "commit": rpcserver.NewRPCFunc(env.Commit, "height", true), + "validators": rpcserver.NewRPCFunc(env.Validators, "height,page,per_page", true), + "tx": rpcserver.NewRPCFunc(env.Tx, "hash,prove", true), + "tx_search": rpcserver.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false), + "block_search": rpcserver.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false), + } +} + +func Handler(rpcConfig *config.RPCConfig, routes rpccore.RoutesMap, logger log.Logger) http.Handler { + mux := http.NewServeMux() + wmLogger := logger.With("protocol", "websocket") + + var eventBus types.EventBusSubscriber + + websocketDisconnectFn := func(remoteAddr string) { + err := eventBus.UnsubscribeAll(context.Background(), remoteAddr) + if err != nil && err != tmpubsub.ErrSubscriptionNotFound { + wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) + } + } + wm := rpcserver.NewWebsocketManager(routes, + rpcserver.OnDisconnect(websocketDisconnectFn), + rpcserver.ReadLimit(rpcConfig.MaxBodyBytes)) + wm.SetLogger(wmLogger) + mux.HandleFunc("/websocket", wm.WebsocketHandler) + + rpcserver.RegisterRPCFuncs(mux, routes, logger) + var rootHandler http.Handler = mux + if rpcConfig.IsCorsEnabled() { + rootHandler = addCORSHandler(rpcConfig, mux) + } + return rootHandler +} + +func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler { + corsMiddleware := cors.New(cors.Options{ + AllowedOrigins: rpcConfig.CORSAllowedOrigins, + AllowedMethods: rpcConfig.CORSAllowedMethods, + AllowedHeaders: rpcConfig.CORSAllowedHeaders, + }) + h = corsMiddleware.Handler(h) + return h +} + +func (r *Server) ListenAndServe(ctx context.Context) error { + listener, err := rpcserver.Listen(r.Addr, r.Config.MaxOpenConnections) + if err != nil { + return err + } + go func() { + <-ctx.Done() + listener.Close() + }() + return rpcserver.Serve(listener, r.Handler, r.Logger, serverRPCConfig(r.Config)) +} + +func (r *Server) ListenAndServeTLS(ctx context.Context, certFile, keyFile string) error { + listener, err := rpcserver.Listen(r.Addr, r.Config.MaxOpenConnections) + if err != nil { + return err + } + go func() { + <-ctx.Done() + listener.Close() + }() + return rpcserver.ServeTLS(listener, r.Handler, certFile, keyFile, r.Logger, serverRPCConfig(r.Config)) +} + +func serverRPCConfig(r *config.RPCConfig) *server.Config { + cfg := rpcserver.DefaultConfig() + cfg.MaxBodyBytes = r.MaxBodyBytes + cfg.MaxHeaderBytes = r.MaxHeaderBytes + // If necessary adjust global WriteTimeout to ensure it's greater than + // TimeoutBroadcastTxCommit. + // See https://github.com/tendermint/tendermint/issues/3435 + if cfg.WriteTimeout <= r.TimeoutBroadcastTxCommit { + cfg.WriteTimeout = r.TimeoutBroadcastTxCommit + 1*time.Second + } + return cfg +} diff --git a/node/debug.go b/node/debug.go deleted file mode 100644 index f926dc799..000000000 --- a/node/debug.go +++ /dev/null @@ -1,88 +0,0 @@ -package node - -import ( - "net" - - cfg "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/libs/service" - rpccore "github.com/tendermint/tendermint/rpc/core" - sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/state/indexer" - "github.com/tendermint/tendermint/store" - "github.com/tendermint/tendermint/types" -) - -// Debug manages an RPC service that exports methods to debug a failed node. -// After a node shuts down due to a consensus failure,, it will no longer start -// up and cannot easily be inspected. A Debug value provides a similar interface -// to the node, using the underlying Tendermint data stores, without bringing up -// any other components. A caller can query the Debug service to inspect the -// persisted state and debug the failure. -type Debug struct { - service.BaseService - - routes rpccore.RoutesMap - - rpcConfig *cfg.RPCConfig - listeners []net.Listener -} - -func NewDebugFromConfig(config *cfg.Config) (*Debug, error) { - blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: _blockStoreID, Config: config}) - if err != nil { - return nil, err - } - blockStore := store.NewBlockStore(blockStoreDB) - stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: _stateStoreID, Config: config}) - if err != nil { - return nil, err - } - genDocFunc := defaultGenesisDocProviderFunc(config) - genDoc, err := genDocFunc() - if err != nil { - return nil, err - } - sinks, err := indexerSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID) - if err != nil { - return nil, err - } - stateStore := sm.NewStore(stateDB) - return NewDebug(config.RPC, blockStore, stateStore, sinks), nil -} - -func NewDebug(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store, eventSinks []indexer.EventSink) *Debug { - routes := rpccore.DebugRoutes(stateStore, blockStore, eventSinks) - return &Debug{ - routes: routes, - rpcConfig: rpcConfig, - } -} - -func NewDefaultDebug() (*Debug, error) { - config := cfg.Config{ - BaseConfig: cfg.DefaultBaseConfig(), - RPC: cfg.DefaultRPCConfig(), - TxIndex: cfg.DefaultTxIndexConfig(), - } - return NewDebugFromConfig(&config) -} - -func (debug *Debug) OnStart() error { - l := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false) - listeners, err := startRPCServers(debug.rpcConfig, l, debug.routes, &types.NopEventBus{}) - if err != nil { - return err - } - debug.listeners = listeners - return nil -} - -func (debug *Debug) OnStop() { - for i := len(debug.listeners) - 1; i >= 0; i-- { - err := debug.listeners[i].Close() - if err != nil { - debug.Logger.Error("Error stopping debug rpc listener", "err", err) - } - } -} diff --git a/node/setup.go b/node/setup.go index 60824cd03..bbc92a4ab 100644 --- a/node/setup.go +++ b/node/setup.go @@ -8,7 +8,6 @@ import ( "math" "net" _ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port - "strings" "time" dbm "github.com/tendermint/tm-db" @@ -33,9 +32,7 @@ import ( "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/indexer" - kv "github.com/tendermint/tendermint/state/indexer/sink/kv" - null "github.com/tendermint/tendermint/state/indexer/sink/null" - psql "github.com/tendermint/tendermint/state/indexer/sink/psql" + "github.com/tendermint/tendermint/state/indexer/sink" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/version" @@ -83,21 +80,7 @@ func createAndStartIndexerService( logger log.Logger, chainID string, ) (*indexer.Service, []indexer.EventSink, error) { - - eventSinks := []indexer.EventSink{} - - // check for duplicated sinks - sinks := map[string]bool{} - for _, s := range config.TxIndex.Indexer { - sl := strings.ToLower(s) - if sinks[sl] { - return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") - } - - sinks[sl] = true - } - - eventSinks, err := indexerSinksFromConfig(config, dbProvider, chainID) + eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID) if err != nil { return nil, nil, err } @@ -111,57 +94,6 @@ func createAndStartIndexerService( return indexerService, eventSinks, nil } -func indexerSinksFromConfig(config *cfg.Config, dbProvider cfg.DBProvider, chainID string) ([]indexer.EventSink, error) { - if len(config.TxIndex.Indexer) == 0 { - return []indexer.EventSink{null.NewEventSink()}, nil - } - - // check for duplicated sinks - sinks := map[string]struct{}{} - for _, s := range config.TxIndex.Indexer { - sl := strings.ToLower(s) - if _, ok := sinks[sl]; ok { - return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") - } - sinks[sl] = struct{}{} - } - - eventSinks := []indexer.EventSink{} - for k := range sinks { - switch k { - case string(indexer.NULL): - // When we see null in the config, the eventsinks will be reset with the - // nullEventSink. - return []indexer.EventSink{null.NewEventSink()}, nil - - case string(indexer.KV): - store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config}) - if err != nil { - return nil, err - } - - eventSinks = append(eventSinks, kv.NewEventSink(store)) - - case string(indexer.PSQL): - conn := config.TxIndex.PsqlConn - if conn == "" { - return nil, errors.New("the psql connection settings cannot be empty") - } - - es, _, err := psql.NewEventSink(conn, chainID) - if err != nil { - return nil, err - } - - eventSinks = append(eventSinks, es) - - default: - return nil, errors.New("unsupported event sink type") - } - } - return eventSinks, nil -} - func doHandshake( stateStore sm.Store, state sm.State, diff --git a/rpc/core/routes.go b/rpc/core/routes.go index 11c0edcfd..18afb7bf6 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -2,8 +2,6 @@ package core import ( rpc "github.com/tendermint/tendermint/rpc/jsonrpc/server" - sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/state/indexer" ) // TODO: better system than "unsafe" prefix @@ -88,6 +86,7 @@ func (env *Environment) UnsafeRoutes() RoutesMap { } } +// CombineRoutes takes a list of RoutesMaps and combines them into a single RoutesMap. func CombineRoutes(routesMaps ...RoutesMap) RoutesMap { res := RoutesMap{} for _, routesMap := range routesMaps { @@ -97,23 +96,3 @@ func CombineRoutes(routesMaps ...RoutesMap) RoutesMap { } return res } - -func DebugRoutes(store sm.Store, blockStore sm.BlockStore, eventSinks []indexer.EventSink) RoutesMap { - env := &Environment{ - EventSinks: eventSinks, - StateStore: store, - BlockStore: blockStore, - } - return RoutesMap{ - "blockchain": rpc.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true), - "consensus_params": rpc.NewRPCFunc(env.ConsensusParams, "height", true), - "block": rpc.NewRPCFunc(env.Block, "height", true), - "block_by_hash": rpc.NewRPCFunc(env.BlockByHash, "hash", true), - "block_results": rpc.NewRPCFunc(env.BlockResults, "height", true), - "commit": rpc.NewRPCFunc(env.Commit, "height", true), - "validators": rpc.NewRPCFunc(env.Validators, "height,page,per_page", true), - "tx": rpc.NewRPCFunc(env.Tx, "hash,prove", true), - "tx_search": rpc.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false), - "block_search": rpc.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false), - } -} diff --git a/state/indexer/sink/sink.go b/state/indexer/sink/sink.go new file mode 100644 index 000000000..96485f52a --- /dev/null +++ b/state/indexer/sink/sink.go @@ -0,0 +1,61 @@ +package sink + +import ( + "errors" + "strings" + + "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/state/indexer" + "github.com/tendermint/tendermint/state/indexer/sink/kv" + "github.com/tendermint/tendermint/state/indexer/sink/null" + "github.com/tendermint/tendermint/state/indexer/sink/psql" +) + +func EventSinksFromConfig(cfg *config.Config, dbProvider config.DBProvider, chainID string) ([]indexer.EventSink, error) { + if len(cfg.TxIndex.Indexer) == 0 { + return []indexer.EventSink{null.NewEventSink()}, nil + } + + // check for duplicated sinks + sinks := map[string]struct{}{} + for _, s := range cfg.TxIndex.Indexer { + sl := strings.ToLower(s) + if _, ok := sinks[sl]; ok { + return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") + } + sinks[sl] = struct{}{} + } + eventSinks := []indexer.EventSink{} + for k := range sinks { + switch k { + case string(indexer.NULL): + // When we see null in the config, the eventsinks will be reset with the + // nullEventSink. + return []indexer.EventSink{null.NewEventSink()}, nil + + case string(indexer.KV): + store, err := dbProvider(&config.DBContext{ID: "tx_index", Config: cfg}) + if err != nil { + return nil, err + } + + eventSinks = append(eventSinks, kv.NewEventSink(store)) + + case string(indexer.PSQL): + conn := cfg.TxIndex.PsqlConn + if conn == "" { + return nil, errors.New("the psql connection settings cannot be empty") + } + + es, _, err := psql.NewEventSink(conn, chainID) + if err != nil { + return nil, err + } + eventSinks = append(eventSinks, es) + default: + return nil, errors.New("unsupported event sink type") + } + } + return eventSinks, nil + +}