mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-12 06:41:31 +00:00
Compare commits
29 Commits
wb/remove-
...
wb/routes-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
741f67cfa4 | ||
|
|
1f2ba5a87d | ||
|
|
885b78698c | ||
|
|
cecd5d8ab2 | ||
|
|
5542a5bacb | ||
|
|
03f3c49bb2 | ||
|
|
69239c55c9 | ||
|
|
677d87324d | ||
|
|
208bf0541b | ||
|
|
f154067401 | ||
|
|
6085c5f6ee | ||
|
|
820284a0f1 | ||
|
|
630dda2234 | ||
|
|
a484d0f3a4 | ||
|
|
af361ed219 | ||
|
|
0575bccb3e | ||
|
|
182e4162ad | ||
|
|
45eedbc087 | ||
|
|
6c7dbec283 | ||
|
|
75f549d408 | ||
|
|
45f92ee928 | ||
|
|
557326c2f4 | ||
|
|
c6077076d4 | ||
|
|
c37e4a43a2 | ||
|
|
c3561f1c03 | ||
|
|
1f7ecbdaf3 | ||
|
|
2be1b37c4e | ||
|
|
03ee71eac4 | ||
|
|
df669c7bed |
@@ -25,6 +25,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- [rpc/grpc] \#6725 Mark gRPC in the RPC layer as deprecated.
|
||||
- [blockchain/v2] \#6730 Fast Sync v2 is deprecated, please use v0
|
||||
- [rpc] Add genesis_chunked method to support paginated and parallel fetching of large genesis documents.
|
||||
- [rpc/jsonrpc/server] \#6785 `Listen` function updated to take an `int` argument, `maxOpenConnections`, instead of an entire config object. (@williambanfield)
|
||||
|
||||
- Apps
|
||||
- [ABCI] \#6408 Change the `key` and `value` fields from `[]byte` to `string` in the `EventAttribute` type. (@alexanderbez)
|
||||
@@ -105,6 +106,7 @@ Friendly reminder: We have a [bug bounty program](https://hackerone.com/tendermi
|
||||
- [config/indexer] \#6411 Introduce support for custom event indexing data sources, specifically PostgreSQL. (@JayT106)
|
||||
- [fastsync/event] \#6619 Emit fastsync status event when switching consensus/fastsync (@JayT106)
|
||||
- [statesync/event] \#6700 Emit statesync status start/end event (@JayT106)
|
||||
- [inspect] \#6785 Add a new `inspect` command for introspecting the state and block store of a crashed tendermint node. (@williambanfield)
|
||||
|
||||
### IMPROVEMENTS
|
||||
- [libs/log] Console log formatting changes as a result of \#6534 and \#6589. (@tychoish)
|
||||
|
||||
82
cmd/tendermint/commands/inspect.go
Normal file
82
cmd/tendermint/commands/inspect.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package commands
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/inspect"
|
||||
"github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// InspectCmd is the command for starting an inspect server.
|
||||
var InspectCmd = &cobra.Command{
|
||||
Use: "inspect",
|
||||
Short: "Run an inspect server for investigating Tendermint state.",
|
||||
Long: `
|
||||
inspect runs a subset of Tendermint's RPC endpoints that are useful for debugging
|
||||
issues with Tendermint.
|
||||
|
||||
When the Tendermint consensus engine detects inconsistent state, it will crash the
|
||||
tendermint process. Tendermint will not start up while in this inconsistent state.
|
||||
The inspect command can be used to query the block and state store using Tendermint
|
||||
RPC calls to debug issues of inconsistent state.
|
||||
`,
|
||||
|
||||
RunE: runInspect,
|
||||
}
|
||||
|
||||
func init() {
|
||||
InspectCmd.Flags().String("rpc.laddr", config.RPC.ListenAddress, "RPC listenener address. Port required")
|
||||
InspectCmd.Flags().String("db-backend", config.DBBackend, "database backend: goleveldb | cleveldb | boltdb | rocksdb | badgerdb")
|
||||
InspectCmd.Flags().String("db-dir", config.DBPath, "database directory")
|
||||
}
|
||||
|
||||
func runInspect(cmd *cobra.Command, args []string) error {
|
||||
ctx, cancel := context.WithCancel(cmd.Context())
|
||||
defer cancel()
|
||||
|
||||
c := make(chan os.Signal, 1)
|
||||
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
|
||||
go func() {
|
||||
<-c
|
||||
cancel()
|
||||
}()
|
||||
|
||||
blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blockStore := store.NewBlockStore(blockStoreDB)
|
||||
stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "state", Config: config})
|
||||
if err != nil {
|
||||
if err := blockStoreDB.Close(); err != nil {
|
||||
logger.Error("error closing block store db", "error", err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sinks, err := sink.EventSinksFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stateStore := state.NewStore(stateDB)
|
||||
|
||||
ins := inspect.New(config.RPC, blockStore, stateStore, sinks, logger)
|
||||
|
||||
logger.Info("starting inspect server")
|
||||
if err := ins.Run(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -28,6 +28,7 @@ func main() {
|
||||
cmd.ShowNodeIDCmd,
|
||||
cmd.GenNodeKeyCmd,
|
||||
cmd.VersionCmd,
|
||||
cmd.InspectCmd,
|
||||
cmd.MakeKeyMigrateCommand(),
|
||||
debug.DebugCmd,
|
||||
cli.NewCompletionCmd(rootCmd, true),
|
||||
|
||||
@@ -62,3 +62,30 @@ given destination directory. Each archive will contain:
|
||||
|
||||
Note: goroutine.out and heap.out will only be written if a profile address is
|
||||
provided and is operational. This command is blocking and will log any error.
|
||||
|
||||
## Tendermint Inspect
|
||||
|
||||
Tendermint includes an `inspect` command for querying Tendermint's state store and block
|
||||
store over Tendermint RPC.
|
||||
|
||||
When the Tendermint consensus engine detects inconsistent state, it will crash the
|
||||
entire Tendermint process.
|
||||
While in this inconsistent state, a node running Tendermint's consensus engine will not start up.
|
||||
The `inspect` command runs only a subset of Tendermint's RPC endpoints for querying the block store
|
||||
and state store.
|
||||
`inspect` allows operators to query a read-only view of the stage.
|
||||
`inspect` does not run the consensus engine at all and can therefore be used to debug
|
||||
processes that have crashed due to inconsistent state.
|
||||
|
||||
|
||||
To start the `inspect` process, run
|
||||
```bash
|
||||
tendermint inspect
|
||||
```
|
||||
|
||||
### RPC endpoints
|
||||
The list of available RPC endpoints can be found by making a request to the RPC port.
|
||||
For an `inspect` process running on `127.0.0.1:26657`, navigate your browser to
|
||||
`http://127.0.0.1:26657/` to retrieve the list of enabled RPC endpoints.
|
||||
|
||||
Additional information on the Tendermint RPC endpoints can be found in the [rpc documentation](https://docs.tendermint.com/master/rpc).
|
||||
|
||||
@@ -64,13 +64,42 @@ It won’t kill the node, but it will gather all of the above data and package i
|
||||
|
||||
At this point, depending on how severe the degradation is, you may want to restart the process.
|
||||
|
||||
## Tendermint Inspect
|
||||
|
||||
What if the Tendermint node will not start up due to inconsistent consensus state?
|
||||
|
||||
When a node running the Tendermint consensus engine detects an inconsistent state
|
||||
it will crash the entire Tendermint process.
|
||||
The Tendermint consensus engine cannot be run in this inconsistent state and the so node
|
||||
will fail to start up as a result.
|
||||
The Tendermint RPC server can provide valuable information for debugging in this situation.
|
||||
The Tendermint `inspect` command will run a subset of the Tendermint RPC server
|
||||
that is useful for debugging inconsistent state.
|
||||
|
||||
### Running inspect
|
||||
|
||||
Start up the `inspect` tool on the machine where Tendermint crashed using:
|
||||
```bash
|
||||
tendermint inspect --home=</path/to/app.d>
|
||||
```
|
||||
|
||||
`inspect` will use the data directory specified in your Tendermint configuration file.
|
||||
`inspect` will also run the RPC server at the address specified in your Tendermint configuration file.
|
||||
|
||||
### Using inspect
|
||||
|
||||
With the `inspect` server running, you can access RPC endpoints that are critically important
|
||||
for debugging.
|
||||
Calling the `/status`, `/consensus_state` and `/dump_consensus_state` RPC endpoint
|
||||
will return useful information about the Tendermint consensus state.
|
||||
|
||||
## Outro
|
||||
|
||||
We’re hoping that the `tendermint debug` subcommand will become de facto the first response to any accidents.
|
||||
We’re hoping that these Tendermint tools will become de facto the first response for any accidents.
|
||||
|
||||
Let us know what your experience has been so far! Have you had a chance to try `tendermint debug` yet?
|
||||
Let us know what your experience has been so far! Have you had a chance to try `tendermint debug` or `tendermint inspect` yet?
|
||||
|
||||
Join our chat, where we discuss the current issues and future improvements.
|
||||
Join our [discord chat](https://discord.gg/vcExX9T), where we discuss the current issues and future improvements.
|
||||
|
||||
—
|
||||
|
||||
|
||||
1
go.mod
1
go.mod
@@ -37,6 +37,7 @@ require (
|
||||
github.com/vektra/mockery/v2 v2.9.0
|
||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
|
||||
google.golang.org/grpc v1.40.0
|
||||
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
|
||||
pgregory.net/rapid v0.4.7
|
||||
|
||||
1
go.sum
1
go.sum
@@ -1071,6 +1071,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
36
inspect/doc.go
Normal file
36
inspect/doc.go
Normal file
@@ -0,0 +1,36 @@
|
||||
/*
|
||||
Package inspect provides a tool for investigating the state of a
|
||||
failed Tendermint node.
|
||||
|
||||
This package provides the Inspect type. The Inspect type runs a subset of the Tendermint
|
||||
RPC endpoints that are useful for debugging issues with Tendermint consensus.
|
||||
|
||||
When a node running the Tendermint consensus engine detects an inconsistent consensus state,
|
||||
the entire node will crash. The Tendermint consensus engine cannot run in this
|
||||
inconsistent state so the node will not be able to start up again.
|
||||
|
||||
The RPC endpoints provided by the Inspect type allow for a node operator to inspect
|
||||
the block store and state store to better understand what may have caused the inconsistent state.
|
||||
|
||||
|
||||
The Inspect type's lifecycle is controlled by a context.Context
|
||||
ins := inspect.NewFromConfig(rpcConfig)
|
||||
ctx, cancelFunc:= context.WithCancel(context.Background())
|
||||
|
||||
// Run blocks until the Inspect server is shut down.
|
||||
go ins.Run(ctx)
|
||||
...
|
||||
|
||||
// calling the cancel function will stop the running inspect server
|
||||
cancelFunc()
|
||||
|
||||
Inspect serves its RPC endpoints on the address configured in the RPC configuration
|
||||
|
||||
rpcConfig.ListenAddress = "tcp://127.0.0.1:26657"
|
||||
ins := inspect.NewFromConfig(rpcConfig)
|
||||
go ins.Run(ctx)
|
||||
|
||||
The list of available RPC endpoints can then be viewed by navigating to
|
||||
http://127.0.0.1:26657/ in the web browser.
|
||||
*/
|
||||
package inspect
|
||||
147
inspect/inspect.go
Normal file
147
inspect/inspect.go
Normal file
@@ -0,0 +1,147 @@
|
||||
package inspect
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/inspect/rpc"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmstrings "github.com/tendermint/tendermint/libs/strings"
|
||||
rpccore "github.com/tendermint/tendermint/rpc/core"
|
||||
"github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// Inspect manages an RPC service that exports methods to debug a failed node.
|
||||
// After a node shuts down due to a consensus failure, it will no longer start
|
||||
// up its state cannot easily be inspected. An Inspect value provides a similar interface
|
||||
// to the node, using the underlying Tendermint data stores, without bringing up
|
||||
// any other components. A caller can query the Inspect service to inspect the
|
||||
// persisted state and debug the failure.
|
||||
type Inspect struct {
|
||||
routes rpccore.RoutesMap
|
||||
|
||||
config *config.RPCConfig
|
||||
|
||||
indexerService *indexer.Service
|
||||
eventBus *types.EventBus
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// New returns an Inspect that serves RPC on the specified BlockStore and StateStore.
|
||||
// The Inspect type does not modify the state or block stores.
|
||||
// The sinks are used to enable block and transaction querying via the RPC server.
|
||||
// The caller is responsible for starting and stopping the Inspect service.
|
||||
///
|
||||
//nolint:lll
|
||||
func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, es []indexer.EventSink, logger log.Logger) *Inspect {
|
||||
routes := rpc.Routes(*cfg, ss, bs, es)
|
||||
eb := types.NewEventBus()
|
||||
eb.SetLogger(logger.With("module", "events"))
|
||||
is := indexer.NewIndexerService(es, eb)
|
||||
is.SetLogger(logger.With("module", "txindex"))
|
||||
return &Inspect{
|
||||
routes: routes,
|
||||
config: cfg,
|
||||
logger: logger,
|
||||
eventBus: eb,
|
||||
indexerService: is,
|
||||
}
|
||||
}
|
||||
|
||||
// NewFromConfig constructs an Inspect using the values defined in the passed in config.
|
||||
func NewFromConfig(cfg *config.Config) (*Inspect, error) {
|
||||
bsDB, err := config.DefaultDBProvider(&config.DBContext{ID: "blockstore", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bs := store.NewBlockStore(bsDB)
|
||||
sDB, err := config.DefaultDBProvider(&config.DBContext{ID: "state", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sinks, err := sink.EventSinksFromConfig(cfg, config.DefaultDBProvider, genDoc.ChainID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logger := log.MustNewDefaultLogger(log.LogFormatPlain, log.LogLevelInfo, false)
|
||||
ss := state.NewStore(sDB)
|
||||
return New(cfg.RPC, bs, ss, sinks, logger), nil
|
||||
}
|
||||
|
||||
// Run starts the Inspect servers and blocks until the servers shut down. The passed
|
||||
// in context is used to control the lifecycle of the servers.
|
||||
func (ins *Inspect) Run(ctx context.Context) error {
|
||||
err := ins.eventBus.Start()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error starting event bus: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
err := ins.eventBus.Stop()
|
||||
if err != nil {
|
||||
ins.logger.Error("event bus stopped with error", "err", err)
|
||||
}
|
||||
}()
|
||||
err = ins.indexerService.Start()
|
||||
if err != nil {
|
||||
return fmt.Errorf("error starting indexer service: %s", err)
|
||||
}
|
||||
defer func() {
|
||||
err := ins.indexerService.Stop()
|
||||
if err != nil {
|
||||
ins.logger.Error("indexer service stopped with error", "err", err)
|
||||
}
|
||||
}()
|
||||
return startRPCServers(ctx, ins.config, ins.logger, ins.routes)
|
||||
}
|
||||
|
||||
func startRPCServers(ctx context.Context, cfg *config.RPCConfig, logger log.Logger, routes rpccore.RoutesMap) error {
|
||||
g, tctx := errgroup.WithContext(ctx)
|
||||
listenAddrs := tmstrings.SplitAndTrimEmpty(cfg.ListenAddress, ",", " ")
|
||||
rh := rpc.Handler(cfg, routes, logger)
|
||||
for _, listenerAddr := range listenAddrs {
|
||||
server := rpc.Server{
|
||||
Logger: logger,
|
||||
Config: cfg,
|
||||
Handler: rh,
|
||||
Addr: listenerAddr,
|
||||
}
|
||||
if cfg.IsTLSEnabled() {
|
||||
keyFile := cfg.KeyFile()
|
||||
certFile := cfg.CertFile()
|
||||
g.Go(func() error {
|
||||
logger.Info("RPC HTTPS server starting", "address", listenerAddr,
|
||||
"certfile", certFile, "keyfile", keyFile)
|
||||
err := server.ListenAndServeTLS(tctx, certFile, keyFile)
|
||||
if !errors.Is(err, net.ErrClosed) {
|
||||
return err
|
||||
}
|
||||
logger.Info("RPC HTTPS server stopped", "address", listenerAddr)
|
||||
return nil
|
||||
})
|
||||
} else {
|
||||
g.Go(func() error {
|
||||
logger.Info("RPC HTTP server starting", "address", listenerAddr)
|
||||
err := server.ListenAndServe(tctx)
|
||||
if !errors.Is(err, net.ErrClosed) {
|
||||
return err
|
||||
}
|
||||
logger.Info("RPC HTTP server stopped", "address", listenerAddr)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
return g.Wait()
|
||||
}
|
||||
163
inspect/inspect_test.go
Normal file
163
inspect/inspect_test.go
Normal file
@@ -0,0 +1,163 @@
|
||||
package inspect_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/fortytw2/leaktest"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
abci_types "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/inspect"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
http_client "github.com/tendermint/tendermint/rpc/client/http"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
indexer_mocks "github.com/tendermint/tendermint/state/indexer/mocks"
|
||||
state_mocks "github.com/tendermint/tendermint/state/mocks"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func TestInspectConstructor(t *testing.T) {
|
||||
cfg := config.ResetTestRoot("test")
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
defer func() { _ = os.RemoveAll(cfg.RootDir) }()
|
||||
t.Run("from config", func(t *testing.T) {
|
||||
d, err := inspect.NewFromConfig(cfg)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, d)
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestInspectRun(t *testing.T) {
|
||||
cfg := config.ResetTestRoot("test")
|
||||
t.Cleanup(leaktest.Check(t))
|
||||
defer func() { _ = os.RemoveAll(cfg.RootDir) }()
|
||||
t.Run("from config", func(t *testing.T) {
|
||||
d, err := inspect.NewFromConfig(cfg)
|
||||
require.NoError(t, err)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
stoppedWG := &sync.WaitGroup{}
|
||||
stoppedWG.Add(1)
|
||||
go func() {
|
||||
require.NoError(t, d.Run(ctx))
|
||||
stoppedWG.Done()
|
||||
}()
|
||||
cancel()
|
||||
stoppedWG.Wait()
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestInspectServeInfoRPC(t *testing.T) {
|
||||
testHeight := int64(1)
|
||||
testBlock := new(types.Block)
|
||||
testBlock.Header.Height = testHeight
|
||||
testBlock.Header.LastCommitHash = []byte("test hash")
|
||||
stateStoreMock := &state_mocks.Store{}
|
||||
|
||||
blockStoreMock := &state_mocks.BlockStore{}
|
||||
blockStoreMock.On("Height").Return(testHeight)
|
||||
blockStoreMock.On("Base").Return(int64(0))
|
||||
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{})
|
||||
blockStoreMock.On("LoadBlock", testHeight).Return(testBlock)
|
||||
eventSinkMock := &indexer_mocks.EventSink{}
|
||||
eventSinkMock.On("Stop").Return(nil)
|
||||
|
||||
rpcConfig := config.TestRPCConfig()
|
||||
l := log.TestingLogger()
|
||||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, d.Run(ctx))
|
||||
}()
|
||||
// force context switch
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
requireConnect(t, rpcConfig.ListenAddress, 15)
|
||||
cli, err := http_client.New(rpcConfig.ListenAddress)
|
||||
require.NoError(t, err)
|
||||
resultBlock, err := cli.Block(context.Background(), &testHeight)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, testBlock.Height, resultBlock.Block.Height)
|
||||
require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash)
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
blockStoreMock.AssertExpectations(t)
|
||||
stateStoreMock.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestInspectTxSearch(t *testing.T) {
|
||||
testHash := []byte("test")
|
||||
testTx := []byte("tx")
|
||||
testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash))
|
||||
testTxResult := &abci_types.TxResult{
|
||||
Height: 1,
|
||||
Index: 100,
|
||||
Tx: testTx,
|
||||
}
|
||||
|
||||
stateStoreMock := &state_mocks.Store{}
|
||||
blockStoreMock := &state_mocks.BlockStore{}
|
||||
eventSinkMock := &indexer_mocks.EventSink{}
|
||||
eventSinkMock.On("Stop").Return(nil)
|
||||
eventSinkMock.On("Type").Return(indexer.KV)
|
||||
eventSinkMock.On("SearchTxEvents", mock.Anything,
|
||||
mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })).
|
||||
Return([]*abci_types.TxResult{testTxResult}, nil)
|
||||
|
||||
rpcConfig := config.TestRPCConfig()
|
||||
l := log.TestingLogger()
|
||||
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, []indexer.EventSink{eventSinkMock}, l)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
require.NoError(t, d.Run(ctx))
|
||||
}()
|
||||
requireConnect(t, rpcConfig.ListenAddress, 15)
|
||||
cli, err := http_client.New(rpcConfig.ListenAddress)
|
||||
require.NoError(t, err)
|
||||
|
||||
var page int = 1
|
||||
resultTxSearch, err := cli.TxSearch(context.Background(), testQuery, false, &page, &page, "")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, resultTxSearch.Txs, 1)
|
||||
require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx)
|
||||
|
||||
cancel()
|
||||
wg.Wait()
|
||||
|
||||
eventSinkMock.AssertExpectations(t)
|
||||
stateStoreMock.AssertExpectations(t)
|
||||
blockStoreMock.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func requireConnect(t testing.TB, addr string, retries int) {
|
||||
parts := strings.SplitN(addr, "://", 2)
|
||||
if len(parts) != 2 {
|
||||
t.Fatalf("malformed address to dial: %s", addr)
|
||||
}
|
||||
var err error
|
||||
for i := 0; i < retries; i++ {
|
||||
var conn net.Conn
|
||||
conn, err = net.Dial(parts[0], parts[1])
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Fatalf("unable to connect to server %s after %d tries: %s", addr, retries, err)
|
||||
}
|
||||
135
inspect/rpc/rpc.go
Normal file
135
inspect/rpc/rpc.go
Normal file
@@ -0,0 +1,135 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/rs/cors"
|
||||
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/pubsub"
|
||||
"github.com/tendermint/tendermint/rpc/core"
|
||||
"github.com/tendermint/tendermint/rpc/jsonrpc/server"
|
||||
"github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// Server defines parameters for running an Inspect rpc server.
|
||||
type Server struct {
|
||||
Addr string // TCP address to listen on, ":http" if empty
|
||||
Handler http.Handler
|
||||
Logger log.Logger
|
||||
Config *config.RPCConfig
|
||||
}
|
||||
|
||||
// Routes returns the set of routes used by the Inspect server.
|
||||
func Routes(cfg config.RPCConfig, s state.Store, bs state.BlockStore, es []indexer.EventSink) core.RoutesMap {
|
||||
env := &core.Environment{
|
||||
Config: cfg,
|
||||
EventSinks: es,
|
||||
StateStore: s,
|
||||
BlockStore: bs,
|
||||
WaitSyncChecker: waitSyncCheckerImpl{},
|
||||
}
|
||||
return core.RoutesMap{
|
||||
"blockchain": server.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true),
|
||||
"consensus_params": server.NewRPCFunc(env.ConsensusParams, "height", true),
|
||||
"block": server.NewRPCFunc(env.Block, "height", true),
|
||||
"block_by_hash": server.NewRPCFunc(env.BlockByHash, "hash", true),
|
||||
"block_results": server.NewRPCFunc(env.BlockResults, "height", true),
|
||||
"commit": server.NewRPCFunc(env.Commit, "height", true),
|
||||
"validators": server.NewRPCFunc(env.Validators, "height,page,per_page", true),
|
||||
"tx": server.NewRPCFunc(env.Tx, "hash,prove", true),
|
||||
"tx_search": server.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false),
|
||||
"block_search": server.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false),
|
||||
}
|
||||
}
|
||||
|
||||
// Handler returns the http.Handler configured for use with an Inspect server. Handler
|
||||
// registers the routes on the http.Handler and also registers the websocket handler
|
||||
// and the CORS handler if specified by the configuration options.
|
||||
func Handler(rpcConfig *config.RPCConfig, routes core.RoutesMap, logger log.Logger) http.Handler {
|
||||
mux := http.NewServeMux()
|
||||
wmLogger := logger.With("protocol", "websocket")
|
||||
|
||||
var eventBus types.EventBusSubscriber
|
||||
|
||||
websocketDisconnectFn := func(remoteAddr string) {
|
||||
err := eventBus.UnsubscribeAll(context.Background(), remoteAddr)
|
||||
if err != nil && err != pubsub.ErrSubscriptionNotFound {
|
||||
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
|
||||
}
|
||||
}
|
||||
wm := server.NewWebsocketManager(routes,
|
||||
server.OnDisconnect(websocketDisconnectFn),
|
||||
server.ReadLimit(rpcConfig.MaxBodyBytes))
|
||||
wm.SetLogger(wmLogger)
|
||||
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
||||
|
||||
server.RegisterRPCFuncs(mux, routes, logger)
|
||||
var rootHandler http.Handler = mux
|
||||
if rpcConfig.IsCorsEnabled() {
|
||||
rootHandler = addCORSHandler(rpcConfig, mux)
|
||||
}
|
||||
return rootHandler
|
||||
}
|
||||
|
||||
func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler {
|
||||
corsMiddleware := cors.New(cors.Options{
|
||||
AllowedOrigins: rpcConfig.CORSAllowedOrigins,
|
||||
AllowedMethods: rpcConfig.CORSAllowedMethods,
|
||||
AllowedHeaders: rpcConfig.CORSAllowedHeaders,
|
||||
})
|
||||
h = corsMiddleware.Handler(h)
|
||||
return h
|
||||
}
|
||||
|
||||
type waitSyncCheckerImpl struct{}
|
||||
|
||||
func (_ waitSyncCheckerImpl) WaitSync() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// ListenAndServe listens on the address specified in srv.Addr and handles any
|
||||
// incoming requests over HTTP using the Inspect rpc handler specified on the server.
|
||||
func (srv *Server) ListenAndServe(ctx context.Context) error {
|
||||
listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
listener.Close()
|
||||
}()
|
||||
return server.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config))
|
||||
}
|
||||
|
||||
// ListenAndServeTLS listens on the address specified in srv.Addr. ListenAndServeTLS handles
|
||||
// incoming requests over HTTPS using the Inspect rpc handler specified on the server.
|
||||
func (srv *Server) ListenAndServeTLS(ctx context.Context, certFile, keyFile string) error {
|
||||
listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
listener.Close()
|
||||
}()
|
||||
return server.ServeTLS(listener, srv.Handler, certFile, keyFile, srv.Logger, serverRPCConfig(srv.Config))
|
||||
}
|
||||
|
||||
func serverRPCConfig(r *config.RPCConfig) *server.Config {
|
||||
cfg := server.DefaultConfig()
|
||||
cfg.MaxBodyBytes = r.MaxBodyBytes
|
||||
cfg.MaxHeaderBytes = r.MaxHeaderBytes
|
||||
// If necessary adjust global WriteTimeout to ensure it's greater than
|
||||
// TimeoutBroadcastTxCommit.
|
||||
// See https://github.com/tendermint/tendermint/issues/3435
|
||||
if cfg.WriteTimeout <= r.TimeoutBroadcastTxCommit {
|
||||
cfg.WriteTimeout = r.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
@@ -113,7 +113,7 @@ func (p *Proxy) listen() (net.Listener, *http.ServeMux, error) {
|
||||
}
|
||||
|
||||
// 4) Start listening for new connections.
|
||||
listener, err := rpcserver.Listen(p.Addr, p.Config)
|
||||
listener, err := rpcserver.Listen(p.Addr, p.Config.MaxOpenConnections)
|
||||
if err != nil {
|
||||
return nil, mux, err
|
||||
}
|
||||
|
||||
18
node/node.go
18
node/node.go
@@ -446,15 +446,15 @@ func makeNode(config *cfg.Config,
|
||||
EvidencePool: evPool,
|
||||
ConsensusState: csState,
|
||||
P2PPeers: sw,
|
||||
WaitSyncChecker: csReactor,
|
||||
BlockSyncReactor: bcReactor.(cs.BlockSyncReactor),
|
||||
|
||||
GenDoc: genDoc,
|
||||
EventSinks: eventSinks,
|
||||
ConsensusReactor: csReactor,
|
||||
EventBus: eventBus,
|
||||
Mempool: mp,
|
||||
Logger: logger.With("module", "rpc"),
|
||||
Config: *config.RPC,
|
||||
GenDoc: genDoc,
|
||||
EventSinks: eventSinks,
|
||||
EventBus: eventBus,
|
||||
Mempool: mp,
|
||||
Logger: logger.With("module", "rpc"),
|
||||
Config: *config.RPC,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -823,7 +823,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger)
|
||||
listener, err := rpcserver.Listen(
|
||||
listenAddr,
|
||||
config,
|
||||
config.MaxOpenConnections,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -881,7 +881,7 @@ func (n *nodeImpl) startRPC() ([]net.Listener, error) {
|
||||
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
|
||||
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
|
||||
}
|
||||
listener, err := rpcserver.Listen(grpcListenAddr, config)
|
||||
listener, err := rpcserver.Listen(grpcListenAddr, config.MaxOpenConnections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"math"
|
||||
"net"
|
||||
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
@@ -33,9 +32,7 @@ import (
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
kv "github.com/tendermint/tendermint/state/indexer/sink/kv"
|
||||
null "github.com/tendermint/tendermint/state/indexer/sink/null"
|
||||
psql "github.com/tendermint/tendermint/state/indexer/sink/psql"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink"
|
||||
"github.com/tendermint/tendermint/store"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
"github.com/tendermint/tendermint/version"
|
||||
@@ -78,56 +75,9 @@ func createAndStartIndexerService(
|
||||
logger log.Logger,
|
||||
chainID string,
|
||||
) (*indexer.Service, []indexer.EventSink, error) {
|
||||
|
||||
eventSinks := []indexer.EventSink{}
|
||||
|
||||
// check for duplicated sinks
|
||||
sinks := map[string]bool{}
|
||||
for _, s := range config.TxIndex.Indexer {
|
||||
sl := strings.ToLower(s)
|
||||
if sinks[sl] {
|
||||
return nil, nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
|
||||
}
|
||||
|
||||
sinks[sl] = true
|
||||
}
|
||||
|
||||
loop:
|
||||
for k := range sinks {
|
||||
switch k {
|
||||
case string(indexer.NULL):
|
||||
// When we see null in the config, the eventsinks will be reset with the
|
||||
// nullEventSink.
|
||||
eventSinks = []indexer.EventSink{null.NewEventSink()}
|
||||
break loop
|
||||
|
||||
case string(indexer.KV):
|
||||
store, err := dbProvider(&cfg.DBContext{ID: "tx_index", Config: config})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
eventSinks = append(eventSinks, kv.NewEventSink(store))
|
||||
|
||||
case string(indexer.PSQL):
|
||||
conn := config.TxIndex.PsqlConn
|
||||
if conn == "" {
|
||||
return nil, nil, errors.New("the psql connection settings cannot be empty")
|
||||
}
|
||||
|
||||
es, _, err := psql.NewEventSink(conn, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
eventSinks = append(eventSinks, es)
|
||||
|
||||
default:
|
||||
return nil, nil, errors.New("unsupported event sink type")
|
||||
}
|
||||
}
|
||||
|
||||
if len(eventSinks) == 0 {
|
||||
eventSinks = []indexer.EventSink{null.NewEventSink()}
|
||||
eventSinks, err := sink.EventSinksFromConfig(config, dbProvider, chainID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
indexerService := indexer.NewIndexerService(eventSinks, eventBus)
|
||||
|
||||
@@ -58,6 +58,10 @@ type peers interface {
|
||||
Peers() p2p.IPeerSet
|
||||
}
|
||||
|
||||
type WaitSyncChecker interface {
|
||||
WaitSync() bool
|
||||
}
|
||||
|
||||
//----------------------------------------------
|
||||
// Environment contains objects and interfaces used by the RPC. It is expected
|
||||
// to be setup once during startup.
|
||||
@@ -67,18 +71,18 @@ type Environment struct {
|
||||
ProxyAppMempool proxy.AppConnMempool
|
||||
|
||||
// interfaces defined in types and above
|
||||
StateStore sm.Store
|
||||
BlockStore sm.BlockStore
|
||||
EvidencePool sm.EvidencePool
|
||||
ConsensusState Consensus
|
||||
P2PPeers peers
|
||||
P2PTransport transport
|
||||
StateStore sm.Store
|
||||
BlockStore sm.BlockStore
|
||||
EvidencePool sm.EvidencePool
|
||||
ConsensusState Consensus
|
||||
WaitSyncChecker WaitSyncChecker
|
||||
P2PPeers peers
|
||||
P2PTransport transport
|
||||
|
||||
// objects
|
||||
PubKey crypto.PubKey
|
||||
GenDoc *types.GenesisDoc // cache the genesis structure
|
||||
EventSinks []indexer.EventSink
|
||||
ConsensusReactor *consensus.Reactor
|
||||
EventBus *types.EventBus // thread safe
|
||||
Mempool mempl.Mempool
|
||||
BlockSyncReactor consensus.BlockSyncReactor
|
||||
@@ -190,7 +194,7 @@ func (env *Environment) getHeight(latestHeight int64, heightPtr *int64) (int64,
|
||||
}
|
||||
|
||||
func (env *Environment) latestUncommittedHeight() int64 {
|
||||
nodeIsSyncing := env.ConsensusReactor.WaitSync()
|
||||
nodeIsSyncing := env.WaitSyncChecker.WaitSync()
|
||||
if nodeIsSyncing {
|
||||
return env.BlockStore.Height()
|
||||
}
|
||||
|
||||
@@ -10,48 +10,87 @@ type RoutesMap map[string]*rpc.RPCFunc
|
||||
|
||||
// Routes is a map of available routes.
|
||||
func (env *Environment) GetRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
// subscribe/unsubscribe are reserved for websocket events.
|
||||
"subscribe": rpc.NewWSRPCFunc(env.Subscribe, "query"),
|
||||
"unsubscribe": rpc.NewWSRPCFunc(env.Unsubscribe, "query"),
|
||||
"unsubscribe_all": rpc.NewWSRPCFunc(env.UnsubscribeAll, ""),
|
||||
|
||||
// info API
|
||||
"health": rpc.NewRPCFunc(env.Health, "", false),
|
||||
"status": rpc.NewRPCFunc(env.Status, "", false),
|
||||
"net_info": rpc.NewRPCFunc(env.NetInfo, "", false),
|
||||
"blockchain": rpc.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true),
|
||||
"genesis": rpc.NewRPCFunc(env.Genesis, "", true),
|
||||
"genesis_chunked": rpc.NewRPCFunc(env.GenesisChunked, "chunk", true),
|
||||
"block": rpc.NewRPCFunc(env.Block, "height", true),
|
||||
"block_by_hash": rpc.NewRPCFunc(env.BlockByHash, "hash", true),
|
||||
"block_results": rpc.NewRPCFunc(env.BlockResults, "height", true),
|
||||
"commit": rpc.NewRPCFunc(env.Commit, "height", true),
|
||||
"check_tx": rpc.NewRPCFunc(env.CheckTx, "tx", true),
|
||||
"tx": rpc.NewRPCFunc(env.Tx, "hash,prove", true),
|
||||
"tx_search": rpc.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false),
|
||||
"block_search": rpc.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false),
|
||||
"validators": rpc.NewRPCFunc(env.Validators, "height,page,per_page", true),
|
||||
}
|
||||
|
||||
func (env *Environment) GenesisRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"genesis": rpc.NewRPCFunc(env.Genesis, "", true),
|
||||
"genesis_chunked": rpc.NewRPCFunc(env.GenesisChunked, "chunk", true),
|
||||
}
|
||||
}
|
||||
|
||||
func (env *Environment) BlockchainLookupRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"block": rpc.NewRPCFunc(env.Block, "height", true),
|
||||
"blockchain": rpc.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", true),
|
||||
"block_by_hash": rpc.NewRPCFunc(env.BlockByHash, "hash", true),
|
||||
"block_results": rpc.NewRPCFunc(env.BlockResults, "height", true),
|
||||
"block_search": rpc.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by", false),
|
||||
"tx": rpc.NewRPCFunc(env.Tx, "hash,prove", true),
|
||||
"tx_search": rpc.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by", false),
|
||||
"commit": rpc.NewRPCFunc(env.Commit, "height", true),
|
||||
"validators": rpc.NewRPCFunc(env.Validators, "height,page,per_page", true),
|
||||
"consensus_params": rpc.NewRPCFunc(env.ConsensusParams, "height", true),
|
||||
}
|
||||
}
|
||||
|
||||
func (env *Environment) ConsensusStateRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"dump_consensus_state": rpc.NewRPCFunc(env.DumpConsensusState, "", false),
|
||||
"consensus_state": rpc.NewRPCFunc(env.GetConsensusState, "", false),
|
||||
"consensus_params": rpc.NewRPCFunc(env.ConsensusParams, "height", true),
|
||||
"unconfirmed_txs": rpc.NewRPCFunc(env.UnconfirmedTxs, "limit", false),
|
||||
"num_unconfirmed_txs": rpc.NewRPCFunc(env.NumUnconfirmedTxs, "", false),
|
||||
}
|
||||
}
|
||||
|
||||
// tx broadcast API
|
||||
func (env *Environment) UnconfirmedTxRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"unconfirmed_txs": rpc.NewRPCFunc(env.UnconfirmedTxs, "limit", false),
|
||||
"num_unconfirmed_txs": rpc.NewRPCFunc(env.NumUnconfirmedTxs, "", false),
|
||||
"check_tx": rpc.NewRPCFunc(env.CheckTx, "tx", true),
|
||||
}
|
||||
}
|
||||
|
||||
func (env *Environment) StatusRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"health": rpc.NewRPCFunc(env.Health, "", false),
|
||||
"status": rpc.NewRPCFunc(env.Status, "", false),
|
||||
}
|
||||
}
|
||||
|
||||
func (env *Environment) P2PInfoRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"net_info": rpc.NewRPCFunc(env.NetInfo, "", false),
|
||||
}
|
||||
}
|
||||
|
||||
func (env *Environment) BroadcastTxRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"broadcast_tx_commit": rpc.NewRPCFunc(env.BroadcastTxCommit, "tx", false),
|
||||
"broadcast_tx_sync": rpc.NewRPCFunc(env.BroadcastTxSync, "tx", false),
|
||||
"broadcast_tx_async": rpc.NewRPCFunc(env.BroadcastTxAsync, "tx", false),
|
||||
|
||||
// abci API
|
||||
}
|
||||
}
|
||||
func (env *Environment) ABCIQueryRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"abci_query": rpc.NewRPCFunc(env.ABCIQuery, "path,data,height,prove", false),
|
||||
"abci_info": rpc.NewRPCFunc(env.ABCIInfo, "", true),
|
||||
}
|
||||
}
|
||||
|
||||
// evidence API
|
||||
func (env *Environment) EvidenceRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"broadcast_evidence": rpc.NewRPCFunc(env.BroadcastEvidence, "evidence", false),
|
||||
}
|
||||
}
|
||||
|
||||
func (env *Environment) SubscribeRoutes() RoutesMap {
|
||||
return RoutesMap{
|
||||
"subscribe": rpc.NewWSRPCFunc(env.Subscribe, "query"),
|
||||
"unsubscribe": rpc.NewWSRPCFunc(env.Unsubscribe, "query"),
|
||||
"unsubscribe_all": rpc.NewWSRPCFunc(env.UnsubscribeAll, ""),
|
||||
}
|
||||
}
|
||||
|
||||
// AddUnsafeRoutes adds unsafe routes.
|
||||
func (env *Environment) AddUnsafe(routes RoutesMap) {
|
||||
// control API
|
||||
@@ -59,3 +98,6 @@ func (env *Environment) AddUnsafe(routes RoutesMap) {
|
||||
routes["dial_peers"] = rpc.NewRPCFunc(env.UnsafeDialPeers, "peers,persistent,unconditional,private", false)
|
||||
routes["unsafe_flush_mempool"] = rpc.NewRPCFunc(env.UnsafeFlushMempool, "", false)
|
||||
}
|
||||
func CombineRoutes(maps ...RoutesMap) RoutesMap {
|
||||
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ func (env *Environment) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, err
|
||||
EarliestBlockHeight: earliestBlockHeight,
|
||||
EarliestBlockTime: time.Unix(0, earliestBlockTimeNano),
|
||||
MaxPeerBlockHeight: env.BlockSyncReactor.GetMaxPeerBlockHeight(),
|
||||
CatchingUp: env.ConsensusReactor.WaitSync(),
|
||||
CatchingUp: env.WaitSyncChecker.WaitSync(),
|
||||
TotalSyncedTime: env.BlockSyncReactor.GetTotalSyncedTime(),
|
||||
RemainingTime: env.BlockSyncReactor.GetRemainingSyncTime(),
|
||||
},
|
||||
|
||||
@@ -110,7 +110,7 @@ func setup() {
|
||||
wm.SetLogger(tcpLogger)
|
||||
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
|
||||
config := server.DefaultConfig()
|
||||
listener1, err := server.Listen(tcpAddr, config)
|
||||
listener1, err := server.Listen(tcpAddr, config.MaxOpenConnections)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -126,7 +126,7 @@ func setup() {
|
||||
wm = server.NewWebsocketManager(Routes)
|
||||
wm.SetLogger(unixLogger)
|
||||
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
|
||||
listener2, err := server.Listen(unixAddr, config)
|
||||
listener2, err := server.Listen(unixAddr, config.MaxOpenConnections)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@@ -261,7 +261,7 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Listen starts a new net.Listener on the given address.
|
||||
// It returns an error if the address is invalid or the call to Listen() fails.
|
||||
func Listen(addr string, config *Config) (listener net.Listener, err error) {
|
||||
func Listen(addr string, maxOpenConnections int) (listener net.Listener, err error) {
|
||||
parts := strings.SplitN(addr, "://", 2)
|
||||
if len(parts) != 2 {
|
||||
return nil, fmt.Errorf(
|
||||
@@ -274,8 +274,8 @@ func Listen(addr string, config *Config) (listener net.Listener, err error) {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to listen on %v: %v", addr, err)
|
||||
}
|
||||
if config.MaxOpenConnections > 0 {
|
||||
listener = netutil.LimitListener(listener, config.MaxOpenConnections)
|
||||
if maxOpenConnections > 0 {
|
||||
listener = netutil.LimitListener(listener, maxOpenConnections)
|
||||
}
|
||||
|
||||
return listener, nil
|
||||
|
||||
@@ -39,8 +39,7 @@ func TestMaxOpenConnections(t *testing.T) {
|
||||
fmt.Fprint(w, "some body")
|
||||
})
|
||||
config := DefaultConfig()
|
||||
config.MaxOpenConnections = max
|
||||
l, err := Listen("tcp://127.0.0.1:0", config)
|
||||
l, err := Listen("tcp://127.0.0.1:0", max)
|
||||
require.NoError(t, err)
|
||||
defer l.Close()
|
||||
go Serve(l, mux, log.TestingLogger(), config) //nolint:errcheck // ignore for tests
|
||||
|
||||
@@ -33,7 +33,7 @@ func main() {
|
||||
|
||||
rpcserver.RegisterRPCFuncs(mux, routes, logger)
|
||||
config := rpcserver.DefaultConfig()
|
||||
listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config)
|
||||
listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config.MaxOpenConnections)
|
||||
if err != nil {
|
||||
tmos.Exit(err.Error())
|
||||
}
|
||||
|
||||
@@ -51,43 +51,47 @@ func (is *Service) OnStart() error {
|
||||
|
||||
go func() {
|
||||
for {
|
||||
msg := <-blockHeadersSub.Out()
|
||||
select {
|
||||
case <-blockHeadersSub.Canceled():
|
||||
return
|
||||
case msg := <-blockHeadersSub.Out():
|
||||
|
||||
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
|
||||
height := eventDataHeader.Header.Height
|
||||
batch := NewBatch(eventDataHeader.NumTxs)
|
||||
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
|
||||
height := eventDataHeader.Header.Height
|
||||
batch := NewBatch(eventDataHeader.NumTxs)
|
||||
|
||||
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
|
||||
msg2 := <-txsSub.Out()
|
||||
txResult := msg2.Data().(types.EventDataTx).TxResult
|
||||
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
|
||||
msg2 := <-txsSub.Out()
|
||||
txResult := msg2.Data().(types.EventDataTx).TxResult
|
||||
|
||||
if err = batch.Add(&txResult); err != nil {
|
||||
is.Logger.Error(
|
||||
"failed to add tx to batch",
|
||||
"height", height,
|
||||
"index", txResult.Index,
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if !IndexingEnabled(is.eventSinks) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sink := range is.eventSinks {
|
||||
if err := sink.IndexBlockEvents(eventDataHeader); err != nil {
|
||||
is.Logger.Error("failed to index block", "height", height, "err", err)
|
||||
} else {
|
||||
is.Logger.Debug("indexed block", "height", height, "sink", sink.Type())
|
||||
if err = batch.Add(&txResult); err != nil {
|
||||
is.Logger.Error(
|
||||
"failed to add tx to batch",
|
||||
"height", height,
|
||||
"index", txResult.Index,
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if len(batch.Ops) > 0 {
|
||||
err := sink.IndexTxEvents(batch.Ops)
|
||||
if err != nil {
|
||||
is.Logger.Error("failed to index block txs", "height", height, "err", err)
|
||||
if !IndexingEnabled(is.eventSinks) {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sink := range is.eventSinks {
|
||||
if err := sink.IndexBlockEvents(eventDataHeader); err != nil {
|
||||
is.Logger.Error("failed to index block", "height", height, "err", err)
|
||||
} else {
|
||||
is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type())
|
||||
is.Logger.Debug("indexed block", "height", height, "sink", sink.Type())
|
||||
}
|
||||
|
||||
if len(batch.Ops) > 0 {
|
||||
err := sink.IndexTxEvents(batch.Ops)
|
||||
if err != nil {
|
||||
is.Logger.Error("failed to index block txs", "height", height, "err", err)
|
||||
} else {
|
||||
is.Logger.Debug("indexed txs", "height", height, "sink", sink.Type())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
65
state/indexer/sink/sink.go
Normal file
65
state/indexer/sink/sink.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package sink
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/state/indexer"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink/kv"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink/null"
|
||||
"github.com/tendermint/tendermint/state/indexer/sink/psql"
|
||||
)
|
||||
|
||||
// EventSinksFromConfig constructs a slice of indexer.EventSink using the provided
|
||||
// configuration.
|
||||
//
|
||||
//nolint:lll
|
||||
func EventSinksFromConfig(cfg *config.Config, dbProvider config.DBProvider, chainID string) ([]indexer.EventSink, error) {
|
||||
if len(cfg.TxIndex.Indexer) == 0 {
|
||||
return []indexer.EventSink{null.NewEventSink()}, nil
|
||||
}
|
||||
|
||||
// check for duplicated sinks
|
||||
sinks := map[string]struct{}{}
|
||||
for _, s := range cfg.TxIndex.Indexer {
|
||||
sl := strings.ToLower(s)
|
||||
if _, ok := sinks[sl]; ok {
|
||||
return nil, errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
|
||||
}
|
||||
sinks[sl] = struct{}{}
|
||||
}
|
||||
eventSinks := []indexer.EventSink{}
|
||||
for k := range sinks {
|
||||
switch k {
|
||||
case string(indexer.NULL):
|
||||
// When we see null in the config, the eventsinks will be reset with the
|
||||
// nullEventSink.
|
||||
return []indexer.EventSink{null.NewEventSink()}, nil
|
||||
|
||||
case string(indexer.KV):
|
||||
store, err := dbProvider(&config.DBContext{ID: "tx_index", Config: cfg})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eventSinks = append(eventSinks, kv.NewEventSink(store))
|
||||
|
||||
case string(indexer.PSQL):
|
||||
conn := cfg.TxIndex.PsqlConn
|
||||
if conn == "" {
|
||||
return nil, errors.New("the psql connection settings cannot be empty")
|
||||
}
|
||||
|
||||
es, _, err := psql.NewEventSink(conn, chainID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventSinks = append(eventSinks, es)
|
||||
default:
|
||||
return nil, errors.New("unsupported event sink type")
|
||||
}
|
||||
}
|
||||
return eventSinks, nil
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user