Merge remote-tracking branch 'origin/wb/abci-metrics' into wb/abci-metrics

This commit is contained in:
William Banfield
2021-10-13 15:27:30 -04:00
44 changed files with 1116 additions and 1098 deletions

View File

@@ -10,7 +10,7 @@ on:
- cron: '0 2 * * *'
jobs:
e2e-nightly-test-2:
e2e-nightly-test:
# Run parallel jobs for the listed testnet groups (must match the
# ./build/generator -g flag)
strategy:
@@ -44,7 +44,7 @@ jobs:
run: ./run-multiple.sh networks/nightly/${{ matrix.p2p }}/*-group${{ matrix.group }}-*.toml
e2e-nightly-fail-2:
needs: e2e-nightly-test-2
needs: e2e-nightly-test
if: ${{ failure() }}
runs-on: ubuntu-latest
steps:
@@ -60,7 +60,7 @@ jobs:
SLACK_FOOTER: ''
e2e-nightly-success: # may turn this off once they seem to pass consistently
needs: e2e-nightly-test-2
needs: e2e-nightly-test
if: ${{ success() }}
runs-on: ubuntu-latest
steps:

View File

@@ -10,7 +10,7 @@ on:
- cron: '0 2 * * *'
jobs:
e2e-nightly-test-2:
e2e-nightly-test:
# Run parallel jobs for the listed testnet groups (must match the
# ./build/generator -g flag)
strategy:
@@ -41,7 +41,7 @@ jobs:
run: ./run-multiple.sh networks/nightly/*-group${{ matrix.group }}-*.toml
e2e-nightly-fail-2:
needs: e2e-nightly-test-2
needs: e2e-nightly-test
if: ${{ failure() }}
runs-on: ubuntu-latest
steps:
@@ -57,7 +57,7 @@ jobs:
SLACK_FOOTER: ''
e2e-nightly-success: # may turn this off once they seem to pass consistently
needs: e2e-nightly-test-2
needs: e2e-nightly-test
if: ${{ success() }}
runs-on: ubuntu-latest
steps:

View File

@@ -33,10 +33,6 @@ jobs:
- name: Run CI testnet
working-directory: test/e2e
run: ./build/runner -f networks/ci.toml
run: ./run-multiple.sh networks/ci.toml
if: "env.GIT_DIFF != ''"
- name: Emit logs on failure
if: ${{ failure() }}
working-directory: test/e2e
run: ./build/runner -f networks/ci.toml logs

View File

@@ -23,7 +23,7 @@ jobs:
- uses: golangci/golangci-lint-action@v2.5.2
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.38
version: v1.42.1
args: --timeout 10m
github-token: ${{ secrets.github_token }}
if: env.GIT_DIFF

View File

@@ -13,12 +13,12 @@ linters:
# - gochecknoinits
# - gocognit
- goconst
- gocritic
# - gocritic
# - gocyclo
# - godox
- gofmt
- goimports
- golint
- revive
- gosec
- gosimple
- govet

View File

@@ -178,6 +178,24 @@ Special thanks to external contributors on this release: @JayT106, @bipulprasad,
- [statesync] [\#6463](https://github.com/tendermint/tendermint/pull/6463) Adds Reverse Sync feature to fetch historical light blocks after state sync in order to verify any evidence (@cmwaters)
- [blocksync] [\#6590](https://github.com/tendermint/tendermint/pull/6590) Update the metrics during blocksync (@JayT106)
## v0.34.14
This release backports the `rollback` feature to allow recovery in the event of an incorrect app hash.
### FEATURES
- [\#6982](https://github.com/tendermint/tendermint/pull/6982) The tendermint binary now has built-in suppport for running the end-to-end test application (with state sync support) (@cmwaters).
- [cli] [#7033](https://github.com/tendermint/tendermint/pull/7033) Add a `rollback` command to rollback to the previous tendermint state. This may be useful in the event of non-determinstic app hash or when reverting an upgrade. @cmwaters
### IMPROVEMENTS
- [\#7103](https://github.com/tendermint/tendermint/pull/7104) Remove IAVL dependency (backport of #6550) (@cmwaters)
### BUG FIXES
- [\#7057](https://github.com/tendermint/tendermint/pull/7057) Import Postgres driver support for the psql indexer (@creachadair).
- [ABCI] [\#7110](https://github.com/tendermint/tendermint/issues/7110) Revert "change client to use multi-reader mutexes (#6873)" (@tychoish).
## v0.34.13
*September 6, 2021*

View File

@@ -36,3 +36,5 @@ Special thanks to external contributors on this release:
### IMPROVEMENTS
### BUG FIXES
- fix: assignment copies lock value in `BitArray.UnmarshalJSON()` (@lklimek)

View File

@@ -227,13 +227,13 @@ build-docs:
build-docker: build-linux
cp $(BUILDDIR)/tendermint DOCKER/tendermint
docker build --label=tendermint --tag="tendermint/tendermint" DOCKER
docker build --label=tendermint --tag="tendermint/tendermint" -f DOCKER/Dockerfile .
rm -rf DOCKER/tendermint
.PHONY: build-docker
###############################################################################
### Mocks ###
### Mocks ###
###############################################################################
mockery:

View File

@@ -87,7 +87,7 @@ type ReqRes struct {
*sync.WaitGroup
*types.Response // Not set atomically, so be sure to use WaitGroup.
mtx tmsync.RWMutex
mtx tmsync.Mutex
done bool // Gets set to true once *after* WaitGroup.Done().
cb func(*types.Response) // A single callback that may be set.
}
@@ -137,16 +137,16 @@ func (r *ReqRes) InvokeCallback() {
//
// ref: https://github.com/tendermint/tendermint/issues/5439
func (r *ReqRes) GetCallback() func(*types.Response) {
r.mtx.RLock()
defer r.mtx.RUnlock()
r.mtx.Lock()
defer r.mtx.Unlock()
return r.cb
}
// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetDone() {
r.mtx.Lock()
defer r.mtx.Unlock()
r.done = true
r.mtx.Unlock()
}
func waitGroup1() (wg *sync.WaitGroup) {

View File

@@ -13,7 +13,7 @@ type Creator func() (Client, error)
// NewLocalCreator returns a Creator for the given app,
// which will be running locally.
func NewLocalCreator(app types.Application) Creator {
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
return func() (Client, error) {
return NewLocalClient(mtx, app), nil

View File

@@ -24,7 +24,7 @@ type grpcClient struct {
conn *grpc.ClientConn
chReqRes chan *ReqRes // dispatches "async" responses to callbacks *in order*, needed by mempool
mtx tmsync.RWMutex
mtx tmsync.Mutex
addr string
err error
resCb func(*types.Request, *types.Response) // listens to all callbacks
@@ -149,8 +149,8 @@ func (cli *grpcClient) StopForError(err error) {
}
func (cli *grpcClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}
@@ -158,8 +158,8 @@ func (cli *grpcClient) Error() error {
// NOTE: callback may get internally generated flush responses.
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}
//----------------------------------------

View File

@@ -15,7 +15,7 @@ import (
type localClient struct {
service.BaseService
mtx *tmsync.RWMutex
mtx *tmsync.Mutex
types.Application
Callback
}
@@ -26,24 +26,22 @@ var _ Client = (*localClient)(nil)
// methods of the given app.
//
// Both Async and Sync methods ignore the given context.Context parameter.
func NewLocalClient(mtx *tmsync.RWMutex, app types.Application) Client {
func NewLocalClient(mtx *tmsync.Mutex, app types.Application) Client {
if mtx == nil {
mtx = &tmsync.RWMutex{}
mtx = new(tmsync.Mutex)
}
cli := &localClient{
mtx: mtx,
Application: app,
}
cli.BaseService = *service.NewBaseService(nil, "localClient", cli)
return cli
}
func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Lock()
defer app.mtx.Unlock()
app.Callback = cb
app.mtx.Unlock()
}
// TODO: change types.Application to include Error()?
@@ -67,8 +65,8 @@ func (app *localClient) EchoAsync(ctx context.Context, msg string) (*ReqRes, err
}
func (app *localClient) InfoAsync(ctx context.Context, req types.RequestInfo) (*ReqRes, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Info(req)
return app.callback(
@@ -100,8 +98,8 @@ func (app *localClient) CheckTxAsync(ctx context.Context, req types.RequestCheck
}
func (app *localClient) QueryAsync(ctx context.Context, req types.RequestQuery) (*ReqRes, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Query(req)
return app.callback(
@@ -215,8 +213,8 @@ func (app *localClient) EchoSync(ctx context.Context, msg string) (*types.Respon
}
func (app *localClient) InfoSync(ctx context.Context, req types.RequestInfo) (*types.ResponseInfo, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Info(req)
return &res, nil
@@ -249,8 +247,8 @@ func (app *localClient) QuerySync(
ctx context.Context,
req types.RequestQuery,
) (*types.ResponseQuery, error) {
app.mtx.RLock()
defer app.mtx.RUnlock()
app.mtx.Lock()
defer app.mtx.Unlock()
res := app.Application.Query(req)
return &res, nil

View File

@@ -39,7 +39,7 @@ type socketClient struct {
reqQueue chan *reqResWithContext
mtx tmsync.RWMutex
mtx tmsync.Mutex
err error
reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // called on all requests, if set.
@@ -102,8 +102,8 @@ func (cli *socketClient) OnStop() {
// Error returns an error if the client was stopped abruptly.
func (cli *socketClient) Error() error {
cli.mtx.RLock()
defer cli.mtx.RUnlock()
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}
@@ -113,8 +113,8 @@ func (cli *socketClient) Error() error {
// NOTE: callback may get internally generated flush responses.
func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.mtx.Lock()
defer cli.mtx.Unlock()
cli.resCb = resCb
cli.mtx.Unlock()
}
//----------------------------------------

View File

@@ -62,7 +62,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)
// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(mtx, app)

View File

@@ -407,7 +407,7 @@ func newStateWithConfigAndBlockStore(
blockStore *store.BlockStore,
) *State {
// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(mtx, app)

View File

@@ -77,7 +77,7 @@ func (m *NewRoundStepMessage) ValidateHeight(initialHeight int64) error {
m.LastCommitRound, initialHeight)
}
if m.Height > initialHeight && m.LastCommitRound < 0 {
return fmt.Errorf("LastCommitRound can only be negative for initial height %v", // nolint
return fmt.Errorf("LastCommitRound can only be negative for initial height %v",
initialHeight)
}
return nil

View File

@@ -346,7 +346,7 @@ func TestReactorWithEvidence(t *testing.T) {
blockStore := store.NewBlockStore(blockDB)
// one for mempool, one for consensus
mtx := new(tmsync.RWMutex)
mtx := new(tmsync.Mutex)
proxyAppConnMem := abciclient.NewLocalClient(mtx, app)
proxyAppConnCon := abciclient.NewLocalClient(mtx, app)

View File

@@ -71,7 +71,7 @@ func iotest(writer protoio.WriteCloser, reader protoio.ReadCloser) error {
return err
}
if n != len(bz)+visize {
return fmt.Errorf("WriteMsg() wrote %v bytes, expected %v", n, len(bz)+visize) // nolint
return fmt.Errorf("WriteMsg() wrote %v bytes, expected %v", n, len(bz)+visize)
}
lens[i] = n
}

View File

@@ -7,17 +7,15 @@ import (
"github.com/tendermint/tendermint/types"
)
// nolint: golint
// TODO: Rename type.
type MempoolIDs struct {
type IDs struct {
mtx tmsync.RWMutex
peerMap map[types.NodeID]uint16
nextID uint16 // assumes that a node will never have over 65536 active peers
activeIDs map[uint16]struct{} // used to check if a given peerID key is used
}
func NewMempoolIDs() *MempoolIDs {
return &MempoolIDs{
func NewMempoolIDs() *IDs {
return &IDs{
peerMap: make(map[types.NodeID]uint16),
// reserve UnknownPeerID for mempoolReactor.BroadcastTx
@@ -28,7 +26,7 @@ func NewMempoolIDs() *MempoolIDs {
// ReserveForPeer searches for the next unused ID and assigns it to the provided
// peer.
func (ids *MempoolIDs) ReserveForPeer(peerID types.NodeID) {
func (ids *IDs) ReserveForPeer(peerID types.NodeID) {
ids.mtx.Lock()
defer ids.mtx.Unlock()
@@ -38,7 +36,7 @@ func (ids *MempoolIDs) ReserveForPeer(peerID types.NodeID) {
}
// Reclaim returns the ID reserved for the peer back to unused pool.
func (ids *MempoolIDs) Reclaim(peerID types.NodeID) {
func (ids *IDs) Reclaim(peerID types.NodeID) {
ids.mtx.Lock()
defer ids.mtx.Unlock()
@@ -50,7 +48,7 @@ func (ids *MempoolIDs) Reclaim(peerID types.NodeID) {
}
// GetForPeer returns an ID reserved for the peer.
func (ids *MempoolIDs) GetForPeer(peerID types.NodeID) uint16 {
func (ids *IDs) GetForPeer(peerID types.NodeID) uint16 {
ids.mtx.RLock()
defer ids.mtx.RUnlock()
@@ -59,7 +57,7 @@ func (ids *MempoolIDs) GetForPeer(peerID types.NodeID) uint16 {
// nextPeerID returns the next unused peer ID to use. We assume that the mutex
// is already held.
func (ids *MempoolIDs) nextPeerID() uint16 {
func (ids *IDs) nextPeerID() uint16 {
if len(ids.activeIDs) == MaxActiveIDs {
panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", MaxActiveIDs))
}

View File

@@ -39,7 +39,7 @@ type Reactor struct {
cfg *config.MempoolConfig
mempool *CListMempool
ids *mempool.MempoolIDs
ids *mempool.IDs
// XXX: Currently, this is the only way to get information about a peer. Ideally,
// we rely on message-oriented communication to get necessary peer data.

View File

@@ -39,7 +39,7 @@ type Reactor struct {
cfg *config.MempoolConfig
mempool *TxMempool
ids *mempool.MempoolIDs
ids *mempool.IDs
// XXX: Currently, this is the only way to get information about a peer. Ideally,
// we rely on message-oriented communication to get necessary peer data.

View File

@@ -180,7 +180,7 @@ func (o *PeerManagerOptions) Validate() error {
if o.MaxPeers > 0 {
if o.MaxConnected == 0 || o.MaxConnected+o.MaxConnectedUpgrade > o.MaxPeers {
return fmt.Errorf("MaxConnected %v and MaxConnectedUpgrade %v can't exceed MaxPeers %v", // nolint
return fmt.Errorf("MaxConnected %v and MaxConnectedUpgrade %v can't exceed MaxPeers %v",
o.MaxConnected, o.MaxConnectedUpgrade, o.MaxPeers)
}
}
@@ -190,7 +190,7 @@ func (o *PeerManagerOptions) Validate() error {
return errors.New("can't set MaxRetryTime without MinRetryTime")
}
if o.MinRetryTime > o.MaxRetryTime {
return fmt.Errorf("MinRetryTime %v is greater than MaxRetryTime %v", // nolint
return fmt.Errorf("MinRetryTime %v is greater than MaxRetryTime %v",
o.MinRetryTime, o.MaxRetryTime)
}
}
@@ -200,7 +200,7 @@ func (o *PeerManagerOptions) Validate() error {
return errors.New("can't set MaxRetryTimePersistent without MinRetryTime")
}
if o.MinRetryTime > o.MaxRetryTimePersistent {
return fmt.Errorf("MinRetryTime %v is greater than MaxRetryTimePersistent %v", // nolint
return fmt.Errorf("MinRetryTime %v is greater than MaxRetryTimePersistent %v",
o.MinRetryTime, o.MaxRetryTimePersistent)
}
}

View File

@@ -18,14 +18,16 @@ var _ indexer.EventSink = (*EventSink)(nil)
// The EventSink is an aggregator for redirecting the call path of the tx/block kvIndexer.
// For the implementation details please see the kv.go in the indexer/block and indexer/tx folder.
type EventSink struct {
txi *kvt.TxIndex
bi *kvb.BlockerIndexer
txi *kvt.TxIndex
bi *kvb.BlockerIndexer
store dbm.DB
}
func NewEventSink(store dbm.DB) indexer.EventSink {
return &EventSink{
txi: kvt.NewTxIndex(store),
bi: kvb.New(store),
txi: kvt.NewTxIndex(store),
bi: kvb.New(store),
store: store,
}
}
@@ -58,5 +60,5 @@ func (kves *EventSink) HasBlock(h int64) (bool, error) {
}
func (kves *EventSink) Stop() error {
return nil
return kves.store.Close()
}

View File

@@ -195,8 +195,8 @@ func (state *State) ToProto() (*tmstate.State, error) {
return sm, nil
}
// StateFromProto takes a state proto message & returns the local state type
func StateFromProto(pb *tmstate.State) (*State, error) { //nolint:golint
// FromProto takes a state proto message & returns the local state type
func FromProto(pb *tmstate.State) (*State, error) {
if pb == nil {
return nil, errors.New("nil State")
}

View File

@@ -1079,7 +1079,7 @@ func TestStateProto(t *testing.T) {
assert.NoError(t, err, tt.testName)
}
smt, err := sm.StateFromProto(pbs)
smt, err := sm.FromProto(pbs)
if tt.expPass2 {
require.NoError(t, err, tt.testName)
require.Equal(t, tt.state, smt, tt.testName)

View File

@@ -130,7 +130,7 @@ func (store dbStore) loadState(key []byte) (state State, err error) {
%v\n`, err))
}
sm, err := StateFromProto(sp)
sm, err := FromProto(sp)
if err != nil {
return state, err
}

View File

@@ -270,7 +270,7 @@ func TestValidateBlockEvidence(t *testing.T) {
A block with too much evidence fails
*/
evidence := make([]types.Evidence, 0)
var currentBytes int64 = 0
var currentBytes int64
// more bytes than the maximum allowed for evidence
for currentBytes <= maxBytesEvidence {
newEv := types.NewMockDuplicateVoteEvidenceWithValidator(height, time.Now(),
@@ -290,7 +290,7 @@ func TestValidateBlockEvidence(t *testing.T) {
A good block with several pieces of good evidence passes
*/
evidence := make([]types.Evidence, 0)
var currentBytes int64 = 0
var currentBytes int64
// precisely the amount of allowed evidence
for {
newEv := types.NewMockDuplicateVoteEvidenceWithValidator(height, defaultEvidenceTime,

View File

@@ -734,7 +734,7 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
}
case *ssproto.LightBlockResponse:
var height int64 = 0
var height int64
if msg.LightBlock != nil {
height = msg.LightBlock.SignedHeader.Header.Height
}

View File

@@ -345,7 +345,7 @@ func (bs *BlockStore) pruneRange(
var (
err error
pruned uint64
totalPruned uint64 = 0
totalPruned uint64
)
batch := bs.db.NewBatch()
@@ -392,7 +392,7 @@ func (bs *BlockStore) batchDelete(
start, end []byte,
preDeletionHook func(key, value []byte, batch dbm.Batch) error,
) (uint64, []byte, error) {
var pruned uint64 = 0
var pruned uint64
iter, err := bs.db.Iterator(start, end)
if err != nil {
return pruned, start, err

View File

@@ -30,9 +30,21 @@ func NewBitArray(bits int) *BitArray {
if bits <= 0 {
return nil
}
return &BitArray{
Bits: bits,
Elems: make([]uint64, numElems(bits)),
bA := &BitArray{}
bA.reset(bits)
return bA
}
// reset changes size of BitArray to `bits` and re-allocates (zeroed) data buffer
func (bA *BitArray) reset(bits int) {
bA.mtx.Lock()
defer bA.mtx.Unlock()
bA.Bits = bits
if bits == 0 {
bA.Elems = nil
} else {
bA.Elems = make([]uint64, numElems(bits))
}
}
@@ -399,8 +411,7 @@ func (bA *BitArray) UnmarshalJSON(bz []byte) error {
if b == "null" {
// This is required e.g. for encoding/json when decoding
// into a pointer with pre-allocated BitArray.
bA.Bits = 0
bA.Elems = nil
bA.reset(0)
return nil
}
@@ -410,16 +421,15 @@ func (bA *BitArray) UnmarshalJSON(bz []byte) error {
return fmt.Errorf("bitArray in JSON should be a string of format %q but got %s", bitArrayJSONRegexp.String(), b)
}
bits := match[1]
// Construct new BitArray and copy over.
numBits := len(bits)
bA2 := NewBitArray(numBits)
bA.reset(numBits)
for i := 0; i < numBits; i++ {
if bits[i] == 'x' {
bA2.SetIndex(i, true)
bA.SetIndex(i, true)
}
}
*bA = *bA2 //nolint:govet
return nil
}

View File

@@ -61,7 +61,6 @@ func (c CustomValue) MarshalJSON() ([]byte, error) {
}
func (c CustomValue) UnmarshalJSON(bz []byte) error {
c.Value = "custom"
return nil
}

View File

@@ -94,7 +94,7 @@ Check out other examples in example_test.go
## 2. Pure functions to verify a new header (see verifier.go)
Verify function verifies a new header against some trusted header. See
https://github.com/tendermint/spec/blob/master/spec/consensus/light-client/verification.md
https://github.com/tendermint/spec/blob/master/spec/light-client/verification/README.md
for details.
There are two methods of verification: sequential and bisection
@@ -118,10 +118,7 @@ as a wrapper, which verifies all the headers, using a light client connected to
some other node.
See
https://docs.tendermint.com/master/tendermint-core/light-client-protocol.html
for usage example.
Or see
https://github.com/tendermint/spec/tree/master/spec/consensus/light-client
for the full spec
https://github.com/tendermint/spec/tree/master/spec/light-client
for the light client specification.
*/
package light

View File

@@ -23,6 +23,7 @@ import (
"github.com/tendermint/tendermint/internal/proxy"
rpccore "github.com/tendermint/tendermint/internal/rpc/core"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/state/indexer"
"github.com/tendermint/tendermint/internal/statesync"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
@@ -62,6 +63,7 @@ type nodeImpl struct {
// services
eventBus *types.EventBus // pub/sub for services
eventSinks []indexer.EventSink
stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk
bcReactor service.Service // for block-syncing
@@ -73,6 +75,7 @@ type nodeImpl struct {
pexReactor service.Service // for exchanging peer addresses
evidenceReactor service.Service
rpcListeners []net.Listener // rpc servers
shutdownOps closer
indexerService service.Service
rpcEnv *rpccore.Environment
prometheusSrv *http.Server
@@ -106,6 +109,7 @@ func newDefaultNode(cfg *config.Config, logger log.Logger) (service.Service, err
}
appClient, _ := proxy.DefaultClientCreator(cfg.ProxyApp, cfg.ABCI, cfg.DBDir())
return makeNode(cfg,
pval,
nodeKey,
@@ -123,27 +127,34 @@ func makeNode(cfg *config.Config,
clientCreator abciclient.Creator,
genesisDocProvider genesisDocProvider,
dbProvider config.DBProvider,
logger log.Logger) (service.Service, error) {
logger log.Logger,
) (service.Service, error) {
closers := []closer{}
blockStore, stateDB, err := initDBs(cfg, dbProvider)
blockStore, stateDB, dbCloser, err := initDBs(cfg, dbProvider)
if err != nil {
return nil, err
return nil, combineCloseError(err, dbCloser)
}
closers = append(closers, dbCloser)
stateStore := sm.NewStore(stateDB)
genDoc, err := genesisDocProvider()
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
err = genDoc.ValidateAndComplete()
if err != nil {
return nil, fmt.Errorf("error in genesis doc: %w", err)
return nil, combineCloseError(
fmt.Errorf("error in genesis doc: %w", err),
makeCloser(closers))
}
state, err := loadStateFromDBOrGenesisDocProvider(stateStore, genDoc)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
nodeMetrics := defaultMetricsProvider(cfg.Instrumentation)(genDoc.ChainID)
@@ -151,7 +162,8 @@ func makeNode(cfg *config.Config,
// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger, nodeMetrics.proxy)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
// EventBus and IndexerService must be started before the handshake because
@@ -160,12 +172,13 @@ func makeNode(cfg *config.Config,
// but before it indexed the txs, or, endblocker panicked)
eventBus, err := createAndStartEventBus(logger)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
indexerService, eventSinks, err := createAndStartIndexerService(cfg, dbProvider, eventBus, logger, genDoc.ChainID)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
// If an address is provided, listen on the socket for a connection from an
@@ -177,12 +190,16 @@ func makeNode(cfg *config.Config,
case "grpc":
privValidator, err = createAndStartPrivValidatorGRPCClient(cfg, genDoc.ChainID, logger)
if err != nil {
return nil, fmt.Errorf("error with private validator grpc client: %w", err)
return nil, combineCloseError(
fmt.Errorf("error with private validator grpc client: %w", err),
makeCloser(closers))
}
default:
privValidator, err = createAndStartPrivValidatorSocketClient(cfg.PrivValidator.ListenAddr, genDoc.ChainID, logger)
if err != nil {
return nil, fmt.Errorf("error with private validator socket client: %w", err)
return nil, combineCloseError(
fmt.Errorf("error with private validator socket client: %w", err),
makeCloser(closers))
}
}
}
@@ -190,10 +207,14 @@ func makeNode(cfg *config.Config,
if cfg.Mode == config.ModeValidator {
pubKey, err = privValidator.GetPubKey(context.TODO())
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
return nil, combineCloseError(fmt.Errorf("can't get pubkey: %w", err),
makeCloser(closers))
}
if pubKey == nil {
return nil, errors.New("could not retrieve public key from private validator")
return nil, combineCloseError(
errors.New("could not retrieve public key from private validator"),
makeCloser(closers))
}
}
@@ -209,7 +230,8 @@ func makeNode(cfg *config.Config,
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
// Reload the state. It will have the Version.Consensus.App set by the
@@ -217,7 +239,9 @@ func makeNode(cfg *config.Config,
// what happened during block replay).
state, err = stateStore.Load()
if err != nil {
return nil, fmt.Errorf("cannot load state: %w", err)
return nil, combineCloseError(
fmt.Errorf("cannot load state: %w", err),
makeCloser(closers))
}
}
@@ -231,35 +255,43 @@ func makeNode(cfg *config.Config,
// TODO: Use a persistent peer database.
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, cfg)
peerManager, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
peerManager, peerCloser, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
closers = append(closers, peerCloser)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
return nil, combineCloseError(
fmt.Errorf("failed to create peer manager: %w", err),
makeCloser(closers))
}
router, err := createRouter(p2pLogger, nodeMetrics.p2p, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(cfg, proxyApp))
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
makeCloser(closers))
}
mpReactorShim, mpReactor, mp, err := createMempoolReactor(
cfg, proxyApp, state, nodeMetrics.mempool, peerManager, router, logger,
)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
evReactorShim, evReactor, evPool, err := createEvidenceReactor(
cfg, dbProvider, stateDB, blockStore, peerManager, router, logger,
)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
// make block executor for consensus and blockchain reactors to execute blocks
@@ -286,7 +318,9 @@ func makeNode(cfg *config.Config,
peerManager, router, blockSync && !stateSync, nodeMetrics.consensus,
)
if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
return nil, combineCloseError(
fmt.Errorf("could not create blockchain reactor: %w", err),
makeCloser(closers))
}
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
@@ -304,6 +338,7 @@ func makeNode(cfg *config.Config,
ssLogger := logger.With("module", "statesync")
ssReactorShim := p2p.NewReactorShim(ssLogger, "StateSyncShim", statesync.ChannelShims)
channels := makeChannelsFromShims(router, statesync.ChannelShims)
peerUpdates := peerManager.Subscribe()
stateSyncReactor := statesync.NewReactor(
genDoc.ChainID,
@@ -353,7 +388,8 @@ func makeNode(cfg *config.Config,
pexReactor, err = createPEXReactor(logger, peerManager, router)
if err != nil {
return nil, err
return nil, combineCloseError(err, makeCloser(closers))
}
if cfg.RPC.PprofListenAddress != "" {
@@ -386,6 +422,9 @@ func makeNode(cfg *config.Config,
evidenceReactor: evReactor,
indexerService: indexerService,
eventBus: eventBus,
eventSinks: eventSinks,
shutdownOps: makeCloser(closers),
rpcEnv: &rpccore.Environment{
ProxyAppQuery: proxyApp.Query(),
@@ -433,6 +472,7 @@ func makeSeedNode(cfg *config.Config,
state, err := sm.MakeGenesisState(genDoc)
if err != nil {
return nil, err
}
nodeInfo, err := makeSeedNodeInfo(cfg, nodeKey, genDoc, state)
@@ -445,15 +485,19 @@ func makeSeedNode(cfg *config.Config,
p2pLogger := logger.With("module", "p2p")
transport := createTransport(p2pLogger, cfg)
peerManager, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
peerManager, closer, err := createPeerManager(cfg, dbProvider, nodeKey.ID)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
return nil, combineCloseError(
fmt.Errorf("failed to create peer manager: %w", err),
closer)
}
router, err := createRouter(p2pLogger, p2pMetrics, nodeInfo, nodeKey.PrivKey,
peerManager, transport, getRouterConfig(cfg, nil))
if err != nil {
return nil, fmt.Errorf("failed to create router: %w", err)
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
closer)
}
var pexReactor service.Service
@@ -467,7 +511,8 @@ func makeSeedNode(cfg *config.Config,
pexReactor, err = createPEXReactor(logger, peerManager, router)
if err != nil {
return nil, err
return nil, combineCloseError(err, closer)
}
if cfg.RPC.PprofListenAddress != "" {
@@ -487,6 +532,8 @@ func makeSeedNode(cfg *config.Config,
peerManager: peerManager,
router: router,
shutdownOps: closer,
pexReactor: pexReactor,
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)
@@ -640,12 +687,22 @@ func (n *nodeImpl) OnStop() {
n.Logger.Info("Stopping Node")
// first stop the non-reactor services
if err := n.eventBus.Stop(); err != nil {
n.Logger.Error("Error closing eventBus", "err", err)
if n.eventBus != nil {
// first stop the non-reactor services
if err := n.eventBus.Stop(); err != nil {
n.Logger.Error("Error closing eventBus", "err", err)
}
}
if err := n.indexerService.Stop(); err != nil {
n.Logger.Error("Error closing indexerService", "err", err)
if n.indexerService != nil {
if err := n.indexerService.Stop(); err != nil {
n.Logger.Error("Error closing indexerService", "err", err)
}
}
for _, es := range n.eventSinks {
if err := es.Stop(); err != nil {
n.Logger.Error("failed to stop event sink", "err", err)
}
}
if n.config.Mode != config.ModeSeed {
@@ -711,6 +768,10 @@ func (n *nodeImpl) OnStop() {
// Error from closing listeners, or context timeout:
n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
}
}
if err := n.shutdownOps(); err != nil {
n.Logger.Error("problem shutting down additional services", "err", err)
}
}

View File

@@ -31,6 +31,7 @@ import (
"github.com/tendermint/tendermint/internal/test/factory"
"github.com/tendermint/tendermint/libs/log"
tmrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/libs/service"
tmtime "github.com/tendermint/tendermint/libs/time"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/types"
@@ -46,11 +47,21 @@ func TestNodeStartStop(t *testing.T) {
require.NoError(t, err)
require.NoError(t, ns.Start())
t.Cleanup(func() {
if ns.IsRunning() {
assert.NoError(t, ns.Stop())
ns.Wait()
}
})
n, ok := ns.(*nodeImpl)
require.True(t, ok)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// wait for the node to produce a block
blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
blocksSub, err := n.EventBus().Subscribe(ctx, "node_test", types.EventQueryNewBlock)
require.NoError(t, err)
select {
case <-blocksSub.Out():
@@ -87,6 +98,14 @@ func getTestNode(t *testing.T, conf *config.Config, logger log.Logger) *nodeImpl
n, ok := ns.(*nodeImpl)
require.True(t, ok)
t.Cleanup(func() {
if ns.IsRunning() {
assert.NoError(t, ns.Stop())
ns.Wait()
}
})
return n
}
@@ -100,7 +119,6 @@ func TestNodeDelayedStart(t *testing.T) {
n.GenesisDoc().GenesisTime = now.Add(2 * time.Second)
require.NoError(t, n.Start())
defer n.Stop() //nolint:errcheck // ignore for tests
startTime := tmtime.Now()
assert.Equal(t, true, startTime.After(n.GenesisDoc().GenesisTime))
@@ -165,8 +183,13 @@ func TestPrivValidatorListenAddrNoProtocol(t *testing.T) {
defer os.RemoveAll(cfg.RootDir)
cfg.PrivValidator.ListenAddr = addrNoPrefix
_, err := newDefaultNode(cfg, log.TestingLogger())
n, err := newDefaultNode(cfg, log.TestingLogger())
assert.Error(t, err)
if n != nil && n.IsRunning() {
assert.NoError(t, n.Stop())
n.Wait()
}
}
func TestNodeSetPrivValIPC(t *testing.T) {
@@ -211,6 +234,9 @@ func testFreeAddr(t *testing.T) string {
// create a proposal block using real and full
// mempool and evidence pool and validate it.
func TestCreateProposalBlock(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg := config.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
@@ -222,7 +248,7 @@ func TestCreateProposalBlock(t *testing.T) {
logger := log.TestingLogger()
const height int64 = 1
state, stateDB, privVals := state(1, height)
state, stateDB, privVals := state(t, 1, height)
stateStore := sm.NewStore(stateDB)
maxBytes := 16384
const partSize uint32 = 256
@@ -249,7 +275,7 @@ func TestCreateProposalBlock(t *testing.T) {
// fill the evidence pool with more evidence
// than can fit in a block
var currentBytes int64 = 0
var currentBytes int64
for currentBytes <= maxEvidenceBytes {
ev := types.NewMockDuplicateVoteEvidenceWithValidator(height, time.Now(), privVals[0], "test-chain")
currentBytes += int64(len(ev.Bytes()))
@@ -266,7 +292,7 @@ func TestCreateProposalBlock(t *testing.T) {
txLength := 100
for i := 0; i <= maxBytes/txLength; i++ {
tx := tmrand.Bytes(txLength)
err := mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err := mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
}
@@ -303,6 +329,9 @@ func TestCreateProposalBlock(t *testing.T) {
}
func TestMaxTxsProposalBlockSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg := config.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
@@ -314,7 +343,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
logger := log.TestingLogger()
const height int64 = 1
state, stateDB, _ := state(1, height)
state, stateDB, _ := state(t, 1, height)
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
const maxBytes int64 = 16384
@@ -336,7 +365,7 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
// fill the mempool with one txs just below the maximum size
txLength := int(types.MaxDataBytesNoEvidence(maxBytes, 1))
tx := tmrand.Bytes(txLength - 4) // to account for the varint
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
blockExec := sm.NewBlockExecutor(
@@ -365,6 +394,9 @@ func TestMaxTxsProposalBlockSize(t *testing.T) {
}
func TestMaxProposalBlockSize(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg := config.ResetTestRoot("node_create_proposal")
defer os.RemoveAll(cfg.RootDir)
cc := abciclient.NewLocalCreator(kvstore.NewApplication())
@@ -375,7 +407,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
logger := log.TestingLogger()
state, stateDB, _ := state(types.MaxVotesCount, int64(1))
state, stateDB, _ := state(t, types.MaxVotesCount, int64(1))
stateStore := sm.NewStore(stateDB)
blockStore := store.NewBlockStore(dbm.NewMemDB())
const maxBytes int64 = 1024 * 1024 * 2
@@ -402,7 +434,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
// At the end of the test, only the single big tx should be added
for i := 0; i < 10; i++ {
tx := tmrand.Bytes(10)
err = mp.CheckTx(context.Background(), tx, nil, mempool.TxInfo{})
err = mp.CheckTx(ctx, tx, nil, mempool.TxInfo{})
assert.NoError(t, err)
}
@@ -493,14 +525,17 @@ func TestNodeNewSeedNode(t *testing.T) {
defaultGenesisDocProviderFunc(cfg),
log.TestingLogger(),
)
require.NoError(t, err)
n, ok := ns.(*nodeImpl)
require.True(t, ok)
err = n.Start()
require.NoError(t, err)
assert.True(t, n.pexReactor.IsRunning())
require.NoError(t, n.Stop())
}
func TestNodeSetEventSink(t *testing.T) {
@@ -511,7 +546,7 @@ func TestNodeSetEventSink(t *testing.T) {
setupTest := func(t *testing.T, conf *config.Config) []indexer.EventSink {
eventBus, err := createAndStartEventBus(logger)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, eventBus.Stop()) })
genDoc, err := types.GenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
@@ -521,6 +556,22 @@ func TestNodeSetEventSink(t *testing.T) {
t.Cleanup(func() { require.NoError(t, indexService.Stop()) })
return eventSinks
}
cleanup := func(ns service.Service) func() {
return func() {
n, ok := ns.(*nodeImpl)
if !ok {
return
}
if n == nil {
return
}
if !n.IsRunning() {
return
}
assert.NoError(t, n.Stop())
n.Wait()
}
}
eventSinks := setupTest(t, cfg)
assert.Equal(t, 1, len(eventSinks))
@@ -541,7 +592,8 @@ func TestNodeSetEventSink(t *testing.T) {
cfg.TxIndex.Indexer = []string{"kvv"}
ns, err := newDefaultNode(cfg, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("unsupported event sink type"), err)
assert.Contains(t, err.Error(), "unsupported event sink type")
t.Cleanup(cleanup(ns))
cfg.TxIndex.Indexer = []string{}
eventSinks = setupTest(t, cfg)
@@ -552,7 +604,8 @@ func TestNodeSetEventSink(t *testing.T) {
cfg.TxIndex.Indexer = []string{"psql"}
ns, err = newDefaultNode(cfg, logger)
assert.Nil(t, ns)
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)
assert.Contains(t, err.Error(), "the psql connection settings cannot be empty")
t.Cleanup(cleanup(ns))
var psqlConn = "test"
@@ -591,18 +644,21 @@ func TestNodeSetEventSink(t *testing.T) {
var e = errors.New("found duplicated sinks, please check the tx-index section in the config.toml")
cfg.TxIndex.Indexer = []string{"psql", "kv", "Kv"}
cfg.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(cfg, logger)
ns, err = newDefaultNode(cfg, logger)
require.Error(t, err)
assert.Equal(t, e, err)
assert.Contains(t, err.Error(), e.Error())
t.Cleanup(cleanup(ns))
cfg.TxIndex.Indexer = []string{"Psql", "kV", "kv", "pSql"}
cfg.TxIndex.PsqlConn = psqlConn
_, err = newDefaultNode(cfg, logger)
ns, err = newDefaultNode(cfg, logger)
require.Error(t, err)
assert.Equal(t, e, err)
assert.Contains(t, err.Error(), e.Error())
t.Cleanup(cleanup(ns))
}
func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
func state(t *testing.T, nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
t.Helper()
privVals := make([]types.PrivValidator, nVals)
vals := make([]types.GenesisValidator, nVals)
for i := 0; i < nVals; i++ {
@@ -623,17 +679,15 @@ func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
// save validators to db for 2 heights
stateDB := dbm.NewMemDB()
t.Cleanup(func() { require.NoError(t, stateDB.Close()) })
stateStore := sm.NewStore(stateDB)
if err := stateStore.Save(s); err != nil {
panic(err)
}
require.NoError(t, stateStore.Save(s))
for i := 1; i < int(height); i++ {
s.LastBlockHeight++
s.LastValidators = s.Validators.Copy()
if err := stateStore.Save(s); err != nil {
panic(err)
}
require.NoError(t, stateStore.Save(s))
}
return s, stateDB, privVals
}

View File

@@ -2,7 +2,9 @@ package node
import (
"bytes"
"errors"
"fmt"
"strings"
"time"
dbm "github.com/tendermint/tm-db"
@@ -35,16 +37,57 @@ import (
_ "net/http/pprof" // nolint: gosec // securely exposed on separate, optional port
)
func initDBs(cfg *config.Config, dbProvider config.DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { //nolint:lll
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&config.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return
}
blockStore = store.NewBlockStore(blockStoreDB)
type closer func() error
stateDB, err = dbProvider(&config.DBContext{ID: "state", Config: cfg})
return
func makeCloser(cs []closer) closer {
return func() error {
errs := make([]string, 0, len(cs))
for _, cl := range cs {
if err := cl(); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) >= 0 {
return errors.New(strings.Join(errs, "; "))
}
return nil
}
}
func combineCloseError(err error, cl closer) error {
if err == nil {
return cl()
}
clerr := cl()
if clerr == nil {
return err
}
return fmt.Errorf("error=%q closerError=%q", err.Error(), clerr.Error())
}
func initDBs(
cfg *config.Config,
dbProvider config.DBProvider,
) (*store.BlockStore, dbm.DB, closer, error) {
blockStoreDB, err := dbProvider(&config.DBContext{ID: "blockstore", Config: cfg})
if err != nil {
return nil, nil, func() error { return nil }, err
}
closers := []closer{}
blockStore := store.NewBlockStore(blockStoreDB)
closers = append(closers, blockStoreDB.Close)
stateDB, err := dbProvider(&config.DBContext{ID: "state", Config: cfg})
if err != nil {
return nil, nil, makeCloser(closers), err
}
closers = append(closers, stateDB.Close)
return blockStore, stateDB, makeCloser(closers), nil
}
// nolint:lll
@@ -355,7 +398,7 @@ func createPeerManager(
cfg *config.Config,
dbProvider config.DBProvider,
nodeID types.NodeID,
) (*p2p.PeerManager, error) {
) (*p2p.PeerManager, closer, error) {
var maxConns uint16
@@ -386,7 +429,7 @@ func createPeerManager(
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.PersistentPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
@@ -396,28 +439,28 @@ func createPeerManager(
for _, p := range tmstrings.SplitAndTrimEmpty(cfg.P2P.BootstrapPeers, ",", " ") {
address, err := p2p.ParseNodeAddress(p)
if err != nil {
return nil, fmt.Errorf("invalid peer address %q: %w", p, err)
return nil, func() error { return nil }, fmt.Errorf("invalid peer address %q: %w", p, err)
}
peers = append(peers, address)
}
peerDB, err := dbProvider(&config.DBContext{ID: "peerstore", Config: cfg})
if err != nil {
return nil, err
return nil, func() error { return nil }, err
}
peerManager, err := p2p.NewPeerManager(nodeID, peerDB, options)
if err != nil {
return nil, fmt.Errorf("failed to create peer manager: %w", err)
return nil, peerDB.Close, fmt.Errorf("failed to create peer manager: %w", err)
}
for _, peer := range peers {
if _, err := peerManager.Add(peer); err != nil {
return nil, fmt.Errorf("failed to add peer %q: %w", peer, err)
return nil, peerDB.Close, fmt.Errorf("failed to add peer %q: %w", peer, err)
}
}
return peerManager, nil
return peerManager, peerDB.Close, nil
}
func createRouter(

View File

@@ -3,7 +3,6 @@ package client_test
import (
"context"
"fmt"
"reflect"
"testing"
"time"
@@ -17,7 +16,7 @@ import (
"github.com/tendermint/tendermint/types"
)
var waitForEventTimeout = 8 * time.Second
const waitForEventTimeout = 2 * time.Second
// MakeTxKV returns a text transaction, allong with expected key, value pair
func MakeTxKV() ([]byte, []byte, []byte) {
@@ -26,164 +25,41 @@ func MakeTxKV() ([]byte, []byte, []byte) {
return k, v, append(k, append([]byte("="), v...)...)
}
func TestHeaderEvents(t *testing.T) {
n, conf := NodeSuite(t)
func testTxEventsSent(ctx context.Context, t *testing.T, broadcastMethod string, c client.Client) {
// make the tx
_, _, tx := MakeTxKV()
for i, c := range GetClients(t, n, conf) {
i, c := i, c
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(t, err, "%d: %+v", i, err)
t.Cleanup(func() {
if err := c.Stop(); err != nil {
t.Error(err)
}
})
}
// send
done := make(chan struct{})
go func() {
defer close(done)
var (
txres *coretypes.ResultBroadcastTx
err error
)
switch broadcastMethod {
case "async":
txres, err = c.BroadcastTxAsync(ctx, tx)
case "sync":
txres, err = c.BroadcastTxSync(ctx, tx)
default:
panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
}
if assert.NoError(t, err) {
assert.Equal(t, txres.Code, abci.CodeTypeOK)
}
}()
evt, err := client.WaitForOneEvent(c, types.EventNewBlockHeaderValue, waitForEventTimeout)
require.Nil(t, err, "%d: %+v", i, err)
_, ok := evt.(types.EventDataNewBlockHeader)
require.True(t, ok, "%d: %#v", i, evt)
// TODO: more checks...
})
}
}
// subscribe to new blocks and make sure height increments by 1
func TestBlockEvents(t *testing.T) {
n, conf := NodeSuite(t)
for _, c := range GetClients(t, n, conf) {
c := c
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(t, err)
t.Cleanup(func() {
if err := c.Stop(); err != nil {
t.Error(err)
}
})
}
const subscriber = "TestBlockEvents"
eventCh, err := c.Subscribe(context.Background(), subscriber, types.QueryForEvent(types.EventNewBlockValue).String())
require.NoError(t, err)
t.Cleanup(func() {
if err := c.UnsubscribeAll(context.Background(), subscriber); err != nil {
t.Error(err)
}
})
var firstBlockHeight int64
for i := int64(0); i < 3; i++ {
event := <-eventCh
blockEvent, ok := event.Data.(types.EventDataNewBlock)
require.True(t, ok)
block := blockEvent.Block
if firstBlockHeight == 0 {
firstBlockHeight = block.Header.Height
}
require.Equal(t, firstBlockHeight+i, block.Header.Height)
}
})
}
}
func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "async") }
func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }
func testTxEventsSent(t *testing.T, broadcastMethod string) {
n, conf := NodeSuite(t)
for _, c := range GetClients(t, n, conf) {
c := c
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
// start for this test it if it wasn't already running
if !c.IsRunning() {
// if so, then we start it, listen, and stop it.
err := c.Start()
require.Nil(t, err)
t.Cleanup(func() {
if err := c.Stop(); err != nil {
t.Error(err)
}
})
}
// make the tx
_, _, tx := MakeTxKV()
// send
go func() {
var (
txres *coretypes.ResultBroadcastTx
err error
ctx = context.Background()
)
switch broadcastMethod {
case "async":
txres, err = c.BroadcastTxAsync(ctx, tx)
case "sync":
txres, err = c.BroadcastTxSync(ctx, tx)
default:
panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
}
if assert.NoError(t, err) {
assert.Equal(t, txres.Code, abci.CodeTypeOK)
}
}()
// and wait for confirmation
evt, err := client.WaitForOneEvent(c, types.EventTxValue, waitForEventTimeout)
require.Nil(t, err)
// and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
require.True(t, ok)
// make sure this is the proper tx
require.EqualValues(t, tx, txe.Tx)
require.True(t, txe.Result.IsOK())
})
}
}
// Test HTTPClient resubscribes upon disconnect && subscription error.
// Test Local client resubscribes upon subscription error.
func TestClientsResubscribe(t *testing.T) {
// TODO(melekes)
}
func TestHTTPReturnsErrorIfClientIsNotRunning(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, conf := NodeSuite(t)
c := getHTTPClient(t, conf)
// on Subscribe
_, err := c.Subscribe(ctx, "TestHeaderEvents",
types.QueryForEvent(types.EventNewBlockHeaderValue).String())
assert.Error(t, err)
// on Unsubscribe
err = c.Unsubscribe(ctx, "TestHeaderEvents",
types.QueryForEvent(types.EventNewBlockHeaderValue).String())
assert.Error(t, err)
// on UnsubscribeAll
err = c.UnsubscribeAll(ctx, "TestHeaderEvents")
assert.Error(t, err)
// and wait for confirmation
evt, err := client.WaitForOneEvent(c, types.EventTxValue, waitForEventTimeout)
require.Nil(t, err)
// and make sure it has the proper info
txe, ok := evt.(types.EventDataTx)
require.True(t, ok)
// make sure this is the proper tx
require.EqualValues(t, tx, txe.Tx)
require.True(t, txe.Result.IsOK())
<-done
}

View File

@@ -3,16 +3,20 @@ package client_test
import (
"bytes"
"context"
"fmt"
"log"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/kvstore"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
"github.com/tendermint/tendermint/rpc/coretypes"
rpctest "github.com/tendermint/tendermint/rpc/test"
)
func ExampleHTTP_simple() {
func TestHTTPSimple(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -29,9 +33,7 @@ func ExampleHTTP_simple() {
// Create our RPC client
rpcAddr := conf.RPC.ListenAddress
c, err := rpchttp.New(rpcAddr)
if err != nil {
log.Fatal(err)
}
require.NoError(t, err)
// Create a transaction
k := []byte("name")
@@ -41,6 +43,7 @@ func ExampleHTTP_simple() {
// Broadcast the transaction and wait for it to commit (rather use
// c.BroadcastTxSync though in production).
bres, err := c.BroadcastTxCommit(context.Background(), tx)
require.NoError(t, err)
if err != nil {
log.Fatal(err)
}
@@ -50,30 +53,19 @@ func ExampleHTTP_simple() {
// Now try to fetch the value for the key
qres, err := c.ABCIQuery(context.Background(), "/key", k)
if err != nil {
log.Fatal(err)
}
if qres.Response.IsErr() {
log.Fatal("ABCIQuery failed")
}
if !bytes.Equal(qres.Response.Key, k) {
log.Fatal("returned key does not match queried key")
}
if !bytes.Equal(qres.Response.Value, v) {
log.Fatal("returned value does not match sent value")
}
require.NoError(t, err)
require.False(t, qres.Response.IsErr(), "ABCIQuery failed")
require.True(t, bytes.Equal(qres.Response.Key, k),
"returned key does not match queried key")
require.True(t, bytes.Equal(qres.Response.Value, v),
"returned value does not match sent value [%s]", string(v))
fmt.Println("Sent tx :", string(tx))
fmt.Println("Queried for :", string(qres.Response.Key))
fmt.Println("Got value :", string(qres.Response.Value))
// Output:
// Sent tx : name=satoshi
// Queried for : name
// Got value : satoshi
assert.Equal(t, "name=satoshi", string(tx), "sent tx")
assert.Equal(t, "name", string(qres.Response.Key), "queried for")
assert.Equal(t, "satoshi", string(qres.Response.Value), "got value")
}
func ExampleHTTP_batching() {
func TestHTTPBatching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -88,10 +80,8 @@ func ExampleHTTP_batching() {
defer func() { _ = closer(ctx) }()
rpcAddr := conf.RPC.ListenAddress
c, err := rpchttp.New(rpcAddr)
if err != nil {
log.Fatal(err)
}
c, err := rpchttp.NewWithClient(rpcAddr, http.DefaultClient)
require.NoError(t, err)
// Create our two transactions
k1 := []byte("firstName")
@@ -111,41 +101,51 @@ func ExampleHTTP_batching() {
for _, tx := range txs {
// Broadcast the transaction and wait for it to commit (rather use
// c.BroadcastTxSync though in production).
if _, err := batch.BroadcastTxCommit(context.Background(), tx); err != nil {
log.Fatal(err)
}
_, err := batch.BroadcastTxSync(ctx, tx)
require.NoError(t, err)
}
// Send the batch of 2 transactions
if _, err := batch.Send(context.Background()); err != nil {
log.Fatal(err)
}
_, err = batch.Send(ctx)
require.NoError(t, err)
// Now let's query for the original results as a batch
keys := [][]byte{k1, k2}
for _, key := range keys {
if _, err := batch.ABCIQuery(context.Background(), "/key", key); err != nil {
log.Fatal(err)
}
}
// wait for the transaction to land, we could poll more for
// the transactions to land definitively.
require.Eventually(t,
func() bool {
// Now let's query for the original results as a batch
exists := 0
for _, key := range [][]byte{k1, k2} {
_, err := batch.ABCIQuery(context.Background(), "/key", key)
if err == nil {
exists++
}
}
return exists == 2
},
10*time.Second,
time.Second,
)
// Send the 2 queries and keep the results
results, err := batch.Send(context.Background())
if err != nil {
log.Fatal(err)
}
results, err := batch.Send(ctx)
require.NoError(t, err)
require.Len(t, results, 2)
// Each result in the returned list is the deserialized result of each
// respective ABCIQuery response
for _, result := range results {
qr, ok := result.(*coretypes.ResultABCIQuery)
if !ok {
log.Fatal("invalid result type from ABCIQuery request")
}
fmt.Println(string(qr.Response.Key), "=", string(qr.Response.Value))
}
require.True(t, ok, "invalid result type from ABCIQuery request")
// Output:
// firstName = satoshi
// lastName = nakamoto
switch string(qr.Response.Key) {
case "firstName":
require.Equal(t, "satoshi", string(qr.Response.Value))
case "lastName":
require.Equal(t, "nakamoto", string(qr.Response.Value))
default:
t.Fatalf("encountered unknown key %q", string(qr.Response.Key))
}
}
}

View File

@@ -2,6 +2,7 @@ package http
import (
"context"
"errors"
"net/http"
"time"
@@ -120,20 +121,20 @@ func NewWithTimeout(remote string, t time.Duration) (*HTTP, error) {
}
// NewWithClient allows you to set a custom http client. An error is returned
// on invalid remote. The function panics when client is nil.
// on invalid remote. The function returns an error when client is nil
// or an invalid remote.
func NewWithClient(remote string, c *http.Client) (*HTTP, error) {
if c == nil {
panic("nil http.Client")
return nil, errors.New("nil client")
}
return NewWithClientAndWSOptions(remote, c, DefaultWSOptions())
}
// NewWithClientAndWSOptions allows you to set a custom http client and
// WebSocket options. An error is returned on invalid remote. The function
// panics when client is nil.
// WebSocket options. An error is returned on invalid remote or nil client.
func NewWithClientAndWSOptions(remote string, c *http.Client, wso WSOptions) (*HTTP, error) {
if c == nil {
panic("nil http.Client")
return nil, errors.New("nil client")
}
rpc, err := jsonrpcclient.NewWithHTTPClient(remote, c)
if err != nil {

View File

@@ -7,6 +7,7 @@ import (
"os"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/abci/example/kvstore"
"github.com/tendermint/tendermint/config"
@@ -30,9 +31,11 @@ func NodeSuite(t *testing.T) (service.Service, *config.Config) {
node, closer, err := rpctest.StartTendermint(ctx, conf, app, rpctest.SuppressStdout)
require.NoError(t, err)
t.Cleanup(func() {
_ = closer(ctx)
cancel()
app.Close()
assert.NoError(t, node.Stop())
assert.NoError(t, closer(ctx))
assert.NoError(t, app.Close())
node.Wait()
_ = os.RemoveAll(dir)
})
return node, conf

File diff suppressed because it is too large Load Diff

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
@@ -155,7 +156,7 @@ func New(remote string) (*Client, error) {
// panics when client is nil.
func NewWithHTTPClient(remote string, c *http.Client) (*Client, error) {
if c == nil {
panic("nil http.Client")
return nil, errors.New("nil client")
}
parsedURL, err := newParsedURL(remote)

View File

@@ -187,13 +187,15 @@ func LoadTestnet(file string) (*Testnet, error) {
LogLevel: manifest.LogLevel,
QueueType: manifest.QueueType,
}
if node.StartAt == testnet.InitialHeight {
node.StartAt = 0 // normalize to 0 for initial nodes, since code expects this
}
if nodeManifest.Mode != "" {
node.Mode = Mode(nodeManifest.Mode)
}
if node.Mode == ModeLight {
node.ABCIProtocol = ProtocolBuiltin
}
if nodeManifest.Database != "" {
node.Database = nodeManifest.Database
}

View File

@@ -162,9 +162,9 @@ func verifyCommitBatch(
var (
val *Validator
valIdx int32
seenVals = make(map[int32]int, len(commit.Signatures))
batchSigIdxs = make([]int, 0, len(commit.Signatures))
talliedVotingPower int64 = 0
talliedVotingPower int64
seenVals = make(map[int32]int, len(commit.Signatures))
batchSigIdxs = make([]int, 0, len(commit.Signatures))
)
// attempt to create a batch verifier
bv, ok := batch.CreateBatchVerifier(vals.GetProposer().PubKey)
@@ -275,9 +275,9 @@ func verifyCommitSingle(
var (
val *Validator
valIdx int32
seenVals = make(map[int32]int, len(commit.Signatures))
talliedVotingPower int64 = 0
talliedVotingPower int64
voteSignBytes []byte
seenVals = make(map[int32]int, len(commit.Signatures))
)
for idx, commitSig := range commit.Signatures {
if ignoreSig(commitSig) {

View File

@@ -508,7 +508,7 @@ func TestAveragingInIncrementProposerPriority(t *testing.T) {
{Address: []byte("c"), ProposerPriority: 1}}},
// this should average twice but the average should be 0 after the first iteration
// (voting power is 0 -> no changes)
11, 1 / 3},
11, 0},
2: {ValidatorSet{
Validators: []*Validator{
{Address: []byte("a"), ProposerPriority: 100},