refactor debug code into separate package and rename to inspect

This commit is contained in:
William Banfield
2021-08-09 11:33:00 -04:00
parent c3561f1c03
commit c37e4a43a2
9 changed files with 434 additions and 200 deletions

View File

@@ -0,0 +1,68 @@
package commands
import (
"context"
"os"
"os/signal"
"syscall"
"github.com/spf13/cobra"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/inspect"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer/sink"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
// InspectCmd represents the base command when called without any subcommands
var InspectCmd = &cobra.Command{
Use: "inspect",
Short: "Run an inspect server",
RunE: runInspect,
}
func init() {
InspectCmd.Flags().String("rpc.laddr", config.RPC.ListenAddress, "RPC listenener address. Port required")
}
func runInspect(cmd *cobra.Command, args []string) error {
ctx, cancelFunc := context.WithCancel(context.Background())
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-c
cancelFunc()
}()
blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config})
if err != nil {
return err
}
blockStore := store.NewBlockStore(blockStoreDB)
stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "statestore", Config: config})
if err != nil {
return err
}
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
if err != nil {
return err
}
sinks, err := sink.EventSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID)
if err != nil {
return err
}
stateStore := state.NewStore(stateDB)
d := inspect.New(config.RPC, blockStore, stateStore, sinks, logger)
logger.Info("starting inspect server")
if err := d.Run(ctx); err != nil {
logger.Error("error encountered while running inspect server", "err", err)
return err
}
return nil
}

View File

@@ -28,6 +28,7 @@ func main() {
cmd.ShowNodeIDCmd,
cmd.GenNodeKeyCmd,
cmd.VersionCmd,
cmd.InspectCmd,
debug.DebugCmd,
cli.NewCompletionCmd(rootCmd, true),
)

139
inspect/inspect.go Normal file
View File

@@ -0,0 +1,139 @@
package inspect
import (
"context"
"errors"
"net"
"sync"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/inspect/rpc"
inspect_rpc "github.com/tendermint/tendermint/inspect/rpc"
"github.com/tendermint/tendermint/libs/log"
tmstrings "github.com/tendermint/tendermint/libs/strings"
rpccore "github.com/tendermint/tendermint/rpc/core"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/indexer/sink"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
// Inspect manages an RPC service that exports methods to debug a failed node.
// After a node shuts down due to a consensus failure,, it will no longer start
// up and cannot easily be inspected. A Inspect value provides a similar interface
// to the node, using the underlying Tendermint data stores, without bringing up
// any other components. A caller can query the Inspect service to inspect the
// persisted state and debug the failure.
type Inspect struct {
routes rpccore.RoutesMap
rpcConfig *cfg.RPCConfig
logger log.Logger
}
func NewFromConfig(config *cfg.Config) (*Inspect, error) {
blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config})
if err != nil {
return nil, err
}
blockStore := store.NewBlockStore(blockStoreDB)
stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "statestore", Config: config})
if err != nil {
return nil, err
}
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
if err != nil {
return nil, err
}
sinks, err := sink.EventSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID)
if err != nil {
return nil, err
}
l := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false)
stateStore := sm.NewStore(stateDB)
return New(config.RPC, blockStore, stateStore, sinks, l), nil
}
func New(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store, eventSinks []indexer.EventSink, logger log.Logger) *Inspect {
routes := inspect_rpc.Routes(stateStore, blockStore, eventSinks)
return &Inspect{
routes: routes,
rpcConfig: rpcConfig,
logger: logger,
}
}
func NewDefault() (*Inspect, error) {
config := cfg.Config{
BaseConfig: cfg.DefaultBaseConfig(),
RPC: cfg.DefaultRPCConfig(),
TxIndex: cfg.DefaultTxIndexConfig(),
}
return NewFromConfig(&config)
}
func (inspect *Inspect) Run(ctx context.Context) error {
return startRPCServers(ctx, inspect.rpcConfig, inspect.logger, inspect.routes)
}
func startRPCServers(ctx context.Context, rpcConfig *cfg.RPCConfig, logger log.Logger, routes rpccore.RoutesMap) error {
wg := &sync.WaitGroup{}
listenAddrs := tmstrings.SplitAndTrimEmpty(rpcConfig.ListenAddress, ",", " ")
rootHandler := inspect_rpc.Handler(rpcConfig, routes, logger)
errChan := make(chan error)
for _, listenerAddr := range listenAddrs {
server := rpc.Server{
Logger: logger,
Config: rpcConfig,
Handler: rootHandler,
Addr: listenerAddr,
}
if rpcConfig.IsTLSEnabled() {
keyFile := rpcConfig.KeyFile()
certFile := rpcConfig.CertFile()
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("RPC HTTPS server starting", "address", listenerAddr,
"certfile", certFile, "keyfile", keyFile)
err := server.ListenAndServeTLS(ctx, certFile, keyFile)
if !errors.Is(err, net.ErrClosed) {
logger.Error("RPC HTTPS server stopped with error", "address", listenerAddr, "err", err)
errChan <- err
return
}
logger.Info("RPC HTTPS server stopped", "address", listenerAddr)
}()
} else {
wg.Add(1)
go func() {
defer wg.Done()
logger.Info("RPC HTTP server starting", "address", listenerAddr)
err := server.ListenAndServe(ctx)
if !errors.Is(err, net.ErrClosed) {
logger.Error("RPC HTTP server stopped with error", "address", listenerAddr, "err", err)
errChan <- err
return
}
logger.Info("RPC HTTP server stopped", "address", listenerAddr)
}()
}
}
select {
case <-chanFromWG(wg):
return nil
case err := <-errChan:
return err
}
}
func chanFromWG(wg *sync.WaitGroup) chan struct{} {
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
return ch
}

