inspect: resurrect the inspect command (#9655)

resurrect the inspect command from #6785

Co-authored-by: Sam Kleinman <garen@tychoish.com>
Co-authored-by: Thane Thomson <connect@thanethomson.com>
Co-authored-by: Callum Waters <cmwaters19@gmail.com>
This commit is contained in:
William Banfield
2022-12-07 12:15:55 -05:00
committed by GitHub
parent ac48630fdb
commit 3324f49fb7
53 changed files with 1500 additions and 356 deletions

View File

@@ -12,6 +12,8 @@
- Go API
- [p2p] \#9625 Remove unused p2p/trust package (@cmwaters)
- [rpc] \#9655 Remove global environment and replace with constructor. (@williambanfield,@tychoish)
- [node] \#9655 Move DBContext and DBProvider from the node package to the config package. (@williambanfield,@tychoish)
- Blockchain Protocol
@@ -22,6 +24,7 @@
- [tools/tm-signer-harness] \#6498 Set OS home dir to instead of the hardcoded PATH. (@JayT106)
- [metrics] \#9682 move state-syncing and block-syncing metrics to their respective packages (@cmwaters)
labels have moved from block_syncing -> blocksync_syncing and state_syncing -> statesync_syncing
- [inspect] \#9655 Add a new `inspect` command for introspecting the state and block store of a crashed tendermint node. (@williambanfield)
### FEATURES

View File

@@ -0,0 +1,87 @@
package commands
import (
"context"
"os"
"os/signal"
"syscall"
"github.com/spf13/cobra"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/inspect"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer/block"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
)
// InspectCmd is the command for starting an inspect server.
var InspectCmd = &cobra.Command{
Use: "inspect",
Short: "Run an inspect server for investigating Tendermint state",
Long: `
inspect runs a subset of Tendermint's RPC endpoints that are useful for debugging
issues with Tendermint.
When the Tendermint consensus engine detects inconsistent state, it will crash the
Tendermint process. Tendermint will not start up while in this inconsistent state.
The inspect command can be used to query the block and state store using Tendermint
RPC calls to debug issues of inconsistent state.
`,
RunE: runInspect,
}
func init() {
InspectCmd.Flags().
String("rpc.laddr",
config.RPC.ListenAddress, "RPC listenener address. Port required")
InspectCmd.Flags().
String("db-backend",
config.DBBackend, "database backend: goleveldb | cleveldb | boltdb | rocksdb | badgerdb")
InspectCmd.Flags().
String("db-dir", config.DBPath, "database directory")
}
func runInspect(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
go func() {
<-c
cancel()
}()
blockStoreDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "blockstore", Config: config})
if err != nil {
return err
}
blockStore := store.NewBlockStore(blockStoreDB)
defer blockStore.Close()
stateDB, err := cfg.DefaultDBProvider(&cfg.DBContext{ID: "state", Config: config})
if err != nil {
return err
}
stateStore := state.NewStore(stateDB, state.StoreOptions{DiscardABCIResponses: false})
defer stateStore.Close()
genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
if err != nil {
return err
}
txIndexer, blockIndexer, err := block.IndexerFromConfig(config, cfg.DefaultDBProvider, genDoc.ChainID)
if err != nil {
return err
}
ins := inspect.New(config.RPC, blockStore, stateStore, txIndexer, blockIndexer, logger)
logger.Info("starting inspect server")
if err := ins.Run(ctx); err != nil {
return err
}
return nil
}

View File

@@ -30,6 +30,7 @@ func main() {
cmd.VersionCmd,
cmd.RollbackStateCmd,
cmd.CompactGoLevelDBCmd,
cmd.InspectCmd,
debug.DebugCmd,
cli.NewCompletionCmd(rootCmd, true),
)

30
config/db.go Normal file
View File

@@ -0,0 +1,30 @@
package config
import (
"context"
dbm "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
// ServiceProvider takes a config and a logger and returns a ready to go Node.
type ServiceProvider func(context.Context, *Config, log.Logger) (service.Service, error)
// DBContext specifies config information for loading a new DB.
type DBContext struct {
ID string
Config *Config
}
// DBProvider takes a DBContext and returns an instantiated DB.
type DBProvider func(*DBContext) (dbm.DB, error)
// DefaultDBProvider returns a database using the DBBackend and DBDir
// specified in the Config.
func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
dbType := dbm.BackendType(ctx.Config.DBBackend)
return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir())
}

View File

@@ -54,7 +54,7 @@ func (cs *State) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscr
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("roundState mismatch. Got %v; Expected %v", m2, m)
}
case <-newStepSub.Cancelled():
case <-newStepSub.Canceled():
return fmt.Errorf("failed to read off newStepSub.Out(). newStepSub was canceled")
case <-ticker:
return fmt.Errorf("failed to read off newStepSub.Out()")

View File

