diff --git a/node/debug.go b/node/debug.go index 91c6305c8..f926dc799 100644 --- a/node/debug.go +++ b/node/debug.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/libs/service" rpccore "github.com/tendermint/tendermint/rpc/core" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" ) @@ -21,8 +22,7 @@ import ( type Debug struct { service.BaseService - blockStore sm.BlockStore - stateStore sm.Store + routes rpccore.RoutesMap rpcConfig *cfg.RPCConfig listeners []net.Listener @@ -38,16 +38,24 @@ func NewDebugFromConfig(config *cfg.Config) (*Debug, error) { if err != nil { return nil, err } + genDocFunc := defaultGenesisDocProviderFunc(config) + genDoc, err := genDocFunc() + if err != nil { + return nil, err + } + sinks, err := indexerSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID) + if err != nil { + return nil, err + } stateStore := sm.NewStore(stateDB) - - return NewDebug(config.RPC, blockStore, stateStore), nil + return NewDebug(config.RPC, blockStore, stateStore, sinks), nil } -func NewDebug(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store) *Debug { +func NewDebug(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store, eventSinks []indexer.EventSink) *Debug { + routes := rpccore.DebugRoutes(stateStore, blockStore, eventSinks) return &Debug{ - blockStore: blockStore, - stateStore: stateStore, - rpcConfig: rpcConfig, + routes: routes, + rpcConfig: rpcConfig, } } @@ -55,18 +63,14 @@ func NewDefaultDebug() (*Debug, error) { config := cfg.Config{ BaseConfig: cfg.DefaultBaseConfig(), RPC: cfg.DefaultRPCConfig(), + TxIndex: cfg.DefaultTxIndexConfig(), } return NewDebugFromConfig(&config) } func (debug *Debug) OnStart() error { - rpcCoreEnv := rpccore.Environment{ - StateStore: debug.stateStore, - BlockStore: debug.blockStore, - } - routes := rpcCoreEnv.InfoRoutes() l := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false) - listeners, err := startRPCServers(debug.rpcConfig, l, routes, types.NopEventBus{}) + listeners, err := startRPCServers(debug.rpcConfig, l, debug.routes, &types.NopEventBus{}) if err != nil { return err } diff --git a/node/debug_test.go b/node/debug_test.go index 11a810fc7..35077b91f 100644 --- a/node/debug_test.go +++ b/node/debug_test.go @@ -2,24 +2,31 @@ package node_test import ( "context" + "fmt" + "os" "testing" "github.com/fortytw2/leaktest" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + abci_types "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/node" http_client "github.com/tendermint/tendermint/rpc/client/http" + "github.com/tendermint/tendermint/state/indexer" + indexer_mocks "github.com/tendermint/tendermint/state/indexer/mocks" state_mocks "github.com/tendermint/tendermint/state/mocks" "github.com/tendermint/tendermint/types" ) func TestDebugConstructor(t *testing.T) { + config := cfg.ResetTestRoot("test") t.Cleanup(leaktest.Check(t)) + defer func() { _ = os.RemoveAll(config.RootDir) }() t.Run("from config", func(t *testing.T) { - d, err := node.NewDebugFromConfig(&config.Config{ - BaseConfig: config.TestBaseConfig(), - RPC: config.TestRPCConfig(), - }) + d, err := node.NewDebugFromConfig(config) require.NoError(t, err) require.NotNil(t, d) @@ -29,12 +36,11 @@ func TestDebugConstructor(t *testing.T) { } func TestDebugRun(t *testing.T) { + config := cfg.ResetTestRoot("test") t.Cleanup(leaktest.Check(t)) + defer func() { _ = os.RemoveAll(config.RootDir) }() t.Run("from config", func(t *testing.T) { - d, err := node.NewDebugFromConfig(&config.Config{ - BaseConfig: config.TestBaseConfig(), - RPC: config.TestRPCConfig(), - }) + d, err := node.NewDebugFromConfig(config) require.NoError(t, err) err = d.OnStart() require.NoError(t, err) @@ -55,9 +61,10 @@ func TestDebugServeInfoRPC(t *testing.T) { blockStoreMock.On("Base").Return(int64(0)) blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}) blockStoreMock.On("LoadBlock", testHeight).Return(testBlock) + eventSinkMock := &indexer_mocks.EventSink{} rpcConfig := config.TestRPCConfig() - d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock) + d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}) require.NoError(t, d.OnStart()) cli, err := http_client.New(rpcConfig.ListenAddress) require.NoError(t, err) @@ -71,3 +78,37 @@ func TestDebugServeInfoRPC(t *testing.T) { blockStoreMock.AssertExpectations(t) stateStoreMock.AssertExpectations(t) } + +func TestDebugTxSearch(t *testing.T) { + testHash := []byte("test") + testTx := []byte("tx") + testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash)) + testTxResult := &abci_types.TxResult{ + Height: 1, + Index: 100, + Tx: testTx, + } + + stateStoreMock := &state_mocks.Store{} + blockStoreMock := &state_mocks.BlockStore{} + eventSinkMock := &indexer_mocks.EventSink{} + eventSinkMock.On("Type").Return(indexer.KV) + eventSinkMock.On("SearchTxEvents", mock.Anything, mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })). + Return([]*abci_types.TxResult{testTxResult}, nil) + + rpcConfig := config.TestRPCConfig() + d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}) + require.NoError(t, d.OnStart()) + cli, err := http_client.New(rpcConfig.ListenAddress) + require.NoError(t, err) + + var page int = 1 + resultTxSearch, err := cli.TxSearch(context.Background(), testQuery, false, &page, &page, "") + require.NoError(t, err) + require.Len(t, resultTxSearch.Txs, 1) + require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx) + + d.OnStop() + + eventSinkMock.AssertExpectations(t) +} diff --git a/node/setup.go b/node/setup.go index 0739282c6..60824cd03 100644 --- a/node/setup.go +++ b/node/setup.go @@ -97,44 +97,10 @@ func createAndStartIndexerService( sinks[sl] = true } -loop: - for k := range sinks { - switch k { - case string(indexer.NULL): - // When we see null in the config, the eventsinks will be reset with the - // nullEventSink. - eventSinks = []indexer.EventSink{null.NewEventSink()} - break loop - - case string(indexer.KV): - store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config}) - if err != nil { - return nil, nil, err - } - - eventSinks = append(eventSinks, kv.NewEventSink(store)) - - case string(indexer.PSQL): - conn := config.TxIndex.PsqlConn - if conn == "" { - return nil, nil, errors.New("the psql connection settings cannot be empty") - } - - es, _, err := psql.NewEventSink(conn, chainID) - if err != nil { - return nil, nil, err - } - eventSinks = append(eventSinks, es) - - default: - return nil, nil, errors.New("unsupported event sink type") - } + eventSinks, err := indexerSinksFromConfig(config, dbProvider, chainID) + if err != nil { + return nil, nil, err } - - if len(eventSinks) == 0 { - eventSinks = []indexer.EventSink{null.NewEventSink()} - } - indexerService := indexer.NewIndexerService(eventSinks, eventBus) indexerService.SetLogger(logger.With("module", "txindex")) @@ -145,6 +111,57 @@ loop: return indexerService, eventSinks, nil } +func indexerSinksFromConfig(config *cfg.Config, dbProvider cfg.DBProvider, chainID string) ([]indexer.EventSink, error) { + if len(config.TxIndex.Indexer) == 0 { + return []indexer.EventSink{null.NewEventSink()}, nil + } + + // check for duplicated sinks + sinks := map[string]struct{}{} + for _, s := range config.TxIndex.Indexer { + sl := strings.ToLower(s) + if _, ok := sinks[sl]; ok { + return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml") + } + sinks[sl] = struct{}{} + } + + eventSinks := []indexer.EventSink{} + for k := range sinks { + switch k { + case string(indexer.NULL): + // When we see null in the config, the eventsinks will be reset with the + // nullEventSink. + return []indexer.EventSink{null.NewEventSink()}, nil + + case string(indexer.KV): + store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config}) + if err != nil { + return nil, err + } + + eventSinks = append(eventSinks, kv.NewEventSink(store)) + + case string(indexer.PSQL): + conn := config.TxIndex.PsqlConn + if conn == "" { + return nil, errors.New("the psql connection settings cannot be empty") + } + + es, _, err := psql.NewEventSink(conn, chainID) + if err != nil { + return nil, err + } + + eventSinks = append(eventSinks, es) + + default: + return nil, errors.New("unsupported event sink type") + } + } + return eventSinks, nil +} + func doHandshake( stateStore sm.Store, state sm.State, diff --git a/rpc/core/routes.go b/rpc/core/routes.go index d150b949f..11c0edcfd 100644 --- a/rpc/core/routes.go +++ b/rpc/core/routes.go @@ -2,6 +2,8 @@ package core import ( rpc "github.com/tendermint/tendermint/rpc/jsonrpc/server" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" ) // TODO: better system than "unsafe" prefix @@ -95,3 +97,23 @@ func CombineRoutes(routesMaps ...RoutesMap) RoutesMap { } return res } + +func DebugRoutes(store sm.Store, blockStore sm.BlockStore, eventSinks []indexer.EventSink) RoutesMap { + env := &Environment{ + EventSinks: eventSinks, + StateStore: store, + BlockStore: blockStore, + } + return RoutesMap{ + "blockchain": rpc.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true), + "consensus_params": rpc.NewRPCFunc(env.ConsensusParams, "height", true), + "block": rpc.NewRPCFunc(env.Block, "height", true), + "block_by_hash": rpc.NewRPCFunc(env.BlockByHash, "hash", true), + "block_results": rpc.NewRPCFunc(env.BlockResults, "height", true), + "commit": rpc.NewRPCFunc(env.Commit, "height", true), + "validators": rpc.NewRPCFunc(env.Validators, "height,page,per_page", true), + "tx": rpc.NewRPCFunc(env.Tx, "hash,prove", true), + "tx_search": rpc.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false), + "block_search": rpc.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false), + } +}