View File

@@ -1,9 +1,11 @@
package node_test
package inspect_test
import (
"context"
"fmt"
"net"
"os"
"strings"
"testing"
"github.com/fortytw2/leaktest"
@@ -12,8 +14,9 @@ import (
abci_types "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/inspect"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/node"
http_client "github.com/tendermint/tendermint/rpc/client/http"
"github.com/tendermint/tendermint/state/indexer"
indexer_mocks "github.com/tendermint/tendermint/state/indexer/mocks"
@@ -21,35 +24,33 @@ import (
"github.com/tendermint/tendermint/types"
)
func TestDebugConstructor(t *testing.T) {
func TestInspectConstructor(t *testing.T) {
config := cfg.ResetTestRoot("test")
t.Cleanup(leaktest.Check(t))
defer func() { _ = os.RemoveAll(config.RootDir) }()
t.Run("from config", func(t *testing.T) {
d, err := node.NewDebugFromConfig(config)
d, err := inspect.NewFromConfig(config)
require.NoError(t, err)
require.NotNil(t, d)
d.OnStop()
})
}
func TestDebugRun(t *testing.T) {
func TestInspectRun(t *testing.T) {
config := cfg.ResetTestRoot("test")
t.Cleanup(leaktest.Check(t))
defer func() { _ = os.RemoveAll(config.RootDir) }()
t.Run("from config", func(t *testing.T) {
d, err := node.NewDebugFromConfig(config)
d, err := inspect.NewFromConfig(config)
require.NoError(t, err)
err = d.OnStart()
require.NoError(t, err)
d.OnStop()
ctx, cancel := context.WithCancel(context.Background())
go d.Run(ctx)
cancel()
})
}
func TestDebugServeInfoRPC(t *testing.T) {
func TestInspectServeInfoRPC(t *testing.T) {
testHeight := int64(1)
testBlock := new(types.Block)
testBlock.Header.Height = testHeight
@@ -64,22 +65,24 @@ func TestDebugServeInfoRPC(t *testing.T) {
eventSinkMock := &indexer_mocks.EventSink{}
rpcConfig := config.TestRPCConfig()
d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock})
require.NoError(t, d.OnStart())
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
go d.Run(ctx)
requireConnect(t, rpcConfig.ListenAddress, 15)
cli, err := http_client.New(rpcConfig.ListenAddress)
require.NoError(t, err)
resultBlock, err := cli.Block(context.Background(), &testHeight)
require.NoError(t, err)
require.Equal(t, testBlock.Height, resultBlock.Block.Height)
require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash)
d.OnStop()
cancel()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestDebugTxSearch(t *testing.T) {
func TestInspectTxSearch(t *testing.T) {
testHash := []byte("test")
testTx := []byte("tx")
testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash))
@@ -97,8 +100,11 @@ func TestDebugTxSearch(t *testing.T) {
Return([]*abci_types.TxResult{testTxResult}, nil)
rpcConfig := config.TestRPCConfig()
d := node.NewDebug(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock})
require.NoError(t, d.OnStart())
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
ctx, cancel := context.WithCancel(context.Background())
go d.Run(ctx)
requireConnect(t, rpcConfig.ListenAddress, 15)
cli, err := http_client.New(rpcConfig.ListenAddress)
require.NoError(t, err)
@@ -108,7 +114,24 @@ func TestDebugTxSearch(t *testing.T) {
require.Len(t, resultTxSearch.Txs, 1)
require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx)
d.OnStop()
cancel()
eventSinkMock.AssertExpectations(t)
}
func requireConnect(t testing.TB, addr string, retries int) {
parts := strings.SplitN(addr, "://", 2)
if len(parts) != 2 {
t.Fatalf("malformed address to dial: %s", addr)
}
var err error
for i := 0; i < retries; i++ {
var conn net.Conn
conn, err = net.Dial(parts[0], parts[1])
if err == nil {
conn.Close()
return
}
}
t.Fatalf("unable to connect to server %s after %d tries: %s", addr, retries, err)
}

119
inspect/rpc/rpc.go Normal file
View File

@@ -0,0 +1,119 @@
package rpc
import (
"context"
"net/http"
"time"
"github.com/rs/cors"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/rpc/core"
rpccore "github.com/tendermint/tendermint/rpc/core"
"github.com/tendermint/tendermint/rpc/jsonrpc/server"
rpcserver "github.com/tendermint/tendermint/rpc/jsonrpc/server"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/types"
)
type Server struct {
Addr string // TCP address to listen on, ":http" if empty
Handler http.Handler
Logger log.Logger
Config *config.RPCConfig
}
func Routes(store state.Store, blockStore state.BlockStore, eventSinks []indexer.EventSink) rpccore.RoutesMap {
env := &core.Environment{
EventSinks: eventSinks,
StateStore: store,
BlockStore: blockStore,
}
return rpccore.RoutesMap{
"blockchain": rpcserver.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true),
"consensus_params": rpcserver.NewRPCFunc(env.ConsensusParams, "height", true),
"block": rpcserver.NewRPCFunc(env.Block, "height", true),
"block_by_hash": rpcserver.NewRPCFunc(env.BlockByHash, "hash", true),
"block_results": rpcserver.NewRPCFunc(env.BlockResults, "height", true),
"commit": rpcserver.NewRPCFunc(env.Commit, "height", true),
"validators": rpcserver.NewRPCFunc(env.Validators, "height,page,per_page", true),
"tx": rpcserver.NewRPCFunc(env.Tx, "hash,prove", true),
"tx_search": rpcserver.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false),
"block_search": rpcserver.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false),
}
}
func Handler(rpcConfig *config.RPCConfig, routes rpccore.RoutesMap, logger log.Logger) http.Handler {
mux := http.NewServeMux()
wmLogger := logger.With("protocol", "websocket")
var eventBus types.EventBusSubscriber
websocketDisconnectFn := func(remoteAddr string) {
err := eventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
}
wm := rpcserver.NewWebsocketManager(routes,
rpcserver.OnDisconnect(websocketDisconnectFn),
rpcserver.ReadLimit(rpcConfig.MaxBodyBytes))
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, routes, logger)
var rootHandler http.Handler = mux
if rpcConfig.IsCorsEnabled() {
rootHandler = addCORSHandler(rpcConfig, mux)
}
return rootHandler
}
func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler {
corsMiddleware := cors.New(cors.Options{
AllowedOrigins: rpcConfig.CORSAllowedOrigins,
AllowedMethods: rpcConfig.CORSAllowedMethods,
AllowedHeaders: rpcConfig.CORSAllowedHeaders,
})
h = corsMiddleware.Handler(h)
return h
}
func (r *Server) ListenAndServe(ctx context.Context) error {
listener, err := rpcserver.Listen(r.Addr, r.Config.MaxOpenConnections)
if err != nil {
return err
}
go func() {
<-ctx.Done()
listener.Close()
}()
return rpcserver.Serve(listener, r.Handler, r.Logger, serverRPCConfig(r.Config))
}
func (r *Server) ListenAndServeTLS(ctx context.Context, certFile, keyFile string) error {
listener, err := rpcserver.Listen(r.Addr, r.Config.MaxOpenConnections)
if err != nil {
return err
}
go func() {
<-ctx.Done()
listener.Close()
}()
return rpcserver.ServeTLS(listener, r.Handler, certFile, keyFile, r.Logger, serverRPCConfig(r.Config))
}
func serverRPCConfig(r *config.RPCConfig) *server.Config {
cfg := rpcserver.DefaultConfig()
cfg.MaxBodyBytes = r.MaxBodyBytes
cfg.MaxHeaderBytes = r.MaxHeaderBytes
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
if cfg.WriteTimeout <= r.TimeoutBroadcastTxCommit {
cfg.WriteTimeout = r.TimeoutBroadcastTxCommit + 1*time.Second
}
return cfg
}

