mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-04 02:52:07 +00:00
add tx indexer routes
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
rpccore "github.com/tendermint/tendermint/rpc/core"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -21,8 +22,7 @@ import (
|
||||
type Debug struct {
|
||||
service.BaseService
|
||||
|
||||
blockStore sm.BlockStore
|
||||
stateStore sm.Store
|
||||
routes rpccore.RoutesMap
|
||||
|
||||
rpcConfig *cfg.RPCConfig
|
||||
listeners []net.Listener
|
||||
@@ -38,16 +38,24 @@ func NewDebugFromConfig(config *cfg.Config) (*Debug, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
genDocFunc := defaultGenesisDocProviderFunc(config)
|
||||
genDoc, err := genDocFunc()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sinks, err := indexerSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
|
||||
return NewDebug(config.RPC, blockStore, stateStore), nil
|
||||
return NewDebug(config.RPC, blockStore, stateStore, sinks), nil
|
||||
}
|
||||
|
||||
func NewDebug(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store) *Debug {
|
||||
func NewDebug(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store, eventSinks []indexer.EventSink) *Debug {
|
||||
routes := rpccore.DebugRoutes(stateStore, blockStore, eventSinks)
|
||||
return &Debug{
|
||||
blockStore: blockStore,
|
||||
stateStore: stateStore,
|
||||
rpcConfig: rpcConfig,
|
||||
routes: routes,
|
||||
rpcConfig: rpcConfig,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -55,18 +63,14 @@ func NewDefaultDebug() (*Debug, error) {
|
||||
config := cfg.Config{
|
||||
BaseConfig: cfg.DefaultBaseConfig(),
|
||||
RPC: cfg.DefaultRPCConfig(),
|
||||
TxIndex: cfg.DefaultTxIndexConfig(),
|
||||
}
|
||||
return NewDebugFromConfig(&config)
|
||||
}
|
||||
|
||||
func (debug *Debug) OnStart() error {
|
||||
rpcCoreEnv := rpccore.Environment{
|
||||
StateStore: debug.stateStore,
|
||||
BlockStore: debug.blockStore,
|
||||
}
|
||||
routes := rpcCoreEnv.InfoRoutes()
|
||||
l := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false)
|
||||
listeners, err := startRPCServers(debug.rpcConfig, l, routes, types.NopEventBus{})
|
||||
listeners, err := startRPCServers(debug.rpcConfig, l, debug.routes, &types.NopEventBus{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2,24 +2,31 @@ package node_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/fortytw2/leaktest"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
abci_types "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
"github.com/tendermint/tendermint/node"
|
||||
http_client "github.com/tendermint/tendermint/rpc/client/http"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
indexer_mocks "github.com/tendermint/tendermint/state/indexer/mocks"
|
||||
state_mocks "github.com/tendermint/tendermint/state/mocks"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func TestDebugConstructor(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("test")
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
defer func() { _ = os.RemoveAll(config.RootDir) }()
|
||||
t.Run("from config", func(t *testing.T) {
|
||||
d, err := node.NewDebugFromConfig(&config.Config{
|
||||
BaseConfig: config.TestBaseConfig(),
|
||||
RPC: config.TestRPCConfig(),
|
||||
})
|
||||
d, err := node.NewDebugFromConfig(config)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, d)
|
||||
|
||||
@@ -29,12 +36,11 @@ func TestDebugConstructor(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDebugRun(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("test")
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
defer func() { _ = os.RemoveAll(config.RootDir) }()
|
||||
t.Run("from config", func(t *testing.T) {
|
||||
d, err := node.NewDebugFromConfig(&config.Config{
|
||||
BaseConfig: config.TestBaseConfig(),
|
||||
RPC: config.TestRPCConfig(),
|
||||
})
|
||||
d, err := node.NewDebugFromConfig(config)
|
||||
require.NoError(t, err)
|
||||
err = d.OnStart()
|
||||
require.NoError(t, err)
|
||||
@@ -55,9 +61,10 @@ func TestDebugServeInfoRPC(t *testing.T) {
|
||||
blockStoreMock.On("Base").Return(int64(0))
|
||||
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{})
|
||||
blockStoreMock.On("LoadBlock", testHeight).Return(testBlock)
|
||||
eventSinkMock := &indexer_mocks.EventSink{}
|
||||
|
||||
rpcConfig := config.TestRPCConfig()
|
||||
d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock)
|
||||
d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock})
|
||||
require.NoError(t, d.OnStart())
|
||||
cli, err := http_client.New(rpcConfig.ListenAddress)
|
||||
require.NoError(t, err)
|
||||
@@ -71,3 +78,37 @@ func TestDebugServeInfoRPC(t *testing.T) {
|
||||
blockStoreMock.AssertExpectations(t)
|
||||
stateStoreMock.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestDebugTxSearch(t *testing.T) {
|
||||
testHash := []byte("test")
|
||||
testTx := []byte("tx")
|
||||
testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash))
|
||||
testTxResult := &abci_types.TxResult{
|
||||
Height: 1,
|
||||
Index: 100,
|
||||
Tx: testTx,
|
||||
}
|
||||
|
||||
stateStoreMock := &state_mocks.Store{}
|
||||
blockStoreMock := &state_mocks.BlockStore{}
|
||||
eventSinkMock := &indexer_mocks.EventSink{}
|
||||
eventSinkMock.On("Type").Return(indexer.KV)
|
||||
eventSinkMock.On("SearchTxEvents", mock.Anything, mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })).
|
||||
Return([]*abci_types.TxResult{testTxResult}, nil)
|
||||
|
||||
rpcConfig := config.TestRPCConfig()
|
||||
d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock})
|
||||
require.NoError(t, d.OnStart())
|
||||
cli, err := http_client.New(rpcConfig.ListenAddress)
|
||||
require.NoError(t, err)
|
||||
|
||||
var page int = 1
|
||||
resultTxSearch, err := cli.TxSearch(context.Background(), testQuery, false, &page, &page, "")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, resultTxSearch.Txs, 1)
|
||||
require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx)
|
||||
|
||||
d.OnStop()
|
||||
|
||||
eventSinkMock.AssertExpectations(t)
|
||||
}
|
||||
|
||||
@@ -97,44 +97,10 @@ func createAndStartIndexerService(
|
||||
sinks[sl] = true
|
||||
}
|
||||
|
||||
loop:
|
||||
for k := range sinks {
|
||||
switch k {
|
||||
case string(indexer.NULL):
|
||||
// When we see null in the config, the eventsinks will be reset with the
|
||||
// nullEventSink.
|
||||
eventSinks = []indexer.EventSink{null.NewEventSink()}
|
||||
break loop
|
||||
|
||||
case string(indexer.KV):
|
||||
store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
eventSinks = append(eventSinks, kv.NewEventSink(store))
|
||||
|
||||
case string(indexer.PSQL):
|
||||
conn := config.TxIndex.PsqlConn
|
||||
if conn == "" {
|
||||
return nil, nil, errors.New("the psql connection settings cannot be empty")
|
||||
}
|
||||
|
||||
es, _, err := psql.NewEventSink(conn, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
eventSinks = append(eventSinks, es)
|
||||
|
||||
default:
|
||||
return nil, nil, errors.New("unsupported event sink type")
|
||||
}
|
||||
eventSinks, err := indexerSinksFromConfig(config, dbProvider, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if len(eventSinks) == 0 {
|
||||
eventSinks = []indexer.EventSink{null.NewEventSink()}
|
||||
}
|
||||
|
||||
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
|
||||
indexerService.SetLogger(logger.With("module", "txindex"))
|
||||
|
||||
@@ -145,6 +111,57 @@ loop:
|
||||
return indexerService, eventSinks, nil
|
||||
}
|
||||
|
||||
func indexerSinksFromConfig(config *cfg.Config, dbProvider cfg.DBProvider, chainID string) ([]indexer.EventSink, error) {
|
||||
if len(config.TxIndex.Indexer) == 0 {
|
||||
return []indexer.EventSink{null.NewEventSink()}, nil
|
||||
}
|
||||
|
||||
// check for duplicated sinks
|
||||
sinks := map[string]struct{}{}
|
||||
for _, s := range config.TxIndex.Indexer {
|
||||
sl := strings.ToLower(s)
|
||||
if _, ok := sinks[sl]; ok {
|
||||
return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
|
||||
}
|
||||
sinks[sl] = struct{}{}
|
||||
}
|
||||
|
||||
eventSinks := []indexer.EventSink{}
|
||||
for k := range sinks {
|
||||
switch k {
|
||||
case string(indexer.NULL):
|
||||
// When we see null in the config, the eventsinks will be reset with the
|
||||
// nullEventSink.
|
||||
return []indexer.EventSink{null.NewEventSink()}, nil
|
||||
|
||||
case string(indexer.KV):
|
||||
store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eventSinks = append(eventSinks, kv.NewEventSink(store))
|
||||
|
||||
case string(indexer.PSQL):
|
||||
conn := config.TxIndex.PsqlConn
|
||||
if conn == "" {
|
||||
return nil, errors.New("the psql connection settings cannot be empty")
|
||||
}
|
||||
|
||||
es, _, err := psql.NewEventSink(conn, chainID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eventSinks = append(eventSinks, es)
|
||||
|
||||
default:
|
||||
return nil, errors.New("unsupported event sink type")
|
||||
}
|
||||
}
|
||||
return eventSinks, nil
|
||||
}
|
||||
|
||||
func doHandshake(
|
||||
stateStore sm.Store,
|
||||
state sm.State,
|
||||
|
||||
Reference in New Issue
Block a user