mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-07 05:46:32 +00:00
test: cleanup rpc/client and node test fixtures (#7112)
This commit is contained in:
14
node/node.go
14
node/node.go
@@ -641,12 +641,16 @@ func (n *nodeImpl) OnStop() {
|
||||
|
||||
n.Logger.Info("Stopping Node")
|
||||
|
||||
// first stop the non-reactor services
|
||||
if err := n.eventBus.Stop(); err != nil {
|
||||
n.Logger.Error("Error closing eventBus", "err", err)
|
||||
if n.eventBus != nil {
|
||||
// first stop the non-reactor services
|
||||
if err := n.eventBus.Stop(); err != nil {
|
||||
n.Logger.Error("Error closing eventBus", "err", err)
|
||||
}
|
||||
}
|
||||
if err := n.indexerService.Stop(); err != nil {
|
||||
n.Logger.Error("Error closing indexerService", "err", err)
|
||||
if n.indexerService != nil {
|
||||
if err := n.indexerService.Stop(); err != nil {
|
||||
n.Logger.Error("Error closing indexerService", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if n.config.Mode != config.ModeSeed {
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/tendermint/tendermint/internal/test/factory"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmrand "github.com/tendermint/tendermint/libs/rand"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
tmtime "github.com/tendermint/tendermint/libs/time"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -46,11 +47,21 @@ func TestNodeStartStop(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, ns.Start())
|
||||
|
||||
t.Cleanup(func() {
|
||||
if ns.IsRunning() {
|
||||
assert.NoError(t, ns.Stop())
|
||||
ns.Wait()
|
||||
}
|
||||
})
|
||||
|
||||
n, ok := ns.(*nodeImpl)
|
||||
require.True(t, ok)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// wait for the node to produce a block
|
||||
blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
|
||||
blocksSub, err := n.EventBus().Subscribe(ctx, "node_test", types.EventQueryNewBlock)
|
||||
require.NoError(t, err)
|
||||
select {
|
||||
case <-blocksSub.Out():
|
||||
@@ -87,6 +98,14 @@ func getTestNode(t *testing.T, conf *config.Config, logger log.Logger) *nodeImpl
|
||||
|
||||
n, ok := ns.(*nodeImpl)
|
||||
require.True(t, ok)
|
||||
|
||||
t.Cleanup(func() {
|
||||
if ns.IsRunning() {
|
||||
assert.NoError(t, ns.Stop())
|
||||
ns.Wait()
|
||||
}
|
||||
})
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
@@ -100,7 +119,6 @@ func TestNodeDelayedStart(t *testing.T) {
|
||||
n.GenesisDoc().GenesisTime = now.Add(2 * time.Second)
|
||||
|
||||
require.NoError(t, n.Start())
|
||||
defer n.Stop() //nolint:errcheck // ignore for tests
|
||||
|
||||
startTime := tmtime.Now()
|
||||
assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime))
|
||||
@@ -165,8 +183,13 @@ func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cfg.PrivValidator.ListenAddr = addrNoPrefix
|
||||
|
||||
_, err := newDefaultNode(cfg, log.TestingLogger())
|
||||
n, err := newDefaultNode(cfg, log.TestingLogger())
|
||||
assert.Error(t, err)
|
||||
|
||||
if n != nil && n.IsRunning() {
|
||||
assert.NoError(t, n.Stop())
|
||||
n.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
@@ -211,6 +234,9 @@ func testFreeAddr(t *testing.T) string {
|
||||
// create a proposal block using real and full
|
||||
// mempool and evidence pool and validate it.
|
||||
func TestCreateProposalBlock(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
cfg := config.ResetTestRoot("node_create_proposal")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
@@ -222,7 +248,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
logger := log.TestingLogger()
|
||||
|
||||
const height int64 = 1
|
||||
state, stateDB, privVals := state(1, height)
|
||||
state, stateDB, privVals := state(t, 1, height)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
maxBytes := 16384
|
||||
const partSize uint32 = 256
|
||||
@@ -266,7 +292,7 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
txLength := 100
|
||||
for i := 0; i <= maxBytes/txLength; i++ {
|
||||
tx := tmrand.Bytes(txLength)
|
||||
err := mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
|
||||
err := mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
@@ -303,6 +329,9 @@ func TestCreateProposalBlock(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
cfg := config.ResetTestRoot("node_create_proposal")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
@@ -314,7 +343,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
logger := log.TestingLogger()
|
||||
|
||||
const height int64 = 1
|
||||
state, stateDB, _ := state(1, height)
|
||||
state, stateDB, _ := state(t, 1, height)
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
const maxBytes int64 = 16384
|
||||
@@ -336,7 +365,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
// fill the mempool with one txs just below the maximum size
|
||||
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1))
|
||||
tx := tmrand.Bytes(txLength - 4) // to account for the varint
|
||||
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
|
||||
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
|
||||
assert.NoError(t, err)
|
||||
|
||||
blockExec := sm.NewBlockExecutor(
|
||||
@@ -365,6 +394,9 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestMaxProposalBlockSize(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
cfg := config.ResetTestRoot("node_create_proposal")
|
||||
defer os.RemoveAll(cfg.RootDir)
|
||||
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
|
||||
@@ -375,7 +407,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
|
||||
logger := log.TestingLogger()
|
||||
|
||||
state, stateDB, _ := state(types.MaxVotesCount, int64(1))
|
||||
state, stateDB, _ := state(t, types.MaxVotesCount, int64(1))
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
blockStore := store.NewBlockStore(dbm.NewMemDB())
|
||||
const maxBytes int64 = 1024 * 1024 * 2
|
||||
@@ -402,7 +434,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
|
||||
// At the end of the test, only the single big tx should be added
|
||||
for i := 0; i < 10; i++ {
|
||||
tx := tmrand.Bytes(10)
|
||||
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
|
||||
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
@@ -493,14 +525,17 @@ func TestNodeNewSeedNode(t *testing.T) {
|
||||
defaultGenesisDocProviderFunc(cfg),
|
||||
log.TestingLogger(),
|
||||
)
|
||||
|
||||
require.NoError(t, err)
|
||||
n, ok := ns.(*nodeImpl)
|
||||
require.True(t, ok)
|
||||
|
||||
err = n.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.True(t, n.pexReactor.IsRunning())
|
||||
|
||||
require.NoError(t, n.Stop())
|
||||
|
||||
}
|
||||
|
||||
func TestNodeSetEventSink(t *testing.T) {
|
||||
@@ -511,7 +546,7 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink {
|
||||
eventBus, err := createAndStartEventBus(logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() { require.NoError(t, eventBus.Stop()) })
|
||||
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -521,6 +556,22 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
|
||||
return eventSinks
|
||||
}
|
||||
cleanup := func(ns service.Service) func() {
|
||||
return func() {
|
||||
n, ok := ns.(*nodeImpl)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if n == nil {
|
||||
return
|
||||
}
|
||||
if !n.IsRunning() {
|
||||
return
|
||||
}
|
||||
assert.NoError(t, n.Stop())
|
||||
n.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
eventSinks := setupTest(t, cfg)
|
||||
assert.Equal(t, 1, len(eventSinks))
|
||||
@@ -542,6 +593,7 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
ns, err := newDefaultNode(cfg, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Equal(t, errors.New("unsupported event sink type"), err)
|
||||
t.Cleanup(cleanup(ns))
|
||||
|
||||
cfg.TxIndex.Indexer = []string{}
|
||||
eventSinks = setupTest(t, cfg)
|
||||
@@ -553,6 +605,7 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
ns, err = newDefaultNode(cfg, logger)
|
||||
assert.Nil(t, ns)
|
||||
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
|
||||
t.Cleanup(cleanup(ns))
|
||||
|
||||
var psqlConn = "test"
|
||||
|
||||
@@ -591,18 +644,21 @@ func TestNodeSetEventSink(t *testing.T) {
|
||||
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
|
||||
cfg.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
|
||||
cfg.TxIndex.PsqlConn = psqlConn
|
||||
_, err = newDefaultNode(cfg, logger)
|
||||
ns, err = newDefaultNode(cfg, logger)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, e, err)
|
||||
t.Cleanup(cleanup(ns))
|
||||
|
||||
cfg.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
|
||||
cfg.TxIndex.PsqlConn = psqlConn
|
||||
_, err = newDefaultNode(cfg, logger)
|
||||
ns, err = newDefaultNode(cfg, logger)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, e, err)
|
||||
t.Cleanup(cleanup(ns))
|
||||
}
|
||||
|
||||
func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
|
||||
func state(t *testing.T, nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
|
||||
t.Helper()
|
||||
privVals := make([]types.PrivValidator, nVals)
|
||||
vals := make([]types.GenesisValidator, nVals)
|
||||
for i := 0; i < nVals; i++ {
|
||||
@@ -623,17 +679,15 @@ func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
|
||||
|
||||
// save validators to db for 2 heights
|
||||
stateDB := dbm.NewMemDB()
|
||||
t.Cleanup(func() { require.NoError(t, stateDB.Close()) })
|
||||
|
||||
stateStore := sm.NewStore(stateDB)
|
||||
if err := stateStore.Save(s); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
require.NoError(t, stateStore.Save(s))
|
||||
|
||||
for i := 1; i < int(height); i++ {
|
||||
s.LastBlockHeight++
|
||||
s.LastValidators = s.Validators.Copy()
|
||||
if err := stateStore.Save(s); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
require.NoError(t, stateStore.Save(s))
|
||||
}
|
||||
return s, stateDB, privVals
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package client_test
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -17,7 +16,7 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var waitForEventTimeout = 8 * time.Second
|
||||
const waitForEventTimeout = 2 * time.Second
|
||||
|
||||
// MakeTxKV returns a text transaction, allong with expected key, value pair
|
||||
func MakeTxKV() ([]byte, []byte, []byte) {
|
||||
@@ -26,164 +25,41 @@ func MakeTxKV() ([]byte, []byte, []byte) {
|
||||
return k, v, append(k, append([]byte("="), v...)...)
|
||||
}
|
||||
|
||||
func TestHeaderEvents(t *testing.T) {
|
||||
n, conf := NodeSuite(t)
|
||||
func testTxEventsSent(ctx context.Context, t *testing.T, broadcastMethod string, c client.Client) {
|
||||
// make the tx
|
||||
_, _, tx := MakeTxKV()
|
||||
|
||||
for i, c := range GetClients(t, n, conf) {
|
||||
i, c := i, c
|
||||
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
|
||||
// start for this test it if it wasn't already running
|
||||
if !c.IsRunning() {
|
||||
// if so, then we start it, listen, and stop it.
|
||||
err := c.Start()
|
||||
require.Nil(t, err, "%d: %+v", i, err)
|
||||
t.Cleanup(func() {
|
||||
if err := c.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
// send
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
var (
|
||||
txres *coretypes.ResultBroadcastTx
|
||||
err error
|
||||
)
|
||||
switch broadcastMethod {
|
||||
case "async":
|
||||
txres, err = c.BroadcastTxAsync(ctx, tx)
|
||||
case "sync":
|
||||
txres, err = c.BroadcastTxSync(ctx, tx)
|
||||
default:
|
||||
panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
|
||||
}
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, txres.Code, abci.CodeTypeOK)
|
||||
}
|
||||
}()
|
||||
|
||||
evt, err := client.WaitForOneEvent(c, types.EventNewBlockHeaderValue, waitForEventTimeout)
|
||||
require.Nil(t, err, "%d: %+v", i, err)
|
||||
_, ok := evt.(types.EventDataNewBlockHeader)
|
||||
require.True(t, ok, "%d: %#v", i, evt)
|
||||
// TODO: more checks...
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe to new blocks and make sure height increments by 1
|
||||
func TestBlockEvents(t *testing.T) {
|
||||
n, conf := NodeSuite(t)
|
||||
for _, c := range GetClients(t, n, conf) {
|
||||
c := c
|
||||
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
|
||||
|
||||
// start for this test it if it wasn't already running
|
||||
if !c.IsRunning() {
|
||||
// if so, then we start it, listen, and stop it.
|
||||
err := c.Start()
|
||||
require.Nil(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := c.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const subscriber = "TestBlockEvents"
|
||||
|
||||
eventCh, err := c.Subscribe(context.Background(), subscriber, types.QueryForEvent(types.EventNewBlockValue).String())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := c.UnsubscribeAll(context.Background(), subscriber); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
|
||||
var firstBlockHeight int64
|
||||
for i := int64(0); i < 3; i++ {
|
||||
event := <-eventCh
|
||||
blockEvent, ok := event.Data.(types.EventDataNewBlock)
|
||||
require.True(t, ok)
|
||||
|
||||
block := blockEvent.Block
|
||||
|
||||
if firstBlockHeight == 0 {
|
||||
firstBlockHeight = block.Header.Height
|
||||
}
|
||||
|
||||
require.Equal(t, firstBlockHeight+i, block.Header.Height)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "async") }
|
||||
func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }
|
||||
|
||||
func testTxEventsSent(t *testing.T, broadcastMethod string) {
|
||||
n, conf := NodeSuite(t)
|
||||
for _, c := range GetClients(t, n, conf) {
|
||||
c := c
|
||||
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
|
||||
|
||||
// start for this test it if it wasn't already running
|
||||
if !c.IsRunning() {
|
||||
// if so, then we start it, listen, and stop it.
|
||||
err := c.Start()
|
||||
require.Nil(t, err)
|
||||
t.Cleanup(func() {
|
||||
if err := c.Stop(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// make the tx
|
||||
_, _, tx := MakeTxKV()
|
||||
|
||||
// send
|
||||
go func() {
|
||||
var (
|
||||
txres *coretypes.ResultBroadcastTx
|
||||
err error
|
||||
ctx = context.Background()
|
||||
)
|
||||
switch broadcastMethod {
|
||||
case "async":
|
||||
txres, err = c.BroadcastTxAsync(ctx, tx)
|
||||
case "sync":
|
||||
txres, err = c.BroadcastTxSync(ctx, tx)
|
||||
default:
|
||||
panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
|
||||
}
|
||||
if assert.NoError(t, err) {
|
||||
assert.Equal(t, txres.Code, abci.CodeTypeOK)
|
||||
}
|
||||
}()
|
||||
|
||||
// and wait for confirmation
|
||||
evt, err := client.WaitForOneEvent(c, types.EventTxValue, waitForEventTimeout)
|
||||
require.Nil(t, err)
|
||||
|
||||
// and make sure it has the proper info
|
||||
txe, ok := evt.(types.EventDataTx)
|
||||
require.True(t, ok)
|
||||
|
||||
// make sure this is the proper tx
|
||||
require.EqualValues(t, tx, txe.Tx)
|
||||
require.True(t, txe.Result.IsOK())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test HTTPClient resubscribes upon disconnect && subscription error.
|
||||
// Test Local client resubscribes upon subscription error.
|
||||
func TestClientsResubscribe(t *testing.T) {
|
||||
// TODO(melekes)
|
||||
}
|
||||
|
||||
func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
_, conf := NodeSuite(t)
|
||||
|
||||
c := getHTTPClient(t, conf)
|
||||
|
||||
// on Subscribe
|
||||
_, err := c.Subscribe(ctx, "TestHeaderEvents",
|
||||
types.QueryForEvent(types.EventNewBlockHeaderValue).String())
|
||||
assert.Error(t, err)
|
||||
|
||||
// on Unsubscribe
|
||||
err = c.Unsubscribe(ctx, "TestHeaderEvents",
|
||||
types.QueryForEvent(types.EventNewBlockHeaderValue).String())
|
||||
assert.Error(t, err)
|
||||
|
||||
// on UnsubscribeAll
|
||||
err = c.UnsubscribeAll(ctx, "TestHeaderEvents")
|
||||
assert.Error(t, err)
|
||||
// and wait for confirmation
|
||||
evt, err := client.WaitForOneEvent(c, types.EventTxValue, waitForEventTimeout)
|
||||
require.Nil(t, err)
|
||||
|
||||
// and make sure it has the proper info
|
||||
txe, ok := evt.(types.EventDataTx)
|
||||
require.True(t, ok)
|
||||
|
||||
// make sure this is the proper tx
|
||||
require.EqualValues(t, tx, txe.Tx)
|
||||
require.True(t, txe.Result.IsOK())
|
||||
<-done
|
||||
}
|
||||
|
||||
@@ -3,16 +3,20 @@ package client_test
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
|
||||
"github.com/tendermint/tendermint/rpc/coretypes"
|
||||
rpctest "github.com/tendermint/tendermint/rpc/test"
|
||||
)
|
||||
|
||||
func ExampleHTTP_simple() {
|
||||
func TestHTTPSimple(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@@ -29,9 +33,7 @@ func ExampleHTTP_simple() {
|
||||
// Create our RPC client
|
||||
rpcAddr := conf.RPC.ListenAddress
|
||||
c, err := rpchttp.New(rpcAddr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create a transaction
|
||||
k := []byte("name")
|
||||
@@ -41,6 +43,7 @@ func ExampleHTTP_simple() {
|
||||
// Broadcast the transaction and wait for it to commit (rather use
|
||||
// c.BroadcastTxSync though in production).
|
||||
bres, err := c.BroadcastTxCommit(context.Background(), tx)
|
||||
require.NoError(t, err)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -50,30 +53,19 @@ func ExampleHTTP_simple() {
|
||||
|
||||
// Now try to fetch the value for the key
|
||||
qres, err := c.ABCIQuery(context.Background(), "/key", k)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if qres.Response.IsErr() {
|
||||
log.Fatal("ABCIQuery failed")
|
||||
}
|
||||
if !bytes.Equal(qres.Response.Key, k) {
|
||||
log.Fatal("returned key does not match queried key")
|
||||
}
|
||||
if !bytes.Equal(qres.Response.Value, v) {
|
||||
log.Fatal("returned value does not match sent value")
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.False(t, qres.Response.IsErr(), "ABCIQuery failed")
|
||||
require.True(t, bytes.Equal(qres.Response.Key, k),
|
||||
"returned key does not match queried key")
|
||||
require.True(t, bytes.Equal(qres.Response.Value, v),
|
||||
"returned value does not match sent value [%s]", string(v))
|
||||
|
||||
fmt.Println("Sent tx :", string(tx))
|
||||
fmt.Println("Queried for :", string(qres.Response.Key))
|
||||
fmt.Println("Got value :", string(qres.Response.Value))
|
||||
|
||||
// Output:
|
||||
// Sent tx : name=satoshi
|
||||
// Queried for : name
|
||||
// Got value : satoshi
|
||||
assert.Equal(t, "name=satoshi", string(tx), "sent tx")
|
||||
assert.Equal(t, "name", string(qres.Response.Key), "queried for")
|
||||
assert.Equal(t, "satoshi", string(qres.Response.Value), "got value")
|
||||
}
|
||||
|
||||
func ExampleHTTP_batching() {
|
||||
func TestHTTPBatching(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
@@ -88,10 +80,8 @@ func ExampleHTTP_batching() {
|
||||
defer func() { _ = closer(ctx) }()
|
||||
|
||||
rpcAddr := conf.RPC.ListenAddress
|
||||
c, err := rpchttp.New(rpcAddr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Create our two transactions
|
||||
k1 := []byte("firstName")
|
||||
@@ -111,41 +101,51 @@ func ExampleHTTP_batching() {
|
||||
for _, tx := range txs {
|
||||
// Broadcast the transaction and wait for it to commit (rather use
|
||||
// c.BroadcastTxSync though in production).
|
||||
if _, err := batch.BroadcastTxCommit(context.Background(), tx); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
_, err := batch.BroadcastTxSync(ctx, tx)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Send the batch of 2 transactions
|
||||
if _, err := batch.Send(context.Background()); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
_, err = batch.Send(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now let's query for the original results as a batch
|
||||
keys := [][]byte{k1, k2}
|
||||
for _, key := range keys {
|
||||
if _, err := batch.ABCIQuery(context.Background(), "/key", key); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
// wait for the transaction to land, we could poll more for
|
||||
// the transactions to land definitively.
|
||||
require.Eventually(t,
|
||||
func() bool {
|
||||
// Now let's query for the original results as a batch
|
||||
exists := 0
|
||||
for _, key := range [][]byte{k1, k2} {
|
||||
_, err := batch.ABCIQuery(context.Background(), "/key", key)
|
||||
if err == nil {
|
||||
exists++
|
||||
|
||||
}
|
||||
}
|
||||
return exists == 2
|
||||
},
|
||||
10*time.Second,
|
||||
time.Second,
|
||||
)
|
||||
|
||||
// Send the 2 queries and keep the results
|
||||
results, err := batch.Send(context.Background())
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
results, err := batch.Send(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, results, 2)
|
||||
// Each result in the returned list is the deserialized result of each
|
||||
// respective ABCIQuery response
|
||||
for _, result := range results {
|
||||
qr, ok := result.(*coretypes.ResultABCIQuery)
|
||||
if !ok {
|
||||
log.Fatal("invalid result type from ABCIQuery request")
|
||||
}
|
||||
fmt.Println(string(qr.Response.Key), "=", string(qr.Response.Value))
|
||||
}
|
||||
require.True(t, ok, "invalid result type from ABCIQuery request")
|
||||
|
||||
// Output:
|
||||
// firstName = satoshi
|
||||
// lastName = nakamoto
|
||||
switch string(qr.Response.Key) {
|
||||
case "firstName":
|
||||
require.Equal(t, "satoshi", string(qr.Response.Value))
|
||||
case "lastName":
|
||||
require.Equal(t, "nakamoto", string(qr.Response.Value))
|
||||
default:
|
||||
t.Fatalf("encountered unknown key %q", string(qr.Response.Key))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
@@ -120,20 +121,20 @@ func NewWithTimeout(remote string, t time.Duration) (*HTTP, error) {
|
||||
}
|
||||
|
||||
// NewWithClient allows you to set a custom http client. An error is returned
|
||||
// on invalid remote. The function panics when client is nil.
|
||||
// on invalid remote. The function returns an error when client is nil
|
||||
// or an invalid remote.
|
||||
func NewWithClient(remote string, c *http.Client) (*HTTP, error) {
|
||||
if c == nil {
|
||||
panic("nil http.Client")
|
||||
return nil, errors.New("nil client")
|
||||
}
|
||||
return NewWithClientAndWSOptions(remote, c, DefaultWSOptions())
|
||||
}
|
||||
|
||||
// NewWithClientAndWSOptions allows you to set a custom http client and
|
||||
// WebSocket options. An error is returned on invalid remote. The function
|
||||
// panics when client is nil.
|
||||
// WebSocket options. An error is returned on invalid remote or nil client.
|
||||
func NewWithClientAndWSOptions(remote string, c *http.Client, wso WSOptions) (*HTTP, error) {
|
||||
if c == nil {
|
||||
panic("nil http.Client")
|
||||
return nil, errors.New("nil client")
|
||||
}
|
||||
rpc, err := jsonrpcclient.NewWithHTTPClient(remote, c)
|
||||
if err != nil {
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
@@ -30,9 +31,11 @@ func NodeSuite(t *testing.T) (service.Service, *config.Config) {
|
||||
node, closer, err := rpctest.StartTendermint(ctx, conf, app, rpctest.SuppressStdout)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
_ = closer(ctx)
|
||||
cancel()
|
||||
app.Close()
|
||||
assert.NoError(t, node.Stop())
|
||||
assert.NoError(t, closer(ctx))
|
||||
assert.NoError(t, app.Close())
|
||||
node.Wait()
|
||||
_ = os.RemoveAll(dir)
|
||||
})
|
||||
return node, conf
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
@@ -155,7 +156,7 @@ func New(remote string) (*Client, error) {
|
||||
// panics when client is nil.
|
||||
func NewWithHTTPClient(remote string, c *http.Client) (*Client, error) {
|
||||
if c == nil {
|
||||
panic("nil http.Client")
|
||||
return nil, errors.New("nil client")
|
||||
}
|
||||
|
||||
parsedURL, err := newParsedURL(remote)
|
||||
|
||||
Reference in New Issue
Block a user