View File

@@ -1,88 +0,0 @@
package node
import (
"net"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
rpccore "github.com/tendermint/tendermint/rpc/core"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
// Debug manages an RPC service that exports methods to debug a failed node.
// After a node shuts down due to a consensus failure,, it will no longer start
// up and cannot easily be inspected. A Debug value provides a similar interface
// to the node, using the underlying Tendermint data stores, without bringing up
// any other components. A caller can query the Debug service to inspect the
// persisted state and debug the failure.
type Debug struct {
service.BaseService
routes rpccore.RoutesMap
rpcConfig *cfg.RPCConfig
listeners []net.Listener
}
func NewDebugFromConfig(config *cfg.Config) (*Debug, error) {
blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: _blockStoreID, Config: config})
if err != nil {
return nil, err
}
blockStore := store.NewBlockStore(blockStoreDB)
stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: _stateStoreID, Config: config})
if err != nil {
return nil, err
}
genDocFunc := defaultGenesisDocProviderFunc(config)
genDoc, err := genDocFunc()
if err != nil {
return nil, err
}
sinks, err := indexerSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID)
if err != nil {
return nil, err
}
stateStore := sm.NewStore(stateDB)
return NewDebug(config.RPC, blockStore, stateStore, sinks), nil
}
func NewDebug(rpcConfig *cfg.RPCConfig, blockStore sm.BlockStore, stateStore sm.Store, eventSinks []indexer.EventSink) *Debug {
routes := rpccore.DebugRoutes(stateStore, blockStore, eventSinks)
return &Debug{
routes: routes,
rpcConfig: rpcConfig,
}
}
func NewDefaultDebug() (*Debug, error) {
config := cfg.Config{
BaseConfig: cfg.DefaultBaseConfig(),
RPC: cfg.DefaultRPCConfig(),
TxIndex: cfg.DefaultTxIndexConfig(),
}
return NewDebugFromConfig(&config)
}
func (debug *Debug) OnStart() error {
l := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false)
listeners, err := startRPCServers(debug.rpcConfig, l, debug.routes, &types.NopEventBus{})
if err != nil {
return err
}
debug.listeners = listeners
return nil
}
func (debug *Debug) OnStop() {
for i := len(debug.listeners) - 1; i >= 0; i-- {
err := debug.listeners[i].Close()
if err != nil {
debug.Logger.Error("Error stopping debug rpc listener", "err", err)
}
}
}