@@ -97,7 +97,7 @@ func startNewStateAndWaitForBlock(t *testing.T, consensusReplayConfig *cfg.Confi
require.NoError(t, err)
select {
case <-newBlockSub.Out():
case <-newBlockSub.Cancelled():
case <-newBlockSub.Canceled():
t.Fatal("newBlockSub was canceled")
case <-time.After(120 * time.Second):
t.Fatal("Timed out waiting for new block (see trace above)")
@@ -1198,6 +1198,7 @@ func (bs *mockBlockStore) PruneBlocks(height int64, state sm.State) (uint64, int
}
func (bs *mockBlockStore) DeleteLatestBlock() error { return nil }
func (bs *mockBlockStore) Close() error { return nil }
//---------------------------------------
// Test handshake/init chain

View File

@@ -55,3 +55,47 @@ given destination directory. Each archive will contain:
Note: goroutine.out and heap.out will only be written if a profile address is
provided and is operational. This command is blocking and will log any error.
## Tendermint Inspect
Tendermint includes an `inspect` command for querying Tendermint's state store and block
store over Tendermint RPC.
When the Tendermint consensus engine detects inconsistent state, it will crash the
entire Tendermint process.
While in this inconsistent state, a node running Tendermint's consensus engine will not start up.
The `inspect` command runs only a subset of Tendermint's RPC endpoints for querying the block store
and state store.
`inspect` allows operators to query a read-only view of the stage.
`inspect` does not run the consensus engine at all and can therefore be used to debug
processes that have crashed due to inconsistent state.
### Running inspect
Start up the `inspect` tool on the machine where Tendermint crashed using:
```bash
tendermint inspect --home=</path/to/app.d>
```
`inspect` will use the data directory specified in your Tendermint configuration file.
`inspect` will also run the RPC server at the address specified in your Tendermint configuration file.
### Using inspect
With the `inspect` server running, you can access RPC endpoints that are critically important
for debugging.
Calling the `/status`, `/consensus_state` and `/dump_consensus_state` RPC endpoint
will return useful information about the Tendermint consensus state.
To start the `inspect` process, run
```bash
tendermint inspect
```
### RPC endpoints
The list of available RPC endpoints can be found by making a request to the RPC port.
For an `inspect` process running on `127.0.0.1:26657`, navigate your browser to
`http://127.0.0.1:26657/` to retrieve the list of enabled RPC endpoints.
Additional information on the Tendermint RPC endpoints can be found in the [rpc documentation](https://docs.tendermint.com/master/rpc).

1
go.sum
View File

@@ -1361,6 +1361,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=

36
inspect/doc.go Normal file
View File

@@ -0,0 +1,36 @@
/*
Package inspect provides a tool for investigating the state of a
failed Tendermint node.
This package provides the Inspector type. The Inspector type runs a subset of the Tendermint
RPC endpoints that are useful for debugging issues with Tendermint consensus.
When a node running the Tendermint consensus engine detects an inconsistent consensus state,
the entire node will crash. The Tendermint consensus engine cannot run in this
inconsistent state so the node will not be able to start up again.
The RPC endpoints provided by the Inspector type allow for a node operator to inspect
the block store and state store to better understand what may have caused the inconsistent state.
The Inspector type's lifecycle is controlled by a context.Context
ins := inspect.NewFromConfig(rpcConfig)
ctx, cancelFunc:= context.WithCancel(context.Background())
// Run blocks until the Inspector server is shut down.
go ins.Run(ctx)
...
// calling the cancel function will stop the running inspect server
cancelFunc()
Inspector serves its RPC endpoints on the address configured in the RPC configuration
rpcConfig.ListenAddress = "tcp://127.0.0.1:26657"
ins := inspect.NewFromConfig(rpcConfig)
go ins.Run(ctx)
The list of available RPC endpoints can then be viewed by navigating to
http://127.0.0.1:26657/ in the web browser.
*/
package inspect

138
inspect/inspect.go Normal file
View File

@@ -0,0 +1,138 @@
package inspect
import (
"context"
"errors"
"net"
"os"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/inspect/rpc"
"github.com/tendermint/tendermint/libs/log"
tmstrings "github.com/tendermint/tendermint/libs/strings"
rpccore "github.com/tendermint/tendermint/rpc/core"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/indexer/block"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
"golang.org/x/sync/errgroup"
)
var (
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
)
// Inspector manages an RPC service that exports methods to debug a failed node.
// After a node shuts down due to a consensus failure, it will no longer start
// up its state cannot easily be inspected. An Inspector value provides a similar interface
// to the node, using the underlying Tendermint data stores, without bringing up
// any other components. A caller can query the Inspector service to inspect the
// persisted state and debug the failure.
type Inspector struct {
routes rpccore.RoutesMap
config *config.RPCConfig
logger log.Logger
// References to the state store and block store are maintained to enable
// the Inspector to safely close them on shutdown.
ss state.Store
bs state.BlockStore
}
// New returns an Inspector that serves RPC on the specified BlockStore and StateStore.
// The Inspector type does not modify the state or block stores.
// The sinks are used to enable block and transaction querying via the RPC server.
// The caller is responsible for starting and stopping the Inspector service.
//
//nolint:lll
func New(cfg *config.RPCConfig, bs state.BlockStore, ss state.Store, txidx txindex.TxIndexer, blkidx indexer.BlockIndexer, lg log.Logger) *Inspector {
routes := rpc.Routes(*cfg, ss, bs, txidx, blkidx, logger)
eb := types.NewEventBus()
eb.SetLogger(logger.With("module", "events"))
return &Inspector{
routes: routes,
config: cfg,
logger: logger,
ss: ss,
bs: bs,
}
}
// NewFromConfig constructs an Inspector using the values defined in the passed in config.
func NewFromConfig(cfg *config.Config) (*Inspector, error) {
bsDB, err := config.DefaultDBProvider(&config.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return nil, err
}
bs := store.NewBlockStore(bsDB)
sDB, err := config.DefaultDBProvider(&config.DBContext{ID: "state", Config: cfg})
if err != nil {
return nil, err
}
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
if err != nil {
return nil, err
}
txidx, blkidx, err := block.IndexerFromConfig(cfg, config.DefaultDBProvider, genDoc.ChainID)
if err != nil {
return nil, err
}
lg := logger.With("module", "inspect")
ss := state.NewStore(sDB, state.StoreOptions{})
return New(cfg.RPC, bs, ss, txidx, blkidx, lg), nil
}
// Run starts the Inspector servers and blocks until the servers shut down. The passed
// in context is used to control the lifecycle of the servers.
func (ins *Inspector) Run(ctx context.Context) error {
defer ins.bs.Close()
defer ins.ss.Close()
return startRPCServers(ctx, ins.config, ins.logger, ins.routes)
}
func startRPCServers(ctx context.Context, cfg *config.RPCConfig, logger log.Logger, routes rpccore.RoutesMap) error {
g, tctx := errgroup.WithContext(ctx)
listenAddrs := tmstrings.SplitAndTrimEmpty(cfg.ListenAddress, ",", " ")
rh := rpc.Handler(cfg, routes, logger)
for _, listenerAddr := range listenAddrs {
server := rpc.Server{
Logger: logger,
Config: cfg,
Handler: rh,
Addr: listenerAddr,
}
if cfg.IsTLSEnabled() {
keyFile := cfg.KeyFile()
certFile := cfg.CertFile()
listenerAddr := listenerAddr
g.Go(func() error {
logger.Info("RPC HTTPS server starting", "address", listenerAddr,
"certfile", certFile, "keyfile", keyFile)
err := server.ListenAndServeTLS(tctx, certFile, keyFile)
if !errors.Is(err, net.ErrClosed) {
return err
}
logger.Info("RPC HTTPS server stopped", "address", listenerAddr)
return nil
})
} else {
listenerAddr := listenerAddr
g.Go(func() error {
logger.Info("RPC HTTP server starting", "address", listenerAddr)
err := server.ListenAndServe(tctx)
if !errors.Is(err, net.ErrClosed) {
return err
}
logger.Info("RPC HTTP server stopped", "address", listenerAddr)
return nil
})
}
}
return g.Wait()
}

605
inspect/inspect_test.go Normal file
View File

@@ -0,0 +1,605 @@
package inspect_test
import (
"context"
"fmt"
"net"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
abcitypes "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/inspect"
"github.com/tendermint/tendermint/internal/test"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/proto/tendermint/state"
httpclient "github.com/tendermint/tendermint/rpc/client/http"
indexermocks "github.com/tendermint/tendermint/state/indexer/mocks"
statemocks "github.com/tendermint/tendermint/state/mocks"
txindexmocks "github.com/tendermint/tendermint/state/txindex/mocks"
"github.com/tendermint/tendermint/types"
)
func TestInspectConstructor(t *testing.T) {
cfg := test.ResetTestRoot("test")
t.Cleanup(leaktest.Check(t))
defer func() { _ = os.RemoveAll(cfg.RootDir) }()
t.Run("from config", func(t *testing.T) {
d, err := inspect.NewFromConfig(cfg)
require.NoError(t, err)
require.NotNil(t, d)
})
}
func TestInspectRun(t *testing.T) {
cfg := test.ResetTestRoot("test")
t.Cleanup(leaktest.Check(t))
defer func() { _ = os.RemoveAll(cfg.RootDir) }()
t.Run("from config", func(t *testing.T) {
d, err := inspect.NewFromConfig(cfg)
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
stoppedWG := &sync.WaitGroup{}
stoppedWG.Add(1)
go func() {
require.NoError(t, d.Run(ctx))
stoppedWG.Done()
}()
cancel()
stoppedWG.Wait()
})
}
func TestBlock(t *testing.T) {
testHeight := int64(1)
testBlock := new(types.Block)
testBlock.Header.Height = testHeight
testBlock.Header.LastCommitHash = []byte("test hash")
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("Base").Return(int64(0))
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{})
blockStoreMock.On("LoadBlock", testHeight).Return(testBlock)
blockStoreMock.On("Close").Return(nil)
txIndexerMock := &txindexmocks.TxIndexer{}
blkIdxMock := &indexermocks.BlockIndexer{}
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
resultBlock, err := cli.Block(context.Background(), &testHeight)
require.NoError(t, err)
require.Equal(t, testBlock.Height, resultBlock.Block.Height)
require.Equal(t, testBlock.LastCommitHash, resultBlock.Block.LastCommitHash)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestTxSearch(t *testing.T) {
testHash := []byte("test")
testTx := []byte("tx")
testQuery := fmt.Sprintf("tx.hash='%s'", string(testHash))
testTxResult := &abcitypes.TxResult{
Height: 1,
Index: 100,
Tx: testTx,
}
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Close").Return(nil)
txIndexerMock := &txindexmocks.TxIndexer{}
blkIdxMock := &indexermocks.BlockIndexer{}
txIndexerMock.On("Search", mock.Anything,
mock.MatchedBy(func(q *query.Query) bool {
return testQuery == strings.ReplaceAll(q.String(), " ", "")
})).
Return([]*abcitypes.TxResult{testTxResult}, nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
var page = 1
resultTxSearch, err := cli.TxSearch(context.Background(), testQuery, false, &page, &page, "")
require.NoError(t, err)
require.Len(t, resultTxSearch.Txs, 1)
require.Equal(t, types.Tx(testTx), resultTxSearch.Txs[0].Tx)
cancel()
wg.Wait()
txIndexerMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
blockStoreMock.AssertExpectations(t)
}
func TestTx(t *testing.T) {
testHash := []byte("test")
testTx := []byte("tx")
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Close").Return(nil)
blkIdxMock := &indexermocks.BlockIndexer{}
txIndexerMock := &txindexmocks.TxIndexer{}
txIndexerMock.On("Get", testHash).Return(&abcitypes.TxResult{
Tx: testTx,
}, nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
res, err := cli.Tx(context.Background(), testHash, false)
require.NoError(t, err)
require.Equal(t, types.Tx(testTx), res.Tx)
cancel()
wg.Wait()
txIndexerMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
blockStoreMock.AssertExpectations(t)
}
func TestConsensusParams(t *testing.T) {
testHeight := int64(1)
testMaxGas := int64(55)
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Close").Return(nil)
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("Base").Return(int64(0))
stateStoreMock.On("LoadConsensusParams", testHeight).Return(types.ConsensusParams{
Block: types.BlockParams{
MaxGas: testMaxGas,
},
}, nil)
txIndexerMock := &txindexmocks.TxIndexer{}
blkIdxMock := &indexermocks.BlockIndexer{}
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
params, err := cli.ConsensusParams(context.Background(), &testHeight)
require.NoError(t, err)
require.Equal(t, params.ConsensusParams.Block.MaxGas, testMaxGas)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestBlockResults(t *testing.T) {
testHeight := int64(1)
testGasUsed := int64(100)
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
// tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
stateStoreMock.On("LoadABCIResponses", testHeight).Return(&state.ABCIResponses{
DeliverTxs: []*abcitypes.ResponseDeliverTx{
{
GasUsed: testGasUsed,
},
},
EndBlock: &abcitypes.ResponseEndBlock{},
BeginBlock: &abcitypes.ResponseBeginBlock{},
}, nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Close").Return(nil)
blockStoreMock.On("Base").Return(int64(0))
blockStoreMock.On("Height").Return(testHeight)
txIndexerMock := &txindexmocks.TxIndexer{}
blkIdxMock := &indexermocks.BlockIndexer{}
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
res, err := cli.BlockResults(context.Background(), &testHeight)
require.NoError(t, err)
require.Equal(t, res.TxsResults[0].GasUsed, testGasUsed)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestCommit(t *testing.T) {
testHeight := int64(1)
testRound := int32(101)
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Close").Return(nil)
blockStoreMock.On("Base").Return(int64(0))
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{}, nil)
blockStoreMock.On("LoadSeenCommit", testHeight).Return(&types.Commit{
Height: testHeight,
Round: testRound,
}, nil)
txIndexerMock := &txindexmocks.TxIndexer{}
blkIdxMock := &indexermocks.BlockIndexer{}
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
res, err := cli.Commit(context.Background(), &testHeight)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, res.SignedHeader.Commit.Round, testRound)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestBlockByHash(t *testing.T) {
testHeight := int64(1)
testHash := []byte("test hash")
testBlock := new(types.Block)
testBlock.Header.Height = testHeight
testBlock.Header.LastCommitHash = testHash
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Close").Return(nil)
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{
BlockID: types.BlockID{
Hash: testHash,
},
Header: types.Header{
Height: testHeight,
},
}, nil)
blockStoreMock.On("LoadBlockByHash", testHash).Return(testBlock, nil)
txIndexerMock := &txindexmocks.TxIndexer{}
blkIdxMock := &indexermocks.BlockIndexer{}
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
res, err := cli.BlockByHash(context.Background(), testHash)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, []byte(res.BlockID.Hash), testHash)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestBlockchain(t *testing.T) {
testHeight := int64(1)
testBlock := new(types.Block)
testBlockHash := []byte("test hash")
testBlock.Header.Height = testHeight
testBlock.Header.LastCommitHash = testBlockHash
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Close").Return(nil)
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("Base").Return(int64(0))
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{
BlockID: types.BlockID{
Hash: testBlockHash,
},
})
txIndexerMock := &txindexmocks.TxIndexer{}
blkIdxMock := &indexermocks.BlockIndexer{}
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
res, err := cli.BlockchainInfo(context.Background(), 0, 100)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, testBlockHash, []byte(res.BlockMetas[0].BlockID.Hash))
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestValidators(t *testing.T) {
testHeight := int64(1)
testVotingPower := int64(100)
testValidators := types.ValidatorSet{
Validators: []*types.Validator{
{
VotingPower: testVotingPower,
},
},
}
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
stateStoreMock.On("LoadValidators", testHeight).Return(&testValidators, nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Close").Return(nil)
blockStoreMock.On("Height").Return(testHeight)
blockStoreMock.On("Base").Return(int64(0))
txIndexerMock := &txindexmocks.TxIndexer{}
blkIdxMock := &indexermocks.BlockIndexer{}
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
testPage := 1
testPerPage := 100
res, err := cli.Validators(context.Background(), &testHeight, &testPage, &testPerPage)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, testVotingPower, res.Validators[0].VotingPower)
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func TestBlockSearch(t *testing.T) {
testHeight := int64(1)
testBlockHash := []byte("test hash")
testQuery := "block.height = 1"
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("Close").Return(nil)
blockStoreMock := &statemocks.BlockStore{}
blockStoreMock.On("Close").Return(nil)
txIndexerMock := &txindexmocks.TxIndexer{}
blkIdxMock := &indexermocks.BlockIndexer{}
blockStoreMock.On("LoadBlock", testHeight).Return(&types.Block{
Header: types.Header{
Height: testHeight,
},
}, nil)
blockStoreMock.On("LoadBlockMeta", testHeight).Return(&types.BlockMeta{
BlockID: types.BlockID{
Hash: testBlockHash,
},
})
blkIdxMock.On("Search", mock.Anything,
mock.MatchedBy(func(q *query.Query) bool { return testQuery == q.String() })).
Return([]int64{testHeight}, nil)
rpcConfig := config.TestRPCConfig()
l := log.TestingLogger()
d := inspect.New(rpcConfig, blockStoreMock, stateStoreMock, txIndexerMock, blkIdxMock, l)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
startedWG := &sync.WaitGroup{}
startedWG.Add(1)
go func() {
startedWG.Done()
defer wg.Done()
require.NoError(t, d.Run(ctx))
}()
// FIXME: used to induce context switch.
// Determine more deterministic method for prompting a context switch
startedWG.Wait()
requireConnect(t, rpcConfig.ListenAddress, 20)
cli, err := httpclient.New(rpcConfig.ListenAddress, "/websocket")
require.NoError(t, err)
testPage := 1
testPerPage := 100
testOrderBy := "desc"
res, err := cli.BlockSearch(context.Background(), testQuery, &testPage, &testPerPage, testOrderBy)
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, testBlockHash, []byte(res.Blocks[0].BlockID.Hash))
cancel()
wg.Wait()
blockStoreMock.AssertExpectations(t)
stateStoreMock.AssertExpectations(t)
}
func requireConnect(t testing.TB, addr string, retries int) {
parts := strings.SplitN(addr, "://", 2)
if len(parts) != 2 {
t.Fatalf("malformed address to dial: %s", addr)
}
var err error
for i := 0; i < retries; i++ {
var conn net.Conn
conn, err = net.Dial(parts[0], parts[1])
if err == nil {
conn.Close()
return
}
// FIXME attempt to yield and let the other goroutine continue execution.
time.Sleep(time.Microsecond * 100)
}
t.Fatalf("unable to connect to server %s after %d tries: %s", addr, retries, err)
}

128
inspect/rpc/rpc.go Normal file
View File

@@ -0,0 +1,128 @@
package rpc
import (
"context"
"net/http"
"time"
"github.com/rs/cors"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/rpc/core"
"github.com/tendermint/tendermint/rpc/jsonrpc/server"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/state/txindex"
)
// Server defines parameters for running an Inspector rpc server.
type Server struct {
Addr string // TCP address to listen on, ":http" if empty
Handler http.Handler
Logger log.Logger
Config *config.RPCConfig
}
// Routes returns the set of routes used by the Inspector server.
func Routes(cfg config.RPCConfig, s state.Store, bs state.BlockStore, txidx txindex.TxIndexer, blkidx indexer.BlockIndexer, logger log.Logger) core.RoutesMap { //nolint: lll
env := &core.Environment{
Config: cfg,
BlockIndexer: blkidx,
TxIndexer: txidx,
StateStore: s,
BlockStore: bs,
ConsensusReactor: waitSyncCheckerImpl{},
Logger: logger,
}
return core.RoutesMap{
"blockchain": server.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight"),
"consensus_params": server.NewRPCFunc(env.ConsensusParams, "height"),
"block": server.NewRPCFunc(env.Block, "height"),
"block_by_hash": server.NewRPCFunc(env.BlockByHash, "hash"),
"block_results": server.NewRPCFunc(env.BlockResults, "height"),
"commit": server.NewRPCFunc(env.Commit, "height"),
"header": server.NewRPCFunc(env.Header, "height"),
"header_by_hash": server.NewRPCFunc(env.HeaderByHash, "hash"),
"validators": server.NewRPCFunc(env.Validators, "height,page,per_page"),
"tx": server.NewRPCFunc(env.Tx, "hash,prove"),
"tx_search": server.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by"),
"block_search": server.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by"),
}
}
// Handler returns the http.Handler configured for use with an Inspector server. Handler
// registers the routes on the http.Handler and also registers the websocket handler
// and the CORS handler if specified by the configuration options.
func Handler(rpcConfig *config.RPCConfig, routes core.RoutesMap, logger log.Logger) http.Handler {
mux := http.NewServeMux()
wmLogger := logger.With("protocol", "websocket")
wm := server.NewWebsocketManager(routes,
server.ReadLimit(rpcConfig.MaxBodyBytes))
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
server.RegisterRPCFuncs(mux, routes, logger)
var rootHandler http.Handler = mux
if rpcConfig.IsCorsEnabled() {
rootHandler = addCORSHandler(rpcConfig, mux)
}
return rootHandler
}
func addCORSHandler(rpcConfig *config.RPCConfig, h http.Handler) http.Handler {
corsMiddleware := cors.New(cors.Options{
AllowedOrigins: rpcConfig.CORSAllowedOrigins,
AllowedMethods: rpcConfig.CORSAllowedMethods,
AllowedHeaders: rpcConfig.CORSAllowedHeaders,
})
h = corsMiddleware.Handler(h)
return h
}
type waitSyncCheckerImpl struct{}
func (waitSyncCheckerImpl) WaitSync() bool {
return false
}
// ListenAndServe listens on the address specified in srv.Addr and handles any
// incoming requests over HTTP using the Inspector rpc handler specified on the server.
func (srv *Server) ListenAndServe(ctx context.Context) error {
listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections)
if err != nil {
return err
}
go func() {
<-ctx.Done()
listener.Close()
}()
return server.Serve(listener, srv.Handler, srv.Logger, serverRPCConfig(srv.Config))
}
// ListenAndServeTLS listens on the address specified in srv.Addr. ListenAndServeTLS handles
// incoming requests over HTTPS using the Inspector rpc handler specified on the server.
func (srv *Server) ListenAndServeTLS(ctx context.Context, certFile, keyFile string) error {
listener, err := server.Listen(srv.Addr, srv.Config.MaxOpenConnections)
if err != nil {
return err
}
go func() {
<-ctx.Done()
listener.Close()
}()
return server.ServeTLS(listener, srv.Handler, certFile, keyFile, srv.Logger, serverRPCConfig(srv.Config))
}
func serverRPCConfig(r *config.RPCConfig) *server.Config {
cfg := server.DefaultConfig()
cfg.MaxBodyBytes = r.MaxBodyBytes
cfg.MaxHeaderBytes = r.MaxHeaderBytes
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
if cfg.WriteTimeout <= r.TimeoutBroadcastTxCommit {
cfg.WriteTimeout = r.TimeoutBroadcastTxCommit + 1*time.Second
}
return cfg
}

View File

@@ -27,7 +27,7 @@
// select {
// case msg <- subscription.Out():
// // handle msg.Data() and msg.Events()
// case <-subscription.Cancelled():
// case <-subscription.Canceled():
// return subscription.Err()
// }
// }

View File

@@ -431,7 +431,7 @@ func benchmarkNClients(n int, b *testing.B) {
select {
case <-subscription.Out():
continue
case <-subscription.Cancelled():
case <-subscription.Canceled():
return
}
}
@@ -472,7 +472,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
select {
case <-subscription.Out():
continue
case <-subscription.Cancelled():
case <-subscription.Canceled():
return
}
}
@@ -501,7 +501,7 @@ func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message,
}
func assertCancelled(t *testing.T, subscription *pubsub.Subscription, err error) {
_, ok := <-subscription.Cancelled()
_, ok := <-subscription.Canceled()
assert.False(t, ok)
assert.Equal(t, err, subscription.Err())
}

View File

@@ -43,11 +43,9 @@ func (s *Subscription) Out() <-chan Message {
return s.out
}
// Cancelled returns a channel that's closed when the subscription is
// Canceled returns a channel that's closed when the subscription is
// terminated and supposed to be used in a select statement.
//
//nolint:misspell
func (s *Subscription) Cancelled() <-chan struct{} {
func (s *Subscription) Canceled() <-chan struct{} {
return s.canceled
}

View File

@@ -32,6 +32,27 @@ func SplitAndTrim(s, sep, cutset string) []string {
return spl
}
// SplitAndTrimEmpty slices s into all subslices separated by sep and returns a
// slice of the string s with all leading and trailing Unicode code points
// contained in cutset removed. If sep is empty, SplitAndTrim splits after each
// UTF-8 sequence. First part is equivalent to strings.SplitN with a count of
// -1. also filter out empty strings, only return non-empty strings.
func SplitAndTrimEmpty(s, sep, cutset string) []string {
if s == "" {
return []string{}
}
spl := strings.Split(s, sep)
nonEmptyStrings := make([]string, 0, len(spl))
for i := 0; i < len(spl); i++ {
element := strings.Trim(spl[i], cutset)
if element != "" {
nonEmptyStrings = append(nonEmptyStrings, element)
}
}
return nonEmptyStrings
}
// Returns true if s is a non-empty printable non-tab ascii character.
func IsASCIIText(s string) bool {
if len(s) == 0 {

View File

@@ -113,7 +113,7 @@ func (p *Proxy) listen() (net.Listener, *http.ServeMux, error) {
}
// 4) Start listening for new connections.
listener, err := rpcserver.Listen(p.Addr, p.Config)
listener, err := rpcserver.Listen(p.Addr, p.Config.MaxOpenConnections)
if err != nil {
return nil, mux, err
}

View File

@@ -141,7 +141,7 @@ func NewNode(config *cfg.Config,
nodeKey *p2p.NodeKey,
clientCreator proxy.ClientCreator,
genesisDocProvider GenesisDocProvider,
dbProvider DBProvider,
dbProvider cfg.DBProvider,
metricsProvider MetricsProvider,
logger log.Logger,
options ...Option,
@@ -503,12 +503,12 @@ func (n *Node) OnStop() {
}
// ConfigureRPC makes sure RPC has all the objects it needs to operate.
func (n *Node) ConfigureRPC() error {
func (n *Node) ConfigureRPC() (*rpccore.Environment, error) {
pubKey, err := n.privValidator.GetPubKey()
if err != nil {
return fmt.Errorf("can't get pubkey: %w", err)
if pubKey == nil || err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}
rpccore.SetEnvironment(&rpccore.Environment{
rpcCoreEnv := rpccore.Environment{
ProxyAppQuery: n.proxyApp.Query(),
ProxyAppMempool: n.proxyApp.Mempool(),
@@ -518,8 +518,8 @@ func (n *Node) ConfigureRPC() error {
ConsensusState: n.consensusState,
P2PPeers: n.sw,
P2PTransport: n,
PubKey: pubKey,
PubKey: pubKey,
GenDoc: n.genesisDoc,
TxIndexer: n.txIndexer,
BlockIndexer: n.blockIndexer,
@@ -530,24 +530,24 @@ func (n *Node) ConfigureRPC() error {
Logger: n.Logger.With("module", "rpc"),
Config: *n.config.RPC,
})
if err := rpccore.InitGenesisChunks(); err != nil {
return err
}
return nil
if err := rpcCoreEnv.InitGenesisChunks(); err != nil {
return nil, err
}
return &rpcCoreEnv, nil
}
func (n *Node) startRPC() ([]net.Listener, error) {
err := n.ConfigureRPC()
env, err := n.ConfigureRPC()
if err != nil {
return nil, err
}
listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")
routes := env.GetRoutes()
if n.config.RPC.Unsafe {
rpccore.AddUnsafeRoutes()
env.AddUnsafeRoutes(routes)
}
config := rpcserver.DefaultConfig()
@@ -567,7 +567,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server")
wmLogger := rpcLogger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(rpccore.Routes,
wm := rpcserver.NewWebsocketManager(routes,
rpcserver.OnDisconnect(func(remoteAddr string) {
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
@@ -579,10 +579,10 @@ func (n *Node) startRPC() ([]net.Listener, error) {
)
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
rpcserver.RegisterRPCFuncs(mux, routes, rpcLogger)
listener, err := rpcserver.Listen(
listenAddr,
config,
config.MaxOpenConnections,
)
if err != nil {
return nil, err
@@ -640,12 +640,12 @@ func (n *Node) startRPC() ([]net.Listener, error) {
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
listener, err := rpcserver.Listen(grpcListenAddr, config)
listener, err := rpcserver.Listen(grpcListenAddr, config.MaxOpenConnections)
if err != nil {
return nil, err
}
go func() {
if err := grpccore.StartGRPCServer(listener); err != nil {
if err := grpccore.StartGRPCServer(env, listener); err != nil {
n.Logger.Error("Error starting gRPC server", "err", err)
}
}()

View File

@@ -53,7 +53,7 @@ func TestNodeStartStop(t *testing.T) {
require.NoError(t, err)
select {
case <-blocksSub.Out():
case <-blocksSub.Cancelled():
case <-blocksSub.Canceled():
t.Fatal("blocksSub was canceled")
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for the node to produce a block")
@@ -461,7 +461,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
nodeKey,
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
DefaultGenesisDocProviderFunc(config),
DefaultDBProvider,
cfg.DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
log.TestingLogger(),
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKSYNC": customBlocksyncReactor}),

View File

@@ -7,6 +7,8 @@ import (
"fmt"
"net"
"strings"
_ "net/http/pprof" //nolint: gosec // securely exposed on separate, optional port
"time"
dbm "github.com/tendermint/tm-db"
@@ -17,6 +19,7 @@ import (
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/evidence"
"github.com/tendermint/tendermint/statesync"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
@@ -30,13 +33,8 @@ import (
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null"
"github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/state/indexer/block"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/statesync"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
@@ -44,24 +42,8 @@ import (
_ "github.com/lib/pq" // provide the psql db driver
)
// DBContext specifies config information for loading a new DB.
type DBContext struct {
ID string
Config *cfg.Config
}
// DBProvider takes a DBContext and returns an instantiated DB.
type DBProvider func(*DBContext) (dbm.DB, error)
const readHeaderTimeout = 10 * time.Second
// DefaultDBProvider returns a database using the DBBackend and DBDir
// specified in the ctx.Config.
func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
dbType := dbm.BackendType(ctx.Config.DBBackend)
return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir())
}
// GenesisDocProvider returns a GenesisDoc.
// It allows the GenesisDoc to be pulled from sources other than the
// filesystem, for instance from a distributed key-value store cluster.
@@ -92,7 +74,7 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
nodeKey,
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
DefaultGenesisDocProviderFunc(config),
DefaultDBProvider,
cfg.DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
logger,
)
@@ -124,15 +106,15 @@ type blockSyncReactor interface {
//------------------------------------------------------------------------------
func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
func initDBs(config *cfg.Config, dbProvider cfg.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&DBContext{"blockstore", config})
blockStoreDB, err = dbProvider(&cfg.DBContext{ID: "blockstore", Config: config})
if err != nil {
return
}
blockStore = store.NewBlockStore(blockStoreDB)
stateDB, err = dbProvider(&DBContext{"state", config})
stateDB, err = dbProvider(&cfg.DBContext{ID: "state", Config: config})
if err != nil {
return
}
@@ -161,7 +143,7 @@ func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
func createAndStartIndexerService(
config *cfg.Config,
chainID string,
dbProvider DBProvider,
dbProvider cfg.DBProvider,
eventBus *types.EventBus,
logger log.Logger,
) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) {
@@ -169,31 +151,9 @@ func createAndStartIndexerService(
txIndexer txindex.TxIndexer
blockIndexer indexer.BlockIndexer
)
switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, nil, nil, err
}
txIndexer = kv.NewTxIndex(store)
blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events")))
case "psql":
if config.TxIndex.PsqlConn == "" {
return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`)
}
es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID)
if err != nil {
return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err)
}
txIndexer = es.TxIndexer()
blockIndexer = es.BlockIndexer()
default:
txIndexer = &null.TxIndex{}
blockIndexer = &blockidxnull.BlockerIndexer{}
txIndexer, blockIndexer, err := block.IndexerFromConfig(config, dbProvider, chainID)
if err != nil {
return nil, nil, nil, err
}
indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
@@ -315,10 +275,10 @@ func createMempoolAndMempoolReactor(
}
}
func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
func createEvidenceReactor(config *cfg.Config, dbProvider cfg.DBProvider,
stateStore sm.Store, blockStore *store.BlockStore, logger log.Logger,
) (*evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&DBContext{"evidence", config})
evidenceDB, err := dbProvider(&cfg.DBContext{ID: "evidence", Config: config})
if err != nil {
return nil, nil, err
}

View File

@@ -41,22 +41,20 @@ type Local struct {
*types.EventBus
Logger log.Logger
ctx *rpctypes.Context
env *core.Environment
}
// NewLocal configures a client that calls the Node directly.
//
// Note that given how rpc/core works with package singletons, that
// you can only have one node per process. So make sure test cases
// don't run in parallel, or try to simulate an entire network in
// one process...
func New(node *nm.Node) *Local {
if err := node.ConfigureRPC(); err != nil {
env, err := node.ConfigureRPC()
if err != nil {
node.Logger.Error("Error configuring RPC", "err", err)
}
return &Local{
EventBus: node.EventBus(),
Logger: log.NewNopLogger(),
ctx: &rpctypes.Context{},
env: env,
}
}
@@ -68,11 +66,11 @@ func (c *Local) SetLogger(l log.Logger) {
}
func (c *Local) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
return core.Status(c.ctx)
return c.env.Status(c.ctx)
}
func (c *Local) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) {
return core.ABCIInfo(c.ctx)
return c.env.ABCIInfo(c.ctx)
}
func (c *Local) ABCIQuery(ctx context.Context, path string, data bytes.HexBytes) (*ctypes.ResultABCIQuery, error) {
@@ -84,55 +82,55 @@ func (c *Local) ABCIQueryWithOptions(
path string,
data bytes.HexBytes,
opts rpcclient.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
return core.ABCIQuery(c.ctx, path, data, opts.Height, opts.Prove)
return c.env.ABCIQuery(c.ctx, path, data, opts.Height, opts.Prove)
}
func (c *Local) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
return core.BroadcastTxCommit(c.ctx, tx)
return c.env.BroadcastTxCommit(c.ctx, tx)
}
func (c *Local) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxAsync(c.ctx, tx)
return c.env.BroadcastTxAsync(c.ctx, tx)
}
func (c *Local) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxSync(c.ctx, tx)
return c.env.BroadcastTxSync(c.ctx, tx)
}
func (c *Local) UnconfirmedTxs(ctx context.Context, limit *int) (*ctypes.ResultUnconfirmedTxs, error) {
return core.UnconfirmedTxs(c.ctx, limit)
return c.env.UnconfirmedTxs(c.ctx, limit)
}
func (c *Local) NumUnconfirmedTxs(ctx context.Context) (*ctypes.ResultUnconfirmedTxs, error) {
return core.NumUnconfirmedTxs(c.ctx)
return c.env.NumUnconfirmedTxs(c.ctx)
}
func (c *Local) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) {
return core.CheckTx(c.ctx, tx)
return c.env.CheckTx(c.ctx, tx)
}
func (c *Local) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) {
return core.NetInfo(c.ctx)
return c.env.NetInfo(c.ctx)
}
func (c *Local) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) {
return core.DumpConsensusState(c.ctx)
return c.env.DumpConsensusState(c.ctx)
}
func (c *Local) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) {
return core.ConsensusState(c.ctx)
return c.env.GetConsensusState(c.ctx)
}
func (c *Local) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) {
return core.ConsensusParams(c.ctx, height)
return c.env.ConsensusParams(c.ctx, height)
}
func (c *Local) Health(ctx context.Context) (*ctypes.ResultHealth, error) {
return core.Health(c.ctx)
return c.env.Health(c.ctx)
}
func (c *Local) DialSeeds(ctx context.Context, seeds []string) (*ctypes.ResultDialSeeds, error) {
return core.UnsafeDialSeeds(c.ctx, seeds)
return c.env.UnsafeDialSeeds(c.ctx, seeds)
}
func (c *Local) DialPeers(
@@ -142,51 +140,51 @@ func (c *Local) DialPeers(
unconditional,
private bool,
) (*ctypes.ResultDialPeers, error) {
return core.UnsafeDialPeers(c.ctx, peers, persistent, unconditional, private)
return c.env.UnsafeDialPeers(c.ctx, peers, persistent, unconditional, private)
}
func (c *Local) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
return core.BlockchainInfo(c.ctx, minHeight, maxHeight)
return c.env.BlockchainInfo(c.ctx, minHeight, maxHeight)
}
func (c *Local) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) {
return core.Genesis(c.ctx)
return c.env.Genesis(c.ctx)
}
func (c *Local) GenesisChunked(ctx context.Context, id uint) (*ctypes.ResultGenesisChunk, error) {
return core.GenesisChunked(c.ctx, id)
return c.env.GenesisChunked(c.ctx, id)
}
func (c *Local) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) {
return core.Block(c.ctx, height)
return c.env.Block(c.ctx, height)
}
func (c *Local) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) {
return core.BlockByHash(c.ctx, hash)
return c.env.BlockByHash(c.ctx, hash)
}
func (c *Local) BlockResults(ctx context.Context, height *int64) (*ctypes.ResultBlockResults, error) {
return core.BlockResults(c.ctx, height)
return c.env.BlockResults(c.ctx, height)
}
func (c *Local) Header(ctx context.Context, height *int64) (*ctypes.ResultHeader, error) {
return core.Header(c.ctx, height)
return c.env.Header(c.ctx, height)
}
func (c *Local) HeaderByHash(ctx context.Context, hash bytes.HexBytes) (*ctypes.ResultHeader, error) {
return core.HeaderByHash(c.ctx, hash)
return c.env.HeaderByHash(c.ctx, hash)
}
func (c *Local) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) {
return core.Commit(c.ctx, height)
return c.env.Commit(c.ctx, height)
}
func (c *Local) Validators(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error) {
return core.Validators(c.ctx, height, page, perPage)
return c.env.Validators(c.ctx, height, page, perPage)
}
func (c *Local) Tx(ctx context.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) {
return core.Tx(c.ctx, hash, prove)
return c.env.Tx(c.ctx, hash, prove)
}
func (c *Local) TxSearch(
@@ -197,7 +195,7 @@ func (c *Local) TxSearch(
perPage *int,
orderBy string,
) (*ctypes.ResultTxSearch, error) {
return core.TxSearch(c.ctx, query, prove, page, perPage, orderBy)
return c.env.TxSearch(c.ctx, query, prove, page, perPage, orderBy)
}
func (c *Local) BlockSearch(
@@ -206,11 +204,11 @@ func (c *Local) BlockSearch(
page, perPage *int,
orderBy string,
) (*ctypes.ResultBlockSearch, error) {
return core.BlockSearch(c.ctx, query, page, perPage, orderBy)
return c.env.BlockSearch(c.ctx, query, page, perPage, orderBy)
}
func (c *Local) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) {
return core.BroadcastEvidence(c.ctx, ev)
return c.env.BroadcastEvidence(c.ctx, ev)
}
func (c *Local) Subscribe(
@@ -262,7 +260,7 @@ func (c *Local) eventsRoutine(
c.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query)
}
}
case <-sub.Cancelled():
case <-sub.Canceled():
if sub.Err() == tmpubsub.ErrUnsubscribed {
return
}

View File

@@ -28,10 +28,6 @@ import (
)
// Client wraps arbitrary implementations of the various interfaces.
//
// We provide a few choices to mock out each one in this package.
// Nothing hidden here, so no New function, just construct it from
// some parts, and swap them out them during the tests.
type Client struct {
client.ABCIClient
client.SignClient
@@ -41,6 +37,14 @@ type Client struct {
client.EvidenceClient
client.MempoolClient
service.Service
env *core.Environment
}
func New() Client {
return Client{
env: &core.Environment{},
}
}
var _ client.Client = Client{}
@@ -80,11 +84,11 @@ func (c Call) GetResponse(args interface{}) (interface{}, error) {
}
func (c Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
return core.Status(&rpctypes.Context{})
return c.env.Status(&rpctypes.Context{})
}
func (c Client) ABCIInfo(ctx context.Context) (*ctypes.ResultABCIInfo, error) {
return core.ABCIInfo(&rpctypes.Context{})
return c.env.ABCIInfo(&rpctypes.Context{})
}
func (c Client) ABCIQuery(ctx context.Context, path string, data bytes.HexBytes) (*ctypes.ResultABCIQuery, error) {
@@ -96,47 +100,47 @@ func (c Client) ABCIQueryWithOptions(
path string,
data bytes.HexBytes,
opts client.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
return core.ABCIQuery(&rpctypes.Context{}, path, data, opts.Height, opts.Prove)
return c.env.ABCIQuery(&rpctypes.Context{}, path, data, opts.Height, opts.Prove)
}
func (c Client) BroadcastTxCommit(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
return core.BroadcastTxCommit(&rpctypes.Context{}, tx)
return c.env.BroadcastTxCommit(&rpctypes.Context{}, tx)
}
func (c Client) BroadcastTxAsync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxAsync(&rpctypes.Context{}, tx)
return c.env.BroadcastTxAsync(&rpctypes.Context{}, tx)
}
func (c Client) BroadcastTxSync(ctx context.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxSync(&rpctypes.Context{}, tx)
return c.env.BroadcastTxSync(&rpctypes.Context{}, tx)
}
func (c Client) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) {
return core.CheckTx(&rpctypes.Context{}, tx)
return c.env.CheckTx(&rpctypes.Context{}, tx)
}
func (c Client) NetInfo(ctx context.Context) (*ctypes.ResultNetInfo, error) {
return core.NetInfo(&rpctypes.Context{})
return c.env.NetInfo(&rpctypes.Context{})
}
func (c Client) ConsensusState(ctx context.Context) (*ctypes.ResultConsensusState, error) {
return core.ConsensusState(&rpctypes.Context{})
return c.env.GetConsensusState(&rpctypes.Context{})
}
func (c Client) DumpConsensusState(ctx context.Context) (*ctypes.ResultDumpConsensusState, error) {
return core.DumpConsensusState(&rpctypes.Context{})
return c.env.DumpConsensusState(&rpctypes.Context{})
}
func (c Client) ConsensusParams(ctx context.Context, height *int64) (*ctypes.ResultConsensusParams, error) {
return core.ConsensusParams(&rpctypes.Context{}, height)
return c.env.ConsensusParams(&rpctypes.Context{}, height)
}
func (c Client) Health(ctx context.Context) (*ctypes.ResultHealth, error) {
return core.Health(&rpctypes.Context{})
return c.env.Health(&rpctypes.Context{})
}
func (c Client) DialSeeds(ctx context.Context, seeds []string) (*ctypes.ResultDialSeeds, error) {
return core.UnsafeDialSeeds(&rpctypes.Context{}, seeds)
return c.env.UnsafeDialSeeds(&rpctypes.Context{}, seeds)
}
func (c Client) DialPeers(
@@ -146,33 +150,33 @@ func (c Client) DialPeers(
unconditional,
private bool,
) (*ctypes.ResultDialPeers, error) {
return core.UnsafeDialPeers(&rpctypes.Context{}, peers, persistent, unconditional, private)
return c.env.UnsafeDialPeers(&rpctypes.Context{}, peers, persistent, unconditional, private)
}
func (c Client) BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
return core.BlockchainInfo(&rpctypes.Context{}, minHeight, maxHeight)
return c.env.BlockchainInfo(&rpctypes.Context{}, minHeight, maxHeight)
}
func (c Client) Genesis(ctx context.Context) (*ctypes.ResultGenesis, error) {
return core.Genesis(&rpctypes.Context{})
return c.env.Genesis(&rpctypes.Context{})
}
func (c Client) Block(ctx context.Context, height *int64) (*ctypes.ResultBlock, error) {
return core.Block(&rpctypes.Context{}, height)
return c.env.Block(&rpctypes.Context{}, height)
}
func (c Client) BlockByHash(ctx context.Context, hash []byte) (*ctypes.ResultBlock, error) {
return core.BlockByHash(&rpctypes.Context{}, hash)
return c.env.BlockByHash(&rpctypes.Context{}, hash)
}
func (c Client) Commit(ctx context.Context, height *int64) (*ctypes.ResultCommit, error) {
return core.Commit(&rpctypes.Context{}, height)
return c.env.Commit(&rpctypes.Context{}, height)
}
func (c Client) Validators(ctx context.Context, height *int64, page, perPage *int) (*ctypes.ResultValidators, error) {
return core.Validators(&rpctypes.Context{}, height, page, perPage)
return c.env.Validators(&rpctypes.Context{}, height, page, perPage)
}
func (c Client) BroadcastEvidence(ctx context.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) {
return core.BroadcastEvidence(&rpctypes.Context{}, ev)
return c.env.BroadcastEvidence(&rpctypes.Context{}, ev)
}

View File

@@ -9,8 +9,8 @@ import (
)
// ABCIQuery queries the application for some information.
// More: https://docs.tendermint.com/main/rpc/#/ABCI/abci_query
func ABCIQuery(
// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_query
func (env *Environment) ABCIQuery(
ctx *rpctypes.Context,
path string,
data bytes.HexBytes,
@@ -31,8 +31,8 @@ func ABCIQuery(
}
// ABCIInfo gets some info about the application.
// More: https://docs.tendermint.com/main/rpc/#/ABCI/abci_info
func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) {
// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info
func (env *Environment) ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) {
resInfo, err := env.ProxyAppQuery.InfoSync(proxy.RequestInfo)
if err != nil {
return nil, err

View File

@@ -15,10 +15,19 @@ import (
)
// BlockchainInfo gets block headers for minHeight <= height <= maxHeight.
// Block headers are returned in descending order (highest first).
// More: https://docs.tendermint.com/main/rpc/#/Info/blockchain
func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
// maximum 20 block metas
//
// If maxHeight does not yet exist, blocks up to the current height will be
// returned. If minHeight does not exist (due to pruning), earliest existing
// height will be used.
//
// At most 20 items will be returned. Block headers are returned in descending
// order (highest first).
//
// More: https://docs.tendermint.com/master/rpc/#/Info/blockchain
func (env *Environment) BlockchainInfo(
ctx *rpctypes.Context,
minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) {
const limit int64 = 20
var err error
minHeight, maxHeight, err = filterMinMax(
@@ -79,8 +88,8 @@ func filterMinMax(base, height, min, max, limit int64) (int64, int64, error) {
// Header gets block header at a given height.
// If no height is provided, it will fetch the latest header.
// More: https://docs.tendermint.com/master/rpc/#/Info/header
func Header(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultHeader, error) {
height, err := getHeight(env.BlockStore.Height(), heightPtr)
func (env *Environment) Header(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultHeader, error) {
height, err := env.getHeight(env.BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
}
@@ -95,7 +104,7 @@ func Header(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultHeader, erro
// HeaderByHash gets header by hash.
// More: https://docs.tendermint.com/master/rpc/#/Info/header_by_hash
func HeaderByHash(ctx *rpctypes.Context, hash bytes.HexBytes) (*ctypes.ResultHeader, error) {
func (env *Environment) HeaderByHash(ctx *rpctypes.Context, hash bytes.HexBytes) (*ctypes.ResultHeader, error) {
// N.B. The hash parameter is HexBytes so that the reflective parameter
// decoding logic in the HTTP service will correctly translate from JSON.
// See https://github.com/tendermint/tendermint/issues/6802 for context.
@@ -110,9 +119,9 @@ func HeaderByHash(ctx *rpctypes.Context, hash bytes.HexBytes) (*ctypes.ResultHea
// Block gets block at a given height.
// If no height is provided, it will fetch the latest block.
// More: https://docs.tendermint.com/main/rpc/#/Info/block
func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) {
height, err := getHeight(env.BlockStore.Height(), heightPtr)
// More: https://docs.tendermint.com/master/rpc/#/Info/block
func (env *Environment) Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) {
height, err := env.getHeight(env.BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
}
@@ -126,8 +135,8 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error)
}
// BlockByHash gets block by hash.
// More: https://docs.tendermint.com/main/rpc/#/Info/block_by_hash
func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash
func (env *Environment) BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) {
block := env.BlockStore.LoadBlockByHash(hash)
if block == nil {
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil
@@ -139,9 +148,9 @@ func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error
// Commit gets block commit at a given height.
// If no height is provided, it will fetch the commit for the latest block.
// More: https://docs.tendermint.com/main/rpc/#/Info/commit
func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) {
height, err := getHeight(env.BlockStore.Height(), heightPtr)
// More: https://docs.tendermint.com/master/rpc/#/Info/commit
func (env *Environment) Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) {
height, err := env.getHeight(env.BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
}
@@ -170,9 +179,9 @@ func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, erro
// Results are for the height of the block containing the txs.
// Thus response.results.deliver_tx[5] is the results of executing
// getBlock(h).Txs[5]
// More: https://docs.tendermint.com/main/rpc/#/Info/block_results
func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) {
height, err := getHeight(env.BlockStore.Height(), heightPtr)
// More: https://docs.tendermint.com/master/rpc/#/Info/block_results
func (env *Environment) BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) {
height, err := env.getHeight(env.BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
}
@@ -194,7 +203,7 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR
// BlockSearch searches for a paginated set of blocks matching BeginBlock and
// EndBlock event search criteria.
func BlockSearch(
func (env *Environment) BlockSearch(
ctx *rpctypes.Context,
query string,
pagePtr, perPagePtr *int,
@@ -230,7 +239,7 @@ func BlockSearch(
// paginate results
totalCount := len(results)
perPage := validatePerPage(perPagePtr)
perPage := env.validatePerPage(perPagePtr)
page, err := validatePage(pagePtr, perPage, totalCount)
if err != nil {

View File

@@ -80,7 +80,7 @@ func TestBlockResults(t *testing.T) {
BeginBlock: &abci.ResponseBeginBlock{},
}
env = &Environment{}
env := &Environment{}
env.StateStore = sm.NewStore(dbm.NewMemDB(), sm.StoreOptions{
DiscardABCIResponses: false,
})
@@ -110,7 +110,7 @@ func TestBlockResults(t *testing.T) {
}
for _, tc := range testCases {
res, err := BlockResults(&rpctypes.Context{}, &tc.height)
res, err := env.BlockResults(&rpctypes.Context{}, &tc.height)
if tc.wantErr {
assert.Error(t, err)
} else {

View File

@@ -14,10 +14,14 @@ import (
// validators are sorted by their voting power - this is the canonical order
// for the validators in the set as used in computing their Merkle root.
//
// More: https://docs.tendermint.com/main/rpc/#/Info/validators
func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/validators
func (env *Environment) Validators(
ctx *rpctypes.Context,
heightPtr *int64,
pagePtr, perPagePtr *int) (*ctypes.ResultValidators, error) {
// The latest validator that we know is the NextValidator of the last block.
height, err := getHeight(latestUncommittedHeight(), heightPtr)
height, err := env.getHeight(env.latestUncommittedHeight(), heightPtr)
if err != nil {
return nil, err
}
@@ -28,7 +32,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in
}
totalCount := len(validators.Validators)
perPage := validatePerPage(perPagePtr)
perPage := env.validatePerPage(perPagePtr)
page, err := validatePage(pagePtr, perPage, totalCount)
if err != nil {
return nil, err
@@ -47,8 +51,8 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in
// DumpConsensusState dumps consensus state.
// UNSTABLE
// More: https://docs.tendermint.com/main/rpc/#/Info/dump_consensus_state
func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/dump_consensus_state
func (env *Environment) DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) {
// Get Peer consensus states.
peers := env.P2PPeers.Peers().List()
peerStates := make([]ctypes.PeerStateInfo, len(peers))
@@ -80,8 +84,8 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState
// ConsensusState returns a concise summary of the consensus state.
// UNSTABLE
// More: https://docs.tendermint.com/main/rpc/#/Info/consensus_state
func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/consensus_state
func (env *Environment) GetConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) {
// Get self round state.
bz, err := env.ConsensusState.GetRoundStateSimpleJSON()
return &ctypes.ResultConsensusState{RoundState: bz}, err
@@ -89,11 +93,14 @@ func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error)
// ConsensusParams gets the consensus parameters at the given block height.
// If no height is provided, it will fetch the latest consensus params.
// More: https://docs.tendermint.com/main/rpc/#/Info/consensus_params
func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultConsensusParams, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/consensus_params
func (env *Environment) ConsensusParams(
ctx *rpctypes.Context,
heightPtr *int64) (*ctypes.ResultConsensusParams, error) {
// The latest consensus params that we know is the consensus params after the
// last block.
height, err := getHeight(latestUncommittedHeight(), heightPtr)
height, err := env.getHeight(env.latestUncommittedHeight(), heightPtr)
if err != nil {
return nil, err
}

View File

@@ -6,7 +6,7 @@ import (
)
// UnsafeFlushMempool removes all transactions from the mempool.
func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) {
func (env *Environment) UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) {
env.Mempool.Flush()
return &ctypes.ResultUnsafeFlushMempool{}, nil
}

View File

@@ -6,7 +6,6 @@ import (
"time"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto"
tmjson "github.com/tendermint/tendermint/libs/json"
"github.com/tendermint/tendermint/libs/log"
@@ -33,17 +32,6 @@ const (
genesisChunkSize = 16 * 1024 * 1024 // 16
)
var (
// set by Node
env *Environment
)
// SetEnvironment sets up the given Environment.
// It will race if multiple Node call SetEnvironment.
func SetEnvironment(e *Environment) {
env = e
}
//----------------------------------------------
// These interfaces are used by RPC and must be thread safe
@@ -69,6 +57,10 @@ type peers interface {
Peers() p2p.IPeerSet
}
type consensusReactor interface {
WaitSync() bool
}
// ----------------------------------------------
// Environment contains objects and interfaces used by the RPC. It is expected
// to be setup once during startup.
@@ -78,21 +70,21 @@ type Environment struct {
ProxyAppMempool proxy.AppConnMempool
// interfaces defined in types and above
StateStore sm.Store
BlockStore sm.BlockStore
EvidencePool sm.EvidencePool
ConsensusState Consensus
P2PPeers peers
P2PTransport transport
StateStore sm.Store
BlockStore sm.BlockStore
EvidencePool sm.EvidencePool
ConsensusState Consensus
ConsensusReactor consensusReactor
P2PPeers peers
P2PTransport transport
// objects
PubKey crypto.PubKey
GenDoc *types.GenesisDoc // cache the genesis structure
TxIndexer txindex.TxIndexer
BlockIndexer indexer.BlockIndexer
ConsensusReactor *consensus.Reactor
EventBus *types.EventBus // thread safe
Mempool mempl.Mempool
PubKey crypto.PubKey
GenDoc *types.GenesisDoc // cache the genesis structure
TxIndexer txindex.TxIndexer
BlockIndexer indexer.BlockIndexer
EventBus *types.EventBus // thread safe
Mempool mempl.Mempool
Logger log.Logger
@@ -125,7 +117,7 @@ func validatePage(pagePtr *int, perPage, totalCount int) (int, error) {
return page, nil
}
func validatePerPage(perPagePtr *int) int {
func (env *Environment) validatePerPage(perPagePtr *int) int {
if perPagePtr == nil { // no per_page parameter
return defaultPerPage
}
@@ -141,7 +133,7 @@ func validatePerPage(perPagePtr *int) int {
// InitGenesisChunks configures the environment and should be called on service
// startup.
func InitGenesisChunks() error {
func (env *Environment) InitGenesisChunks() error {
if env.genChunks != nil {
return nil
}
@@ -178,7 +170,7 @@ func validateSkipCount(page, perPage int) int {
}
// latestHeight can be either latest committed or uncommitted (+1) height.
func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
func (env *Environment) getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
if heightPtr != nil {
height := *heightPtr
if height <= 0 {
@@ -198,7 +190,7 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
return latestHeight, nil
}
func latestUncommittedHeight() int64 {
func (env *Environment) latestUncommittedHeight() int64 {
nodeIsSyncing := env.ConsensusReactor.WaitSync()
if nodeIsSyncing {
return env.BlockStore.Height()

View File

@@ -70,13 +70,13 @@ func TestPaginationPerPage(t *testing.T) {
{5, maxPerPage, maxPerPage},
{5, maxPerPage + 1, maxPerPage},
}
env := &Environment{}
for _, c := range cases {
p := validatePerPage(&c.perPage)
p := env.validatePerPage(&c.perPage)
assert.Equal(t, c.newPerPage, p, fmt.Sprintf("%v", c))
}
// nil case
p := validatePerPage(nil)
p := env.validatePerPage(nil)
assert.Equal(t, defaultPerPage, p)
}

View File

@@ -19,8 +19,8 @@ const (
)
// Subscribe for events via WebSocket.
// More: https://docs.tendermint.com/main/rpc/#/Websocket/subscribe
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
// More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe
func (env *Environment) Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
addr := ctx.RemoteAddr()
if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
@@ -76,7 +76,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
return
}
}
case <-sub.Cancelled():
case <-sub.Canceled():
if sub.Err() != tmpubsub.ErrUnsubscribed {
var reason string
if sub.Err() == nil {
@@ -102,8 +102,8 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
}
// Unsubscribe from events via WebSocket.
// More: https://docs.tendermint.com/main/rpc/#/Websocket/unsubscribe
func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe
func (env *Environment) Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
env.Logger.Info("Unsubscribe from query", "remote", addr, "query", query)
q, err := tmquery.New(query)
@@ -118,8 +118,8 @@ func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe
}
// UnsubscribeAll from all events via WebSocket.
// More: https://docs.tendermint.com/main/rpc/#/Websocket/unsubscribe_all
func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe_all
func (env *Environment) UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
env.Logger.Info("Unsubscribe from all", "remote", addr)
err := env.EventBus.UnsubscribeAll(context.Background(), addr)

View File

@@ -10,8 +10,11 @@ import (
)
// BroadcastEvidence broadcasts evidence of the misbehavior.
// More: https://docs.tendermint.com/main/rpc/#/Info/broadcast_evidence
func BroadcastEvidence(ctx *rpctypes.Context, ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) {
// More: https://docs.tendermint.com/master/rpc/#/Evidence/broadcast_evidence
func (env *Environment) BroadcastEvidence(
ctx *rpctypes.Context,
ev types.Evidence) (*ctypes.ResultBroadcastEvidence, error) {
if ev == nil {
return nil, errors.New("no evidence was provided")
}

View File

@@ -7,7 +7,7 @@ import (
// Health gets node health. Returns empty result (200 OK) on success, no
// response - in case of an error.
// More: https://docs.tendermint.com/main/rpc/#/Info/health
func Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/health
func (env *Environment) Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) {
return &ctypes.ResultHealth{}, nil
}

View File

@@ -18,8 +18,8 @@ import (
// BroadcastTxAsync returns right away, with no response. Does not wait for
// CheckTx nor DeliverTx results.
// More: https://docs.tendermint.com/main/rpc/#/Tx/broadcast_tx_async
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
func (env *Environment) BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{})
if err != nil {
@@ -30,8 +30,8 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca
// BroadcastTxSync returns with the response from CheckTx. Does not wait for
// DeliverTx result.
// More: https://docs.tendermint.com/main/rpc/#/Tx/broadcast_tx_sync
func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
func (env *Environment) BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
resCh := make(chan *abci.Response, 1)
err := env.Mempool.CheckTx(tx, func(res *abci.Response) {
select {
@@ -60,8 +60,8 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas
}
// BroadcastTxCommit returns with the responses from CheckTx and DeliverTx.
// More: https://docs.tendermint.com/main/rpc/#/Tx/broadcast_tx_commit
func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit
func (env *Environment) BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
subscriber := ctx.RemoteAddr()
if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
@@ -121,7 +121,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
Hash: tx.Hash(),
Height: deliverTxRes.Height,
}, nil
case <-deliverTxSub.Cancelled():
case <-deliverTxSub.Canceled():
var reason string
if deliverTxSub.Err() == nil {
reason = "Tendermint exited"
@@ -149,10 +149,10 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
// UnconfirmedTxs gets unconfirmed transactions (maximum ?limit entries)
// including their number.
// More: https://docs.tendermint.com/main/rpc/#/Info/unconfirmed_txs
func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs
func (env *Environment) UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) {
// reuse per_page validator
limit := validatePerPage(limitPtr)
limit := env.validatePerPage(limitPtr)
txs := env.Mempool.ReapMaxTxs(limit)
return &ctypes.ResultUnconfirmedTxs{
@@ -163,8 +163,8 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfi
}
// NumUnconfirmedTxs gets number of unconfirmed transactions.
// More: https://docs.tendermint.com/main/rpc/#/Info/num_unconfirmed_txs
func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs
func (env *Environment) NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) {
return &ctypes.ResultUnconfirmedTxs{
Count: env.Mempool.Size(),
Total: env.Mempool.Size(),
@@ -173,8 +173,8 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err
// CheckTx checks the transaction without executing it. The transaction won't
// be added to the mempool either.
// More: https://docs.tendermint.com/main/rpc/#/Tx/check_tx
func CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) {
// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx
func (env *Environment) CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) {
res, err := env.ProxyAppMempool.CheckTxSync(abci.RequestCheckTx{Tx: tx})
if err != nil {
return nil, err

View File

@@ -11,8 +11,8 @@ import (
)
// NetInfo returns network info.
// More: https://docs.tendermint.com/main/rpc/#/Info/net_info
func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/net_info
func (env *Environment) NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) {
peersList := env.P2PPeers.Peers().List()
peers := make([]ctypes.Peer, 0, len(peersList))
for _, peer := range peersList {
@@ -39,7 +39,7 @@ func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) {
}
// UnsafeDialSeeds dials the given seeds (comma-separated id@IP:PORT).
func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) {
func (env *Environment) UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) {
if len(seeds) == 0 {
return &ctypes.ResultDialSeeds{}, errors.New("no seeds provided")
}
@@ -52,8 +52,11 @@ func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialS
// UnsafeDialPeers dials the given peers (comma-separated id@IP:PORT),
// optionally making them persistent.
func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent, unconditional, private bool) (
*ctypes.ResultDialPeers, error) {
func (env *Environment) UnsafeDialPeers(
ctx *rpctypes.Context,
peers []string,
persistent, unconditional, private bool) (*ctypes.ResultDialPeers, error) {
if len(peers) == 0 {
return &ctypes.ResultDialPeers{}, errors.New("no peers provided")
}
@@ -92,8 +95,8 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent, uncondit
}
// Genesis returns genesis file.
// More: https://docs.tendermint.com/main/rpc/#/Info/genesis
func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/genesis
func (env *Environment) Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) {
if len(env.genChunks) > 1 {
return nil, errors.New("genesis response is large, please use the genesis_chunked API instead")
}
@@ -101,7 +104,7 @@ func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) {
return &ctypes.ResultGenesis{Genesis: env.GenDoc}, nil
}
func GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) {
func (env *Environment) GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) {
if env.genChunks == nil {
return nil, fmt.Errorf("service configuration error, genesis chunks are not initialized")
}

View File

@@ -23,6 +23,7 @@ func TestUnsafeDialSeeds(t *testing.T) {
}
})
env := &Environment{}
env.Logger = log.TestingLogger()
env.P2PPeers = sw
@@ -36,7 +37,7 @@ func TestUnsafeDialSeeds(t *testing.T) {
}
for _, tc := range testCases {
res, err := UnsafeDialSeeds(&rpctypes.Context{}, tc.seeds)
res, err := env.UnsafeDialSeeds(&rpctypes.Context{}, tc.seeds)
if tc.isErr {
assert.Error(t, err)
} else {
@@ -62,6 +63,7 @@ func TestUnsafeDialPeers(t *testing.T) {
}
})
env := &Environment{}
env.Logger = log.TestingLogger()
env.P2PPeers = sw
@@ -76,7 +78,7 @@ func TestUnsafeDialPeers(t *testing.T) {
}
for _, tc := range testCases {
res, err := UnsafeDialPeers(&rpctypes.Context{}, tc.peers, tc.persistence, tc.unconditional, tc.private)
res, err := env.UnsafeDialPeers(&rpctypes.Context{}, tc.peers, tc.persistence, tc.unconditional, tc.private)
if tc.isErr {
assert.Error(t, err)
} else {

View File

@@ -6,54 +6,58 @@ import (
// TODO: better system than "unsafe" prefix
type RoutesMap map[string]*rpc.RPCFunc
// Routes is a map of available routes.
var Routes = map[string]*rpc.RPCFunc{
// subscribe/unsubscribe are reserved for websocket events.
"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"),
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"),
"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""),
func (env *Environment) GetRoutes() RoutesMap {
return RoutesMap{
// subscribe/unsubscribe are reserved for websocket events.
"subscribe": rpc.NewWSRPCFunc(env.Subscribe, "query"),
"unsubscribe": rpc.NewWSRPCFunc(env.Unsubscribe, "query"),
"unsubscribe_all": rpc.NewWSRPCFunc(env.UnsubscribeAll, ""),
// info API
"health": rpc.NewRPCFunc(Health, ""),
"status": rpc.NewRPCFunc(Status, ""),
"net_info": rpc.NewRPCFunc(NetInfo, ""),
"blockchain": rpc.NewRPCFunc(BlockchainInfo, "minHeight,maxHeight", rpc.Cacheable()),
"genesis": rpc.NewRPCFunc(Genesis, "", rpc.Cacheable()),
"genesis_chunked": rpc.NewRPCFunc(GenesisChunked, "chunk", rpc.Cacheable()),
"block": rpc.NewRPCFunc(Block, "height", rpc.Cacheable("height")),
"block_by_hash": rpc.NewRPCFunc(BlockByHash, "hash", rpc.Cacheable()),
"block_results": rpc.NewRPCFunc(BlockResults, "height", rpc.Cacheable("height")),
"commit": rpc.NewRPCFunc(Commit, "height", rpc.Cacheable("height")),
"header": rpc.NewRPCFunc(Header, "height", rpc.Cacheable("height")),
"header_by_hash": rpc.NewRPCFunc(HeaderByHash, "hash", rpc.Cacheable()),
"check_tx": rpc.NewRPCFunc(CheckTx, "tx"),
"tx": rpc.NewRPCFunc(Tx, "hash,prove", rpc.Cacheable()),
"tx_search": rpc.NewRPCFunc(TxSearch, "query,prove,page,per_page,order_by"),
"block_search": rpc.NewRPCFunc(BlockSearch, "query,page,per_page,order_by"),
"validators": rpc.NewRPCFunc(Validators, "height,page,per_page", rpc.Cacheable("height")),
"dump_consensus_state": rpc.NewRPCFunc(DumpConsensusState, ""),
"consensus_state": rpc.NewRPCFunc(ConsensusState, ""),
"consensus_params": rpc.NewRPCFunc(ConsensusParams, "height", rpc.Cacheable("height")),
"unconfirmed_txs": rpc.NewRPCFunc(UnconfirmedTxs, "limit"),
"num_unconfirmed_txs": rpc.NewRPCFunc(NumUnconfirmedTxs, ""),
// info AP
"health": rpc.NewRPCFunc(env.Health, ""),
"status": rpc.NewRPCFunc(env.Status, ""),
"net_info": rpc.NewRPCFunc(env.NetInfo, ""),
"blockchain": rpc.NewRPCFunc(env.BlockchainInfo, "minHeight,maxHeight", rpc.Cacheable()),
"genesis": rpc.NewRPCFunc(env.Genesis, "", rpc.Cacheable()),
"genesis_chunked": rpc.NewRPCFunc(env.GenesisChunked, "chunk", rpc.Cacheable()),
"block": rpc.NewRPCFunc(env.Block, "height", rpc.Cacheable("height")),
"block_by_hash": rpc.NewRPCFunc(env.BlockByHash, "hash", rpc.Cacheable()),
"block_results": rpc.NewRPCFunc(env.BlockResults, "height", rpc.Cacheable("height")),
"commit": rpc.NewRPCFunc(env.Commit, "height", rpc.Cacheable("height")),
"header": rpc.NewRPCFunc(env.Header, "height", rpc.Cacheable("height")),
"header_by_hash": rpc.NewRPCFunc(env.HeaderByHash, "hash", rpc.Cacheable()),
"check_tx": rpc.NewRPCFunc(env.CheckTx, "tx"),
"tx": rpc.NewRPCFunc(env.Tx, "hash,prove", rpc.Cacheable()),
"tx_search": rpc.NewRPCFunc(env.TxSearch, "query,prove,page,per_page,order_by"),
"block_search": rpc.NewRPCFunc(env.BlockSearch, "query,page,per_page,order_by"),
"validators": rpc.NewRPCFunc(env.Validators, "height,page,per_page", rpc.Cacheable("height")),
"dump_consensus_state": rpc.NewRPCFunc(env.DumpConsensusState, ""),
"consensus_state": rpc.NewRPCFunc(env.GetConsensusState, ""),
"consensus_params": rpc.NewRPCFunc(env.ConsensusParams, "height", rpc.Cacheable("height")),
"unconfirmed_txs": rpc.NewRPCFunc(env.UnconfirmedTxs, "limit"),
"num_unconfirmed_txs": rpc.NewRPCFunc(env.NumUnconfirmedTxs, ""),
// tx broadcast API
"broadcast_tx_commit": rpc.NewRPCFunc(BroadcastTxCommit, "tx"),
"broadcast_tx_sync": rpc.NewRPCFunc(BroadcastTxSync, "tx"),
"broadcast_tx_async": rpc.NewRPCFunc(BroadcastTxAsync, "tx"),
// tx broadcast API
"broadcast_tx_commit": rpc.NewRPCFunc(env.BroadcastTxCommit, "tx"),
"broadcast_tx_sync": rpc.NewRPCFunc(env.BroadcastTxSync, "tx"),
"broadcast_tx_async": rpc.NewRPCFunc(env.BroadcastTxAsync, "tx"),
// abci API
"abci_query": rpc.NewRPCFunc(ABCIQuery, "path,data,height,prove"),
"abci_info": rpc.NewRPCFunc(ABCIInfo, "", rpc.Cacheable()),
// abci API
"abci_query": rpc.NewRPCFunc(env.ABCIQuery, "path,data,height,prove"),
"abci_info": rpc.NewRPCFunc(env.ABCIInfo, "", rpc.Cacheable()),
// evidence API
"broadcast_evidence": rpc.NewRPCFunc(BroadcastEvidence, "evidence"),
// evidence API
"broadcast_evidence": rpc.NewRPCFunc(env.BroadcastEvidence, "evidence"),
}
}
// AddUnsafeRoutes adds unsafe routes.
func AddUnsafeRoutes() {
func (env *Environment) AddUnsafeRoutes(routes RoutesMap) {
// control API
Routes["dial_seeds"] = rpc.NewRPCFunc(UnsafeDialSeeds, "seeds")
Routes["dial_peers"] = rpc.NewRPCFunc(UnsafeDialPeers, "peers,persistent,unconditional,private")
Routes["unsafe_flush_mempool"] = rpc.NewRPCFunc(UnsafeFlushMempool, "")
routes["dial_seeds"] = rpc.NewRPCFunc(env.UnsafeDialSeeds, "seeds")
routes["dial_peers"] = rpc.NewRPCFunc(env.UnsafeDialPeers, "peers,persistent,unconditional,private")
routes["unsafe_flush_mempool"] = rpc.NewRPCFunc(env.UnsafeFlushMempool, "")
}

View File

@@ -12,8 +12,8 @@ import (
// Status returns Tendermint status including node info, pubkey, latest block
// hash, app hash, block height and time.
// More: https://docs.tendermint.com/main/rpc/#/Info/status
func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
// More: https://docs.tendermint.com/master/rpc/#/Info/status
func (env *Environment) Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
var (
earliestBlockHeight int64
earliestBlockHash tmbytes.HexBytes
@@ -47,7 +47,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
// Return the very last voting power, not the voting power of this validator
// during the last block.
var votingPower int64
if val := validatorAtHeight(latestUncommittedHeight()); val != nil {
if val := env.validatorAtHeight(env.latestUncommittedHeight()); val != nil {
votingPower = val.VotingPower
}
@@ -74,12 +74,12 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
return result, nil
}
func validatorAtHeight(h int64) *types.Validator {
vals, err := env.StateStore.LoadValidators(h)
func (env *Environment) validatorAtHeight(h int64) *types.Validator {
valsWithH, err := env.StateStore.LoadValidators(h)
if err != nil {
return nil
}
privValAddress := env.PubKey.Address()
_, val := vals.GetByAddress(privValAddress)
_, val := valsWithH.GetByAddress(privValAddress)
return val
}

View File

@@ -17,7 +17,7 @@ import (
// transaction is in the mempool, invalidated, or was not sent in the first
// place.
// More: https://docs.tendermint.com/main/rpc/#/Info/tx
func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) {
func (env *Environment) Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) {
// if index is disabled, return error
if _, ok := env.TxIndexer.(*null.TxIndex); ok {
return nil, fmt.Errorf("transaction indexing is disabled")
@@ -51,7 +51,7 @@ func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error
// TxSearch allows you to query for multiple transactions results. It returns a
// list of transactions (maximum ?per_page entries) and the total count.
// More: https://docs.tendermint.com/main/rpc/#/Info/tx_search
func TxSearch(
func (env *Environment) TxSearch(
ctx *rpctypes.Context,
query string,
prove bool,
@@ -98,7 +98,7 @@ func TxSearch(
// paginate results
totalCount := len(results)
perPage := validatePerPage(perPagePtr)
perPage := env.validatePerPage(perPagePtr)
page, err := validatePage(pagePtr, perPage, totalCount)
if err != nil {

View File

@@ -9,6 +9,7 @@ import (
)
type broadcastAPI struct {
env *core.Environment
}
func (bapi *broadcastAPI) Ping(ctx context.Context, req *RequestPing) (*ResponsePing, error) {
@@ -19,7 +20,7 @@ func (bapi *broadcastAPI) Ping(ctx context.Context, req *RequestPing) (*Response
func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) {
// NOTE: there's no way to get client's remote address
// see https://stackoverflow.com/questions/33684570/session-and-remote-ip-address-in-grpc-go
res, err := core.BroadcastTxCommit(&rpctypes.Context{}, req.Tx)
res, err := bapi.env.BroadcastTxCommit(&rpctypes.Context{}, req.Tx)
if err != nil {
return nil, err
}

View File

@@ -7,6 +7,7 @@ import (
"google.golang.org/grpc"
tmnet "github.com/tendermint/tendermint/libs/net"
"github.com/tendermint/tendermint/rpc/core"
)
// Config is an gRPC server configuration.
@@ -17,16 +18,16 @@ type Config struct {
// StartGRPCServer starts a new gRPC BroadcastAPIServer using the given
// net.Listener.
// NOTE: This function blocks - you may want to call it in a go-routine.
func StartGRPCServer(ln net.Listener) error {
func StartGRPCServer(env *core.Environment, ln net.Listener) error {
grpcServer := grpc.NewServer()
RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{})
RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{env: env})
return grpcServer.Serve(ln)
}
// StartGRPCClient dials the gRPC server using protoAddr and returns a new
// BroadcastAPIClient.
func StartGRPCClient(protoAddr string) BroadcastAPIClient {
//nolint:staticcheck // SA1019 Existing use of deprecated but supported dial option.
//nolint: staticcheck // SA1019 Existing use of deprecated but supported dial option.
conn, err := grpc.Dial(protoAddr, grpc.WithInsecure(), grpc.WithContextDialer(dialerFunc))
if err != nil {
panic(err)

View File

@@ -138,7 +138,7 @@ func setup() {
wm.SetLogger(tcpLogger)
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
config := server.DefaultConfig()
listener1, err := server.Listen(tcpAddr, config)
listener1, err := server.Listen(tcpAddr, config.MaxOpenConnections)
if err != nil {
panic(err)
}
@@ -154,7 +154,7 @@ func setup() {
wm = server.NewWebsocketManager(Routes)
wm.SetLogger(unixLogger)
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
listener2, err := server.Listen(unixAddr, config)
listener2, err := server.Listen(unixAddr, config.MaxOpenConnections)
if err != nil {
panic(err)
}

View File

@@ -258,7 +258,7 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Listen starts a new net.Listener on the given address.
// It returns an error if the address is invalid or the call to Listen() fails.
func Listen(addr string, config *Config) (listener net.Listener, err error) {
func Listen(addr string, maxOpenConnections int) (listener net.Listener, err error) {
parts := strings.SplitN(addr, "://", 2)
if len(parts) != 2 {
return nil, fmt.Errorf(
@@ -271,8 +271,8 @@ func Listen(addr string, config *Config) (listener net.Listener, err error) {
if err != nil {
return nil, fmt.Errorf("failed to listen on %v: %v", addr, err)
}
if config.MaxOpenConnections > 0 {
listener = netutil.LimitListener(listener, config.MaxOpenConnections)
if maxOpenConnections > 0 {
listener = netutil.LimitListener(listener, maxOpenConnections)
}
return listener, nil

View File

@@ -39,8 +39,7 @@ func TestMaxOpenConnections(t *testing.T) {
fmt.Fprint(w, "some body")
})
config := DefaultConfig()
config.MaxOpenConnections = max
l, err := Listen("tcp://127.0.0.1:0", config)
l, err := Listen("tcp://127.0.0.1:0", max)
require.NoError(t, err)
defer l.Close()
go Serve(l, mux, log.TestingLogger(), config) //nolint:errcheck // ignore for tests

View File

@@ -34,7 +34,7 @@ func main() {
rpcserver.RegisterRPCFuncs(mux, routes, logger)
config := rpcserver.DefaultConfig()
listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config)
listener, err := rpcserver.Listen("tcp://127.0.0.1:8008", config.MaxOpenConnections)
if err != nil {
tmos.Exit(err.Error())
}

View File

@@ -170,7 +170,7 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node {
}
node, err := nm.NewNode(config, pv, nodeKey, papp,
nm.DefaultGenesisDocProviderFunc(config),
nm.DefaultDBProvider,
cfg.DefaultDBProvider,
nm.DefaultMetricsProvider(config.Instrumentation),
logger)
if err != nil {

View File

@@ -549,7 +549,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey)
assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower)
}
case <-updatesSub.Cancelled():
case <-updatesSub.Canceled():
t.Fatalf("updatesSub was canceled (reason: %v)", updatesSub.Err())
case <-time.After(1 * time.Second):
t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.")

View File

@@ -0,0 +1,47 @@
package block
import (
"errors"
"fmt"
dbm "github.com/tendermint/tm-db"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/state/indexer"
blockidxkv "github.com/tendermint/tendermint/state/indexer/block/kv"
blockidxnull "github.com/tendermint/tendermint/state/indexer/block/null"
"github.com/tendermint/tendermint/state/indexer/sink/psql"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/state/txindex/kv"
"github.com/tendermint/tendermint/state/txindex/null"
)
// EventSinksFromConfig constructs a slice of indexer.EventSink using the provided
// configuration.
//
//nolint:lll
func IndexerFromConfig(cfg *config.Config, dbProvider config.DBProvider, chainID string) (txindex.TxIndexer, indexer.BlockIndexer, error) {
switch cfg.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&config.DBContext{ID: "tx_index", Config: cfg})
if err != nil {
return nil, nil, err
}
return kv.NewTxIndex(store), blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))), nil
case "psql":
conn := cfg.TxIndex.PsqlConn
if conn == "" {
return nil, nil, errors.New("the psql connection settings cannot be empty")
}
es, err := psql.NewEventSink(cfg.TxIndex.PsqlConn, chainID)
if err != nil {
return nil, nil, fmt.Errorf("creating psql indexer: %w", err)
}
return es.TxIndexer(), es.BlockIndexer(), nil
default:
return &null.TxIndex{}, &blockidxnull.BlockerIndexer{}, nil
}
}

View File

@@ -28,6 +28,20 @@ func (_m *BlockStore) Base() int64 {
return r0
}
// Close provides a mock function with given fields:
func (_m *BlockStore) Close() error {
ret := _m.Called()
var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
// DeleteLatestBlock provides a mock function with given fields:
func (_m *BlockStore) DeleteLatestBlock() error {
ret := _m.Called()

View File

@@ -36,6 +36,8 @@ type BlockStore interface {
LoadSeenCommit(height int64) *types.Commit
DeleteLatestBlock() error
Close() error
}
//-----------------------------------------------------------------------------

View File

@@ -59,54 +59,59 @@ func (is *IndexerService) OnStart() error {
go func() {
for {
msg := <-blockHeadersSub.Out()
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
height := eventDataHeader.Header.Height
batch := NewBatch(eventDataHeader.NumTxs)
select {
case <-blockHeadersSub.Canceled():
return
case msg := <-blockHeadersSub.Out():
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
msg2 := <-txsSub.Out()
txResult := msg2.Data().(types.EventDataTx).TxResult
eventDataHeader := msg.Data().(types.EventDataNewBlockHeader)
height := eventDataHeader.Header.Height
batch := NewBatch(eventDataHeader.NumTxs)
if err = batch.Add(&txResult); err != nil {
is.Logger.Error(
"failed to add tx to batch",
"height", height,
"index", txResult.Index,
"err", err,
)
for i := int64(0); i < eventDataHeader.NumTxs; i++ {
msg2 := <-txsSub.Out()
txResult := msg2.Data().(types.EventDataTx).TxResult
if err = batch.Add(&txResult); err != nil {
is.Logger.Error(
"failed to add tx to batch",
"height", height,
"index", txResult.Index,
"err", err,
)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
}
}
if err := is.blockIdxr.Index(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Info("indexed block exents", "height", height)
}
}
if err := is.blockIdxr.Index(eventDataHeader); err != nil {
is.Logger.Error("failed to index block", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
return
} else {
is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs)
}
} else {
is.Logger.Info("indexed block exents", "height", height)
}
if err = is.txIdxr.AddBatch(batch); err != nil {
is.Logger.Error("failed to index block txs", "height", height, "err", err)
if is.terminateOnError {
if err := is.Stop(); err != nil {
is.Logger.Error("failed to stop", "err", err)
}
return
}
} else {
is.Logger.Debug("indexed transactions", "height", height, "num_txs", eventDataHeader.NumTxs)
}
}
}()

View File

@@ -141,7 +141,7 @@ func startNode(cfg *Config) error {
nodeKey,
cc,
node.DefaultGenesisDocProviderFunc(tmcfg),
node.DefaultDBProvider,
config.DefaultDBProvider,
node.DefaultMetricsProvider(tmcfg.Instrumentation),
nodeLogger,
)
@@ -157,8 +157,8 @@ func startLightClient(cfg *Config) error {
return err
}
dbContext := &node.DBContext{ID: "light", Config: tmcfg}
lightDB, err := node.DefaultDBProvider(dbContext)
dbContext := &config.DBContext{ID: "light", Config: tmcfg}
lightDB, err := config.DefaultDBProvider(dbContext)
if err != nil {
return err
}

View File

@@ -23,7 +23,7 @@ type EventBusSubscriber interface {
type Subscription interface {
Out() <-chan tmpubsub.Message
Cancelled() <-chan struct{} //nolint: misspell
Canceled() <-chan struct{}
Err() error
}

View File

@@ -439,7 +439,7 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes
for {
select {
case <-sub.Out():
case <-sub.Cancelled():
case <-sub.Canceled():
return
}
}