Merge remote-tracking branch 'origin' into jasmina/4457_block_sync_verification

This commit is contained in:
Jasmina Malicevic
2022-04-07 14:42:57 +02:00
46 changed files with 830 additions and 751 deletions

View File

@@ -230,6 +230,26 @@ Special thanks to external contributors on this release: @JayT106,
- [cmd/tendermint/commands] [\#6623](https://github.com/tendermint/tendermint/pull/6623) replace `$HOME/.some/test/dir` with `t.TempDir` (@tanyabouman)
- [statesync] \6807 Implement P2P state provider as an alternative to RPC (@cmwaters)
## v0.34.18
### BREAKING CHANGES
- CLI/RPC/Config
- [cli] [\#8258](https://github.com/tendermint/tendermint/pull/8258) Fix a bug in the cli that caused `unsafe-reset-all` to panic
## v0.34.17
### BREAKING CHANGES
- CLI/RPC/Config
- [cli] [\#8081](https://github.com/tendermint/tendermint/issues/8081) make the reset command safe to use (@marbar3778).
### BUG FIXES
- [consensus] [\#8079](https://github.com/tendermint/tendermint/issues/8079) start the timeout ticker before relay (backport #7844) (@creachadair).
- [consensus] [\#7992](https://github.com/tendermint/tendermint/issues/7992) [\#7994](https://github.com/tendermint/tendermint/issues/7994) change lock handling in handleMsg and reactor to alleviate issues gossiping during long ABCI calls (@williambanfield).
## v0.34.16
Special thanks to external contributors on this release: @yihuang
@@ -1953,7 +1973,7 @@ more details.
- [rpc] [\#3269](https://github.com/tendermint/tendermint/issues/2826) Limit number of unique clientIDs with open subscriptions. Configurable via `rpc.max_subscription_clients`
- [rpc] [\#3269](https://github.com/tendermint/tendermint/issues/2826) Limit number of unique queries a given client can subscribe to at once. Configurable via `rpc.max_subscriptions_per_client`.
- [rpc] [\#3435](https://github.com/tendermint/tendermint/issues/3435) Default ReadTimeout and WriteTimeout changed to 10s. WriteTimeout can increased by setting `rpc.timeout_broadcast_tx_commit` in the config.
- [rpc/client] [\#3269](https://github.com/tendermint/tendermint/issues/3269) Update `EventsClient` interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md). This includes `Subscribe`, `Unsubscribe`, and `UnsubscribeAll` methods.
- [rpc/client] [\#3269](https://github.com/tendermint/tendermint/issues/3269) Update `EventsClient` interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md). This includes `Subscribe`, `Unsubscribe`, and `UnsubscribeAll` methods.
* Apps
- [abci] [\#3403](https://github.com/tendermint/tendermint/issues/3403) Remove `time_iota_ms` from BlockParams. This is a
@@ -2006,7 +2026,7 @@ more details.
- [blockchain] [\#3358](https://github.com/tendermint/tendermint/pull/3358) Fix timer leak in `BlockPool` (@guagualvcha)
- [cmd] [\#3408](https://github.com/tendermint/tendermint/issues/3408) Fix `testnet` command's panic when creating non-validator configs (using `--n` flag) (@srmo)
- [libs/db/remotedb/grpcdb] [\#3402](https://github.com/tendermint/tendermint/issues/3402) Close Iterator/ReverseIterator after use
- [libs/pubsub] [\#951](https://github.com/tendermint/tendermint/issues/951), [\#1880](https://github.com/tendermint/tendermint/issues/1880) Use non-blocking send when dispatching messages [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
- [libs/pubsub] [\#951](https://github.com/tendermint/tendermint/issues/951), [\#1880](https://github.com/tendermint/tendermint/issues/1880) Use non-blocking send when dispatching messages [ADR-33](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md)
- [lite] [\#3364](https://github.com/tendermint/tendermint/issues/3364) Fix `/validators` and `/abci_query` proxy endpoints
(@guagualvcha)
- [p2p/conn] [\#3347](https://github.com/tendermint/tendermint/issues/3347) Reject all-zero shared secrets in the Diffie-Hellman step of secret-connection
@@ -2710,7 +2730,7 @@ Special thanks to external contributors on this release:
This release is mostly about the ConsensusParams - removing fields and enforcing MaxGas.
It also addresses some issues found via security audit, removes various unused
functions from `libs/common`, and implements
[ADR-012](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-012-peer-transport.md).
[ADR-012](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-012-peer-transport.md).
BREAKING CHANGES:
@@ -2775,7 +2795,7 @@ are affected by a change.
A few more breaking changes are in the works - each will come with a clear
Architecture Decision Record (ADR) explaining the change. You can review ADRs
[here](https://github.com/tendermint/tendermint/tree/develop/docs/architecture)
[here](https://github.com/tendermint/tendermint/tree/master/docs/architecture)
or in the [open Pull Requests](https://github.com/tendermint/tendermint/pulls).
You can also check in on the [issues marked as
breaking](https://github.com/tendermint/tendermint/issues?q=is%3Aopen+is%3Aissue+label%3Abreaking).
@@ -2791,7 +2811,7 @@ BREAKING CHANGES:
- [abci] Added address of the original proposer of the block to Header
- [abci] Change ABCI Header to match Tendermint exactly
- [abci] [\#2159](https://github.com/tendermint/tendermint/issues/2159) Update use of `Validator` (see
[ADR-018](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-018-ABCI-Validators.md)):
[ADR-018](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-018-ABCI-Validators.md)):
- Remove PubKey from `Validator` (so it's just Address and Power)
- Introduce `ValidatorUpdate` (with just PubKey and Power)
- InitChain and EndBlock use ValidatorUpdate
@@ -2813,7 +2833,7 @@ BREAKING CHANGES:
- [state] [\#1815](https://github.com/tendermint/tendermint/issues/1815) Validator set changes are now delayed by one block (!)
- Add NextValidatorSet to State, changes on-disk representation of state
- [state] [\#2184](https://github.com/tendermint/tendermint/issues/2184) Enforce ConsensusParams.BlockSize.MaxBytes (See
[ADR-020](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-020-block-size.md)).
[ADR-020](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-020-block-size.md)).
- Remove ConsensusParams.BlockSize.MaxTxs
- Introduce maximum sizes for all components of a block, including ChainID
- [types] Updates to the block Header:
@@ -2824,7 +2844,7 @@ BREAKING CHANGES:
- [consensus] [\#2203](https://github.com/tendermint/tendermint/issues/2203) Implement BFT time
- Timestamp in block must be monotonic and equal the median of timestamps in block's LastCommit
- [crypto] [\#2239](https://github.com/tendermint/tendermint/issues/2239) Secp256k1 signature changes (See
[ADR-014](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-014-secp-malleability.md)):
[ADR-014](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-014-secp-malleability.md)):
- format changed from DER to `r || s`, both little endian encoded as 32 bytes.
- malleability removed by requiring `s` to be in canonical form.
@@ -3054,7 +3074,7 @@ BREAKING CHANGES:
FEATURES
- [cmd] Added metrics (served under `/metrics` using a Prometheus client;
disabled by default). See the new `instrumentation` section in the config and
[metrics](https://tendermint.readthedocs.io/projects/tools/en/develop/metrics.html)
[metrics](https://github.com/tendermint/tendermint/blob/master/docs/nodes/metrics.md)
guide.
- [p2p] Add IPv6 support to peering.
- [p2p] Add `external_address` to config to allow specifying the address for
@@ -3168,7 +3188,7 @@ BREAKING:
FEATURES
- [rpc] the RPC documentation is now published to https://tendermint.github.io/slate
- [rpc] the RPC documentation is now published to `https://tendermint.github.io/slate`
- [p2p] AllowDuplicateIP config option to refuse connections from same IP.
- true by default for now, false by default in next breaking release
- [docs] Add docs for query, tx indexing, events, pubsub

View File

@@ -8,7 +8,7 @@ Official releases can be found [here](https://github.com/tendermint/tendermint/r
The Dockerfile for tendermint is not expected to change in the near future. The master file used for all builds can be found [here](https://raw.githubusercontent.com/tendermint/tendermint/master/DOCKER/Dockerfile).
Respective versioned files can be found <https://raw.githubusercontent.com/tendermint/tendermint/vX.XX.XX/DOCKER/Dockerfile> (replace the Xs with the version number).
Respective versioned files can be found at `https://raw.githubusercontent.com/tendermint/tendermint/vX.XX.XX/DOCKER/Dockerfile` (replace the Xs with the version number).
## Quick reference

View File

@@ -100,7 +100,7 @@ these parameters may do so by setting the `ConsensusParams.Timeout` field of the
As a safety measure in case of unusual timing issues during the upgrade to
v0.36, an operator may override the consensus timeout values for a single node.
Note, however, that these overrides will be removed in Tendermint v0.37. See
[configuration](https://github.com/tendermint/tendermint/blob/wb/issue-8182/docs/nodes/configuration.md)
[configuration](https://github.com/tendermint/tendermint/blob/master/docs/nodes/configuration.md)
for more information about these overrides.
For more discussion of this, see [ADR 074](https://tinyurl.com/adr074), which

View File

@@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/abci/types"
)

View File

@@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/abci/types"
)

View File

@@ -12,7 +12,7 @@ For any specific algorithm, use its specific module e.g.
## Binary encoding
For Binary encoding, please refer to the [Tendermint encoding specification](https://docs.tendermint.com/master/spec/blockchain/encoding.html).
For Binary encoding, please refer to the [Tendermint encoding specification](https://docs.tendermint.com/master/spec/core/encoding.html).
## JSON Encoding

4
go.mod
View File

@@ -3,7 +3,7 @@ module github.com/tendermint/tendermint
go 1.17
require (
github.com/BurntSushi/toml v1.0.0
github.com/BurntSushi/toml v1.1.0
github.com/adlio/schema v1.3.0
github.com/btcsuite/btcd v0.22.0-beta
github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce
@@ -41,7 +41,7 @@ require (
github.com/creachadair/atomicfile v0.2.4
github.com/golangci/golangci-lint v1.45.2
github.com/google/go-cmp v0.5.7
github.com/vektra/mockery/v2 v2.10.2
github.com/vektra/mockery/v2 v2.10.4
gotest.tools v2.2.0+incompatible
)

7
go.sum
View File

@@ -68,8 +68,9 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.0.0 h1:dtDWrepsVPfW9H/4y7dDgFc2MBUSeJhlaDtK13CxFlU=
github.com/BurntSushi/toml v1.0.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60=
github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
@@ -1035,8 +1036,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasthttp v1.30.0/go.mod h1:2rsYD01CKFrjjsvFxx75KlEUNpWNBY9JWD3K/7o2Cus=
github.com/valyala/quicktemplate v1.7.0/go.mod h1:sqKJnoaOF88V07vkO+9FL8fb9uZg/VPSJnLYn+LmLk8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/vektra/mockery/v2 v2.10.2 h1:ISAFkB3rQS6Y3aDZzAKtDwgeyDknwNa1aBE3Zgx0h+I=
github.com/vektra/mockery/v2 v2.10.2/go.mod h1:m/WO2UzWzqgVX3nvqpRQq70I4Z7jbSCRhdmkgtp+Ab4=
github.com/vektra/mockery/v2 v2.10.4 h1:nMdsCKIS7ZdNTRNS/77Bx6Q/UbasGcfc3Nx7JO7HGTg=
github.com/vektra/mockery/v2 v2.10.4/go.mod h1:m/WO2UzWzqgVX3nvqpRQq70I4Z7jbSCRhdmkgtp+Ab4=
github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE=
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU=

View File

@@ -12,6 +12,7 @@ import (
"github.com/tendermint/tendermint/internal/consensus"
"github.com/tendermint/tendermint/internal/eventbus"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/p2p/conn"
sm "github.com/tendermint/tendermint/internal/state"
"github.com/tendermint/tendermint/internal/store"
"github.com/tendermint/tendermint/libs/log"
@@ -47,6 +48,7 @@ func GetChannelDescriptor() *p2p.ChannelDescriptor {
SendQueueCapacity: 1000,
RecvBufferCapacity: 1024,
RecvMessageCapacity: MaxMsgSize,
Name: "blockSync",
}
}
@@ -130,8 +132,8 @@ type Reactor struct {
consReactor consensusReactor
blockSync *atomicBool
blockSyncCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
chCreator p2p.ChannelCreator
peerEvents p2p.PeerEventSubscriber
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
@@ -146,39 +148,33 @@ type Reactor struct {
// NewReactor returns new reactor instance.
func NewReactor(
ctx context.Context,
logger log.Logger,
stateStore sm.Store,
blockExec *sm.BlockExecutor,
store *store.BlockStore,
consReactor consensusReactor,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
peerEvents p2p.PeerEventSubscriber,
blockSync bool,
metrics *consensus.Metrics,
eventBus *eventbus.EventBus,
) (*Reactor, error) {
blockSyncCh, err := channelCreator(ctx, GetChannelDescriptor())
if err != nil {
return nil, err
}
) *Reactor {
r := &Reactor{
logger: logger,
stateStore: stateStore,
blockExec: blockExec,
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
blockSyncCh: blockSyncCh,
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
logger: logger,
stateStore: stateStore,
blockExec: blockExec,
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
chCreator: channelCreator,
peerEvents: peerEvents,
metrics: metrics,
eventBus: eventBus,
lastTrustedBlock: nil,
}
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
return r, nil
return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -189,6 +185,12 @@ func NewReactor(
// If blockSync is enabled, we also start the pool and the pool processing
// goroutine. If the pool fails to start, an error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
blockSyncCh, err := r.chCreator(ctx, GetChannelDescriptor())
if err != nil {
return err
}
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil }
state, err := r.stateStore.Load()
if err != nil {
return err
@@ -214,13 +216,13 @@ func (r *Reactor) OnStart(ctx context.Context) error {
if err := r.pool.Start(ctx); err != nil {
return err
}
go r.requestRoutine(ctx)
go r.requestRoutine(ctx, blockSyncCh)
go r.poolRoutine(ctx, false)
go r.poolRoutine(ctx, false, blockSyncCh)
}
go r.processBlockSyncCh(ctx)
go r.processPeerUpdates(ctx)
go r.processBlockSyncCh(ctx, blockSyncCh)
go r.processPeerUpdates(ctx, r.peerEvents(ctx), blockSyncCh)
return nil
}
@@ -235,12 +237,12 @@ func (r *Reactor) OnStop() {
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
// Otherwise, we'll respond saying we do not have it.
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID) error {
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
block := r.store.LoadBlockProto(msg.Height)
if block != nil {
blockCommit := r.store.LoadBlockCommitProto(msg.Height)
if blockCommit != nil {
return r.blockSyncCh.Send(ctx, p2p.Envelope{
return blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.BlockResponse{Block: block, Commit: blockCommit},
})
@@ -249,16 +251,17 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
return r.blockSyncCh.Send(ctx, p2p.Envelope{
return blockSyncCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
})
}
<<<<<<< HEAD
// handleBlockSyncMessage handles envelopes sent from peers on the
// BlockSyncChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope *p2p.Envelope) error {
func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) error {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -299,10 +302,12 @@ func (r *Reactor) handleBlockSyncMessage(ctx context.Context, envelope *p2p.Enve
return nil
}
=======
>>>>>>> origin
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -318,7 +323,39 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
switch chID {
case BlockSyncChannel:
err = r.handleBlockSyncMessage(ctx, envelope)
switch msg := envelope.Message.(type) {
case *bcproto.BlockRequest:
return r.respondToPeer(ctx, msg, envelope.From, blockSyncCh)
case *bcproto.BlockResponse:
block, err := types.BlockFromProto(msg.Block)
if err != nil {
r.logger.Error("failed to convert block from proto",
"peer", envelope.From,
"err", err)
return err
}
r.pool.AddBlock(envelope.From, block, block.Size())
case *bcproto.StatusRequest:
return blockSyncCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &bcproto.StatusResponse{
Height: r.store.Height(),
Base: r.store.Base(),
},
})
case *bcproto.StatusResponse:
r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
r.logger.Debug("peer does not have the requested block",
"peer", envelope.From,
"height", msg.Height)
default:
return fmt.Errorf("received unknown message: %T", msg)
}
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
@@ -332,17 +369,17 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// message execution will result in a PeerError being sent on the BlockSyncChannel.
// When the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processBlockSyncCh(ctx context.Context) {
iter := r.blockSyncCh.Receive(ctx)
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) {
iter := blockSyncCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.blockSyncCh.ID, envelope); err != nil {
if err := r.handleMessage(ctx, blockSyncCh.ID, envelope, blockSyncCh); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.logger.Error("failed to process message", "ch_id", r.blockSyncCh.ID, "envelope", envelope, "err", err)
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
r.logger.Error("failed to process message", "ch_id", blockSyncCh.ID, "envelope", envelope, "err", err)
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -353,7 +390,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) {
}
// processPeerUpdate processes a PeerUpdate.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
@@ -364,7 +401,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
switch peerUpdate.Status {
case p2p.PeerStatusUp:
// send a status update the newly added peer
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
if err := blockSyncCh.Send(ctx, p2p.Envelope{
To: peerUpdate.NodeID,
Message: &bcproto.StatusResponse{
Base: r.store.Base(),
@@ -372,7 +409,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
},
}); err != nil {
r.pool.RemovePeer(peerUpdate.NodeID)
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerUpdate.NodeID,
Err: err,
}); err != nil {
@@ -388,13 +425,13 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate, blockSyncCh)
}
}
}
@@ -412,13 +449,18 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
r.syncStartTime = time.Now()
go r.requestRoutine(ctx)
go r.poolRoutine(ctx, true)
bsCh, err := r.chCreator(ctx, GetChannelDescriptor())
if err != nil {
return err
}
go r.requestRoutine(ctx, bsCh)
go r.poolRoutine(ctx, true, bsCh)
return nil
}
func (r *Reactor) requestRoutine(ctx context.Context) {
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()
@@ -427,11 +469,11 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
case <-ctx.Done():
return
case request := <-r.requestsCh:
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
if err := blockSyncCh.Send(ctx, p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}); err != nil {
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: request.PeerID,
Err: err,
}); err != nil {
@@ -439,14 +481,14 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
}
}
case pErr := <-r.errorsCh:
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
if err := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: pErr.peerID,
Err: pErr.err,
}); err != nil {
return
}
case <-statusUpdateTicker.C:
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
if err := blockSyncCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}); err != nil {
@@ -460,7 +502,7 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
// do.
//
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) {
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
@@ -563,6 +605,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
newBlockID = types.BlockID{Hash: newBlock.block.Hash(), PartSetHeader: newBlockPartSetHeader}
)
<<<<<<< HEAD
if r.lastTrustedBlock != nil && r.lastTrustedBlock.block != nil {
if newBlock.block.Height != r.lastTrustedBlock.block.Height+1 {
panic("Need last block")
@@ -587,6 +630,36 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool) {
peerID := r.pool.RedoRequest(r.lastTrustedBlock.block.Height + 1)
if serr := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID,
=======
// Finally, verify the first block using the second's commit.
//
// NOTE: We can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
if err = state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit); err != nil {
err = fmt.Errorf("invalid last commit: %w", err)
r.logger.Error(
err.Error(),
"last_commit", second.LastCommit,
"block_id", firstID,
"height", first.Height,
)
// NOTE: We've already removed the peer's request, but we still need
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID,
Err: err,
}); serr != nil {
return
}
peerID2 := r.pool.RedoRequest(second.Height)
if peerID2 != peerID {
if serr := blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerID2,
>>>>>>> origin
Err: err,
}); serr != nil {
return

View File

@@ -190,20 +190,18 @@ func (rts *reactorTestSuite) addNode(
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.blockSyncChannels[nodeID], nil
}
rts.reactors[nodeID], err = NewReactor(
ctx,
rts.reactors[nodeID] = NewReactor(
rts.logger.With("nodeID", nodeID),
stateStore,
blockExec,
blockStore,
nil,
chCreator,
rts.peerUpdates[nodeID],
func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
rts.blockSync,
consensus.NopMetrics(),
nil, // eventbus, can be nil
)
require.NoError(t, err)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
require.True(t, rts.reactors[nodeID].IsRunning())

View File

@@ -141,8 +141,10 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// send two votes to all peers (1st to one half, 2nd to another half)
i := 0
for _, ps := range bzReactor.peers {
voteCh := rts.voteChannels[bzNodeID]
if i < len(bzReactor.peers)/2 {
require.NoError(t, bzReactor.voteCh.Send(ctx,
require.NoError(t, voteCh.Send(ctx,
p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
@@ -150,7 +152,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
},
}))
} else {
require.NoError(t, bzReactor.voteCh.Send(ctx,
require.NoError(t, voteCh.Send(ctx,
p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{

View File

@@ -57,7 +57,7 @@ func TestReactorInvalidPrecommit(t *testing.T) {
privVal := byzState.privValidator
byzState.doPrevote = func(ctx context.Context, height int64, round int32) {
defer close(signal)
invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, privVal)
invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, rts.voteChannels[node.NodeID], privVal)
}
byzState.mtx.Unlock()
@@ -107,6 +107,7 @@ func invalidDoPrevoteFunc(
round int32,
cs *State,
r *Reactor,
voteCh *p2p.Channel,
pv types.PrivValidator,
) {
// routine to:
@@ -155,7 +156,7 @@ func invalidDoPrevoteFunc(
count := 0
for _, peerID := range ids {
count++
err := r.voteCh.Send(ctx, p2p.Envelope{
err := voteCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &tmcons.Vote{
Vote: precommit.ToProto(),

View File

@@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/internal/state"
)

View File

@@ -38,6 +38,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 64,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
Name: "state",
},
DataChannel: {
// TODO: Consider a split between gossiping current block and catchup
@@ -49,6 +50,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 64,
RecvBufferCapacity: 512,
RecvMessageCapacity: maxMsgSize,
Name: "data",
},
VoteChannel: {
ID: VoteChannel,
@@ -57,6 +59,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 64,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
Name: "vote",
},
VoteSetBitsChannel: {
ID: VoteSetBitsChannel,
@@ -65,6 +68,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 8,
RecvBufferCapacity: 128,
RecvMessageCapacity: maxMsgSize,
Name: "voteSet",
},
}
}
@@ -122,11 +126,8 @@ type Reactor struct {
rs *cstypes.RoundState
readySignal chan struct{} // closed when the node is ready to start consensus
stateCh *p2p.Channel
dataCh *p2p.Channel
voteCh *p2p.Channel
voteSetBitsCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
peerEvents p2p.PeerEventSubscriber
chCreator p2p.ChannelCreator
}
// NewReactor returns a reference to a new consensus reactor, which implements
@@ -134,49 +135,25 @@ type Reactor struct {
// to relevant p2p Channels and a channel to listen for peer updates on. The
// reactor will close all p2p Channels when stopping.
func NewReactor(
ctx context.Context,
logger log.Logger,
cs *State,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
peerEvents p2p.PeerEventSubscriber,
eventBus *eventbus.EventBus,
waitSync bool,
metrics *Metrics,
) (*Reactor, error) {
chans := getChannelDescriptors()
stateCh, err := channelCreator(ctx, chans[StateChannel])
if err != nil {
return nil, err
}
dataCh, err := channelCreator(ctx, chans[DataChannel])
if err != nil {
return nil, err
}
voteCh, err := channelCreator(ctx, chans[VoteChannel])
if err != nil {
return nil, err
}
voteSetBitsCh, err := channelCreator(ctx, chans[VoteSetBitsChannel])
if err != nil {
return nil, err
}
) *Reactor {
r := &Reactor{
logger: logger,
state: cs,
waitSync: waitSync,
rs: cs.GetRoundState(),
peers: make(map[types.NodeID]*PeerState),
eventBus: eventBus,
Metrics: metrics,
stateCh: stateCh,
dataCh: dataCh,
voteCh: voteCh,
voteSetBitsCh: voteSetBitsCh,
peerUpdates: peerUpdates,
readySignal: make(chan struct{}),
logger: logger,
state: cs,
waitSync: waitSync,
rs: cs.GetRoundState(),
peers: make(map[types.NodeID]*PeerState),
eventBus: eventBus,
Metrics: metrics,
peerEvents: peerEvents,
chCreator: channelCreator,
readySignal: make(chan struct{}),
}
r.BaseService = *service.NewBaseService(logger, "Consensus", r)
@@ -184,7 +161,14 @@ func NewReactor(
close(r.readySignal)
}
return r, nil
return r
}
type channelBundle struct {
state *p2p.Channel
data *p2p.Channel
vote *p2p.Channel
votSet *p2p.Channel
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -194,13 +178,39 @@ func NewReactor(
func (r *Reactor) OnStart(ctx context.Context) error {
r.logger.Debug("consensus wait sync", "wait_sync", r.WaitSync())
peerUpdates := r.peerEvents(ctx)
var chBundle channelBundle
var err error
chans := getChannelDescriptors()
chBundle.state, err = r.chCreator(ctx, chans[StateChannel])
if err != nil {
return err
}
chBundle.data, err = r.chCreator(ctx, chans[DataChannel])
if err != nil {
return err
}
chBundle.vote, err = r.chCreator(ctx, chans[VoteChannel])
if err != nil {
return err
}
chBundle.votSet, err = r.chCreator(ctx, chans[VoteSetBitsChannel])
if err != nil {
return err
}
// start routine that computes peer statistics for evaluating peer quality
//
// TODO: Evaluate if we need this to be synchronized via WaitGroup as to not
// leak the goroutine when stopping the reactor.
go r.peerStatsRoutine(ctx)
go r.peerStatsRoutine(ctx, peerUpdates)
r.subscribeToBroadcastEvents()
r.subscribeToBroadcastEvents(chBundle.state)
go r.updateRoundStateRoutine()
if !r.WaitSync() {
@@ -209,11 +219,11 @@ func (r *Reactor) OnStart(ctx context.Context) error {
}
}
go r.processStateCh(ctx)
go r.processDataCh(ctx)
go r.processVoteCh(ctx)
go r.processVoteSetBitsCh(ctx)
go r.processPeerUpdates(ctx)
go r.processStateCh(ctx, chBundle)
go r.processDataCh(ctx, chBundle)
go r.processVoteCh(ctx, chBundle)
go r.processVoteSetBitsCh(ctx, chBundle)
go r.processPeerUpdates(ctx, peerUpdates, chBundle)
return nil
}
@@ -289,22 +299,6 @@ func (r *Reactor) String() string {
return "ConsensusReactor"
}
// StringIndented returns an indented string representation of the Reactor.
func (r *Reactor) StringIndented(indent string) string {
r.mtx.RLock()
defer r.mtx.RUnlock()
s := "ConsensusReactor{\n"
s += indent + " " + r.state.StringIndented(indent+" ") + "\n"
for _, ps := range r.peers {
s += indent + " " + ps.StringIndented(indent+" ") + "\n"
}
s += indent + "}"
return s
}
// GetPeerState returns PeerState for a given NodeID.
func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) {
r.mtx.RLock()
@@ -314,16 +308,16 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) {
return ps, ok
}
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState) error {
return r.stateCh.Send(ctx, p2p.Envelope{
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: makeRoundStepMessage(rs),
})
}
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState) error {
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
psHeader := rs.ProposalBlockParts.Header()
return r.stateCh.Send(ctx, p2p.Envelope{
return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &tmcons.NewValidBlock{
Height: rs.Height,
@@ -335,8 +329,8 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes
})
}
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote) error {
return r.stateCh.Send(ctx, p2p.Envelope{
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &tmcons.HasVote{
Height: vote.Height,
@@ -350,14 +344,14 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote)
// subscribeToBroadcastEvents subscribes for new round steps and votes using the
// internal pubsub defined in the consensus state to broadcast them to peers
// upon receiving.
func (r *Reactor) subscribeToBroadcastEvents() {
func (r *Reactor) subscribeToBroadcastEvents(stateCh *p2p.Channel) {
onStopCh := r.state.getOnStopCh()
err := r.state.evsw.AddListenerForEvent(
listenerIDConsensus,
types.EventNewRoundStepValue,
func(ctx context.Context, data tmevents.EventData) error {
if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState)); err != nil {
if err := r.broadcastNewRoundStepMessage(ctx, data.(*cstypes.RoundState), stateCh); err != nil {
return err
}
select {
@@ -378,7 +372,7 @@ func (r *Reactor) subscribeToBroadcastEvents() {
listenerIDConsensus,
types.EventValidBlockValue,
func(ctx context.Context, data tmevents.EventData) error {
return r.broadcastNewValidBlockMessage(ctx, data.(*cstypes.RoundState))
return r.broadcastNewValidBlockMessage(ctx, data.(*cstypes.RoundState), stateCh)
},
)
if err != nil {
@@ -389,7 +383,7 @@ func (r *Reactor) subscribeToBroadcastEvents() {
listenerIDConsensus,
types.EventVoteValue,
func(ctx context.Context, data tmevents.EventData) error {
return r.broadcastHasVoteMessage(ctx, data.(*types.Vote))
return r.broadcastHasVoteMessage(ctx, data.(*types.Vote), stateCh)
},
)
if err != nil {
@@ -407,8 +401,8 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
}
}
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID) error {
return r.stateCh.Send(ctx, p2p.Envelope{
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error {
return stateCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: makeRoundStepMessage(r.getRoundState()),
})
@@ -434,7 +428,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState {
return r.rs
}
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState) {
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
@@ -483,7 +477,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
}
logger.Debug("sending block part for catchup", "round", prs.Round, "index", index)
_ = r.dataCh.Send(ctx, p2p.Envelope{
_ = dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.BlockPart{
Height: prs.Height, // not our height, so it does not matter.
@@ -498,7 +492,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
time.Sleep(r.state.config.PeerGossipSleepDuration)
}
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState) {
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) {
logger := r.logger.With("peer", ps.peerID)
timer := time.NewTimer(0)
@@ -530,7 +524,7 @@ OUTER_LOOP:
}
logger.Debug("sending block part", "height", prs.Height, "round", prs.Round)
if err := r.dataCh.Send(ctx, p2p.Envelope{
if err := dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.BlockPart{
Height: rs.Height, // this tells peer that this part applies to us
@@ -577,7 +571,7 @@ OUTER_LOOP:
continue OUTER_LOOP
}
r.gossipDataForCatchup(ctx, rs, prs, ps)
r.gossipDataForCatchup(ctx, rs, prs, ps, dataCh)
continue OUTER_LOOP
}
@@ -604,7 +598,7 @@ OUTER_LOOP:
propProto := rs.Proposal.ToProto()
logger.Debug("sending proposal", "height", prs.Height, "round", prs.Round)
if err := r.dataCh.Send(ctx, p2p.Envelope{
if err := dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Proposal{
Proposal: *propProto,
@@ -627,7 +621,7 @@ OUTER_LOOP:
pPolProto := pPol.ToProto()
logger.Debug("sending POL", "height", prs.Height, "round", prs.Round)
if err := r.dataCh.Send(ctx, p2p.Envelope{
if err := dataCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.ProposalPOL{
Height: rs.Height,
@@ -655,14 +649,14 @@ OUTER_LOOP:
// pickSendVote picks a vote and sends it to the peer. It will return true if
// there is a vote to send and false otherwise.
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader) (bool, error) {
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) {
vote, ok := ps.PickVoteToSend(votes)
if !ok {
return false, nil
}
r.logger.Debug("sending vote message", "ps", ps, "vote", vote)
if err := r.voteCh.Send(ctx, p2p.Envelope{
if err := voteCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.Vote{
Vote: vote.ToProto(),
@@ -683,12 +677,13 @@ func (r *Reactor) gossipVotesForHeight(
rs *cstypes.RoundState,
prs *cstypes.PeerRoundState,
ps *PeerState,
voteCh *p2p.Channel,
) (bool, error) {
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
// if there are lastCommits to send...
if prs.Step == cstypes.RoundStepNewHeight {
if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.LastCommit to send")
@@ -700,7 +695,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are POL prevotes to send...
if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ok, err := r.pickSendVote(ctx, ps, polPrevotes); err != nil {
if ok, err := r.pickSendVote(ctx, ps, polPrevotes, voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound)
@@ -711,7 +706,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are prevotes to send...
if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round)); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round), voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
@@ -721,7 +716,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are precommits to send...
if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Precommits(prs.Round)); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Precommits(prs.Round), voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Precommits(prs.Round) to send", "round", prs.Round)
@@ -731,7 +726,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are prevotes to send...(which are needed because of validBlock mechanism)
if prs.Round != -1 && prs.Round <= rs.Round {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round)); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.Votes.Prevotes(prs.Round), voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
@@ -742,7 +737,7 @@ func (r *Reactor) gossipVotesForHeight(
// if there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ok, err := r.pickSendVote(ctx, ps, polPrevotes); err != nil {
if ok, err := r.pickSendVote(ctx, ps, polPrevotes, voteCh); err != nil {
return false, err
} else if ok {
logger.Debug("picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound)
@@ -754,7 +749,7 @@ func (r *Reactor) gossipVotesForHeight(
return false, nil
}
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) {
logger := r.logger.With("peer", ps.peerID)
// XXX: simple hack to throttle logs upon sleep
@@ -786,7 +781,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// if height matches, then send LastCommit, Prevotes, and Precommits
if rs.Height == prs.Height {
if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps); err != nil {
if ok, err := r.gossipVotesForHeight(ctx, rs, prs, ps, voteCh); err != nil {
return
} else if ok {
continue
@@ -795,7 +790,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// special catchup logic -- if peer is lagging by height 1, send LastCommit
if prs.Height != 0 && rs.Height == prs.Height+1 {
if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit); err != nil {
if ok, err := r.pickSendVote(ctx, ps, rs.LastCommit, voteCh); err != nil {
return
} else if ok {
logger.Debug("picked rs.LastCommit to send", "height", prs.Height)
@@ -809,7 +804,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// Load the block commit for prs.Height, which contains precommit
// signatures for prs.Height.
if commit := r.state.blockStore.LoadBlockCommit(prs.Height); commit != nil {
if ok, err := r.pickSendVote(ctx, ps, commit); err != nil {
if ok, err := r.pickSendVote(ctx, ps, commit, voteCh); err != nil {
return
} else if ok {
logger.Debug("picked Catchup commit to send", "height", prs.Height)
@@ -843,7 +838,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState) {
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening.
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) {
timer := time.NewTimer(0)
defer timer.Stop()
@@ -879,7 +874,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// maybe send Height/Round/Prevotes
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
if err := r.stateCh.Send(ctx, p2p.Envelope{
if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -900,7 +895,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// maybe send Height/Round/ProposalPOL
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
if err := r.stateCh.Send(ctx, p2p.Envelope{
if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -921,7 +916,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// maybe send Height/Round/Precommits
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
if err := r.stateCh.Send(ctx, p2p.Envelope{
if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -946,7 +941,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
if prs.Height <= r.state.blockStore.Height() && prs.Height >= r.state.blockStore.Base() {
// maybe send Height/CatchupCommitRound/CatchupCommit
if commit := r.state.LoadCommit(prs.Height); commit != nil {
if err := r.stateCh.Send(ctx, p2p.Envelope{
if err := stateCh.Send(ctx, p2p.Envelope{
To: ps.peerID,
Message: &tmcons.VoteSetMaj23{
Height: prs.Height,
@@ -979,7 +974,7 @@ func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState) {
// be the case, and we spawn all the relevant goroutine to broadcast messages to
// the peer. During peer removal, we remove the peer for our set of peers and
// signal to all spawned goroutines to gracefully exit in a non-blocking manner.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, chans channelBundle) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -1020,14 +1015,14 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
return
}
// start goroutines for this peer
go r.gossipDataRoutine(ctx, ps)
go r.gossipVotesRoutine(ctx, ps)
go r.queryMaj23Routine(ctx, ps)
go r.gossipDataRoutine(ctx, ps, chans.data)
go r.gossipVotesRoutine(ctx, ps, chans.vote)
go r.queryMaj23Routine(ctx, ps, chans.state)
// Send our state to the peer. If we're block-syncing, broadcast a
// RoundStepMessage later upon SwitchToConsensus().
if !r.WaitSync() {
go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID) }()
go func() { _ = r.sendNewRoundStepMessage(ctx, ps.peerID, chans.state) }()
}
}()
@@ -1054,7 +1049,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// If we fail to find the peer state for the envelope sender, we perform a no-op
// and return. This can happen when we process the envelope after the peer is
// removed.
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message) error {
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error {
ps, ok := r.GetPeerState(envelope.From)
if !ok || ps == nil {
r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel")
@@ -1124,7 +1119,7 @@ func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope
eMsg.Votes = *votesProto
}
if err := r.voteSetBitsCh.Send(ctx, p2p.Envelope{
if err := voteSetCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: eMsg,
}); err != nil {
@@ -1292,7 +1287,7 @@ func (r *Reactor) handleVoteSetBitsMessage(ctx context.Context, envelope *p2p.En
// the p2p channel.
//
// NOTE: We block on consensus state for proposals, block parts, and votes.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope, chans channelBundle) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -1323,17 +1318,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
switch chID {
case StateChannel:
err = r.handleStateMessage(ctx, envelope, msgI)
err = r.handleStateMessage(ctx, envelope, msgI, chans.votSet)
case DataChannel:
err = r.handleDataMessage(ctx, envelope, msgI)
case VoteChannel:
err = r.handleVoteMessage(ctx, envelope, msgI)
case VoteSetBitsChannel:
err = r.handleVoteSetBitsMessage(ctx, envelope, msgI)
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
}
@@ -1346,13 +1337,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// execution will result in a PeerError being sent on the StateChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processStateCh(ctx context.Context) {
iter := r.stateCh.Receive(ctx)
func (r *Reactor) processStateCh(ctx context.Context, chans channelBundle) {
iter := chans.state.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.stateCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.stateCh.ID, "envelope", envelope, "err", err)
if serr := r.stateCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, chans.state.ID, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", chans.state.ID, "envelope", envelope, "err", err)
if serr := chans.state.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1367,13 +1358,13 @@ func (r *Reactor) processStateCh(ctx context.Context) {
// execution will result in a PeerError being sent on the DataChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processDataCh(ctx context.Context) {
iter := r.dataCh.Receive(ctx)
func (r *Reactor) processDataCh(ctx context.Context, chans channelBundle) {
iter := chans.data.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.dataCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.dataCh.ID, "envelope", envelope, "err", err)
if serr := r.dataCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, chans.data.ID, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", chans.data.ID, "envelope", envelope, "err", err)
if serr := chans.data.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1388,13 +1379,13 @@ func (r *Reactor) processDataCh(ctx context.Context) {
// execution will result in a PeerError being sent on the VoteChannel. When
// the reactor is stopped, we will catch the signal and close the p2p Channel
// gracefully.
func (r *Reactor) processVoteCh(ctx context.Context) {
iter := r.voteCh.Receive(ctx)
func (r *Reactor) processVoteCh(ctx context.Context, chans channelBundle) {
iter := chans.vote.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.voteCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.voteCh.ID, "envelope", envelope, "err", err)
if serr := r.voteCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, chans.vote.ID, envelope, chans); err != nil {
r.logger.Error("failed to process message", "ch_id", chans.vote.ID, "envelope", envelope, "err", err)
if serr := chans.vote.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1409,18 +1400,18 @@ func (r *Reactor) processVoteCh(ctx context.Context) {
// execution will result in a PeerError being sent on the VoteSetBitsChannel.
// When the reactor is stopped, we will catch the signal and close the p2p
// Channel gracefully.
func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
iter := r.voteSetBitsCh.Receive(ctx)
func (r *Reactor) processVoteSetBitsCh(ctx context.Context, chans channelBundle) {
iter := chans.votSet.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.voteSetBitsCh.ID, envelope); err != nil {
if err := r.handleMessage(ctx, chans.votSet.ID, envelope, chans); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
r.logger.Error("failed to process message", "ch_id", r.voteSetBitsCh.ID, "envelope", envelope, "err", err)
if serr := r.voteSetBitsCh.SendError(ctx, p2p.PeerError{
r.logger.Error("failed to process message", "ch_id", chans.votSet.ID, "envelope", envelope, "err", err)
if serr := chans.votSet.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -1433,18 +1424,18 @@ func (r *Reactor) processVoteSetBitsCh(ctx context.Context) {
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, chans channelBundle) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate, chans)
}
}
}
func (r *Reactor) peerStatsRoutine(ctx context.Context) {
func (r *Reactor) peerStatsRoutine(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
if !r.IsRunning() {
r.logger.Info("stopping peerStatsRoutine")
@@ -1462,7 +1453,7 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) {
switch msg.Msg.(type) {
case *VoteMessage:
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
r.peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
NodeID: msg.PeerID,
Status: p2p.PeerStatusGood,
})
@@ -1470,7 +1461,7 @@ func (r *Reactor) peerStatsRoutine(ctx context.Context) {
case *BlockPartMessage:
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
r.peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
peerUpdates.SendUpdate(ctx, p2p.PeerUpdate{
NodeID: msg.PeerID,
Status: p2p.PeerStatusGood,
})

View File

@@ -104,16 +104,15 @@ func setup(
for nodeID, node := range rts.network.Nodes {
state := states[i]
reactor, err := NewReactor(ctx,
reactor := NewReactor(
state.logger.With("node", nodeID),
state,
chCreator(nodeID),
node.MakePeerUpdates(ctx, t),
func(ctx context.Context) *p2p.PeerUpdates { return node.MakePeerUpdates(ctx, t) },
state.eventBus,
true,
NopMetrics(),
)
require.NoError(t, err)
blocksSub, err := state.eventBus.SubscribeWithArgs(ctx, tmpubsub.SubscribeArgs{
ClientID: testSubscriber,

View File

@@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
types "github.com/tendermint/tendermint/types"
)

View File

@@ -38,6 +38,7 @@ func GetChannelDescriptor() *p2p.ChannelDescriptor {
Priority: 6,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 32,
Name: "evidence",
}
}
@@ -46,9 +47,9 @@ type Reactor struct {
service.BaseService
logger log.Logger
evpool *Pool
evidenceCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
evpool *Pool
chCreator p2p.ChannelCreator
peerEvents p2p.PeerEventSubscriber
mtx sync.Mutex
@@ -59,28 +60,22 @@ type Reactor struct {
// service.Service interface. It accepts a p2p Channel dedicated for handling
// envelopes with EvidenceList messages.
func NewReactor(
ctx context.Context,
logger log.Logger,
chCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
peerEvents p2p.PeerEventSubscriber,
evpool *Pool,
) (*Reactor, error) {
evidenceCh, err := chCreator(ctx, GetChannelDescriptor())
if err != nil {
return nil, err
}
) *Reactor {
r := &Reactor{
logger: logger,
evpool: evpool,
evidenceCh: evidenceCh,
peerUpdates: peerUpdates,
chCreator: chCreator,
peerEvents: peerEvents,
peerRoutines: make(map[types.NodeID]context.CancelFunc),
}
r.BaseService = *service.NewBaseService(logger, "Evidence", r)
return r, err
return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -88,18 +83,20 @@ func NewReactor(
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed. No error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
go r.processEvidenceCh(ctx)
go r.processPeerUpdates(ctx)
ch, err := r.chCreator(ctx, GetChannelDescriptor())
if err != nil {
return err
}
go r.processEvidenceCh(ctx, ch)
go r.processPeerUpdates(ctx, r.peerEvents(ctx), ch)
return nil
}
// OnStop stops the reactor by signaling to all spawned goroutines to exit and
// blocking until they all exit.
func (r *Reactor) OnStop() {
// Close the evidence db
r.evpool.Close()
}
func (r *Reactor) OnStop() { r.evpool.Close() }
// handleEvidenceMessage handles envelopes sent from peers on the EvidenceChannel.
// It returns an error only if the Envelope.Message is unknown for this channel
@@ -163,13 +160,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// processEvidenceCh implements a blocking event loop where we listen for p2p
// Envelope messages from the evidenceCh.
func (r *Reactor) processEvidenceCh(ctx context.Context) {
iter := r.evidenceCh.Receive(ctx)
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) {
iter := evidenceCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.evidenceCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.evidenceCh.ID, "envelope", envelope, "err", err)
if serr := r.evidenceCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, evidenceCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", evidenceCh.ID, "envelope", envelope, "err", err)
if serr := evidenceCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -190,7 +187,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context) {
// connects/disconnects frequently from the broadcasting peer(s).
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -213,7 +210,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
if !ok {
pctx, pcancel := context.WithCancel(ctx)
r.peerRoutines[peerUpdate.NodeID] = pcancel
go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID)
go r.broadcastEvidenceLoop(pctx, peerUpdate.NodeID, evidenceCh)
}
case p2p.PeerStatusDown:
@@ -231,11 +228,11 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) {
for {
select {
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate, evidenceCh)
case <-ctx.Done():
return
}
@@ -253,7 +250,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
// that the peer has already received or may not be ready for.
//
// REF: https://github.com/tendermint/tendermint/issues/4727
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID) {
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) {
var next *clist.CElement
defer func() {
@@ -300,7 +297,7 @@ func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID
// peer may receive this piece of evidence multiple times if it added and
// removed frequently from the broadcasting peer.
if err := r.evidenceCh.Send(ctx, p2p.Envelope{
if err := evidenceCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: evProto,
}); err != nil {

View File

@@ -92,21 +92,20 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store, chBuf uint
require.NoError(t, err)
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
pu := p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.peerUpdates[nodeID] = pu
rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu)
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.evidenceChannels[nodeID], nil
}
rts.reactors[nodeID], err = evidence.NewReactor(
ctx,
rts.reactors[nodeID] = evidence.NewReactor(
logger,
chCreator,
rts.peerUpdates[nodeID],
func(ctx context.Context) *p2p.PeerUpdates { return pu },
rts.pools[nodeID])
require.NoError(t, err)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
require.True(t, rts.reactors[nodeID].IsRunning())

View File

@@ -40,13 +40,9 @@ type Reactor struct {
mempool *TxMempool
ids *IDs
// XXX: Currently, this is the only way to get information about a peer. Ideally,
// we rely on message-oriented communication to get necessary peer data.
// ref: https://github.com/tendermint/tendermint/issues/5670
peerMgr PeerManager
mempoolCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
getPeerHeight func(types.NodeID) int64
peerEvents p2p.PeerEventSubscriber
chCreator p2p.ChannelCreator
// observePanic is a function for observing panics that were recovered in methods on
// Reactor. observePanic is called with the recovered value.
@@ -58,34 +54,27 @@ type Reactor struct {
// NewReactor returns a reference to a new reactor.
func NewReactor(
ctx context.Context,
logger log.Logger,
cfg *config.MempoolConfig,
peerMgr PeerManager,
txmp *TxMempool,
chCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
) (*Reactor, error) {
ch, err := chCreator(ctx, getChannelDescriptor(cfg))
if err != nil {
return nil, err
}
peerEvents p2p.PeerEventSubscriber,
getPeerHeight func(types.NodeID) int64,
) *Reactor {
r := &Reactor{
logger: logger,
cfg: cfg,
peerMgr: peerMgr,
mempool: txmp,
ids: NewMempoolIDs(),
mempoolCh: ch,
peerUpdates: peerUpdates,
peerRoutines: make(map[types.NodeID]context.CancelFunc),
observePanic: defaultObservePanic,
logger: logger,
cfg: cfg,
mempool: txmp,
ids: NewMempoolIDs(),
chCreator: chCreator,
peerEvents: peerEvents,
getPeerHeight: getPeerHeight,
peerRoutines: make(map[types.NodeID]context.CancelFunc),
observePanic: defaultObservePanic,
}
r.BaseService = *service.NewBaseService(logger, "Mempool", r)
return r, nil
return r
}
func defaultObservePanic(r interface{}) {}
@@ -106,6 +95,7 @@ func getChannelDescriptor(cfg *config.MempoolConfig) *p2p.ChannelDescriptor {
Priority: 5,
RecvMessageCapacity: batchMsg.Size(),
RecvBufferCapacity: 128,
Name: "mempool",
}
}
@@ -118,8 +108,13 @@ func (r *Reactor) OnStart(ctx context.Context) error {
r.logger.Info("tx broadcasting is disabled")
}
go r.processMempoolCh(ctx)
go r.processPeerUpdates(ctx)
ch, err := r.chCreator(ctx, getChannelDescriptor(r.cfg))
if err != nil {
return err
}
go r.processMempoolCh(ctx, ch)
go r.processPeerUpdates(ctx, r.peerEvents(ctx), ch)
return nil
}
@@ -202,13 +197,13 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// processMempoolCh implements a blocking event loop where we listen for p2p
// Envelope messages from the mempoolCh.
func (r *Reactor) processMempoolCh(ctx context.Context) {
iter := r.mempoolCh.Receive(ctx)
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) {
iter := mempoolCh.Receive(ctx)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, r.mempoolCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", r.mempoolCh.ID, "envelope", envelope, "err", err)
if serr := r.mempoolCh.SendError(ctx, p2p.PeerError{
if err := r.handleMessage(ctx, mempoolCh.ID, envelope); err != nil {
r.logger.Error("failed to process message", "ch_id", mempoolCh.ID, "envelope", envelope, "err", err)
if serr := mempoolCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -223,7 +218,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context) {
// goroutine or not. If not, we start one for the newly added peer. For down or
// removed peers, we remove the peer from the mempool peer ID set and signal to
// stop the tx broadcasting goroutine.
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
r.mtx.Lock()
@@ -251,7 +246,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
r.ids.ReserveForPeer(peerUpdate.NodeID)
// start a broadcast routine ensuring all txs are forwarded to the peer
go r.broadcastTxRoutine(pctx, peerUpdate.NodeID)
go r.broadcastTxRoutine(pctx, peerUpdate.NodeID, mempoolCh)
}
}
@@ -272,18 +267,18 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate, mempoolCh)
}
}
}
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) {
peerMempoolID := r.ids.GetForPeer(peerID)
var nextGossipTx *clist.CElement
@@ -324,8 +319,8 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
memTx := nextGossipTx.Value.(*WrappedTx)
if r.peerMgr != nil {
height := r.peerMgr.GetHeight(peerID)
if r.getPeerHeight != nil {
height := r.getPeerHeight(peerID)
if height > 0 && height < memTx.height-1 {
// allow for a lag of one block
time.Sleep(PeerCatchupSleepIntervalMS * time.Millisecond)
@@ -338,7 +333,7 @@ func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID) {
if ok := r.mempool.txStore.TxHasPeer(memTx.hash, peerMempoolID); !ok {
// Send the mempool tx to the corresponding peer. Note, the peer may be
// behind and thus would not be able to process the mempool tx correctly.
if err := r.mempoolCh.Send(ctx, p2p.Envelope{
if err := mempoolCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &protomem.Txs{
Txs: [][]byte{memTx.tx},

View File

@@ -79,17 +79,14 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
return rts.mempoolChannels[nodeID], nil
}
rts.reactors[nodeID], err = NewReactor(
ctx,
rts.reactors[nodeID] = NewReactor(
rts.logger.With("nodeID", nodeID),
cfg.Mempool,
rts.network.Nodes[nodeID].PeerManager,
mempool,
chCreator,
rts.peerUpdates[nodeID],
func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
rts.network.Nodes[nodeID].PeerManager.GetHeight,
)
require.NoError(t, err)
rts.nodes = append(rts.nodes, nodeID)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
@@ -179,7 +176,7 @@ func TestReactorBroadcastDoesNotPanic(t *testing.T) {
// run the router
rts.start(ctx, t)
go primaryReactor.broadcastTxRoutine(ctx, secondary)
go primaryReactor.broadcastTxRoutine(ctx, secondary, rts.mempoolChannels[primary])
wg := &sync.WaitGroup{}
for i := 0; i < 50; i++ {

View File

@@ -7,5 +7,5 @@ Docs:
- [Connection](https://docs.tendermint.com/master/spec/p2p/connection.html) for details on how connections and multiplexing work
- [Peer](https://docs.tendermint.com/master/spec/p2p/node.html) for details on peer ID, handshakes, and peer exchange
- [Node](https://docs.tendermint.com/master/spec/p2p/node.html) for details about different types of nodes and how they should work
- [Pex](https://docs.tendermint.com/master/spec/reactors/pex/pex.html) for details on peer discovery and exchange
- [Pex](https://docs.tendermint.com/master/spec/p2p/messages/pex.html) for details on peer discovery and exchange
- [Config](https://docs.tendermint.com/master/spec/p2p/config.html) for details on some config option

View File

@@ -60,6 +60,7 @@ type Channel struct {
errCh chan<- PeerError // peer error reporting
messageType proto.Message // the channel's message type, used for unmarshaling
name string
}
// NewChannel creates a new channel. It is primarily for internal and test
@@ -102,6 +103,8 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
}
}
func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
// Receive returns a new unbuffered iterator to receive messages from ch.
// The iterator runs until ctx ends.
func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {

View File

@@ -616,6 +616,10 @@ type ChannelDescriptor struct {
// RecvBufferCapacity defines the max buffer size of inbound messages for a
// given p2p Channel queue.
RecvBufferCapacity int
// Human readable name of the channel, used in logging and
// diagnostics.
Name string
}
func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {

View File

@@ -259,12 +259,11 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
n.logger,
p2p.NopMetrics(),
nodeInfo,
privKey,
peerManager,
func() *types.NodeInfo { return &nodeInfo },
[]p2p.Transport{transport},
transport.Endpoints(),
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},

View File

@@ -828,6 +828,11 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
return addresses
}
// PeerEventSubscriber describes the type of the subscription method, to assist
// in isolating reactors specific construction and lifecycle from the
// peer manager.
type PeerEventSubscriber func(context.Context) *PeerUpdates
// Subscribe subscribes to peer updates. The caller must consume the peer
// updates in a timely fashion and close the subscription when done, otherwise
// the PeerManager will halt.

View File

@@ -63,6 +63,7 @@ func ChannelDescriptor() *conn.ChannelDescriptor {
SendQueueCapacity: 10,
RecvMessageCapacity: maxMsgSize,
RecvBufferCapacity: 128,
Name: "pex",
}
}
@@ -79,9 +80,8 @@ type Reactor struct {
logger log.Logger
peerManager *p2p.PeerManager
pexCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
chCreator p2p.ChannelCreator
peerEvents p2p.PeerEventSubscriber
// list of available peers to loop through and send peer requests to
availablePeers map[types.NodeID]struct{}
@@ -104,30 +104,23 @@ type Reactor struct {
// NewReactor returns a reference to a new reactor.
func NewReactor(
ctx context.Context,
logger log.Logger,
peerManager *p2p.PeerManager,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
) (*Reactor, error) {
channel, err := channelCreator(ctx, ChannelDescriptor())
if err != nil {
return nil, err
}
peerEvents p2p.PeerEventSubscriber,
) *Reactor {
r := &Reactor{
logger: logger,
peerManager: peerManager,
pexCh: channel,
peerUpdates: peerUpdates,
chCreator: channelCreator,
peerEvents: peerEvents,
availablePeers: make(map[types.NodeID]struct{}),
requestsSent: make(map[types.NodeID]struct{}),
lastReceivedRequests: make(map[types.NodeID]time.Time),
}
r.BaseService = *service.NewBaseService(logger, "PEX", r)
return r, nil
return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -135,8 +128,14 @@ func NewReactor(
// messages on that p2p channel accordingly. The caller must be sure to execute
// OnStop to ensure the outbound p2p Channels are closed.
func (r *Reactor) OnStart(ctx context.Context) error {
go r.processPexCh(ctx)
go r.processPeerUpdates(ctx)
channel, err := r.chCreator(ctx, ChannelDescriptor())
if err != nil {
return err
}
peerUpdates := r.peerEvents(ctx)
go r.processPexCh(ctx, channel)
go r.processPeerUpdates(ctx, peerUpdates)
return nil
}
@@ -146,11 +145,11 @@ func (r *Reactor) OnStop() {}
// processPexCh implements a blocking event loop where we listen for p2p
// Envelope messages from the pexCh.
func (r *Reactor) processPexCh(ctx context.Context) {
func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
incoming := make(chan *p2p.Envelope)
go func() {
defer close(incoming)
iter := r.pexCh.Receive(ctx)
iter := pexCh.Receive(ctx)
for iter.Next(ctx) {
select {
case <-ctx.Done():
@@ -176,7 +175,7 @@ func (r *Reactor) processPexCh(ctx context.Context) {
case <-timer.C:
// Send a request for more peer addresses.
if err := r.sendRequestForPeers(ctx); err != nil {
if err := r.sendRequestForPeers(ctx, pexCh); err != nil {
return
// TODO(creachadair): Do we really want to stop processing the PEX
// channel just because of an error here?
@@ -191,11 +190,11 @@ func (r *Reactor) processPexCh(ctx context.Context) {
}
// A request from another peer, or a response to one of our requests.
dur, err := r.handlePexMessage(ctx, envelope)
dur, err := r.handlePexMessage(ctx, envelope, pexCh)
if err != nil {
r.logger.Error("failed to process message",
"ch_id", r.pexCh.ID, "envelope", envelope, "err", err)
if serr := r.pexCh.SendError(ctx, p2p.PeerError{
"ch_id", pexCh.ID, "envelope", envelope, "err", err)
if serr := pexCh.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
Err: err,
}); serr != nil {
@@ -212,12 +211,12 @@ func (r *Reactor) processPexCh(ctx context.Context) {
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
}
}
@@ -226,7 +225,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
// handlePexMessage handles envelopes sent from peers on the PexChannel.
// If an update was received, a new polling interval is returned; otherwise the
// duration is 0.
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope) (time.Duration, error) {
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -245,7 +244,7 @@ func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope)
URL: addr.String(),
}
}
return 0, r.pexCh.Send(ctx, p2p.Envelope{
return 0, pexCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &protop2p.PexResponse{Addresses: pexAddresses},
})
@@ -309,7 +308,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
// that peer a request for more peer addresses. The chosen peer is moved into
// the requestsSent bucket so that we will not attempt to contact them again
// until they've replied or updated.
func (r *Reactor) sendRequestForPeers(ctx context.Context) error {
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error {
r.mtx.Lock()
defer r.mtx.Unlock()
if len(r.availablePeers) == 0 {
@@ -324,7 +323,7 @@ func (r *Reactor) sendRequestForPeers(ctx context.Context) error {
break
}
if err := r.pexCh.Send(ctx, p2p.Envelope{
if err := pexCh.Send(ctx, p2p.Envelope{
To: peerID,
Message: &protop2p.PexRequest{},
}); err != nil {

View File

@@ -23,8 +23,8 @@ import (
const (
checkFrequency = 500 * time.Millisecond
defaultBufferSize = 2
shortWait = 10 * time.Second
longWait = 60 * time.Second
shortWait = 5 * time.Second
longWait = 20 * time.Second
firstNode = 0
secondNode = 1
@@ -211,7 +211,8 @@ func TestReactorSmallPeerStoreInALargeNetwork(t *testing.T) {
require.Eventually(t, func() bool {
// nolint:scopelint
return testNet.network.Nodes[nodeID].PeerManager.PeerRatio() >= 0.9
}, longWait, checkFrequency)
}, longWait, checkFrequency,
"peer ratio is: %f", testNet.network.Nodes[nodeID].PeerManager.PeerRatio())
}
}
@@ -303,8 +304,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
return pexCh, nil
}
reactor, err := pex.NewReactor(ctx, log.NewNopLogger(), peerManager, chCreator, peerUpdates)
require.NoError(t, err)
reactor := pex.NewReactor(log.NewNopLogger(), peerManager, chCreator, func(_ context.Context) *p2p.PeerUpdates { return peerUpdates })
require.NoError(t, reactor.Start(ctx))
t.Cleanup(reactor.Wait)
@@ -381,6 +381,10 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
idx := 0
for nodeID := range rts.network.Nodes {
// make a copy to avoid getting hit by the range ref
// confusion:
nodeID := nodeID
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate, chBuf)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
@@ -393,15 +397,12 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
if idx < opts.MockNodes {
rts.mocks = append(rts.mocks, nodeID)
} else {
var err error
rts.reactors[nodeID], err = pex.NewReactor(
ctx,
rts.reactors[nodeID] = pex.NewReactor(
rts.logger.With("nodeID", nodeID),
rts.network.Nodes[nodeID].PeerManager,
chCreator,
rts.peerUpdates[nodeID],
func(_ context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
)
require.NoError(t, err)
}
rts.nodes = append(rts.nodes, nodeID)
@@ -426,9 +427,10 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
func (r *reactorTestSuite) start(ctx context.Context, t *testing.T) {
t.Helper()
for _, reactor := range r.reactors {
for name, reactor := range r.reactors {
require.NoError(t, reactor.Start(ctx))
require.True(t, reactor.IsRunning())
t.Log("started", name)
}
}
@@ -451,15 +453,12 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int
return r.pexChannels[nodeID], nil
}
var err error
r.reactors[nodeID], err = pex.NewReactor(
ctx,
r.reactors[nodeID] = pex.NewReactor(
r.logger.With("nodeID", nodeID),
r.network.Nodes[nodeID].PeerManager,
chCreator,
r.peerUpdates[nodeID],
func(_ context.Context) *p2p.PeerUpdates { return r.peerUpdates[nodeID] },
)
require.NoError(t, err)
r.nodes = append(r.nodes, nodeID)
r.total++
}

View File

@@ -150,7 +150,6 @@ type Router struct {
metrics *Metrics
options RouterOptions
nodeInfo types.NodeInfo
privKey crypto.PrivKey
peerManager *PeerManager
chDescs []*ChannelDescriptor
@@ -162,8 +161,9 @@ type Router struct {
peerMtx sync.RWMutex
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
// the channels that the peer queue has open
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue
nodeInfoProducer func() *types.NodeInfo
// FIXME: We don't strictly need to use a mutex for this if we seal the
// channels on router start. This depends on whether we want to allow
@@ -177,12 +177,11 @@ type Router struct {
// listening on appropriate interfaces, and will be closed by the Router when it
// stops.
func NewRouter(
ctx context.Context,
logger log.Logger,
metrics *Metrics,
nodeInfo types.NodeInfo,
privKey crypto.PrivKey,
peerManager *PeerManager,
nodeInfoProducer func() *types.NodeInfo,
transports []Transport,
endpoints []Endpoint,
options RouterOptions,
@@ -193,10 +192,10 @@ func NewRouter(
}
router := &Router{
logger: logger,
metrics: metrics,
nodeInfo: nodeInfo,
privKey: privKey,
logger: logger,
metrics: metrics,
privKey: privKey,
nodeInfoProducer: nodeInfoProducer,
connTracker: newConnTracker(
options.MaxIncomingConnectionAttempts,
options.IncomingConnectionWindow,
@@ -215,13 +214,6 @@ func NewRouter(
router.BaseService = service.NewBaseService(logger, "router", router)
qf, err := router.createQueueFactory(ctx)
if err != nil {
return nil, err
}
router.queueFactory = qf
for _, transport := range transports {
for _, protocol := range transport.Protocols() {
if _, ok := router.protocolTransports[protocol]; !ok {
@@ -281,6 +273,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh)
channel.name = chDesc.Name
var wrapper Wrapper
if w, ok := messageType.(Wrapper); ok {
@@ -291,7 +284,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C
r.channelMessages[id] = messageType
// add the channel to the nodeInfo if it's not already there.
r.nodeInfo.AddChannel(uint16(chDesc.ID))
r.nodeInfoProducer().AddChannel(uint16(chDesc.ID))
for _, t := range r.transports {
t.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
@@ -722,7 +715,8 @@ func (r *Router) handshakePeer(
defer cancel()
}
peerInfo, peerKey, err := conn.Handshake(ctx, r.nodeInfo, r.privKey)
nodeInfo := r.nodeInfoProducer()
peerInfo, peerKey, err := conn.Handshake(ctx, *nodeInfo, r.privKey)
if err != nil {
return peerInfo, err
}
@@ -737,7 +731,7 @@ func (r *Router) handshakePeer(
return peerInfo, fmt.Errorf("expected to connect with peer %q, got %q",
expectID, peerInfo.NodeID)
}
if err := r.nodeInfo.CompatibleWith(peerInfo); err != nil {
if err := r.nodeInfoProducer().CompatibleWith(peerInfo); err != nil {
return peerInfo, ErrRejected{
err: err,
id: peerInfo.ID(),
@@ -937,11 +931,25 @@ func (r *Router) evictPeers(ctx context.Context) {
// NodeInfo returns a copy of the current NodeInfo. Used for testing.
func (r *Router) NodeInfo() types.NodeInfo {
return r.nodeInfo.Copy()
return r.nodeInfoProducer().Copy()
}
func (r *Router) setupQueueFactory(ctx context.Context) error {
qf, err := r.createQueueFactory(ctx)
if err != nil {
return err
}
r.queueFactory = qf
return nil
}
// OnStart implements service.Service.
func (r *Router) OnStart(ctx context.Context) error {
if err := r.setupQueueFactory(ctx); err != nil {
return err
}
for _, transport := range r.transports {
for _, endpoint := range r.endpoints {
if err := transport.Listen(endpoint); err != nil {
@@ -950,11 +958,12 @@ func (r *Router) OnStart(ctx context.Context) error {
}
}
nodeInfo := r.nodeInfoProducer()
r.logger.Info(
"starting router",
"node_id", r.nodeInfo.NodeID,
"channels", r.nodeInfo.Channels,
"listen_addr", r.nodeInfo.ListenAddr,
"node_id", nodeInfo.NodeID,
"channels", nodeInfo.Channels,
"listen_addr", nodeInfo.ListenAddr,
"transports", len(r.transports),
)

View File

@@ -23,29 +23,35 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
t.Run("Default", func(t *testing.T) {
require.Zero(t, os.Getenv("TM_P2P_QUEUE"))
opts := RouterOptions{}
r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Fifo", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypeFifo}
r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
_, ok := r.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Priority", func(t *testing.T) {
opts := RouterOptions{QueueType: queueTypePriority}
r, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
q, ok := r.queueFactory(1).(*pqScheduler)
require.True(t, ok)
defer q.close()
})
t.Run("NonExistant", func(t *testing.T) {
opts := RouterOptions{QueueType: "fast"}
_, err := NewRouter(ctx, log.NewNopLogger(), nil, types.NodeInfo{}, nil, nil, nil, nil, opts)
_, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
require.Error(t, err)
require.Contains(t, err.Error(), "fast")
})

View File

@@ -106,12 +106,11 @@ func TestRouter_Channel_Basic(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
nil,
nil,
p2p.RouterOptions{},
@@ -409,12 +408,11 @@ func TestRouter_AcceptPeers(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -464,12 +462,11 @@ func TestRouter_AcceptPeers_Error(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -502,12 +499,11 @@ func TestRouter_AcceptPeers_ErrorEOF(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -554,12 +550,11 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -658,12 +653,11 @@ func TestRouter_DialPeers(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -744,12 +738,11 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
require.True(t, added)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{
@@ -819,12 +812,11 @@ func TestRouter_EvictPeers(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -882,12 +874,11 @@ func TestRouter_ChannelCompatability(t *testing.T) {
require.NoError(t, err)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},
@@ -938,12 +929,11 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
sub := peerManager.Subscribe(ctx)
router, err := p2p.NewRouter(
ctx,
log.NewNopLogger(),
p2p.NopMetrics(),
selfInfo,
selfKey,
peerManager,
func() *types.NodeInfo { return &selfInfo },
[]p2p.Transport{mockTransport},
nil,
p2p.RouterOptions{},

View File

@@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
indexer "github.com/tendermint/tendermint/internal/state/indexer"
query "github.com/tendermint/tendermint/internal/pubsub/query"

View File

@@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/internal/state"
types "github.com/tendermint/tendermint/types"

View File

@@ -4,7 +4,6 @@ package mocks
import (
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/internal/state"
tendermintstate "github.com/tendermint/tendermint/proto/tendermint/state"

View File

@@ -6,7 +6,6 @@ import (
context "context"
mock "github.com/stretchr/testify/mock"
state "github.com/tendermint/tendermint/internal/state"
types "github.com/tendermint/tendermint/types"

View File

@@ -75,13 +75,13 @@ const (
func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
return map[p2p.ChannelID]*p2p.ChannelDescriptor{
SnapshotChannel: {
ID: SnapshotChannel,
MessageType: new(ssproto.Message),
Priority: 6,
SendQueueCapacity: 10,
RecvMessageCapacity: snapshotMsgSize,
RecvBufferCapacity: 128,
Name: "snapshot",
},
ChunkChannel: {
ID: ChunkChannel,
@@ -90,6 +90,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 4,
RecvMessageCapacity: chunkMsgSize,
RecvBufferCapacity: 128,
Name: "chunk",
},
LightBlockChannel: {
ID: LightBlockChannel,
@@ -98,6 +99,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 10,
RecvMessageCapacity: lightBlockMsgSize,
RecvBufferCapacity: 128,
Name: "light-block",
},
ParamsChannel: {
ID: ParamsChannel,
@@ -106,6 +108,7 @@ func getChannelDescriptors() map[p2p.ChannelID]*p2p.ChannelDescriptor {
SendQueueCapacity: 10,
RecvMessageCapacity: paramMsgSize,
RecvBufferCapacity: 128,
Name: "params",
},
}
@@ -135,13 +138,11 @@ type Reactor struct {
stateStore sm.Store
blockStore *store.BlockStore
conn abciclient.Client
tempDir string
snapshotCh *p2p.Channel
chunkCh *p2p.Channel
blockCh *p2p.Channel
paramsCh *p2p.Channel
peerUpdates *p2p.PeerUpdates
conn abciclient.Client
tempDir string
peerEvents p2p.PeerEventSubscriber
chCreator p2p.ChannelCreator
sendBlockError func(context.Context, p2p.PeerError) error
// Dispatcher is used to multiplex light block requests and responses over multiple
// peers used by the p2p state provider and in reverse sync.
@@ -151,10 +152,13 @@ type Reactor struct {
// These will only be set when a state sync is in progress. It is used to feed
// received snapshots and chunks into the syncer and manage incoming and outgoing
// providers.
mtx sync.RWMutex
syncer *syncer
providers map[types.NodeID]*BlockProvider
stateProvider StateProvider
mtx sync.RWMutex
initSyncer func() *syncer
requestSnaphot func() error
syncer *syncer
providers map[types.NodeID]*BlockProvider
initStateProvider func(ctx context.Context, chainID string, initialHeight int64) error
stateProvider StateProvider
eventBus *eventbus.EventBus
metrics *Metrics
@@ -174,32 +178,13 @@ func NewReactor(
logger log.Logger,
conn abciclient.Client,
channelCreator p2p.ChannelCreator,
peerUpdates *p2p.PeerUpdates,
peerEvents p2p.PeerEventSubscriber,
stateStore sm.Store,
blockStore *store.BlockStore,
tempDir string,
ssMetrics *Metrics,
eventBus *eventbus.EventBus,
) (*Reactor, error) {
chDesc := getChannelDescriptors()
snapshotCh, err := channelCreator(ctx, chDesc[SnapshotChannel])
if err != nil {
return nil, err
}
chunkCh, err := channelCreator(ctx, chDesc[ChunkChannel])
if err != nil {
return nil, err
}
blockCh, err := channelCreator(ctx, chDesc[LightBlockChannel])
if err != nil {
return nil, err
}
paramsCh, err := channelCreator(ctx, chDesc[ParamsChannel])
if err != nil {
return nil, err
}
) *Reactor {
r := &Reactor{
logger: logger,
@@ -207,23 +192,19 @@ func NewReactor(
initialHeight: initialHeight,
cfg: cfg,
conn: conn,
snapshotCh: snapshotCh,
chunkCh: chunkCh,
blockCh: blockCh,
paramsCh: paramsCh,
peerUpdates: peerUpdates,
chCreator: channelCreator,
peerEvents: peerEvents,
tempDir: tempDir,
stateStore: stateStore,
blockStore: blockStore,
peers: newPeerList(),
dispatcher: NewDispatcher(blockCh),
providers: make(map[types.NodeID]*BlockProvider),
metrics: ssMetrics,
eventBus: eventBus,
}
r.BaseService = *service.NewBaseService(logger, "StateSync", r)
return r, nil
return r
}
// OnStart starts separate go routines for each p2p Channel and listens for
@@ -233,11 +214,91 @@ func NewReactor(
// The caller must be sure to execute OnStop to ensure the outbound p2p Channels are
// closed. No error is returned.
func (r *Reactor) OnStart(ctx context.Context) error {
go r.processCh(ctx, r.snapshotCh, "snapshot")
go r.processCh(ctx, r.chunkCh, "chunk")
go r.processCh(ctx, r.blockCh, "light block")
go r.processCh(ctx, r.paramsCh, "consensus params")
go r.processPeerUpdates(ctx)
// construct channels
chDesc := getChannelDescriptors()
snapshotCh, err := r.chCreator(ctx, chDesc[SnapshotChannel])
if err != nil {
return err
}
chunkCh, err := r.chCreator(ctx, chDesc[ChunkChannel])
if err != nil {
return err
}
blockCh, err := r.chCreator(ctx, chDesc[LightBlockChannel])
if err != nil {
return err
}
paramsCh, err := r.chCreator(ctx, chDesc[ParamsChannel])
if err != nil {
return err
}
// define constructor and helper functions, that hold
// references to these channels for use later. This is not
// ideal.
r.initSyncer = func() *syncer {
return &syncer{
logger: r.logger,
stateProvider: r.stateProvider,
conn: r.conn,
snapshots: newSnapshotPool(),
snapshotCh: snapshotCh,
chunkCh: chunkCh,
tempDir: r.tempDir,
fetchers: r.cfg.Fetchers,
retryTimeout: r.cfg.ChunkRequestTimeout,
metrics: r.metrics,
}
}
r.dispatcher = NewDispatcher(blockCh)
r.requestSnaphot = func() error {
// request snapshots from all currently connected peers
return snapshotCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &ssproto.SnapshotsRequest{},
})
}
r.sendBlockError = blockCh.SendError
r.initStateProvider = func(ctx context.Context, chainID string, initialHeight int64) error {
to := light.TrustOptions{
Period: r.cfg.TrustPeriod,
Height: r.cfg.TrustHeight,
Hash: r.cfg.TrustHashBytes(),
}
spLogger := r.logger.With("module", "stateprovider")
spLogger.Info("initializing state provider", "trustPeriod", to.Period,
"trustHeight", to.Height, "useP2P", r.cfg.UseP2P)
if r.cfg.UseP2P {
if err := r.waitForEnoughPeers(ctx, 2); err != nil {
return err
}
peers := r.peers.All()
providers := make([]provider.Provider, len(peers))
for idx, p := range peers {
providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
}
stateProvider, err := NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, paramsCh, r.logger.With("module", "stateprovider"))
if err != nil {
return fmt.Errorf("failed to initialize P2P state provider: %w", err)
}
r.stateProvider = stateProvider
return nil
}
stateProvider, err := NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
if err != nil {
return fmt.Errorf("failed to initialize RPC state provider: %w", err)
}
r.stateProvider = stateProvider
return nil
}
go r.processChannels(ctx, snapshotCh, chunkCh, blockCh, paramsCh)
go r.processPeerUpdates(ctx, r.peerEvents(ctx))
return nil
}
@@ -280,17 +341,9 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
return sm.State{}, err
}
r.syncer = newSyncer(
r.cfg,
r.logger,
r.conn,
r.stateProvider,
r.snapshotCh,
r.chunkCh,
r.tempDir,
r.metrics,
)
r.syncer = r.initSyncer()
r.mtx.Unlock()
defer func() {
r.mtx.Lock()
// reset syncing objects at the close of Sync
@@ -299,15 +352,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.mtx.Unlock()
}()
requestSnapshotsHook := func() error {
// request snapshots from all currently connected peers
return r.snapshotCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &ssproto.SnapshotsRequest{},
})
}
state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, requestSnapshotsHook)
state, commit, err := r.syncer.SyncAny(ctx, r.cfg.DiscoveryTime, r.requestSnaphot)
if err != nil {
return sm.State{}, err
}
@@ -434,7 +479,7 @@ func (r *Reactor) backfill(
r.logger.Info("backfill: fetched light block failed validate basic, removing peer...",
"err", err, "height", height)
queue.retry(height)
if serr := r.blockCh.SendError(ctx, p2p.PeerError{
if serr := r.sendBlockError(ctx, p2p.PeerError{
NodeID: peer,
Err: fmt.Errorf("received invalid light block: %w", err),
}); serr != nil {
@@ -471,7 +516,7 @@ func (r *Reactor) backfill(
if w, g := trustedBlockID.Hash, resp.block.Hash(); !bytes.Equal(w, g) {
r.logger.Info("received invalid light block. header hash doesn't match trusted LastBlockID",
"trustedHash", w, "receivedHash", g, "height", resp.block.Height)
if err := r.blockCh.SendError(ctx, p2p.PeerError{
if err := r.sendBlockError(ctx, p2p.PeerError{
NodeID: resp.peer,
Err: fmt.Errorf("received invalid light block. Expected hash %v, got: %v", w, g),
}); err != nil {
@@ -532,7 +577,7 @@ func (r *Reactor) backfill(
// handleSnapshotMessage handles envelopes sent from peers on the
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope) error {
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error {
logger := r.logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
@@ -551,7 +596,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
"peer", envelope.From,
)
if err := r.snapshotCh.Send(ctx, p2p.Envelope{
if err := snapshotCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
@@ -587,8 +632,8 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
"failed to add snapshot",
"height", msg.Height,
"format", msg.Format,
"channel", snapshotCh.ID,
"err", err,
"channel", r.snapshotCh.ID,
)
return nil
}
@@ -604,7 +649,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
// handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
// It returns an error only if the Envelope.Message is unknown for this channel.
// This should never be called outside of handleMessage.
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope) error {
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.ChunkRequest:
r.logger.Debug(
@@ -638,7 +683,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
"chunk", msg.Index,
"peer", envelope.From,
)
if err := r.chunkCh.Send(ctx, p2p.Envelope{
if err := chunkCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.ChunkResponse{
Height: msg.Height,
@@ -693,7 +738,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
return nil
}
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope) error {
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.LightBlockRequest:
r.logger.Info("received light block request", "height", msg.Height)
@@ -703,7 +748,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
return err
}
if lb == nil {
if err := r.blockCh.Send(ctx, p2p.Envelope{
if err := blockCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.LightBlockResponse{
LightBlock: nil,
@@ -722,7 +767,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
// NOTE: If we don't have the light block we will send a nil light block
// back to the requested node, indicating that we don't have it.
if err := r.blockCh.Send(ctx, p2p.Envelope{
if err := blockCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.LightBlockResponse{
LightBlock: lbproto,
@@ -750,7 +795,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
return nil
}
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope) error {
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error {
switch msg := envelope.Message.(type) {
case *ssproto.ParamsRequest:
r.logger.Debug("received consensus params request", "height", msg.Height)
@@ -761,7 +806,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
}
cpproto := cp.ToProto()
if err := r.paramsCh.Send(ctx, p2p.Envelope{
if err := paramsCh.Send(ctx, p2p.Envelope{
To: envelope.From,
Message: &ssproto.ParamsResponse{
Height: msg.Height,
@@ -780,6 +825,8 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
select {
case sp.paramsRecvCh <- cp:
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second):
return errors.New("failed to send consensus params, stateprovider not ready for response")
}
@@ -797,7 +844,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelope *p2p.Envelope) (err error) {
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing message: %v", e)
@@ -811,17 +858,17 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
r.logger.Debug("received message", "message", reflect.TypeOf(envelope.Message), "peer", envelope.From)
switch chID {
switch envelope.ChannelID {
case SnapshotChannel:
err = r.handleSnapshotMessage(ctx, envelope)
err = r.handleSnapshotMessage(ctx, envelope, chans[SnapshotChannel])
case ChunkChannel:
err = r.handleChunkMessage(ctx, envelope)
err = r.handleChunkMessage(ctx, envelope, chans[ChunkChannel])
case LightBlockChannel:
err = r.handleLightBlockMessage(ctx, envelope)
err = r.handleLightBlockMessage(ctx, envelope, chans[LightBlockChannel])
case ParamsChannel:
err = r.handleParamsMessage(ctx, envelope)
err = r.handleParamsMessage(ctx, envelope, chans[ParamsChannel])
default:
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope)
err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", envelope.ChannelID, envelope)
}
return err
@@ -831,15 +878,35 @@ func (r *Reactor) handleMessage(ctx context.Context, chID p2p.ChannelID, envelop
// encountered during message execution will result in a PeerError being sent on
// the respective channel. When the reactor is stopped, we will catch the signal
// and close the p2p Channel gracefully.
func (r *Reactor) processCh(ctx context.Context, ch *p2p.Channel, chName string) {
iter := ch.Receive(ctx)
func (r *Reactor) processChannels(ctx context.Context, chs ...*p2p.Channel) {
// make sure that the iterator gets cleaned up in case of error
ctx, cancel := context.WithCancel(ctx)
defer cancel()
chanTable := make(map[p2p.ChannelID]*p2p.Channel, len(chs))
for idx := range chs {
ch := chs[idx]
chanTable[ch.ID] = ch
}
iter := p2p.MergedChannelIterator(ctx, chs...)
for iter.Next(ctx) {
envelope := iter.Envelope()
if err := r.handleMessage(ctx, ch.ID, envelope); err != nil {
if err := r.handleMessage(ctx, envelope, chanTable); err != nil {
ch, ok := chanTable[envelope.ChannelID]
if !ok {
r.logger.Error("received impossible message",
"envelope_from", envelope.From,
"envelope_ch", envelope.ChannelID,
"num_chs", len(chanTable),
"err", err,
)
return
}
r.logger.Error("failed to process message",
"err", err,
"channel", chName,
"ch_id", ch.ID,
"channel", ch.String(),
"ch_id", envelope.ChannelID,
"envelope", envelope)
if serr := ch.SendError(ctx, p2p.PeerError{
NodeID: envelope.From,
@@ -875,14 +942,15 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
r.mtx.Lock()
defer r.mtx.Unlock()
if r.syncer == nil {
return
}
switch peerUpdate.Status {
case p2p.PeerStatusUp:
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
r.providers[peerUpdate.NodeID] = newProvider
err := r.syncer.AddPeer(ctx, peerUpdate.NodeID)
if err != nil {
@@ -905,12 +973,12 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
// processPeerUpdates initiates a blocking process where we listen for and handle
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
// close the p2p PeerUpdatesCh gracefully.
func (r *Reactor) processPeerUpdates(ctx context.Context) {
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates) {
for {
select {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
case peerUpdate := <-peerUpdates.Updates():
r.processPeerUpdate(ctx, peerUpdate)
}
}
@@ -1015,41 +1083,6 @@ func (r *Reactor) waitForEnoughPeers(ctx context.Context, numPeers int) error {
return nil
}
func (r *Reactor) initStateProvider(ctx context.Context, chainID string, initialHeight int64) error {
var err error
to := light.TrustOptions{
Period: r.cfg.TrustPeriod,
Height: r.cfg.TrustHeight,
Hash: r.cfg.TrustHashBytes(),
}
spLogger := r.logger.With("module", "stateprovider")
spLogger.Info("initializing state provider", "trustPeriod", to.Period,
"trustHeight", to.Height, "useP2P", r.cfg.UseP2P)
if r.cfg.UseP2P {
if err := r.waitForEnoughPeers(ctx, 2); err != nil {
return err
}
peers := r.peers.All()
providers := make([]provider.Provider, len(peers))
for idx, p := range peers {
providers[idx] = NewBlockProvider(p, chainID, r.dispatcher)
}
r.stateProvider, err = NewP2PStateProvider(ctx, chainID, initialHeight, providers, to, r.paramsCh, spLogger)
if err != nil {
return fmt.Errorf("failed to initialize P2P state provider: %w", err)
}
} else {
r.stateProvider, err = NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.RPCServers, to, spLogger)
if err != nil {
return fmt.Errorf("failed to initialize RPC state provider: %w", err)
}
}
return nil
}
func (r *Reactor) TotalSnapshots() int64 {
r.mtx.RLock()
defer r.mtx.RUnlock()

View File

@@ -154,8 +154,7 @@ func setup(
logger := log.NewNopLogger()
var err error
rts.reactor, err = NewReactor(
rts.reactor = NewReactor(
ctx,
factory.DefaultTestChainID,
1,
@@ -163,25 +162,26 @@ func setup(
logger.With("component", "reactor"),
conn,
chCreator,
rts.peerUpdates,
func(context.Context) *p2p.PeerUpdates { return rts.peerUpdates },
rts.stateStore,
rts.blockStore,
"",
m,
nil, // eventbus can be nil
)
require.NoError(t, err)
rts.syncer = newSyncer(
*cfg,
logger.With("component", "syncer"),
conn,
stateProvider,
rts.snapshotChannel,
rts.chunkChannel,
"",
rts.reactor.metrics,
)
rts.syncer = &syncer{
logger: logger,
stateProvider: stateProvider,
conn: conn,
snapshots: newSnapshotPool(),
snapshotCh: rts.snapshotChannel,
chunkCh: rts.chunkChannel,
tempDir: t.TempDir(),
fetchers: cfg.Fetchers,
retryTimeout: cfg.ChunkRequestTimeout,
metrics: rts.reactor.metrics,
}
ctx, cancel := context.WithCancel(ctx)
@@ -196,11 +196,11 @@ func setup(
}
func TestReactor_Sync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
const snapshotHeight = 7
rts := setup(ctx, t, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 100)
chain := buildLightBlockChain(ctx, t, 1, 10, time.Now())
// app accepts any snapshot
rts.conn.On("OfferSnapshot", ctx, mock.AnythingOfType("types.RequestOfferSnapshot")).
@@ -224,8 +224,7 @@ func TestReactor_Sync(t *testing.T) {
closeCh := make(chan struct{})
defer close(closeCh)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh,
rts.blockInCh, closeCh, 0)
go handleLightBlockRequests(ctx, t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
go graduallyAddPeers(ctx, t, rts.peerUpdateCh, closeCh, 1*time.Second)
go handleSnapshotRequests(ctx, t, rts.snapshotOutCh, rts.snapshotInCh, closeCh, []snapshot{
{
@@ -257,8 +256,9 @@ func TestReactor_ChunkRequest_InvalidRequest(t *testing.T) {
rts := setup(ctx, t, nil, nil, 2)
rts.chunkInCh <- p2p.Envelope{
From: types.NodeID("aa"),
Message: &ssproto.SnapshotsRequest{},
From: types.NodeID("aa"),
ChannelID: ChunkChannel,
Message: &ssproto.SnapshotsRequest{},
}
response := <-rts.chunkPeerErrCh
@@ -315,8 +315,9 @@ func TestReactor_ChunkRequest(t *testing.T) {
rts := setup(ctx, t, conn, nil, 2)
rts.chunkInCh <- p2p.Envelope{
From: types.NodeID("aa"),
Message: tc.request,
From: types.NodeID("aa"),
ChannelID: ChunkChannel,
Message: tc.request,
}
response := <-rts.chunkOutCh
@@ -335,8 +336,9 @@ func TestReactor_SnapshotsRequest_InvalidRequest(t *testing.T) {
rts := setup(ctx, t, nil, nil, 2)
rts.snapshotInCh <- p2p.Envelope{
From: types.NodeID("aa"),
Message: &ssproto.ChunkRequest{},
From: types.NodeID("aa"),
ChannelID: SnapshotChannel,
Message: &ssproto.ChunkRequest{},
}
response := <-rts.snapshotPeerErrCh
@@ -400,8 +402,9 @@ func TestReactor_SnapshotsRequest(t *testing.T) {
rts := setup(ctx, t, conn, nil, 100)
rts.snapshotInCh <- p2p.Envelope{
From: types.NodeID("aa"),
Message: &ssproto.SnapshotsRequest{},
From: types.NodeID("aa"),
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsRequest{},
}
if len(tc.expectResponses) > 0 {
@@ -457,7 +460,8 @@ func TestReactor_LightBlockResponse(t *testing.T) {
rts.stateStore.On("LoadValidators", height).Return(vals, nil)
rts.blockInCh <- p2p.Envelope{
From: types.NodeID("aa"),
From: types.NodeID("aa"),
ChannelID: LightBlockChannel,
Message: &ssproto.LightBlockRequest{
Height: 10,
},
@@ -733,7 +737,8 @@ func handleLightBlockRequests(
require.NoError(t, err)
select {
case sending <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: LightBlockChannel,
Message: &ssproto.LightBlockResponse{
LightBlock: lb,
},
@@ -750,7 +755,8 @@ func handleLightBlockRequests(
require.NoError(t, err)
select {
case sending <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: LightBlockChannel,
Message: &ssproto.LightBlockResponse{
LightBlock: differntLB,
},
@@ -761,7 +767,8 @@ func handleLightBlockRequests(
case 1: // send nil block i.e. pretend we don't have it
select {
case sending <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: LightBlockChannel,
Message: &ssproto.LightBlockResponse{
LightBlock: nil,
},
@@ -802,7 +809,8 @@ func handleConsensusParamsRequest(
}
select {
case sending <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: ParamsChannel,
Message: &ssproto.ParamsResponse{
Height: msg.Height,
ConsensusParams: paramsProto,
@@ -913,7 +921,8 @@ func handleSnapshotRequests(
require.True(t, ok)
for _, snapshot := range snapshots {
sendingCh <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: SnapshotChannel,
Message: &ssproto.SnapshotsResponse{
Height: snapshot.Height,
Format: snapshot.Format,
@@ -946,7 +955,8 @@ func handleChunkRequests(
msg, ok := envelope.Message.(*ssproto.ChunkRequest)
require.True(t, ok)
sendingCh <- p2p.Envelope{
From: envelope.To,
From: envelope.To,
ChannelID: ChunkChannel,
Message: &ssproto.ChunkResponse{
Height: msg.Height,
Format: msg.Format,

View File

@@ -10,7 +10,6 @@ import (
abciclient "github.com/tendermint/tendermint/abci/client"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/internal/proxy"
sm "github.com/tendermint/tendermint/internal/state"
@@ -72,31 +71,6 @@ type syncer struct {
processingSnapshot *snapshot
}
// newSyncer creates a new syncer.
func newSyncer(
cfg config.StateSyncConfig,
logger log.Logger,
conn abciclient.Client,
stateProvider StateProvider,
snapshotCh *p2p.Channel,
chunkCh *p2p.Channel,
tempDir string,
metrics *Metrics,
) *syncer {
return &syncer{
logger: logger,
stateProvider: stateProvider,
conn: conn,
snapshots: newSnapshotPool(),
snapshotCh: snapshotCh,
chunkCh: chunkCh,
tempDir: tempDir,
fetchers: cfg.Fetchers,
retryTimeout: cfg.ChunkRequestTimeout,
metrics: metrics,
}
}
// AddChunk adds a chunk to the chunk queue, if any. It returns false if the chunk has already
// been added to the queue, or an error if there's no sync in progress.
func (s *syncer) AddChunk(chunk *chunk) (bool, error) {

View File

@@ -1,3 +1,3 @@
# Local Cluster with Docker Compose
See the [docs](https://docs.tendermint.com/master/networks/docker-compose.html).
See the [docs](https://docs.tendermint.com/master/tools/docker-compose.html).

View File

@@ -88,12 +88,11 @@ func newDefaultNode(
}
if cfg.Mode == config.ModeSeed {
return makeSeedNode(
ctx,
logger,
cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(cfg),
logger,
)
}
pval, err := makeDefaultPrivval(cfg)
@@ -214,13 +213,6 @@ func makeNode(
}
}
// Determine whether we should attempt state sync.
stateSync := cfg.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}
// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
if err := consensus.NewHandshaker(
@@ -244,7 +236,7 @@ func makeNode(
// TODO: Fetch and provide real options and do proper p2p bootstrapping.
// TODO: Use a persistent peer database.
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state)
nodeInfo, err := makeNodeInfo(cfg, nodeKey, eventSinks, genDoc, state.Version.Consensus)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
@@ -257,28 +249,67 @@ func makeNode(
makeCloser(closers))
}
router, err := createRouter(ctx, logger, nodeMetrics.p2p, nodeInfo, nodeKey,
peerManager, cfg, proxyApp)
// TODO construct node here:
node := &nodeImpl{
config: cfg,
logger: logger,
genesisDoc: genDoc,
privValidator: privValidator,
peerManager: peerManager,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
eventSinks: eventSinks,
services: []service.Service{eventBus},
stateStore: stateStore,
blockStore: blockStore,
shutdownOps: makeCloser(closers),
rpcEnv: &rpccore.Environment{
ProxyApp: proxyApp,
StateStore: stateStore,
BlockStore: blockStore,
PeerManager: peerManager,
GenDoc: genDoc,
EventSinks: eventSinks,
EventBus: eventBus,
EventLog: eventLog,
Logger: logger.With("module", "rpc"),
Config: *cfg.RPC,
},
}
node.router, err = createRouter(logger, nodeMetrics.p2p, node.NodeInfo, nodeKey, peerManager, cfg, proxyApp)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
makeCloser(closers))
}
mpReactor, mp, err := createMempoolReactor(ctx,
cfg, proxyApp, stateStore, nodeMetrics.mempool, peerManager, router, logger,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
evReactor, evPool, edbCloser, err := createEvidenceReactor(ctx,
cfg, dbProvider, stateStore, blockStore, peerManager, router, logger, nodeMetrics.evidence, eventBus,
)
evReactor, evPool, edbCloser, err := createEvidenceReactor(logger, cfg, dbProvider,
stateStore, blockStore, peerManager.Subscribe, node.router.OpenChannel, nodeMetrics.evidence, eventBus)
closers = append(closers, edbCloser)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
node.services = append(node.services, evReactor)
node.rpcEnv.EvidencePool = evPool
node.evPool = evPool
mpReactor, mp, err := createMempoolReactor(logger, cfg, proxyApp, stateStore, nodeMetrics.mempool,
peerManager.Subscribe, node.router.OpenChannel, peerManager.GetHeight)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
node.rpcEnv.Mempool = mp
node.services = append(node.services, mpReactor)
// make block executor for consensus and blockchain reactors to execute blocks
blockExec := sm.NewBlockExecutor(
@@ -292,38 +323,46 @@ func makeNode(
nodeMetrics.state,
)
// Determine whether we should attempt state sync.
stateSync := cfg.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}
// Determine whether we should do block sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
blockSync := !onlyValidatorIsUs(state, pubKey)
waitSync := stateSync || blockSync
csReactor, csState, err := createConsensusReactor(ctx,
cfg, stateStore, blockExec, blockStore, mp, evPool,
privValidator, nodeMetrics.consensus, stateSync || blockSync, eventBus,
peerManager, router, logger,
privValidator, nodeMetrics.consensus, waitSync, eventBus,
peerManager, node.router.OpenChannel, logger,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
node.services = append(node.services, csReactor)
node.rpcEnv.ConsensusState = csState
node.rpcEnv.ConsensusReactor = csReactor
// Create the blockchain reactor. Note, we do not start block sync if we're
// doing a state sync first.
bcReactor, err := blocksync.NewReactor(ctx,
bcReactor := blocksync.NewReactor(
logger.With("module", "blockchain"),
stateStore,
blockExec,
blockStore,
csReactor,
router.OpenChannel,
peerManager.Subscribe(ctx),
node.router.OpenChannel,
peerManager.Subscribe,
blockSync && !stateSync,
nodeMetrics.consensus,
eventBus,
)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("could not create blocksync reactor: %w", err),
makeCloser(closers))
}
node.services = append(node.services, bcReactor)
node.rpcEnv.BlockSyncReactor = bcReactor
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or block sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
@@ -337,83 +376,25 @@ func makeNode(
// FIXME The way we do phased startups (e.g. replay -> block sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
stateSyncReactor, err := statesync.NewReactor(
node.stateSync = stateSync
node.stateSyncReactor = statesync.NewReactor(
ctx,
genDoc.ChainID,
genDoc.InitialHeight,
*cfg.StateSync,
logger.With("module", "statesync"),
proxyApp,
router.OpenChannel,
peerManager.Subscribe(ctx),
node.router.OpenChannel,
peerManager.Subscribe,
stateStore,
blockStore,
cfg.StateSync.TempDir,
nodeMetrics.statesync,
eventBus,
)
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
var pexReactor service.Service = service.NopService{}
if cfg.P2P.PexReactor {
pexReactor, err = pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil {
return nil, combineCloseError(err, makeCloser(closers))
}
}
node := &nodeImpl{
config: cfg,
logger: logger,
genesisDoc: genDoc,
privValidator: privValidator,
peerManager: peerManager,
router: router,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
eventSinks: eventSinks,
services: []service.Service{
eventBus,
evReactor,
mpReactor,
csReactor,
bcReactor,
pexReactor,
},
stateStore: stateStore,
blockStore: blockStore,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
evPool: evPool,
shutdownOps: makeCloser(closers),
rpcEnv: &rpccore.Environment{
ProxyApp: proxyApp,
EvidencePool: evPool,
ConsensusState: csState,
StateStore: stateStore,
BlockStore: blockStore,
ConsensusReactor: csReactor,
BlockSyncReactor: bcReactor,
PeerManager: peerManager,
GenDoc: genDoc,
EventSinks: eventSinks,
EventBus: eventBus,
EventLog: eventLog,
Mempool: mp,
Logger: logger.With("module", "rpc"),
Config: *cfg.RPC,
},
node.services = append(node.services, pex.NewReactor(logger, peerManager, node.router.OpenChannel, peerManager.Subscribe))
}
if cfg.Mode == config.ModeValidator {
@@ -658,6 +639,10 @@ func (n *nodeImpl) startPrometheusServer(ctx context.Context, addr string) *http
return srv
}
func (n *nodeImpl) NodeInfo() *types.NodeInfo {
return &n.nodeInfo
}
// EventBus returns the Node's EventBus.
func (n *nodeImpl) EventBus() *eventbus.EventBus {
return n.rpcEnv.EventBus

View File

@@ -581,12 +581,12 @@ func TestNodeNewSeedNode(t *testing.T) {
logger := log.NewNopLogger()
ns, err := makeSeedNode(ctx,
ns, err := makeSeedNode(
logger,
cfg,
config.DefaultDBProvider,
nodeKey,
defaultGenesisDocProviderFunc(cfg),
logger,
)
t.Cleanup(ns.Wait)
t.Cleanup(leaktest.CheckTimeout(t, time.Second))

View File

@@ -68,7 +68,7 @@ func New(
config.DefaultDBProvider,
logger)
case config.ModeSeed:
return makeSeedNode(ctx, conf, config.DefaultDBProvider, nodeKey, genProvider, logger)
return makeSeedNode(logger, conf, config.DefaultDBProvider, nodeKey, genProvider)
default:
return nil, fmt.Errorf("%q is not a valid mode", conf.Mode)
}

View File

@@ -29,7 +29,6 @@ type seedNodeImpl struct {
// network
peerManager *p2p.PeerManager
router *p2p.Router
nodeInfo types.NodeInfo
nodeKey types.NodeKey // our node privkey
isListening bool
@@ -40,12 +39,11 @@ type seedNodeImpl struct {
// makeSeedNode returns a new seed node, containing only p2p, pex reactor
func makeSeedNode(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
nodeKey types.NodeKey,
genesisDocProvider genesisDocProvider,
logger log.Logger,
) (service.Service, error) {
if !cfg.P2P.PexReactor {
return nil, errors.New("cannot run seed nodes with PEX disabled")
@@ -76,32 +74,25 @@ func makeSeedNode(
closer)
}
router, err := createRouter(ctx, logger, p2pMetrics, nodeInfo, nodeKey,
peerManager, cfg, nil)
router, err := createRouter(logger, p2pMetrics, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, nil)
if err != nil {
return nil, combineCloseError(
fmt.Errorf("failed to create router: %w", err),
closer)
}
pexReactor, err := pex.NewReactor(ctx, logger, peerManager, router.OpenChannel, peerManager.Subscribe(ctx))
if err != nil {
return nil, combineCloseError(err, closer)
}
node := &seedNodeImpl{
config: cfg,
logger: logger,
genesisDoc: genDoc,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
peerManager: peerManager,
router: router,
shutdownOps: closer,
pexReactor: pexReactor,
pexReactor: pex.NewReactor(logger, peerManager, router.OpenChannel, peerManager.Subscribe),
}
node.BaseService = *service.NewBaseService(logger, "SeedNode", node)

View File

@@ -169,14 +169,14 @@ func onlyValidatorIsUs(state sm.State, pubKey crypto.PubKey) bool {
}
func createMempoolReactor(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
appClient abciclient.Client,
store sm.Store,
memplMetrics *mempool.Metrics,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
peerEvents p2p.PeerEventSubscriber,
chCreator p2p.ChannelCreator,
peerHeight func(types.NodeID) int64,
) (service.Service, mempool.Mempool, error) {
logger = logger.With("module", "mempool")
@@ -189,18 +189,14 @@ func createMempoolReactor(
mempool.WithPostCheck(sm.TxPostCheckFromStore(store)),
)
reactor, err := mempool.NewReactor(
ctx,
reactor := mempool.NewReactor(
logger,
cfg.Mempool,
peerManager,
mp,
router.OpenChannel,
peerManager.Subscribe(ctx),
chCreator,
peerEvents,
peerHeight,
)
if err != nil {
return nil, nil, err
}
if cfg.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
@@ -210,14 +206,13 @@ func createMempoolReactor(
}
func createEvidenceReactor(
ctx context.Context,
logger log.Logger,
cfg *config.Config,
dbProvider config.DBProvider,
store sm.Store,
blockStore *store.BlockStore,
peerManager *p2p.PeerManager,
router *p2p.Router,
logger log.Logger,
peerEvents p2p.PeerEventSubscriber,
chCreator p2p.ChannelCreator,
metrics *evidence.Metrics,
eventBus *eventbus.EventBus,
) (*evidence.Reactor, *evidence.Pool, closer, error) {
@@ -231,16 +226,12 @@ func createEvidenceReactor(
evidencePool := evidence.NewPool(logger, evidenceDB, store, blockStore, metrics, eventBus)
evidenceReactor, err := evidence.NewReactor(
ctx,
evidenceReactor := evidence.NewReactor(
logger,
router.OpenChannel,
peerManager.Subscribe(ctx),
chCreator,
peerEvents,
evidencePool,
)
if err != nil {
return nil, nil, dbCloser, fmt.Errorf("creating evidence reactor: %w", err)
}
return evidenceReactor, evidencePool, dbCloser, nil
}
@@ -258,7 +249,7 @@ func createConsensusReactor(
waitSync bool,
eventBus *eventbus.EventBus,
peerManager *p2p.PeerManager,
router *p2p.Router,
chCreator p2p.ChannelCreator,
logger log.Logger,
) (*consensus.Reactor, *consensus.State, error) {
logger = logger.With("module", "consensus")
@@ -282,20 +273,15 @@ func createConsensusReactor(
consensusState.SetPrivValidator(ctx, privValidator)
}
reactor, err := consensus.NewReactor(
ctx,
reactor := consensus.NewReactor(
logger,
consensusState,
router.OpenChannel,
peerManager.Subscribe(ctx),
chCreator,
peerManager.Subscribe,
eventBus,
waitSync,
csMetrics,
)
if err != nil {
return nil, nil, err
}
return reactor, consensusState, nil
}
@@ -375,10 +361,9 @@ func createPeerManager(
}
func createRouter(
ctx context.Context,
logger log.Logger,
p2pMetrics *p2p.Metrics,
nodeInfo types.NodeInfo,
nodeInfoProducer func() *types.NodeInfo,
nodeKey types.NodeKey,
peerManager *p2p.PeerManager,
cfg *config.Config,
@@ -405,12 +390,11 @@ func createRouter(
}
return p2p.NewRouter(
ctx,
p2pLogger,
p2pMetrics,
nodeInfo,
nodeKey.PrivKey,
peerManager,
nodeInfoProducer,
[]p2p.Transport{transport},
[]p2p.Endpoint{ep},
getRouterConfig(cfg, appClient),
@@ -422,7 +406,7 @@ func makeNodeInfo(
nodeKey types.NodeKey,
eventSinks []indexer.EventSink,
genDoc *types.GenesisDoc,
state sm.State,
versionInfo version.Consensus,
) (types.NodeInfo, error) {
txIndexerStatus := "off"
@@ -434,8 +418,8 @@ func makeNodeInfo(
nodeInfo := types.NodeInfo{
ProtocolVersion: types.ProtocolVersion{
P2P: version.P2PProtocol, // global
Block: state.Version.Consensus.Block,
App: state.Version.Consensus.App,
Block: versionInfo.Block,
App: versionInfo.App,
},
NodeID: nodeKey.ID,
Network: genDoc.ChainID,

View File

@@ -411,6 +411,29 @@ func (_m *Client) DumpConsensusState(_a0 context.Context) (*coretypes.ResultDump
return r0, r1
}
// Events provides a mock function with given fields: ctx, req
func (_m *Client) Events(ctx context.Context, req *coretypes.RequestEvents) (*coretypes.ResultEvents, error) {
ret := _m.Called(ctx, req)
var r0 *coretypes.ResultEvents
if rf, ok := ret.Get(0).(func(context.Context, *coretypes.RequestEvents) *coretypes.ResultEvents); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*coretypes.ResultEvents)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *coretypes.RequestEvents) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Genesis provides a mock function with given fields: _a0
func (_m *Client) Genesis(_a0 context.Context) (*coretypes.ResultGenesis, error) {
ret := _m.Called(_a0)

View File

@@ -26,7 +26,7 @@ PexResponse is an list of net addresses provided to a peer to dial.
| Name | Type | Description | Field Number |
|-------|------------------------------------|------------------------------------------|--------------|
| addresses | repeated [PexAddress](#PexAddress) | List of peer addresses available to dial | 1 |
| addresses | repeated [PexAddress](#pexaddress) | List of peer addresses available to dial | 1 |
### PexAddress
@@ -41,7 +41,7 @@ into a `NodeAddress`. See [ParseNodeAddress](https://github.com/tendermint/tende
Message is a [`oneof` protobuf type](https://developers.google.com/protocol-buffers/docs/proto#oneof). The one of consists of two messages.
| Name | Type | Description | Field Number |
|--------------|---------------------------|------------------------------------------------------|--------------|
| pex_request | [PexRequest](#PexRequest) | Empty request asking for a list of addresses to dial | 3 |
| pex_response | [PexResponse](#PexResponse) | List of addresses to dial | 4 |
| Name | Type | Description | Field Number |
|--------------|-----------------------------|------------------------------------------------------|--------------|
| pex_request | [PexRequest](#pexrequest) | Empty request asking for a list of addresses to dial | 3 |
| pex_response | [PexResponse](#pexresponse) | List of addresses to dial | 4 |

View File

@@ -11,7 +11,7 @@ This creates and runs a testnet named `ci` under `networks/ci/`.
## Conceptual Overview
End-to-end testnets are used to test Tendermint functionality as a user would use it, by spinning up a set of nodes with various configurations and making sure the nodes and network behave correctly. The background for the E2E test suite is outlined in [RFC-001](https://github.com/tendermint/tendermint/blob/master/docs/rfc/rfc-001-end-to-end-testing.md).
End-to-end testnets are used to test Tendermint functionality as a user would use it, by spinning up a set of nodes with various configurations and making sure the nodes and network behave correctly. The background for the E2E test suite is outlined in [RFC-001](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-066-e2e-testing.md).
The end-to-end tests can be thought of in this manner:
@@ -180,4 +180,4 @@ tendermint start
./build/node ./node.socket.toml
```
Check `node/config.go` to see how the settings of the test application can be tweaked.
Check `node/config.go` to see how the settings of the test application can be tweaked.