View File

@@ -8,7 +8,6 @@ import (
"math"
"net"
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
"strings"
"time"
dbm "github.com/tendermint/tm-db"
@@ -33,9 +32,7 @@ import (
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
null "github.com/tendermint/tendermint/state/indexer/sink/null"
psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/state/indexer/sink"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
@@ -83,21 +80,7 @@ func createAndStartIndexerService(
logger log.Logger,
chainID string,
) (*indexer.Service, []indexer.EventSink, error) {
eventSinks := []indexer.EventSink{}
// check for duplicated sinks
sinks := map[string]bool{}
for _, s := range config.TxIndex.Indexer {
sl := strings.ToLower(s)
if sinks[sl] {
return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
}
sinks[sl] = true
}
eventSinks, err := indexerSinksFromConfig(config, dbProvider, chainID)
eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID)
if err != nil {
return nil, nil, err
}
@@ -111,57 +94,6 @@ func createAndStartIndexerService(
return indexerService, eventSinks, nil
}
func indexerSinksFromConfig(config *cfg.Config, dbProvider cfg.DBProvider, chainID string) ([]indexer.EventSink, error) {
if len(config.TxIndex.Indexer) == 0 {
return []indexer.EventSink{null.NewEventSink()}, nil
}
// check for duplicated sinks
sinks := map[string]struct{}{}
for _, s := range config.TxIndex.Indexer {
sl := strings.ToLower(s)
if _, ok := sinks[sl]; ok {
return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
}
sinks[sl] = struct{}{}
}
eventSinks := []indexer.EventSink{}
for k := range sinks {
switch k {
case string(indexer.NULL):
// When we see null in the config, the eventsinks will be reset with the
// nullEventSink.
return []indexer.EventSink{null.NewEventSink()}, nil
case string(indexer.KV):
store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
if err != nil {
return nil, err
}
eventSinks = append(eventSinks, kv.NewEventSink(store))
case string(indexer.PSQL):
conn := config.TxIndex.PsqlConn
if conn == "" {
return nil, errors.New("the psql connection settings cannot be empty")
}
es, _, err := psql.NewEventSink(conn, chainID)
if err != nil {
return nil, err
}
eventSinks = append(eventSinks, es)
default:
return nil, errors.New("unsupported event sink type")
}
}
return eventSinks, nil
}
func doHandshake(
stateStore sm.Store,
state sm.State,

View File

@@ -2,8 +2,6 @@ package core
import (
rpc "github.com/tendermint/tendermint/rpc/jsonrpc/server"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
)
// TODO: better system than "unsafe" prefix
@@ -88,6 +86,7 @@ func (env *Environment) UnsafeRoutes() RoutesMap {
}
}
// CombineRoutes takes a list of RoutesMaps and combines them into a single RoutesMap.
func CombineRoutes(routesMaps ...RoutesMap) RoutesMap {
res := RoutesMap{}
for _, routesMap := range routesMaps {
@@ -97,23 +96,3 @@ func CombineRoutes(routesMaps ...RoutesMap) RoutesMap {
}
return res
}
func DebugRoutes(store sm.Store, blockStore sm.BlockStore, eventSinks []indexer.EventSink) RoutesMap {
env := &Environment{
EventSinks: eventSinks,
StateStore: store,
BlockStore: blockStore,
}
return RoutesMap{
"blockchain": rpc.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true),
"consensus_params": rpc.NewRPCFunc(env.ConsensusParams, "height", true),
"block": rpc.NewRPCFunc(env.Block, "height", true),
"block_by_hash": rpc.NewRPCFunc(env.BlockByHash, "hash", true),
"block_results": rpc.NewRPCFunc(env.BlockResults, "height", true),
"commit": rpc.NewRPCFunc(env.Commit, "height", true),
"validators": rpc.NewRPCFunc(env.Validators, "height,page,per_page", true),
"tx": rpc.NewRPCFunc(env.Tx, "hash,prove", true),
"tx_search": rpc.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false),
"block_search": rpc.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false),
}
}

View File

@@ -0,0 +1,61 @@
package sink
import (
"errors"
"strings"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/indexer/sink/kv"
"github.com/tendermint/tendermint/state/indexer/sink/null"
"github.com/tendermint/tendermint/state/indexer/sink/psql"
)
func EventSinksFromConfig(cfg *config.Config, dbProvider config.DBProvider, chainID string) ([]indexer.EventSink, error) {
if len(cfg.TxIndex.Indexer) == 0 {
return []indexer.EventSink{null.NewEventSink()}, nil
}
// check for duplicated sinks
sinks := map[string]struct{}{}
for _, s := range cfg.TxIndex.Indexer {
sl := strings.ToLower(s)
if _, ok := sinks[sl]; ok {
return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
}
sinks[sl] = struct{}{}
}
eventSinks := []indexer.EventSink{}
for k := range sinks {
switch k {
case string(indexer.NULL):
// When we see null in the config, the eventsinks will be reset with the
// nullEventSink.
return []indexer.EventSink{null.NewEventSink()}, nil
case string(indexer.KV):
store, err := dbProvider(&config.DBContext{ID: "tx_index", Config: cfg})
if err != nil {
return nil, err
}
eventSinks = append(eventSinks, kv.NewEventSink(store))
case string(indexer.PSQL):
conn := cfg.TxIndex.PsqlConn
if conn == "" {
return nil, errors.New("the psql connection settings cannot be empty")
}
es, _, err := psql.NewEventSink(conn, chainID)
if err != nil {
return nil, err
}
eventSinks = append(eventSinks, es)
default:
return nil, errors.New("unsupported event sink type")
}
}
return eventSinks, nil
}