mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-15 09:12:50 +00:00
Compare commits
42 Commits
jae/fix
...
main-libp2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37cce2542a | ||
|
|
1cc4509029 | ||
|
|
9456b7975f | ||
|
|
2994fe00e4 | ||
|
|
86cf79a9d9 | ||
|
|
fff97326a1 | ||
|
|
6dd0e8d83a | ||
|
|
2cecf7f9c7 | ||
|
|
4452949c81 | ||
|
|
ffa7af3c36 | ||
|
|
8e9614d625 | ||
|
|
9db41016ab | ||
|
|
b43336b706 | ||
|
|
b7ccee6240 | ||
|
|
d30a1821cc | ||
|
|
f1c9a56d57 | ||
|
|
2bc4f12aae | ||
|
|
0d844825f1 | ||
|
|
379136a55c | ||
|
|
229ec9dd6e | ||
|
|
1e2fd769c2 | ||
|
|
6e16cd1ff8 | ||
|
|
03879849d9 | ||
|
|
82f66026a5 | ||
|
|
9f94161163 | ||
|
|
e7ed0720fd | ||
|
|
83239d2b06 | ||
|
|
516ff45392 | ||
|
|
4868bb48f5 | ||
|
|
4844af2b8d | ||
|
|
72405b450f | ||
|
|
99b862228f | ||
|
|
385ad1e1e3 | ||
|
|
251e41895f | ||
|
|
0dda3ff88b | ||
|
|
be501981e7 | ||
|
|
d5b9bc4c18 | ||
|
|
4859631a6a | ||
|
|
4da31ca2c8 | ||
|
|
e41ba263be | ||
|
|
da277baa69 | ||
|
|
19ba3c6375 |
@@ -662,6 +662,11 @@ type P2PConfig struct { //nolint: maligned
|
||||
// layer uses. Options are: "fifo" and "priority",
|
||||
// with the default being "priority".
|
||||
QueueType string `mapstructure:"queue-type"`
|
||||
|
||||
// UseLibP2P switches to using the new networking layer based
|
||||
// on libp2p. This option is unlikely to persist into a
|
||||
// release of tendermint, but will ease the transition.
|
||||
UseLibP2P bool `mapstructure:"experimental-use-lib-p2p"`
|
||||
}
|
||||
|
||||
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer
|
||||
|
||||
44
go.mod
44
go.mod
@@ -67,6 +67,7 @@ require (
|
||||
github.com/bombsimon/wsl/v3 v3.3.0 // indirect
|
||||
github.com/breml/bidichk v0.2.3 // indirect
|
||||
github.com/breml/errchkjson v0.3.0 // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
|
||||
github.com/butuzov/ireturn v0.1.1 // indirect
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
|
||||
github.com/cespare/xxhash v1.1.0 // indirect
|
||||
@@ -77,6 +78,7 @@ require (
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
||||
github.com/daixiang0/gci v0.3.3 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
|
||||
github.com/denis-tingaikin/go-header v0.4.3 // indirect
|
||||
github.com/dgraph-io/badger/v2 v2.2007.2 // indirect
|
||||
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de // indirect
|
||||
@@ -118,6 +120,7 @@ require (
|
||||
github.com/golangci/revgrep v0.0.0-20210930125155-c22e5001d4f2 // indirect
|
||||
github.com/golangci/unconvert v0.0.0-20180507085042-28b1c447d1f4 // indirect
|
||||
github.com/google/btree v1.0.0 // indirect
|
||||
github.com/google/gopacket v1.1.19 // indirect
|
||||
github.com/gordonklaus/ineffassign v0.0.0-20210914165742-4cc7213b9bc8 // indirect
|
||||
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
|
||||
github.com/gostaticanalysis/comment v1.4.2 // indirect
|
||||
@@ -127,9 +130,16 @@ require (
|
||||
github.com/hashicorp/errwrap v1.0.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/hashicorp/go-version v1.4.0 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/hexops/gotextdiff v1.0.3 // indirect
|
||||
github.com/huin/goupnp v1.0.0 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/ipfs/go-cid v0.2.0 // indirect
|
||||
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
|
||||
github.com/ipfs/go-log v1.0.5 // indirect
|
||||
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||
github.com/jdxcode/netrc v0.0.0-20210204082910-926c7f70242a // indirect
|
||||
github.com/jgautheron/goconst v1.5.1 // indirect
|
||||
github.com/jhump/protocompile v0.0.0-20220216033700-d705409f108f // indirect
|
||||
@@ -141,13 +151,27 @@ require (
|
||||
github.com/kisielk/errcheck v1.6.0 // indirect
|
||||
github.com/kisielk/gotool v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.15.1 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
|
||||
github.com/klauspost/pgzip v1.2.5 // indirect
|
||||
github.com/koron/go-ssdp v0.0.0-20191105050749-2e1c40ed0b5d // indirect
|
||||
github.com/kulti/thelper v0.6.2 // indirect
|
||||
github.com/kunwardeep/paralleltest v1.0.3 // indirect
|
||||
github.com/kyoh86/exportloopref v0.1.8 // indirect
|
||||
github.com/ldez/gomoddirectives v0.2.3 // indirect
|
||||
github.com/ldez/tagliatelle v0.3.1 // indirect
|
||||
github.com/leonklingele/grouper v1.1.0 // indirect
|
||||
github.com/libp2p/go-cidranger v1.1.0 // indirect
|
||||
github.com/libp2p/go-eventbus v0.2.1 // indirect
|
||||
github.com/libp2p/go-libp2p v0.20.3 // indirect
|
||||
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
|
||||
github.com/libp2p/go-libp2p-core v0.19.1 // indirect
|
||||
github.com/libp2p/go-libp2p-discovery v0.6.0 // indirect
|
||||
github.com/libp2p/go-libp2p-peerstore v0.6.0 // indirect
|
||||
github.com/libp2p/go-libp2p-pubsub v0.7.1 // indirect
|
||||
github.com/libp2p/go-msgio v0.2.0 // indirect
|
||||
github.com/libp2p/go-nat v0.1.0 // indirect
|
||||
github.com/libp2p/go-netroute v0.2.0 // indirect
|
||||
github.com/libp2p/go-openssl v0.0.7 // indirect
|
||||
github.com/lufeee/execinquery v1.0.0 // indirect
|
||||
github.com/magiconair/properties v1.8.6 // indirect
|
||||
github.com/maratori/testpackage v1.0.1 // indirect
|
||||
@@ -158,9 +182,23 @@ require (
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/mbilski/exhaustivestruct v1.2.0 // indirect
|
||||
github.com/mgechev/revive v1.2.1 // indirect
|
||||
github.com/miekg/dns v1.1.43 // indirect
|
||||
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
|
||||
github.com/minio/sha256-simd v1.0.0 // indirect
|
||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/moricho/tparallel v0.2.1 // indirect
|
||||
github.com/mr-tron/base58 v1.2.0 // indirect
|
||||
github.com/multiformats/go-base32 v0.0.3 // indirect
|
||||
github.com/multiformats/go-base36 v0.1.0 // indirect
|
||||
github.com/multiformats/go-multiaddr v0.5.0 // indirect
|
||||
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
|
||||
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
|
||||
github.com/multiformats/go-multibase v0.0.3 // indirect
|
||||
github.com/multiformats/go-multicodec v0.4.1 // indirect
|
||||
github.com/multiformats/go-multihash v0.1.0 // indirect
|
||||
github.com/multiformats/go-multistream v0.3.3 // indirect
|
||||
github.com/multiformats/go-varint v0.0.6 // indirect
|
||||
github.com/nakabonne/nestif v0.3.1 // indirect
|
||||
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354 // indirect
|
||||
github.com/nishanths/exhaustive v0.7.11 // indirect
|
||||
@@ -169,6 +207,7 @@ require (
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.0.2 // indirect
|
||||
github.com/opencontainers/runc v1.1.3 // indirect
|
||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||
github.com/pelletier/go-toml v1.9.5 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.0.2 // indirect
|
||||
github.com/phayes/checkstyle v0.0.0-20170904204023-bfd46e6a821d // indirect
|
||||
@@ -193,6 +232,8 @@ require (
|
||||
github.com/sivchari/tenv v1.5.0 // indirect
|
||||
github.com/sonatard/noctx v0.0.1 // indirect
|
||||
github.com/sourcegraph/go-diff v0.6.1 // indirect
|
||||
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/spf13/afero v1.8.2 // indirect
|
||||
github.com/spf13/cast v1.5.0 // indirect
|
||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
||||
@@ -208,9 +249,11 @@ require (
|
||||
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 // indirect
|
||||
github.com/tomarrell/wrapcheck/v2 v2.6.1 // indirect
|
||||
github.com/tommy-muehle/go-mnd/v2 v2.5.0 // indirect
|
||||
github.com/tychoish/emt v0.1.0 // indirect
|
||||
github.com/ultraware/funlen v0.0.3 // indirect
|
||||
github.com/ultraware/whitespace v0.0.5 // indirect
|
||||
github.com/uudashr/gocognit v1.0.5 // indirect
|
||||
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
|
||||
github.com/yagipy/maintidx v1.0.0 // indirect
|
||||
github.com/yeya24/promlinter v0.2.0 // indirect
|
||||
gitlab.com/bosi/decorder v0.2.1 // indirect
|
||||
@@ -231,6 +274,7 @@ require (
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
honnef.co/go/tools v0.3.1 // indirect
|
||||
lukechampine.com/blake3 v1.1.6 // indirect
|
||||
mvdan.cc/gofumpt v0.3.1 // indirect
|
||||
mvdan.cc/interfacer v0.0.0-20180901003855-c20040233aed // indirect
|
||||
mvdan.cc/lint v0.0.0-20170908181259-adc824a0674b // indirect
|
||||
|
||||
@@ -135,7 +135,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (*p2p.Channel, error) { return blockSyncCh, nil }
|
||||
r.chCreator = func(context.Context, *conn.ChannelDescriptor) (p2p.Channel, error) { return blockSyncCh, nil }
|
||||
|
||||
state, err := r.stateStore.Load()
|
||||
if err != nil {
|
||||
@@ -183,7 +183,7 @@ func (r *Reactor) OnStop() {
|
||||
|
||||
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
|
||||
// Otherwise, we'll respond saying we do not have it.
|
||||
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh *p2p.Channel) error {
|
||||
func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest, peerID types.NodeID, blockSyncCh p2p.Channel) error {
|
||||
block := r.store.LoadBlock(msg.Height)
|
||||
if block == nil {
|
||||
r.logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
|
||||
@@ -223,7 +223,7 @@ func (r *Reactor) respondToPeer(ctx context.Context, msg *bcproto.BlockRequest,
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh *p2p.Channel) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blockSyncCh p2p.Channel) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
@@ -298,7 +298,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo
|
||||
// message execution will result in a PeerError being sent on the BlockSyncChannel.
|
||||
// When the reactor is stopped, we will catch the signal and close the p2p Channel
|
||||
// gracefully.
|
||||
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh p2p.Channel) {
|
||||
iter := blockSyncCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -319,7 +319,7 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context, blockSyncCh *p2p.Chann
|
||||
}
|
||||
|
||||
// processPeerUpdate processes a PeerUpdate.
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, blockSyncCh p2p.Channel) {
|
||||
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
|
||||
@@ -354,7 +354,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, blockSyncCh p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -396,7 +396,7 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh p2p.Channel) {
|
||||
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
|
||||
defer statusUpdateTicker.Stop()
|
||||
|
||||
@@ -438,7 +438,7 @@ func (r *Reactor) requestRoutine(ctx context.Context, blockSyncCh *p2p.Channel)
|
||||
// do.
|
||||
//
|
||||
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
|
||||
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh *p2p.Channel) {
|
||||
func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh p2p.Channel) {
|
||||
var (
|
||||
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
|
||||
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
|
||||
|
||||
@@ -37,7 +37,7 @@ type reactorTestSuite struct {
|
||||
reactors map[types.NodeID]*Reactor
|
||||
app map[types.NodeID]abciclient.Client
|
||||
|
||||
blockSyncChannels map[types.NodeID]*p2p.Channel
|
||||
blockSyncChannels map[types.NodeID]p2p.Channel
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
||||
}
|
||||
@@ -64,7 +64,7 @@ func setup(
|
||||
nodes: make([]types.NodeID, 0, numNodes),
|
||||
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
||||
app: make(map[types.NodeID]abciclient.Client, numNodes),
|
||||
blockSyncChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
|
||||
blockSyncChannels: make(map[types.NodeID]p2p.Channel, numNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
|
||||
}
|
||||
@@ -177,7 +177,7 @@ func (rts *reactorTestSuite) addNode(
|
||||
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.blockSyncChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -107,7 +107,7 @@ func invalidDoPrevoteFunc(
|
||||
round int32,
|
||||
cs *State,
|
||||
r *Reactor,
|
||||
voteCh *p2p.Channel,
|
||||
voteCh p2p.Channel,
|
||||
pv types.PrivValidator,
|
||||
) {
|
||||
// routine to:
|
||||
|
||||
@@ -165,10 +165,10 @@ func NewReactor(
|
||||
}
|
||||
|
||||
type channelBundle struct {
|
||||
state *p2p.Channel
|
||||
data *p2p.Channel
|
||||
vote *p2p.Channel
|
||||
votSet *p2p.Channel
|
||||
state p2p.Channel
|
||||
data p2p.Channel
|
||||
vote p2p.Channel
|
||||
votSet p2p.Channel
|
||||
}
|
||||
|
||||
// OnStart starts separate go routines for each p2p Channel and listens for
|
||||
@@ -310,14 +310,14 @@ func (r *Reactor) GetPeerState(peerID types.NodeID) (*PeerState, bool) {
|
||||
return ps, ok
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) broadcastNewRoundStepMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error {
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
Broadcast: true,
|
||||
Message: makeRoundStepMessage(rs),
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes.RoundState, stateCh p2p.Channel) error {
|
||||
psHeader := rs.ProposalBlockParts.Header()
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
Broadcast: true,
|
||||
@@ -331,7 +331,7 @@ func (r *Reactor) broadcastNewValidBlockMessage(ctx context.Context, rs *cstypes
|
||||
})
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote, stateCh p2p.Channel) error {
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
Broadcast: true,
|
||||
Message: &tmcons.HasVote{
|
||||
@@ -346,7 +346,7 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote,
|
||||
// subscribeToBroadcastEvents subscribes for new round steps and votes using the
|
||||
// internal pubsub defined in the consensus state to broadcast them to peers
|
||||
// upon receiving.
|
||||
func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh *p2p.Channel) {
|
||||
func (r *Reactor) subscribeToBroadcastEvents(ctx context.Context, stateCh p2p.Channel) {
|
||||
onStopCh := r.state.getOnStopCh()
|
||||
|
||||
err := r.state.evsw.AddListenerForEvent(
|
||||
@@ -403,7 +403,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) *tmcons.NewRoundStep {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh *p2p.Channel) error {
|
||||
func (r *Reactor) sendNewRoundStepMessage(ctx context.Context, peerID types.NodeID, stateCh p2p.Channel) error {
|
||||
return stateCh.Send(ctx, p2p.Envelope{
|
||||
To: peerID,
|
||||
Message: makeRoundStepMessage(r.getRoundState()),
|
||||
@@ -433,7 +433,7 @@ func (r *Reactor) getRoundState() *cstypes.RoundState {
|
||||
return r.rs
|
||||
}
|
||||
|
||||
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh *p2p.Channel) {
|
||||
func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, dataCh p2p.Channel) {
|
||||
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
|
||||
|
||||
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
|
||||
@@ -497,7 +497,7 @@ func (r *Reactor) gossipDataForCatchup(ctx context.Context, rs *cstypes.RoundSta
|
||||
time.Sleep(r.state.config.PeerGossipSleepDuration)
|
||||
}
|
||||
|
||||
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh *p2p.Channel) {
|
||||
func (r *Reactor) gossipDataRoutine(ctx context.Context, ps *PeerState, dataCh p2p.Channel) {
|
||||
logger := r.logger.With("peer", ps.peerID)
|
||||
|
||||
timer := time.NewTimer(0)
|
||||
@@ -632,7 +632,7 @@ OUTER_LOOP:
|
||||
|
||||
// pickSendVote picks a vote and sends it to the peer. It will return true if
|
||||
// there is a vote to send and false otherwise.
|
||||
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh *p2p.Channel) (bool, error) {
|
||||
func (r *Reactor) pickSendVote(ctx context.Context, ps *PeerState, votes types.VoteSetReader, voteCh p2p.Channel) (bool, error) {
|
||||
vote, ok := ps.PickVoteToSend(votes)
|
||||
if !ok {
|
||||
return false, nil
|
||||
@@ -660,7 +660,7 @@ func (r *Reactor) gossipVotesForHeight(
|
||||
rs *cstypes.RoundState,
|
||||
prs *cstypes.PeerRoundState,
|
||||
ps *PeerState,
|
||||
voteCh *p2p.Channel,
|
||||
voteCh p2p.Channel,
|
||||
) (bool, error) {
|
||||
logger := r.logger.With("height", prs.Height).With("peer", ps.peerID)
|
||||
|
||||
@@ -732,7 +732,7 @@ func (r *Reactor) gossipVotesForHeight(
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh *p2p.Channel) {
|
||||
func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh p2p.Channel) {
|
||||
logger := r.logger.With("peer", ps.peerID)
|
||||
|
||||
timer := time.NewTimer(0)
|
||||
@@ -804,7 +804,7 @@ func (r *Reactor) gossipVotesRoutine(ctx context.Context, ps *PeerState, voteCh
|
||||
|
||||
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
|
||||
// into play for liveness when there's a signature DDoS attack happening.
|
||||
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh *p2p.Channel) {
|
||||
func (r *Reactor) queryMaj23Routine(ctx context.Context, ps *PeerState, stateCh p2p.Channel) {
|
||||
timer := time.NewTimer(0)
|
||||
defer timer.Stop()
|
||||
|
||||
@@ -1015,7 +1015,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// If we fail to find the peer state for the envelope sender, we perform a no-op
|
||||
// and return. This can happen when we process the envelope after the peer is
|
||||
// removed.
|
||||
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleStateMessage(ctx context.Context, envelope *p2p.Envelope, msgI Message, voteSetCh p2p.Channel) error {
|
||||
ps, ok := r.GetPeerState(envelope.From)
|
||||
if !ok || ps == nil {
|
||||
r.logger.Debug("failed to find peer state", "peer", envelope.From, "ch_id", "StateChannel")
|
||||
|
||||
@@ -46,10 +46,10 @@ type reactorTestSuite struct {
|
||||
reactors map[types.NodeID]*Reactor
|
||||
subs map[types.NodeID]eventbus.Subscription
|
||||
blocksyncSubs map[types.NodeID]eventbus.Subscription
|
||||
stateChannels map[types.NodeID]*p2p.Channel
|
||||
dataChannels map[types.NodeID]*p2p.Channel
|
||||
voteChannels map[types.NodeID]*p2p.Channel
|
||||
voteSetBitsChannels map[types.NodeID]*p2p.Channel
|
||||
stateChannels map[types.NodeID]p2p.Channel
|
||||
dataChannels map[types.NodeID]p2p.Channel
|
||||
voteChannels map[types.NodeID]p2p.Channel
|
||||
voteSetBitsChannels map[types.NodeID]p2p.Channel
|
||||
}
|
||||
|
||||
func chDesc(chID p2p.ChannelID, size int) *p2p.ChannelDescriptor {
|
||||
@@ -86,7 +86,7 @@ func setup(
|
||||
t.Cleanup(cancel)
|
||||
|
||||
chCreator := func(nodeID types.NodeID) p2p.ChannelCreator {
|
||||
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
return func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
switch desc.ID {
|
||||
case StateChannel:
|
||||
return rts.stateChannels[nodeID], nil
|
||||
|
||||
@@ -159,7 +159,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
|
||||
|
||||
// processEvidenceCh implements a blocking event loop where we listen for p2p
|
||||
// Envelope messages from the evidenceCh.
|
||||
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh p2p.Channel) {
|
||||
iter := evidenceCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -186,7 +186,7 @@ func (r *Reactor) processEvidenceCh(ctx context.Context, evidenceCh *p2p.Channel
|
||||
// connects/disconnects frequently from the broadcasting peer(s).
|
||||
//
|
||||
// REF: https://github.com/tendermint/tendermint/issues/4727
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, evidenceCh p2p.Channel) {
|
||||
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
r.mtx.Lock()
|
||||
@@ -227,7 +227,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, evidenceCh p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case peerUpdate := <-peerUpdates.Updates():
|
||||
@@ -249,7 +249,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
|
||||
// that the peer has already received or may not be ready for.
|
||||
//
|
||||
// REF: https://github.com/tendermint/tendermint/issues/4727
|
||||
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh *p2p.Channel) {
|
||||
func (r *Reactor) broadcastEvidenceLoop(ctx context.Context, peerID types.NodeID, evidenceCh p2p.Channel) {
|
||||
var next *clist.CElement
|
||||
|
||||
defer func() {
|
||||
|
||||
@@ -38,7 +38,7 @@ type reactorTestSuite struct {
|
||||
logger log.Logger
|
||||
reactors map[types.NodeID]*evidence.Reactor
|
||||
pools map[types.NodeID]*evidence.Pool
|
||||
evidenceChannels map[types.NodeID]*p2p.Channel
|
||||
evidenceChannels map[types.NodeID]p2p.Channel
|
||||
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
nodes []*p2ptest.Node
|
||||
@@ -96,7 +96,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu)
|
||||
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
|
||||
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.evidenceChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -194,7 +194,7 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope) (er
|
||||
|
||||
// processMempoolCh implements a blocking event loop where we listen for p2p
|
||||
// Envelope messages from the mempoolCh.
|
||||
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh p2p.Channel) {
|
||||
iter := mempoolCh.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -215,7 +215,7 @@ func (r *Reactor) processMempoolCh(ctx context.Context, mempoolCh *p2p.Channel)
|
||||
// goroutine or not. If not, we start one for the newly added peer. For down or
|
||||
// removed peers, we remove the peer from the mempool peer ID set and signal to
|
||||
// stop the tx broadcasting goroutine.
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate, mempoolCh p2p.Channel) {
|
||||
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
|
||||
|
||||
r.mtx.Lock()
|
||||
@@ -264,7 +264,7 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
|
||||
// processPeerUpdates initiates a blocking process where we listen for and handle
|
||||
// PeerUpdate messages. When the reactor is stopped, we will catch the signal and
|
||||
// close the p2p PeerUpdatesCh gracefully.
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerUpdates, mempoolCh p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -275,7 +275,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh *p2p.Channel) {
|
||||
func (r *Reactor) broadcastTxRoutine(ctx context.Context, peerID types.NodeID, mempoolCh p2p.Channel) {
|
||||
peerMempoolID := r.ids.GetForPeer(peerID)
|
||||
var nextGossipTx *clist.CElement
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ type reactorTestSuite struct {
|
||||
logger log.Logger
|
||||
|
||||
reactors map[types.NodeID]*Reactor
|
||||
mempoolChannels map[types.NodeID]*p2p.Channel
|
||||
mempoolChannels map[types.NodeID]p2p.Channel
|
||||
mempools map[types.NodeID]*TxMempool
|
||||
kvstores map[types.NodeID]*kvstore.Application
|
||||
|
||||
@@ -51,7 +51,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
|
||||
logger: log.NewNopLogger().With("testCase", t.Name()),
|
||||
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
|
||||
reactors: make(map[types.NodeID]*Reactor, numNodes),
|
||||
mempoolChannels: make(map[types.NodeID]*p2p.Channel, numNodes),
|
||||
mempoolChannels: make(map[types.NodeID]p2p.Channel, numNodes),
|
||||
mempools: make(map[types.NodeID]*TxMempool, numNodes),
|
||||
kvstores: make(map[types.NodeID]*kvstore.Application, numNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, numNodes),
|
||||
@@ -75,7 +75,7 @@ func setupReactors(ctx context.Context, t *testing.T, logger log.Logger, numNode
|
||||
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, chDesc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.mempoolChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,22 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/tychoish/emt"
|
||||
|
||||
"github.com/tendermint/tendermint/internal/libs/protoio"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -37,6 +47,16 @@ type Wrapper interface {
|
||||
Unwrap() (proto.Message, error)
|
||||
}
|
||||
|
||||
type Channel interface {
|
||||
fmt.Stringer
|
||||
|
||||
Err() error
|
||||
|
||||
Send(context.Context, Envelope) error
|
||||
SendError(context.Context, PeerError) error
|
||||
Receive(context.Context) *ChannelIterator
|
||||
}
|
||||
|
||||
// PeerError is a peer error reported via Channel.Error.
|
||||
//
|
||||
// FIXME: This currently just disconnects the peer, which is too simplistic.
|
||||
@@ -56,9 +76,9 @@ type PeerError struct {
|
||||
func (pe PeerError) Error() string { return fmt.Sprintf("peer=%q: %s", pe.NodeID, pe.Err.Error()) }
|
||||
func (pe PeerError) Unwrap() error { return pe.Err }
|
||||
|
||||
// Channel is a bidirectional channel to exchange Protobuf messages with peers.
|
||||
// legacyChannel is a bidirectional channel to exchange Protobuf messages with peers.
|
||||
// Each message is wrapped in an Envelope to specify its sender and receiver.
|
||||
type Channel struct {
|
||||
type legacyChannel struct {
|
||||
ID ChannelID
|
||||
inCh <-chan Envelope // inbound messages (peers to reactors)
|
||||
outCh chan<- Envelope // outbound messages (reactors to peers)
|
||||
@@ -69,9 +89,10 @@ type Channel struct {
|
||||
|
||||
// NewChannel creates a new channel. It is primarily for internal and test
|
||||
// use, reactors should use Router.OpenChannel().
|
||||
func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) *Channel {
|
||||
return &Channel{
|
||||
func NewChannel(id ChannelID, name string, inCh <-chan Envelope, outCh chan<- Envelope, errCh chan<- PeerError) Channel {
|
||||
return &legacyChannel{
|
||||
ID: id,
|
||||
name: name,
|
||||
inCh: inCh,
|
||||
outCh: outCh,
|
||||
errCh: errCh,
|
||||
@@ -80,7 +101,7 @@ func NewChannel(id ChannelID, inCh <-chan Envelope, outCh chan<- Envelope, errCh
|
||||
|
||||
// Send blocks until the envelope has been sent, or until ctx ends.
|
||||
// An error only occurs if the context ends before the send completes.
|
||||
func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
|
||||
func (ch *legacyChannel) Send(ctx context.Context, envelope Envelope) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -89,9 +110,15 @@ func (ch *Channel) Send(ctx context.Context, envelope Envelope) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *legacyChannel) Err() error { return nil }
|
||||
|
||||
// SendError blocks until the given error has been sent, or ctx ends.
|
||||
// An error only occurs if the context ends before the send completes.
|
||||
func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
|
||||
func (ch *legacyChannel) SendError(ctx context.Context, pe PeerError) error {
|
||||
if errors.Is(pe.Err, context.Canceled) || errors.Is(pe.Err, context.DeadlineExceeded) {
|
||||
return nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
@@ -100,18 +127,29 @@ func (ch *Channel) SendError(ctx context.Context, pe PeerError) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (ch *Channel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
|
||||
func (ch *legacyChannel) String() string { return fmt.Sprintf("p2p.Channel<%d:%s>", ch.ID, ch.name) }
|
||||
|
||||
// Receive returns a new unbuffered iterator to receive messages from ch.
|
||||
// The iterator runs until ctx ends.
|
||||
func (ch *Channel) Receive(ctx context.Context) *ChannelIterator {
|
||||
func (ch *legacyChannel) Receive(ctx context.Context) *ChannelIterator {
|
||||
iter := &ChannelIterator{
|
||||
pipe: make(chan Envelope), // unbuffered
|
||||
}
|
||||
go func() {
|
||||
go func(pipe chan Envelope) {
|
||||
defer close(iter.pipe)
|
||||
iteratorWorker(ctx, ch, iter.pipe)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case envelope := <-ch.inCh:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case pipe <- envelope:
|
||||
}
|
||||
}
|
||||
}
|
||||
}(iter.pipe)
|
||||
return iter
|
||||
}
|
||||
|
||||
@@ -126,21 +164,6 @@ type ChannelIterator struct {
|
||||
current *Envelope
|
||||
}
|
||||
|
||||
func iteratorWorker(ctx context.Context, ch *Channel, pipe chan Envelope) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case envelope := <-ch.inCh:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case pipe <- envelope:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns true when the Envelope value has advanced, and false
|
||||
// when the context is canceled or iteration should stop. If an iterator has returned false,
|
||||
// it will never return true again.
|
||||
@@ -179,7 +202,7 @@ func (iter *ChannelIterator) Envelope() *Envelope { return iter.current }
|
||||
//
|
||||
// This allows the caller to consume messages from multiple channels
|
||||
// without needing to manage the concurrency separately.
|
||||
func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterator {
|
||||
func MergedChannelIterator(ctx context.Context, chs ...Channel) *ChannelIterator {
|
||||
iter := &ChannelIterator{
|
||||
pipe: make(chan Envelope), // unbuffered
|
||||
}
|
||||
@@ -187,10 +210,17 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato
|
||||
|
||||
for _, ch := range chs {
|
||||
wg.Add(1)
|
||||
go func(ch *Channel) {
|
||||
go func(ch Channel, pipe chan Envelope) {
|
||||
defer wg.Done()
|
||||
iteratorWorker(ctx, ch, iter.pipe)
|
||||
}(ch)
|
||||
iter := ch.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case pipe <- *iter.Envelope():
|
||||
}
|
||||
}
|
||||
}(ch, iter.pipe)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
@@ -207,3 +237,254 @@ func MergedChannelIterator(ctx context.Context, chs ...*Channel) *ChannelIterato
|
||||
|
||||
return iter
|
||||
}
|
||||
|
||||
type libp2pChannelImpl struct {
|
||||
chDesc *ChannelDescriptor
|
||||
pubsub *pubsub.PubSub
|
||||
host host.Host
|
||||
topic *pubsub.Topic
|
||||
recvCh chan Envelope
|
||||
chainID string
|
||||
wrapper Wrapper
|
||||
|
||||
// thread-safe error aggregator used to collect errors seen
|
||||
// during receiving messages into an iterator.
|
||||
errs emt.Catcher
|
||||
|
||||
// the context is passed when the channel is opened and should
|
||||
// cover the lifecycle of the channel itself. The contexts
|
||||
// passed into methods should cover the lifecycle of the
|
||||
// operations they represent.
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewLibP2PChannel(ctx context.Context, chainID string, chDesc *ChannelDescriptor, ps *pubsub.PubSub, h host.Host) (Channel, error) {
|
||||
ch := &libp2pChannelImpl{
|
||||
ctx: ctx,
|
||||
chDesc: chDesc,
|
||||
pubsub: ps,
|
||||
host: h,
|
||||
chainID: chainID,
|
||||
recvCh: make(chan Envelope, chDesc.RecvMessageCapacity),
|
||||
errs: emt.NewCatcher(),
|
||||
}
|
||||
topic, err := ps.Join(ch.canonicalizedTopicName())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ch.topic = topic
|
||||
|
||||
if w, ok := chDesc.MessageType.(Wrapper); ok {
|
||||
ch.wrapper = w
|
||||
}
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
func (ch *libp2pChannelImpl) String() string {
|
||||
return fmt.Sprintf("Channel<%s>", ch.canonicalizedTopicName())
|
||||
}
|
||||
|
||||
func (ch *libp2pChannelImpl) Err() error { return ch.errs.Resolve() }
|
||||
|
||||
func (ch *libp2pChannelImpl) canonicalizedTopicName() string {
|
||||
return fmt.Sprintf("%s.%s.%d", ch.chainID, ch.chDesc.Name, ch.chDesc.ID)
|
||||
}
|
||||
|
||||
func (ch *libp2pChannelImpl) Receive(ctx context.Context) *ChannelIterator {
|
||||
// TODO: consider caching an iterator in the channel, or
|
||||
// erroring if this gets called more than once.
|
||||
//
|
||||
// While it's safe to register a handler more than once, we
|
||||
// could get into a dodgy situation where if you call receive
|
||||
// more than once, the subsequently messages won't be routed
|
||||
// correctly.
|
||||
iter := &ChannelIterator{
|
||||
pipe: make(chan Envelope),
|
||||
}
|
||||
|
||||
ch.host.SetStreamHandler(protocol.ID(ch.canonicalizedTopicName()), func(stream network.Stream) {
|
||||
// TODO: properly capture the max message size here.
|
||||
reader := protoio.NewDelimitedReader(bufio.NewReader(stream), ch.chDesc.RecvBufferCapacity*2)
|
||||
|
||||
remote := stream.Conn().RemotePeer()
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
go func() { <-ctx.Done(); ch.errs.Add(stream.Close()) }()
|
||||
|
||||
for {
|
||||
payload := proto.Clone(ch.chDesc.MessageType)
|
||||
|
||||
if _, err := reader.ReadMsg(payload); err != nil {
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
}
|
||||
ch.errs.Add(err)
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case iter.pipe <- Envelope{
|
||||
From: types.NodeID(remote),
|
||||
Message: payload,
|
||||
ChannelID: ch.chDesc.ID,
|
||||
}:
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
sub, err := ch.topic.Subscribe()
|
||||
if err != nil {
|
||||
ch.errs.Add(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case e := <-ch.recvCh:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case iter.pipe <- e:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
msg, err := sub.Next(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return
|
||||
}
|
||||
|
||||
ch.errs.Add(err)
|
||||
return
|
||||
}
|
||||
|
||||
payload := proto.Clone(ch.chDesc.MessageType)
|
||||
if err := proto.Unmarshal(msg.Data, payload); err != nil {
|
||||
ch.errs.Add(err)
|
||||
return
|
||||
}
|
||||
if wrapper, ok := payload.(Wrapper); ok {
|
||||
if payload, err = wrapper.Unwrap(); err != nil {
|
||||
ch.errs.Add(err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case iter.pipe <- Envelope{
|
||||
From: types.NodeID(msg.From),
|
||||
Message: payload,
|
||||
ChannelID: ch.chDesc.ID,
|
||||
}:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// TODO: this is probably wrong in it's current form: the
|
||||
// handler for point-to-point messages could still end up
|
||||
// trying to send into the pipe after things close.
|
||||
go func() { wg.Wait(); defer close(iter.pipe) }()
|
||||
|
||||
return iter
|
||||
}
|
||||
|
||||
func (ch *libp2pChannelImpl) Send(ctx context.Context, e Envelope) error {
|
||||
if ch.wrapper != nil {
|
||||
msg := proto.Clone(ch.wrapper)
|
||||
if err := msg.(Wrapper).Wrap(e.Message); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.Message = msg
|
||||
}
|
||||
|
||||
if e.Broadcast {
|
||||
e.From = types.NodeID(ch.host.ID())
|
||||
bz, err := proto.Marshal(e.Message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ch.topic.Publish(ctx, bz)
|
||||
}
|
||||
|
||||
switch ch.host.Network().Connectedness(peer.ID(e.To)) {
|
||||
case network.CannotConnect:
|
||||
return fmt.Errorf("cannot connect to %q", e.To)
|
||||
default:
|
||||
stream, err := ch.getStream(peer.ID(e.To))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
writer := protoio.NewDelimitedWriter(bufio.NewWriter(stream))
|
||||
_, err = writer.WriteMsg(e.Message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ch *libp2pChannelImpl) getStream(peer peer.ID) (network.Stream, error) {
|
||||
conns := ch.host.Network().ConnsToPeer(peer)
|
||||
pid := protocol.ID(ch.canonicalizedTopicName())
|
||||
if len(conns) > 0 {
|
||||
for cidx := range conns {
|
||||
streams := conns[cidx].GetStreams()
|
||||
for sidx := range streams {
|
||||
stream := streams[sidx]
|
||||
if stream.Protocol() == pid && stream.Stat().Direction == network.DirOutbound {
|
||||
return stream, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
conn, err := ch.host.Network().DialPeer(ch.ctx, peer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stream, err := ch.host.NewStream(ch.ctx, conn.RemotePeer(), pid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
func (ch *libp2pChannelImpl) SendError(ctx context.Context, pe PeerError) error {
|
||||
if errors.Is(pe.Err, context.Canceled) || errors.Is(pe.Err, context.DeadlineExceeded) || ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: change handling of errors to peers. This problably
|
||||
// shouldn't be handled as a property of the channel, and
|
||||
// rather as part of some peer-info/network-management
|
||||
// interface, but we can do it here for now, to ensure compatibility.
|
||||
//
|
||||
// Closing the peer is the same behavior as the legacy system,
|
||||
// and seems less drastic than blacklisting the peer forever.
|
||||
return ch.host.Network().ClosePeer(peer.ID(pe.NodeID))
|
||||
}
|
||||
|
||||
@@ -16,13 +16,13 @@ type channelInternal struct {
|
||||
Error chan PeerError
|
||||
}
|
||||
|
||||
func testChannel(size int) (*channelInternal, *Channel) {
|
||||
func testChannel(size int) (*channelInternal, *legacyChannel) {
|
||||
in := &channelInternal{
|
||||
In: make(chan Envelope, size),
|
||||
Out: make(chan Envelope, size),
|
||||
Error: make(chan PeerError, size),
|
||||
}
|
||||
ch := &Channel{
|
||||
ch := &legacyChannel{
|
||||
inCh: in.In,
|
||||
outCh: in.Out,
|
||||
errCh: in.Error,
|
||||
|
||||
19
internal/p2p/host.go
Normal file
19
internal/p2p/host.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
)
|
||||
|
||||
// NewHost constructs a default networking connection for a libp2p
|
||||
// network and returns the top level host object.
|
||||
func NewHost(conf *config.P2PConfig) (host.Host, error) { return nil, errors.New("not implemented") }
|
||||
|
||||
// NewPubSub constructs a pubsub protocol using a libp2p host object.
|
||||
func NewPubSub(ctx context.Context, conf *config.P2PConfig, host host.Host) (*pubsub.PubSub, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
@@ -146,8 +146,8 @@ func (n *Network) MakeChannels(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) map[types.NodeID]*p2p.Channel {
|
||||
channels := map[types.NodeID]*p2p.Channel{}
|
||||
) map[types.NodeID]p2p.Channel {
|
||||
channels := map[types.NodeID]p2p.Channel{}
|
||||
for _, node := range n.Nodes {
|
||||
channels[node.NodeID] = node.MakeChannel(ctx, t, chDesc)
|
||||
}
|
||||
@@ -161,8 +161,8 @@ func (n *Network) MakeChannelsNoCleanup(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) map[types.NodeID]*p2p.Channel {
|
||||
channels := map[types.NodeID]*p2p.Channel{}
|
||||
) map[types.NodeID]p2p.Channel {
|
||||
channels := map[types.NodeID]p2p.Channel{}
|
||||
for _, node := range n.Nodes {
|
||||
channels[node.NodeID] = node.MakeChannelNoCleanup(ctx, t, chDesc)
|
||||
}
|
||||
@@ -267,10 +267,11 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
|
||||
p2p.NopMetrics(),
|
||||
privKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &nodeInfo },
|
||||
transport,
|
||||
ep,
|
||||
p2p.RouterOptions{},
|
||||
p2p.RouterOptions{
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &nodeInfo },
|
||||
LegacyTransport: transport,
|
||||
LegacyEndpoint: ep,
|
||||
},
|
||||
)
|
||||
|
||||
require.NoError(t, err)
|
||||
@@ -304,7 +305,7 @@ func (n *Node) MakeChannel(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) *p2p.Channel {
|
||||
) p2p.Channel {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
channel, err := n.Router.OpenChannel(ctx, chDesc)
|
||||
require.NoError(t, err)
|
||||
@@ -321,7 +322,7 @@ func (n *Node) MakeChannelNoCleanup(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
chDesc *p2p.ChannelDescriptor,
|
||||
) *p2p.Channel {
|
||||
) p2p.Channel {
|
||||
channel, err := n.Router.OpenChannel(ctx, chDesc)
|
||||
require.NoError(t, err)
|
||||
return channel
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
)
|
||||
|
||||
// RequireEmpty requires that the given channel is empty.
|
||||
func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) {
|
||||
func RequireEmpty(ctx context.Context, t *testing.T, channels ...p2p.Channel) {
|
||||
t.Helper()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
|
||||
@@ -32,7 +32,7 @@ func RequireEmpty(ctx context.Context, t *testing.T, channels ...*p2p.Channel) {
|
||||
}
|
||||
|
||||
// RequireReceive requires that the given envelope is received on the channel.
|
||||
func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, expect p2p.Envelope) {
|
||||
func RequireReceive(ctx context.Context, t *testing.T, channel p2p.Channel, expect p2p.Envelope) {
|
||||
t.Helper()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
@@ -54,7 +54,7 @@ func RequireReceive(ctx context.Context, t *testing.T, channel *p2p.Channel, exp
|
||||
|
||||
// RequireReceiveUnordered requires that the given envelopes are all received on
|
||||
// the channel, ignoring order.
|
||||
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Channel, expect []*p2p.Envelope) {
|
||||
func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel p2p.Channel, expect []*p2p.Envelope) {
|
||||
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -75,7 +75,7 @@ func RequireReceiveUnordered(ctx context.Context, t *testing.T, channel *p2p.Cha
|
||||
}
|
||||
|
||||
// RequireSend requires that the given envelope is sent on the channel.
|
||||
func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelope p2p.Envelope) {
|
||||
func RequireSend(ctx context.Context, t *testing.T, channel p2p.Channel, envelope p2p.Envelope) {
|
||||
tctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -93,7 +93,7 @@ func RequireSend(ctx context.Context, t *testing.T, channel *p2p.Channel, envelo
|
||||
func RequireSendReceive(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
channel *p2p.Channel,
|
||||
channel p2p.Channel,
|
||||
peerID types.NodeID,
|
||||
send proto.Message,
|
||||
receive proto.Message,
|
||||
@@ -116,7 +116,7 @@ func RequireNoUpdates(ctx context.Context, t *testing.T, peerUpdates *p2p.PeerUp
|
||||
}
|
||||
|
||||
// RequireError requires that the given peer error is submitted for a peer.
|
||||
func RequireError(ctx context.Context, t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) {
|
||||
func RequireError(ctx context.Context, t *testing.T, channel p2p.Channel, peerError p2p.PeerError) {
|
||||
tctx, tcancel := context.WithTimeout(ctx, time.Second)
|
||||
defer tcancel()
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ func (r *Reactor) OnStop() {}
|
||||
|
||||
// processPexCh implements a blocking event loop where we listen for p2p
|
||||
// Envelope messages from the pexCh.
|
||||
func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
|
||||
func (r *Reactor) processPexCh(ctx context.Context, pexCh p2p.Channel) {
|
||||
incoming := make(chan *p2p.Envelope)
|
||||
go func() {
|
||||
defer close(incoming)
|
||||
@@ -192,8 +192,7 @@ func (r *Reactor) processPexCh(ctx context.Context, pexCh *p2p.Channel) {
|
||||
// A request from another peer, or a response to one of our requests.
|
||||
dur, err := r.handlePexMessage(ctx, envelope, pexCh)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to process message",
|
||||
"ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
r.logger.Error("failed to process message", "ch_id", envelope.ChannelID, "envelope", envelope, "err", err)
|
||||
if serr := pexCh.SendError(ctx, p2p.PeerError{
|
||||
NodeID: envelope.From,
|
||||
Err: err,
|
||||
@@ -225,7 +224,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context, peerUpdates *p2p.PeerU
|
||||
// handlePexMessage handles envelopes sent from peers on the PexChannel.
|
||||
// If an update was received, a new polling interval is returned; otherwise the
|
||||
// duration is 0.
|
||||
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh *p2p.Channel) (time.Duration, error) {
|
||||
func (r *Reactor) handlePexMessage(ctx context.Context, envelope *p2p.Envelope, pexCh p2p.Channel) (time.Duration, error) {
|
||||
logger := r.logger.With("peer", envelope.From)
|
||||
|
||||
switch msg := envelope.Message.(type) {
|
||||
@@ -308,7 +307,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
|
||||
// that peer a request for more peer addresses. The chosen peer is moved into
|
||||
// the requestsSent bucket so that we will not attempt to contact them again
|
||||
// until they've replied or updated.
|
||||
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh *p2p.Channel) error {
|
||||
func (r *Reactor) sendRequestForPeers(ctx context.Context, pexCh p2p.Channel) error {
|
||||
r.mtx.Lock()
|
||||
defer r.mtx.Unlock()
|
||||
if len(r.availablePeers) == 0 {
|
||||
|
||||
@@ -275,7 +275,7 @@ type singleTestReactor struct {
|
||||
pexInCh chan p2p.Envelope
|
||||
pexOutCh chan p2p.Envelope
|
||||
pexErrCh chan p2p.PeerError
|
||||
pexCh *p2p.Channel
|
||||
pexCh p2p.Channel
|
||||
peerCh chan p2p.PeerUpdate
|
||||
manager *p2p.PeerManager
|
||||
}
|
||||
@@ -287,8 +287,11 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
|
||||
pexInCh := make(chan p2p.Envelope, chBuf)
|
||||
pexOutCh := make(chan p2p.Envelope, chBuf)
|
||||
pexErrCh := make(chan p2p.PeerError, chBuf)
|
||||
|
||||
chDesc := pex.ChannelDescriptor()
|
||||
pexCh := p2p.NewChannel(
|
||||
p2p.ChannelID(pex.PexChannel),
|
||||
chDesc.ID,
|
||||
chDesc.Name,
|
||||
pexInCh,
|
||||
pexOutCh,
|
||||
pexErrCh,
|
||||
@@ -299,7 +302,7 @@ func setupSingle(ctx context.Context, t *testing.T) *singleTestReactor {
|
||||
peerManager, err := p2p.NewPeerManager(nodeID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return pexCh, nil
|
||||
}
|
||||
|
||||
@@ -324,7 +327,7 @@ type reactorTestSuite struct {
|
||||
logger log.Logger
|
||||
|
||||
reactors map[types.NodeID]*pex.Reactor
|
||||
pexChannels map[types.NodeID]*p2p.Channel
|
||||
pexChannels map[types.NodeID]p2p.Channel
|
||||
|
||||
peerChans map[types.NodeID]chan p2p.PeerUpdate
|
||||
peerUpdates map[types.NodeID]*p2p.PeerUpdates
|
||||
@@ -367,7 +370,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
|
||||
logger: log.NewNopLogger().With("testCase", t.Name()),
|
||||
network: p2ptest.MakeNetwork(ctx, t, networkOpts),
|
||||
reactors: make(map[types.NodeID]*pex.Reactor, realNodes),
|
||||
pexChannels: make(map[types.NodeID]*p2p.Channel, opts.TotalNodes),
|
||||
pexChannels: make(map[types.NodeID]p2p.Channel, opts.TotalNodes),
|
||||
peerChans: make(map[types.NodeID]chan p2p.PeerUpdate, opts.TotalNodes),
|
||||
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, opts.TotalNodes),
|
||||
total: opts.TotalNodes,
|
||||
@@ -388,7 +391,7 @@ func setupNetwork(ctx context.Context, t *testing.T, opts testOptions) *reactorT
|
||||
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], chBuf)
|
||||
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return rts.pexChannels[nodeID], nil
|
||||
}
|
||||
|
||||
@@ -448,7 +451,7 @@ func (r *reactorTestSuite) addNodes(ctx context.Context, t *testing.T, nodes int
|
||||
r.peerUpdates[nodeID] = p2p.NewPeerUpdates(r.peerChans[nodeID], r.opts.BufferSize)
|
||||
r.network.Nodes[nodeID].PeerManager.Register(ctx, r.peerUpdates[nodeID])
|
||||
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(context.Context, *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
return r.pexChannels[nodeID], nil
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/libp2p/go-libp2p-core/host"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -65,6 +67,20 @@ type RouterOptions struct {
|
||||
// are used to dial peers. This defaults to the value of
|
||||
// runtime.NumCPU.
|
||||
NumConcurrentDials func() int
|
||||
|
||||
// NodeInfoProducer returns a reference to the current
|
||||
// NodeInfo object for use in adding channels.
|
||||
NodeInfoProducer func() *types.NodeInfo
|
||||
|
||||
// UseLibP2P toggles the use of the new networking layer
|
||||
// within the router.
|
||||
UseLibP2P bool
|
||||
|
||||
LegacyTransport Transport
|
||||
LegacyEndpoint *Endpoint
|
||||
|
||||
NetworkHost host.Host
|
||||
NetworkPubSub *pubsub.PubSub
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -84,6 +100,38 @@ func (o *RouterOptions) Validate() error {
|
||||
return fmt.Errorf("queue type %q is not supported", o.QueueType)
|
||||
}
|
||||
|
||||
if o.NodeInfoProducer == nil {
|
||||
return errors.New("must specify a NodeInfoProducer")
|
||||
}
|
||||
|
||||
if o.UseLibP2P {
|
||||
if o.LegacyTransport != nil {
|
||||
return errors.New("when using libp2p you must not specify legacy components (transport)")
|
||||
}
|
||||
if o.LegacyEndpoint != nil {
|
||||
return errors.New("when using libp2p you must not specify legacy components (endpoint)")
|
||||
}
|
||||
if o.NetworkHost == nil {
|
||||
return errors.New("when using libp2p you must specify network components (host)")
|
||||
}
|
||||
if o.NetworkPubSub == nil {
|
||||
return errors.New("when using libp2p you must specify network components (pubsub)")
|
||||
}
|
||||
} else {
|
||||
if o.LegacyTransport == nil {
|
||||
return errors.New("when using legacy p2p you must specify a transport")
|
||||
}
|
||||
if o.LegacyEndpoint == nil {
|
||||
return errors.New("when using legacy p2p you must specify an endpoint")
|
||||
}
|
||||
if o.NetworkHost != nil {
|
||||
return errors.New("when using legacy p2p you must not specify libp2p components (host)")
|
||||
}
|
||||
if o.NetworkPubSub != nil {
|
||||
return errors.New("when using legacy p2p you must not specify libp2p components (pubsub)")
|
||||
}
|
||||
}
|
||||
|
||||
switch {
|
||||
case o.IncomingConnectionWindow == 0:
|
||||
o.IncomingConnectionWindow = 100 * time.Millisecond
|
||||
@@ -140,76 +188,87 @@ func (o *RouterOptions) Validate() error {
|
||||
// quality of service.
|
||||
type Router struct {
|
||||
*service.BaseService
|
||||
logger log.Logger
|
||||
|
||||
logger log.Logger
|
||||
metrics *Metrics
|
||||
lc *metricsLabelCache
|
||||
|
||||
options RouterOptions
|
||||
privKey crypto.PrivKey
|
||||
peerManager *PeerManager
|
||||
chDescs []*ChannelDescriptor
|
||||
transport Transport
|
||||
endpoint *Endpoint
|
||||
connTracker connectionTracker
|
||||
options RouterOptions
|
||||
privKey crypto.PrivKey
|
||||
chDescs []*ChannelDescriptor
|
||||
|
||||
peerMtx sync.RWMutex
|
||||
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
|
||||
// the channels that the peer queue has open
|
||||
peerChannels map[types.NodeID]ChannelIDSet
|
||||
queueFactory func(int) queue
|
||||
nodeInfoProducer func() *types.NodeInfo
|
||||
chainID string
|
||||
|
||||
// FIXME: We don't strictly need to use a mutex for this if we seal the
|
||||
// channels on router start. This depends on whether we want to allow
|
||||
// dynamic channels in the future.
|
||||
channelMtx sync.RWMutex
|
||||
channelQueues map[ChannelID]queue // inbound messages from all peers to a single channel
|
||||
channelMessages map[ChannelID]proto.Message
|
||||
legacy struct {
|
||||
peerManager *PeerManager
|
||||
transport Transport
|
||||
endpoint *Endpoint
|
||||
connTracker connectionTracker
|
||||
peerMtx sync.RWMutex
|
||||
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
|
||||
// the channels that the peer queue has open
|
||||
peerChannels map[types.NodeID]ChannelIDSet
|
||||
queueFactory func(int) queue
|
||||
|
||||
// FIXME: We don't strictly need to use a mutex for this if we seal the
|
||||
// channels on router start. This depends on whether we want to allow
|
||||
// dynamic channels in the future.
|
||||
channelMtx sync.RWMutex
|
||||
channelQueues map[ChannelID]queue // inbound messages from all peers to a single channel
|
||||
channelMessages map[ChannelID]proto.Message
|
||||
}
|
||||
|
||||
network struct {
|
||||
host host.Host // network handle for ourselves
|
||||
ps *pubsub.PubSub
|
||||
|
||||
mtx sync.Mutex
|
||||
channels map[string]Channel
|
||||
}
|
||||
}
|
||||
|
||||
// NewRouter creates a new Router. The given Transports must already be
|
||||
// listening on appropriate interfaces, and will be closed by the Router when it
|
||||
// stops.
|
||||
func NewRouter(
|
||||
logger log.Logger,
|
||||
metrics *Metrics,
|
||||
privKey crypto.PrivKey,
|
||||
peerManager *PeerManager,
|
||||
nodeInfoProducer func() *types.NodeInfo,
|
||||
transport Transport,
|
||||
endpoint *Endpoint,
|
||||
options RouterOptions,
|
||||
) (*Router, error) {
|
||||
|
||||
if err := options.Validate(); err != nil {
|
||||
func NewRouter(logger log.Logger, metrics *Metrics, key crypto.PrivKey, pm *PeerManager, opts RouterOptions) (*Router, error) {
|
||||
if err := opts.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
router := &Router{
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
lc: newMetricsLabelCache(),
|
||||
privKey: privKey,
|
||||
nodeInfoProducer: nodeInfoProducer,
|
||||
connTracker: newConnTracker(
|
||||
options.MaxIncomingConnectionAttempts,
|
||||
options.IncomingConnectionWindow,
|
||||
),
|
||||
chDescs: make([]*ChannelDescriptor, 0),
|
||||
transport: transport,
|
||||
endpoint: endpoint,
|
||||
peerManager: peerManager,
|
||||
options: options,
|
||||
channelQueues: map[ChannelID]queue{},
|
||||
channelMessages: map[ChannelID]proto.Message{},
|
||||
peerQueues: map[types.NodeID]queue{},
|
||||
peerChannels: make(map[types.NodeID]ChannelIDSet),
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
lc: newMetricsLabelCache(),
|
||||
|
||||
privKey: key,
|
||||
nodeInfoProducer: opts.NodeInfoProducer,
|
||||
options: opts,
|
||||
chDescs: make([]*ChannelDescriptor, 0),
|
||||
}
|
||||
|
||||
router.BaseService = service.NewBaseService(logger, "router", router)
|
||||
|
||||
return router, nil
|
||||
switch {
|
||||
case opts.UseLibP2P:
|
||||
router.network.host = opts.NetworkHost
|
||||
router.options.NetworkPubSub = opts.NetworkPubSub
|
||||
|
||||
return nil, errors.New("libp2p is not (yet) supported")
|
||||
default:
|
||||
router.legacy.connTracker = newConnTracker(
|
||||
opts.MaxIncomingConnectionAttempts,
|
||||
opts.IncomingConnectionWindow,
|
||||
)
|
||||
router.legacy.transport = opts.LegacyTransport
|
||||
router.legacy.endpoint = opts.LegacyEndpoint
|
||||
router.legacy.peerManager = pm
|
||||
router.legacy.channelQueues = map[ChannelID]queue{}
|
||||
router.legacy.channelMessages = map[ChannelID]proto.Message{}
|
||||
router.legacy.peerQueues = map[types.NodeID]queue{}
|
||||
router.legacy.peerChannels = make(map[types.NodeID]ChannelIDSet)
|
||||
|
||||
return router, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error) {
|
||||
@@ -239,7 +298,7 @@ func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error
|
||||
// ChannelCreator allows routers to construct their own channels,
|
||||
// either by receiving a reference to Router.OpenChannel or using some
|
||||
// kind shim for testing purposes.
|
||||
type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
|
||||
type ChannelCreator func(context.Context, *ChannelDescriptor) (Channel, error)
|
||||
|
||||
// OpenChannel opens a new channel for the given message type. The caller must
|
||||
// close the channel when done, before stopping the Router. messageType is the
|
||||
@@ -247,50 +306,74 @@ type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
|
||||
// implement Wrapper to automatically (un)wrap multiple message types in a
|
||||
// wrapper message. The caller may provide a size to make the channel buffered,
|
||||
// which internally makes the inbound, outbound, and error channel buffered.
|
||||
func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*Channel, error) {
|
||||
r.channelMtx.Lock()
|
||||
defer r.channelMtx.Unlock()
|
||||
func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Channel, error) {
|
||||
switch {
|
||||
case r.options.UseLibP2P:
|
||||
info := r.nodeInfoProducer()
|
||||
ch, err := NewLibP2PChannel(ctx, info.Network, chDesc, r.options.NetworkPubSub, r.options.NetworkHost)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id := chDesc.ID
|
||||
if _, ok := r.channelQueues[id]; ok {
|
||||
return nil, fmt.Errorf("channel %v already exists", id)
|
||||
}
|
||||
r.chDescs = append(r.chDescs, chDesc)
|
||||
// TODO(tychoish): might be nice (though ultimately
|
||||
// not particularly impactful(?)) to be able to get the
|
||||
// canonical name for the channel without constructing
|
||||
// it.
|
||||
|
||||
messageType := chDesc.MessageType
|
||||
name := ch.String()
|
||||
r.network.mtx.Lock()
|
||||
defer r.network.mtx.Unlock()
|
||||
if _, ok := r.network.channels[name]; ok {
|
||||
// TODO(tychoish) actually maybe it would be ok to just
|
||||
// return the existing channel.
|
||||
return nil, fmt.Errorf("cannot construct channel %q more than once", name)
|
||||
}
|
||||
r.network.channels[name] = ch
|
||||
|
||||
queue := r.queueFactory(chDesc.RecvBufferCapacity)
|
||||
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
|
||||
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
|
||||
channel := NewChannel(id, queue.dequeue(), outCh, errCh)
|
||||
channel.name = chDesc.Name
|
||||
return ch, nil
|
||||
default:
|
||||
r.legacy.channelMtx.Lock()
|
||||
defer r.legacy.channelMtx.Unlock()
|
||||
|
||||
var wrapper Wrapper
|
||||
if w, ok := messageType.(Wrapper); ok {
|
||||
wrapper = w
|
||||
}
|
||||
if _, ok := r.legacy.channelQueues[chDesc.ID]; ok {
|
||||
return nil, fmt.Errorf("channel %v already exists", chDesc.ID)
|
||||
}
|
||||
r.chDescs = append(r.chDescs, chDesc)
|
||||
|
||||
r.channelQueues[id] = queue
|
||||
r.channelMessages[id] = messageType
|
||||
messageType := chDesc.MessageType
|
||||
|
||||
// add the channel to the nodeInfo if it's not already there.
|
||||
r.nodeInfoProducer().AddChannel(uint16(chDesc.ID))
|
||||
queue := r.legacy.queueFactory(chDesc.RecvBufferCapacity)
|
||||
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
|
||||
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
|
||||
channel := NewChannel(chDesc.ID, chDesc.Name, queue.dequeue(), outCh, errCh)
|
||||
|
||||
r.transport.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
|
||||
var wrapper Wrapper
|
||||
if w, ok := chDesc.MessageType.(Wrapper); ok {
|
||||
wrapper = w
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
r.channelMtx.Lock()
|
||||
delete(r.channelQueues, id)
|
||||
delete(r.channelMessages, id)
|
||||
r.channelMtx.Unlock()
|
||||
queue.close()
|
||||
r.legacy.channelQueues[chDesc.ID] = queue
|
||||
r.legacy.channelMessages[chDesc.ID] = messageType
|
||||
|
||||
// add the channel to the nodeInfo if it's not already there.
|
||||
r.nodeInfoProducer().AddChannel(uint16(chDesc.ID))
|
||||
|
||||
r.legacy.transport.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
r.legacy.channelMtx.Lock()
|
||||
delete(r.legacy.channelQueues, chDesc.ID)
|
||||
delete(r.legacy.channelMessages, chDesc.ID)
|
||||
r.legacy.channelMtx.Unlock()
|
||||
queue.close()
|
||||
}()
|
||||
|
||||
r.routeChannel(ctx, chDesc.ID, outCh, errCh, wrapper)
|
||||
}()
|
||||
|
||||
r.routeChannel(ctx, id, outCh, errCh, wrapper)
|
||||
}()
|
||||
|
||||
return channel, nil
|
||||
return channel, nil
|
||||
}
|
||||
}
|
||||
|
||||
// routeChannel receives outbound channel messages and routes them to the
|
||||
@@ -329,11 +412,11 @@ func (r *Router) routeChannel(
|
||||
// collect peer queues to pass the message via
|
||||
var queues []queue
|
||||
if envelope.Broadcast {
|
||||
r.peerMtx.RLock()
|
||||
r.legacy.peerMtx.RLock()
|
||||
|
||||
queues = make([]queue, 0, len(r.peerQueues))
|
||||
for nodeID, q := range r.peerQueues {
|
||||
peerChs := r.peerChannels[nodeID]
|
||||
queues = make([]queue, 0, len(r.legacy.peerQueues))
|
||||
for nodeID, q := range r.legacy.peerQueues {
|
||||
peerChs := r.legacy.peerChannels[nodeID]
|
||||
|
||||
// check whether the peer is receiving on that channel
|
||||
if _, ok := peerChs[chID]; ok {
|
||||
@@ -341,19 +424,19 @@ func (r *Router) routeChannel(
|
||||
}
|
||||
}
|
||||
|
||||
r.peerMtx.RUnlock()
|
||||
r.legacy.peerMtx.RUnlock()
|
||||
} else {
|
||||
r.peerMtx.RLock()
|
||||
r.legacy.peerMtx.RLock()
|
||||
|
||||
q, ok := r.peerQueues[envelope.To]
|
||||
q, ok := r.legacy.peerQueues[envelope.To]
|
||||
contains := false
|
||||
if ok {
|
||||
peerChs := r.peerChannels[envelope.To]
|
||||
peerChs := r.legacy.peerChannels[envelope.To]
|
||||
|
||||
// check whether the peer is receiving on that channel
|
||||
_, contains = peerChs[chID]
|
||||
}
|
||||
r.peerMtx.RUnlock()
|
||||
r.legacy.peerMtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)
|
||||
@@ -387,8 +470,13 @@ func (r *Router) routeChannel(
|
||||
}
|
||||
}
|
||||
|
||||
case peerError := <-errCh:
|
||||
maxPeerCapacity := r.peerManager.HasMaxPeerCapacity()
|
||||
case peerError, ok := <-errCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
maxPeerCapacity := r.legacy.peerManager.HasMaxPeerCapacity()
|
||||
|
||||
r.logger.Error("peer error",
|
||||
"peer", peerError.NodeID,
|
||||
"err", peerError.Err,
|
||||
@@ -399,16 +487,15 @@ func (r *Router) routeChannel(
|
||||
// if the error is fatal or all peer
|
||||
// slots are in use, we can error
|
||||
// (disconnect) from the peer.
|
||||
r.peerManager.Errored(peerError.NodeID, peerError.Err)
|
||||
r.legacy.peerManager.Errored(peerError.NodeID, peerError.Err)
|
||||
} else {
|
||||
// this just decrements the peer
|
||||
// score.
|
||||
r.peerManager.processPeerEvent(ctx, PeerUpdate{
|
||||
r.legacy.peerManager.processPeerEvent(ctx, PeerUpdate{
|
||||
NodeID: peerError.NodeID,
|
||||
Status: PeerStatusBad,
|
||||
})
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
@@ -455,10 +542,12 @@ func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
|
||||
// in this case we got an error from the net.Listener.
|
||||
r.logger.Error("failed to accept connection", "transport", transport, "err", err)
|
||||
continue
|
||||
case conn == nil:
|
||||
continue
|
||||
}
|
||||
|
||||
incomingIP := conn.RemoteEndpoint().IP
|
||||
if err := r.connTracker.AddConn(incomingIP); err != nil {
|
||||
if err := r.legacy.connTracker.AddConn(incomingIP); err != nil {
|
||||
closeErr := conn.Close()
|
||||
r.logger.Debug("rate limiting incoming peer",
|
||||
"err", err,
|
||||
@@ -477,7 +566,7 @@ func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
|
||||
|
||||
func (r *Router) openConnection(ctx context.Context, conn Connection) {
|
||||
defer conn.Close()
|
||||
defer r.connTracker.RemoveConn(conn.RemoteEndpoint().IP)
|
||||
defer r.legacy.connTracker.RemoveConn(conn.RemoteEndpoint().IP)
|
||||
|
||||
re := conn.RemoteEndpoint()
|
||||
incomingIP := re.IP
|
||||
@@ -515,7 +604,7 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID) }); err != nil {
|
||||
if err := r.runWithPeerMutex(func() error { return r.legacy.peerManager.Accepted(peerInfo.NodeID) }); err != nil {
|
||||
r.logger.Error("failed to accept connection",
|
||||
"op", "incoming/accepted", "peer", peerInfo.NodeID, "err", err)
|
||||
return
|
||||
@@ -553,7 +642,7 @@ func (r *Router) dialPeers(ctx context.Context) {
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
address, err := r.peerManager.DialNext(ctx)
|
||||
address, err := r.legacy.peerManager.DialNext(ctx)
|
||||
switch {
|
||||
case errors.Is(err, context.Canceled):
|
||||
break LOOP
|
||||
@@ -580,7 +669,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
|
||||
return
|
||||
case err != nil:
|
||||
r.logger.Debug("failed to dial peer", "peer", address, "err", err)
|
||||
if err = r.peerManager.DialFailed(ctx, address); err != nil {
|
||||
if err = r.legacy.peerManager.DialFailed(ctx, address); err != nil {
|
||||
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
|
||||
}
|
||||
return
|
||||
@@ -593,16 +682,16 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
|
||||
return
|
||||
case err != nil:
|
||||
r.logger.Error("failed to handshake with peer", "peer", address, "err", err)
|
||||
if err = r.peerManager.DialFailed(ctx, address); err != nil {
|
||||
if err = r.legacy.peerManager.DialFailed(ctx, address); err != nil {
|
||||
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
|
||||
}
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
|
||||
if err := r.runWithPeerMutex(func() error { return r.legacy.peerManager.Dialed(address) }); err != nil {
|
||||
r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err)
|
||||
r.peerManager.dialWaker.Wake()
|
||||
r.legacy.peerManager.dialWaker.Wake()
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
@@ -612,16 +701,16 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
|
||||
}
|
||||
|
||||
func (r *Router) getOrMakeQueue(peerID types.NodeID, channels ChannelIDSet) queue {
|
||||
r.peerMtx.Lock()
|
||||
defer r.peerMtx.Unlock()
|
||||
r.legacy.peerMtx.Lock()
|
||||
defer r.legacy.peerMtx.Unlock()
|
||||
|
||||
if peerQueue, ok := r.peerQueues[peerID]; ok {
|
||||
if peerQueue, ok := r.legacy.peerQueues[peerID]; ok {
|
||||
return peerQueue
|
||||
}
|
||||
|
||||
peerQueue := r.queueFactory(queueBufferDefault)
|
||||
r.peerQueues[peerID] = peerQueue
|
||||
r.peerChannels[peerID] = channels
|
||||
peerQueue := r.legacy.queueFactory(queueBufferDefault)
|
||||
r.legacy.peerQueues[peerID] = peerQueue
|
||||
r.legacy.peerChannels[peerID] = channels
|
||||
return peerQueue
|
||||
}
|
||||
|
||||
@@ -658,7 +747,7 @@ func (r *Router) dialPeer(ctx context.Context, address NodeAddress) (Connection,
|
||||
// by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us
|
||||
// on a private address on this endpoint, but a peer on the public
|
||||
// Internet can't and needs a different public address.
|
||||
conn, err := r.transport.Dial(dialCtx, endpoint)
|
||||
conn, err := r.legacy.transport.Dial(dialCtx, endpoint)
|
||||
if err != nil {
|
||||
r.logger.Debug("failed to dial endpoint", "peer", address.NodeID, "endpoint", endpoint, "err", err)
|
||||
} else {
|
||||
@@ -697,7 +786,7 @@ func (r *Router) handshakePeer(
|
||||
}
|
||||
|
||||
if err := nodeInfo.CompatibleWith(peerInfo); err != nil {
|
||||
if err := r.peerManager.Inactivate(peerInfo.NodeID); err != nil {
|
||||
if err := r.legacy.peerManager.Inactivate(peerInfo.NodeID); err != nil {
|
||||
return peerInfo, fmt.Errorf("problem inactivating peer %q: %w", peerInfo.ID(), err)
|
||||
}
|
||||
|
||||
@@ -711,8 +800,8 @@ func (r *Router) handshakePeer(
|
||||
}
|
||||
|
||||
func (r *Router) runWithPeerMutex(fn func() error) error {
|
||||
r.peerMtx.Lock()
|
||||
defer r.peerMtx.Unlock()
|
||||
r.legacy.peerMtx.Lock()
|
||||
defer r.legacy.peerMtx.Unlock()
|
||||
return fn()
|
||||
}
|
||||
|
||||
@@ -721,18 +810,18 @@ func (r *Router) runWithPeerMutex(fn func() error) error {
|
||||
// they are closed elsewhere it will cause this method to shut down and return.
|
||||
func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels ChannelIDSet) {
|
||||
r.metrics.PeersConnected.Add(1)
|
||||
r.peerManager.Ready(ctx, peerID, channels)
|
||||
r.legacy.peerManager.Ready(ctx, peerID, channels)
|
||||
|
||||
sendQueue := r.getOrMakeQueue(peerID, channels)
|
||||
defer func() {
|
||||
r.peerMtx.Lock()
|
||||
delete(r.peerQueues, peerID)
|
||||
delete(r.peerChannels, peerID)
|
||||
r.peerMtx.Unlock()
|
||||
r.legacy.peerMtx.Lock()
|
||||
delete(r.legacy.peerQueues, peerID)
|
||||
delete(r.legacy.peerChannels, peerID)
|
||||
r.legacy.peerMtx.Unlock()
|
||||
|
||||
sendQueue.close()
|
||||
|
||||
r.peerManager.Disconnected(ctx, peerID)
|
||||
r.legacy.peerManager.Disconnected(ctx, peerID)
|
||||
r.metrics.PeersConnected.Add(-1)
|
||||
}()
|
||||
|
||||
@@ -795,10 +884,10 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn
|
||||
return err
|
||||
}
|
||||
|
||||
r.channelMtx.RLock()
|
||||
queue, ok := r.channelQueues[chID]
|
||||
messageType := r.channelMessages[chID]
|
||||
r.channelMtx.RUnlock()
|
||||
r.legacy.channelMtx.RLock()
|
||||
queue, ok := r.legacy.channelQueues[chID]
|
||||
messageType := r.legacy.channelMessages[chID]
|
||||
r.legacy.channelMtx.RUnlock()
|
||||
|
||||
if !ok {
|
||||
r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID)
|
||||
@@ -876,7 +965,7 @@ func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connect
|
||||
// evictPeers evicts connected peers as requested by the peer manager.
|
||||
func (r *Router) evictPeers(ctx context.Context) {
|
||||
for {
|
||||
peerID, err := r.peerManager.EvictNext(ctx)
|
||||
peerID, err := r.legacy.peerManager.EvictNext(ctx)
|
||||
|
||||
switch {
|
||||
case errors.Is(err, context.Canceled):
|
||||
@@ -888,9 +977,9 @@ func (r *Router) evictPeers(ctx context.Context) {
|
||||
|
||||
r.logger.Info("evicting peer", "peer", peerID)
|
||||
|
||||
r.peerMtx.RLock()
|
||||
queue, ok := r.peerQueues[peerID]
|
||||
r.peerMtx.RUnlock()
|
||||
r.legacy.peerMtx.RLock()
|
||||
queue, ok := r.legacy.peerQueues[peerID]
|
||||
r.legacy.peerMtx.RUnlock()
|
||||
|
||||
r.metrics.PeersEvicted.Add(1)
|
||||
|
||||
@@ -906,23 +995,29 @@ func (r *Router) setupQueueFactory(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
r.queueFactory = qf
|
||||
r.legacy.queueFactory = qf
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStart implements service.Service.
|
||||
func (r *Router) OnStart(ctx context.Context) error {
|
||||
if r.options.UseLibP2P {
|
||||
return nil
|
||||
}
|
||||
|
||||
r.chainID = r.nodeInfoProducer().Network
|
||||
|
||||
if err := r.setupQueueFactory(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.transport.Listen(r.endpoint); err != nil {
|
||||
if err := r.legacy.transport.Listen(r.legacy.endpoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go r.dialPeers(ctx)
|
||||
go r.evictPeers(ctx)
|
||||
go r.acceptPeers(ctx, r.transport)
|
||||
go r.acceptPeers(ctx, r.legacy.transport)
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -934,25 +1029,38 @@ func (r *Router) OnStart(ctx context.Context) error {
|
||||
// here, since that would cause any reactor senders to panic, so it is the
|
||||
// sender's responsibility.
|
||||
func (r *Router) OnStop() {
|
||||
if r.options.UseLibP2P {
|
||||
for name, ch := range r.network.channels {
|
||||
if err := ch.Err(); err != nil {
|
||||
r.logger.Error("shutting down channel",
|
||||
"name", name,
|
||||
"err", ch.Err(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Close transport listeners (unblocks Accept calls).
|
||||
if err := r.transport.Close(); err != nil {
|
||||
if err := r.legacy.transport.Close(); err != nil {
|
||||
r.logger.Error("failed to close transport", "err", err)
|
||||
}
|
||||
|
||||
// Collect all remaining queues, and wait for them to close.
|
||||
queues := []queue{}
|
||||
|
||||
r.channelMtx.RLock()
|
||||
for _, q := range r.channelQueues {
|
||||
r.legacy.channelMtx.RLock()
|
||||
for _, q := range r.legacy.channelQueues {
|
||||
queues = append(queues, q)
|
||||
}
|
||||
r.channelMtx.RUnlock()
|
||||
r.legacy.channelMtx.RUnlock()
|
||||
|
||||
r.peerMtx.RLock()
|
||||
for _, q := range r.peerQueues {
|
||||
r.legacy.peerMtx.RLock()
|
||||
for _, q := range r.legacy.peerQueues {
|
||||
queues = append(queues, q)
|
||||
}
|
||||
r.peerMtx.RUnlock()
|
||||
r.legacy.peerMtx.RUnlock()
|
||||
|
||||
for _, q := range queues {
|
||||
q.close()
|
||||
|
||||
@@ -19,8 +19,7 @@ func TestConnectionFiltering(t *testing.T) {
|
||||
|
||||
filterByIPCount := 0
|
||||
router := &Router{
|
||||
logger: logger,
|
||||
connTracker: newConnTracker(1, time.Second),
|
||||
logger: logger,
|
||||
options: RouterOptions{
|
||||
FilterPeerByIP: func(ctx context.Context, ip net.IP, port uint16) error {
|
||||
filterByIPCount++
|
||||
@@ -28,6 +27,8 @@ func TestConnectionFiltering(t *testing.T) {
|
||||
},
|
||||
},
|
||||
}
|
||||
router.legacy.connTracker = newConnTracker(1, time.Second)
|
||||
|
||||
require.Equal(t, 0, filterByIPCount)
|
||||
router.openConnection(ctx, &MemoryConnection{logger: logger, closeFn: func() {}})
|
||||
require.Equal(t, 1, filterByIPCount)
|
||||
|
||||
@@ -11,47 +11,57 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func getDefaultRouterOptions() RouterOptions {
|
||||
return RouterOptions{
|
||||
LegacyTransport: &MemoryTransport{},
|
||||
LegacyEndpoint: &Endpoint{},
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
|
||||
}
|
||||
}
|
||||
func TestRouter_ConstructQueueFactory(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
t.Run("ValidateOptionsPopulatesDefaultQueue", func(t *testing.T) {
|
||||
opts := RouterOptions{}
|
||||
opts := getDefaultRouterOptions()
|
||||
require.NoError(t, opts.Validate())
|
||||
require.Equal(t, "fifo", opts.QueueType)
|
||||
})
|
||||
t.Run("Default", func(t *testing.T) {
|
||||
require.Zero(t, os.Getenv("TM_P2P_QUEUE"))
|
||||
opts := RouterOptions{}
|
||||
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
|
||||
opts := getDefaultRouterOptions()
|
||||
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, r.setupQueueFactory(ctx))
|
||||
|
||||
_, ok := r.queueFactory(1).(*fifoQueue)
|
||||
_, ok := r.legacy.queueFactory(1).(*fifoQueue)
|
||||
require.True(t, ok)
|
||||
})
|
||||
t.Run("Fifo", func(t *testing.T) {
|
||||
opts := RouterOptions{QueueType: queueTypeFifo}
|
||||
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
|
||||
opts := getDefaultRouterOptions()
|
||||
opts.QueueType = queueTypeFifo
|
||||
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, r.setupQueueFactory(ctx))
|
||||
|
||||
_, ok := r.queueFactory(1).(*fifoQueue)
|
||||
_, ok := r.legacy.queueFactory(1).(*fifoQueue)
|
||||
require.True(t, ok)
|
||||
})
|
||||
t.Run("Priority", func(t *testing.T) {
|
||||
opts := RouterOptions{QueueType: queueTypePriority}
|
||||
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
|
||||
opts := getDefaultRouterOptions()
|
||||
opts.QueueType = queueTypePriority
|
||||
r, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, r.setupQueueFactory(ctx))
|
||||
|
||||
q, ok := r.queueFactory(1).(*pqScheduler)
|
||||
q, ok := r.legacy.queueFactory(1).(*pqScheduler)
|
||||
require.True(t, ok)
|
||||
defer q.close()
|
||||
})
|
||||
t.Run("NonExistant", func(t *testing.T) {
|
||||
opts := RouterOptions{QueueType: "fast"}
|
||||
_, err := NewRouter(log.NewNopLogger(), nil, nil, nil, func() *types.NodeInfo { return &types.NodeInfo{} }, nil, nil, opts)
|
||||
opts := getDefaultRouterOptions()
|
||||
opts.QueueType = "fast"
|
||||
_, err := NewRouter(log.NewNopLogger(), nil, nil, nil, opts)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "fast")
|
||||
})
|
||||
|
||||
@@ -14,6 +14,8 @@ import (
|
||||
"github.com/fortytw2/leaktest"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
gogotypes "github.com/gogo/protobuf/types"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
dbm "github.com/tendermint/tm-db"
|
||||
@@ -26,7 +28,70 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func echoReactor(ctx context.Context, channel *p2p.Channel) {
|
||||
func TestRouterConstruction(t *testing.T) {
|
||||
t.Run("Legacy", func(t *testing.T) {
|
||||
opts := p2p.RouterOptions{
|
||||
UseLibP2P: false,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
LegacyTransport: &p2p.MemoryTransport{},
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
|
||||
}
|
||||
if err := opts.Validate(); err != nil {
|
||||
t.Fatalf("options should validate: %v", err)
|
||||
}
|
||||
|
||||
logger := log.NewNopLogger()
|
||||
metrics := p2p.NopMetrics()
|
||||
|
||||
router, err := p2p.NewRouter(
|
||||
logger,
|
||||
metrics,
|
||||
nil, // privkey
|
||||
nil, // peermanager
|
||||
opts,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal("problem constructing legacy router", err)
|
||||
}
|
||||
if router == nil {
|
||||
t.Error("router was not constructed when it should not have been")
|
||||
}
|
||||
})
|
||||
t.Run("LibP2P", func(t *testing.T) {
|
||||
opts := p2p.RouterOptions{
|
||||
UseLibP2P: true,
|
||||
LegacyEndpoint: nil,
|
||||
LegacyTransport: nil,
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
|
||||
NetworkHost: &basichost.BasicHost{},
|
||||
NetworkPubSub: &pubsub.PubSub{},
|
||||
}
|
||||
if err := opts.Validate(); err != nil {
|
||||
t.Fatalf("options should validate: %v", err)
|
||||
}
|
||||
|
||||
logger := log.NewNopLogger()
|
||||
metrics := p2p.NopMetrics()
|
||||
|
||||
router, err := p2p.NewRouter(
|
||||
logger,
|
||||
metrics,
|
||||
nil, // privkey
|
||||
nil, // peermanager
|
||||
opts,
|
||||
)
|
||||
if err == nil {
|
||||
t.Error("support for libp2p does not exist, and should prevent the router from being constructed")
|
||||
} else if err.Error() != "libp2p is not (yet) supported" {
|
||||
t.Errorf("incorrect error: %q", err.Error())
|
||||
}
|
||||
if router != nil {
|
||||
t.Error("router was constructed and should not have have been")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func echoReactor(ctx context.Context, channel p2p.Channel) {
|
||||
iter := channel.Receive(ctx)
|
||||
for iter.Next(ctx) {
|
||||
envelope := iter.Envelope()
|
||||
@@ -112,10 +177,11 @@ func TestRouter_Channel_Basic(t *testing.T) {
|
||||
p2p.NopMetrics(),
|
||||
selfKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &selfInfo },
|
||||
testnet.RandomNode().Transport,
|
||||
&p2p.Endpoint{},
|
||||
p2p.RouterOptions{},
|
||||
p2p.RouterOptions{
|
||||
LegacyTransport: testnet.RandomNode().Transport,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -411,10 +477,11 @@ func TestRouter_AcceptPeers(t *testing.T) {
|
||||
p2p.NopMetrics(),
|
||||
selfKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &selfInfo },
|
||||
mockTransport,
|
||||
nil,
|
||||
p2p.RouterOptions{},
|
||||
p2p.RouterOptions{
|
||||
LegacyTransport: mockTransport,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, router.Start(ctx))
|
||||
@@ -451,27 +518,27 @@ func TestRouter_AcceptPeers_Errors(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Set up a mock transport that returns io.EOF once, which should prevent
|
||||
// the router from calling Accept again.
|
||||
mockTransport := &mocks.Transport{}
|
||||
mockTransport.On("String").Maybe().Return("mock")
|
||||
mockTransport.On("Accept", mock.Anything).Once().Return(nil, err)
|
||||
mockTransport.On("Accept", mock.Anything).Once().Return(nil, io.EOF)
|
||||
mockTransport.On("Close").Return(nil)
|
||||
mockTransport.On("Listen", mock.Anything).Return(nil)
|
||||
|
||||
// Set up and start the router.
|
||||
peerManager, err := p2p.NewPeerManager(selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{})
|
||||
require.NoError(t, err)
|
||||
|
||||
router, err := p2p.NewRouter(
|
||||
log.NewNopLogger(),
|
||||
p2p.NopMetrics(),
|
||||
selfKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &selfInfo },
|
||||
mockTransport,
|
||||
nil,
|
||||
p2p.RouterOptions{},
|
||||
p2p.RouterOptions{
|
||||
LegacyTransport: mockTransport,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -523,10 +590,11 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
|
||||
p2p.NopMetrics(),
|
||||
selfKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &selfInfo },
|
||||
mockTransport,
|
||||
nil,
|
||||
p2p.RouterOptions{},
|
||||
p2p.RouterOptions{
|
||||
LegacyTransport: mockTransport,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, router.Start(ctx))
|
||||
@@ -626,10 +694,11 @@ func TestRouter_DialPeers(t *testing.T) {
|
||||
p2p.NopMetrics(),
|
||||
selfKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &selfInfo },
|
||||
mockTransport,
|
||||
nil,
|
||||
p2p.RouterOptions{},
|
||||
p2p.RouterOptions{
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
|
||||
LegacyTransport: mockTransport,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, router.Start(ctx))
|
||||
@@ -711,10 +780,9 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
|
||||
p2p.NopMetrics(),
|
||||
selfKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &selfInfo },
|
||||
mockTransport,
|
||||
nil,
|
||||
p2p.RouterOptions{
|
||||
LegacyTransport: mockTransport,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
NumConcurrentDials: func() int {
|
||||
ncpu := runtime.NumCPU()
|
||||
if ncpu <= 3 {
|
||||
@@ -784,10 +852,11 @@ func TestRouter_EvictPeers(t *testing.T) {
|
||||
p2p.NopMetrics(),
|
||||
selfKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &selfInfo },
|
||||
mockTransport,
|
||||
nil,
|
||||
p2p.RouterOptions{},
|
||||
p2p.RouterOptions{
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
|
||||
LegacyTransport: mockTransport,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, router.Start(ctx))
|
||||
@@ -846,10 +915,11 @@ func TestRouter_ChannelCompatability(t *testing.T) {
|
||||
p2p.NopMetrics(),
|
||||
selfKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &selfInfo },
|
||||
mockTransport,
|
||||
nil,
|
||||
p2p.RouterOptions{},
|
||||
p2p.RouterOptions{
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
|
||||
LegacyTransport: mockTransport,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, router.Start(ctx))
|
||||
@@ -901,10 +971,11 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
|
||||
p2p.NopMetrics(),
|
||||
selfKey,
|
||||
peerManager,
|
||||
func() *types.NodeInfo { return &selfInfo },
|
||||
mockTransport,
|
||||
nil,
|
||||
p2p.RouterOptions{},
|
||||
p2p.RouterOptions{
|
||||
NodeInfoProducer: func() *types.NodeInfo { return &selfInfo },
|
||||
LegacyTransport: mockTransport,
|
||||
LegacyEndpoint: &p2p.Endpoint{},
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, router.Start(ctx))
|
||||
|
||||
@@ -26,14 +26,14 @@ var (
|
||||
// NOTE: It is not the responsibility of the dispatcher to verify the light blocks.
|
||||
type Dispatcher struct {
|
||||
// the channel with which to send light block requests on
|
||||
requestCh *p2p.Channel
|
||||
requestCh p2p.Channel
|
||||
|
||||
mtx sync.Mutex
|
||||
// all pending calls that have been dispatched and are awaiting an answer
|
||||
calls map[types.NodeID]chan *types.LightBlock
|
||||
}
|
||||
|
||||
func NewDispatcher(requestChannel *p2p.Channel) *Dispatcher {
|
||||
func NewDispatcher(requestChannel p2p.Channel) *Dispatcher {
|
||||
return &Dispatcher{
|
||||
requestCh: requestChannel,
|
||||
calls: make(map[types.NodeID]chan *types.LightBlock),
|
||||
|
||||
@@ -24,13 +24,13 @@ type channelInternal struct {
|
||||
Error chan p2p.PeerError
|
||||
}
|
||||
|
||||
func testChannel(size int) (*channelInternal, *p2p.Channel) {
|
||||
func testChannel(size int) (*channelInternal, p2p.Channel) {
|
||||
in := &channelInternal{
|
||||
In: make(chan p2p.Envelope, size),
|
||||
Out: make(chan p2p.Envelope, size),
|
||||
Error: make(chan p2p.PeerError, size),
|
||||
}
|
||||
return in, p2p.NewChannel(0, in.In, in.Out, in.Error)
|
||||
return in, p2p.NewChannel(0, "test", in.In, in.Out, in.Error)
|
||||
}
|
||||
|
||||
func TestDispatcherBasic(t *testing.T) {
|
||||
|
||||
@@ -305,7 +305,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
go r.processChannels(ctx, map[p2p.ChannelID]*p2p.Channel{
|
||||
go r.processChannels(ctx, map[p2p.ChannelID]p2p.Channel{
|
||||
SnapshotChannel: snapshotCh,
|
||||
ChunkChannel: chunkCh,
|
||||
LightBlockChannel: blockCh,
|
||||
@@ -611,7 +611,7 @@ func (r *Reactor) backfill(
|
||||
// handleSnapshotMessage handles envelopes sent from peers on the
|
||||
// SnapshotChannel. It returns an error only if the Envelope.Message is unknown
|
||||
// for this channel. This should never be called outside of handleMessage.
|
||||
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envelope, snapshotCh p2p.Channel) error {
|
||||
logger := r.logger.With("peer", envelope.From)
|
||||
|
||||
switch msg := envelope.Message.(type) {
|
||||
@@ -683,7 +683,7 @@ func (r *Reactor) handleSnapshotMessage(ctx context.Context, envelope *p2p.Envel
|
||||
// handleChunkMessage handles envelopes sent from peers on the ChunkChannel.
|
||||
// It returns an error only if the Envelope.Message is unknown for this channel.
|
||||
// This should never be called outside of handleMessage.
|
||||
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope, chunkCh p2p.Channel) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.ChunkRequest:
|
||||
r.logger.Debug(
|
||||
@@ -772,7 +772,7 @@ func (r *Reactor) handleChunkMessage(ctx context.Context, envelope *p2p.Envelope
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Envelope, blockCh p2p.Channel) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.LightBlockRequest:
|
||||
r.logger.Info("received light block request", "height", msg.Height)
|
||||
@@ -829,7 +829,7 @@ func (r *Reactor) handleLightBlockMessage(ctx context.Context, envelope *p2p.Env
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh *p2p.Channel) error {
|
||||
func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelope, paramsCh p2p.Channel) error {
|
||||
switch msg := envelope.Message.(type) {
|
||||
case *ssproto.ParamsRequest:
|
||||
r.logger.Debug("received consensus params request", "height", msg.Height)
|
||||
@@ -878,7 +878,7 @@ func (r *Reactor) handleParamsMessage(ctx context.Context, envelope *p2p.Envelop
|
||||
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
|
||||
// It will handle errors and any possible panics gracefully. A caller can handle
|
||||
// any error returned by sending a PeerError on the respective channel.
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]*p2p.Channel) (err error) {
|
||||
func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, chans map[p2p.ChannelID]p2p.Channel) (err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("panic in processing message: %v", e)
|
||||
@@ -912,12 +912,12 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, cha
|
||||
// encountered during message execution will result in a PeerError being sent on
|
||||
// the respective channel. When the reactor is stopped, we will catch the signal
|
||||
// and close the p2p Channel gracefully.
|
||||
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]*p2p.Channel) {
|
||||
// make sure that the iterator gets cleaned up in case of error
|
||||
func (r *Reactor) processChannels(ctx context.Context, chanTable map[p2p.ChannelID]p2p.Channel) {
|
||||
// make sure tht the iterator gets cleaned up in case of error
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
chs := make([]*p2p.Channel, 0, len(chanTable))
|
||||
chs := make([]p2p.Channel, 0, len(chanTable))
|
||||
for key := range chanTable {
|
||||
chs = append(chs, chanTable[key])
|
||||
}
|
||||
|
||||
@@ -40,22 +40,22 @@ type reactorTestSuite struct {
|
||||
conn *clientmocks.Client
|
||||
stateProvider *mocks.StateProvider
|
||||
|
||||
snapshotChannel *p2p.Channel
|
||||
snapshotChannel p2p.Channel
|
||||
snapshotInCh chan p2p.Envelope
|
||||
snapshotOutCh chan p2p.Envelope
|
||||
snapshotPeerErrCh chan p2p.PeerError
|
||||
|
||||
chunkChannel *p2p.Channel
|
||||
chunkChannel p2p.Channel
|
||||
chunkInCh chan p2p.Envelope
|
||||
chunkOutCh chan p2p.Envelope
|
||||
chunkPeerErrCh chan p2p.PeerError
|
||||
|
||||
blockChannel *p2p.Channel
|
||||
blockChannel p2p.Channel
|
||||
blockInCh chan p2p.Envelope
|
||||
blockOutCh chan p2p.Envelope
|
||||
blockPeerErrCh chan p2p.PeerError
|
||||
|
||||
paramsChannel *p2p.Channel
|
||||
paramsChannel p2p.Channel
|
||||
paramsInCh chan p2p.Envelope
|
||||
paramsOutCh chan p2p.Envelope
|
||||
paramsPeerErrCh chan p2p.PeerError
|
||||
@@ -102,6 +102,7 @@ func setup(
|
||||
|
||||
rts.snapshotChannel = p2p.NewChannel(
|
||||
SnapshotChannel,
|
||||
"snapshot",
|
||||
rts.snapshotInCh,
|
||||
rts.snapshotOutCh,
|
||||
rts.snapshotPeerErrCh,
|
||||
@@ -109,6 +110,7 @@ func setup(
|
||||
|
||||
rts.chunkChannel = p2p.NewChannel(
|
||||
ChunkChannel,
|
||||
"chunk",
|
||||
rts.chunkInCh,
|
||||
rts.chunkOutCh,
|
||||
rts.chunkPeerErrCh,
|
||||
@@ -116,6 +118,7 @@ func setup(
|
||||
|
||||
rts.blockChannel = p2p.NewChannel(
|
||||
LightBlockChannel,
|
||||
"lightblock",
|
||||
rts.blockInCh,
|
||||
rts.blockOutCh,
|
||||
rts.blockPeerErrCh,
|
||||
@@ -123,6 +126,7 @@ func setup(
|
||||
|
||||
rts.paramsChannel = p2p.NewChannel(
|
||||
ParamsChannel,
|
||||
"params",
|
||||
rts.paramsInCh,
|
||||
rts.paramsOutCh,
|
||||
rts.paramsPeerErrCh,
|
||||
@@ -133,7 +137,7 @@ func setup(
|
||||
|
||||
cfg := config.DefaultStateSyncConfig()
|
||||
|
||||
chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
|
||||
chCreator := func(ctx context.Context, desc *p2p.ChannelDescriptor) (p2p.Channel, error) {
|
||||
switch desc.ID {
|
||||
case SnapshotChannel:
|
||||
return rts.snapshotChannel, nil
|
||||
|
||||
@@ -208,7 +208,7 @@ type stateProviderP2P struct {
|
||||
sync.Mutex // light.Client is not concurrency-safe
|
||||
lc *light.Client
|
||||
initialHeight int64
|
||||
paramsSendCh *p2p.Channel
|
||||
paramsSendCh p2p.Channel
|
||||
paramsRecvCh chan types.ConsensusParams
|
||||
}
|
||||
|
||||
@@ -220,7 +220,7 @@ func NewP2PStateProvider(
|
||||
initialHeight int64,
|
||||
providers []lightprovider.Provider,
|
||||
trustOptions light.TrustOptions,
|
||||
paramsSendCh *p2p.Channel,
|
||||
paramsSendCh p2p.Channel,
|
||||
logger log.Logger,
|
||||
) (StateProvider, error) {
|
||||
if len(providers) < 2 {
|
||||
|
||||
@@ -56,8 +56,8 @@ type syncer struct {
|
||||
stateProvider StateProvider
|
||||
conn abciclient.Client
|
||||
snapshots *snapshotPool
|
||||
snapshotCh *p2p.Channel
|
||||
chunkCh *p2p.Channel
|
||||
snapshotCh p2p.Channel
|
||||
chunkCh p2p.Channel
|
||||
tempDir string
|
||||
fetchers int32
|
||||
retryTimeout time.Duration
|
||||
|
||||
@@ -88,6 +88,7 @@ func newDefaultNode(
|
||||
}
|
||||
if cfg.Mode == config.ModeSeed {
|
||||
return makeSeedNode(
|
||||
ctx,
|
||||
logger,
|
||||
cfg,
|
||||
config.DefaultDBProvider,
|
||||
@@ -248,7 +249,7 @@ func makeNode(
|
||||
},
|
||||
}
|
||||
|
||||
node.router, err = createRouter(logger, nodeMetrics.p2p, node.NodeInfo, nodeKey, peerManager, cfg, proxyApp)
|
||||
node.router, err = createRouter(ctx, logger, nodeMetrics.p2p, node.NodeInfo, nodeKey, peerManager, cfg, proxyApp)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("failed to create router: %w", err),
|
||||
@@ -716,6 +717,7 @@ func loadStateFromDBOrGenesisDocProvider(stateStore sm.Store, genDoc *types.Gene
|
||||
func getRouterConfig(conf *config.Config, appClient abciclient.Client) p2p.RouterOptions {
|
||||
opts := p2p.RouterOptions{
|
||||
QueueType: conf.P2P.QueueType,
|
||||
UseLibP2P: conf.P2P.UseLibP2P,
|
||||
HandshakeTimeout: conf.P2P.HandshakeTimeout,
|
||||
DialTimeout: conf.P2P.DialTimeout,
|
||||
}
|
||||
|
||||
@@ -598,6 +598,7 @@ func TestNodeNewSeedNode(t *testing.T) {
|
||||
logger := log.NewNopLogger()
|
||||
|
||||
ns, err := makeSeedNode(
|
||||
ctx,
|
||||
logger,
|
||||
cfg,
|
||||
config.DefaultDBProvider,
|
||||
|
||||
@@ -31,8 +31,7 @@ func NewDefault(
|
||||
// Genesis document: if the value is nil, the genesis document is read
|
||||
// from the file specified in the config, and otherwise the node uses
|
||||
// value of the final argument.
|
||||
func New(
|
||||
ctx context.Context,
|
||||
func New(ctx context.Context,
|
||||
conf *config.Config,
|
||||
logger log.Logger,
|
||||
cf abciclient.Client,
|
||||
@@ -68,7 +67,7 @@ func New(
|
||||
config.DefaultDBProvider,
|
||||
logger)
|
||||
case config.ModeSeed:
|
||||
return makeSeedNode(logger, conf, config.DefaultDBProvider, nodeKey, genProvider)
|
||||
return makeSeedNode(ctx, logger, conf, config.DefaultDBProvider, nodeKey, genProvider)
|
||||
default:
|
||||
return nil, fmt.Errorf("%q is not a valid mode", conf.Mode)
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ type seedNodeImpl struct {
|
||||
|
||||
// makeSeedNode returns a new seed node, containing only p2p, pex reactor
|
||||
func makeSeedNode(
|
||||
ctx context.Context,
|
||||
logger log.Logger,
|
||||
cfg *config.Config,
|
||||
dbProvider config.DBProvider,
|
||||
@@ -74,7 +75,7 @@ func makeSeedNode(
|
||||
closer)
|
||||
}
|
||||
|
||||
router, err := createRouter(logger, p2pMetrics, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, nil)
|
||||
router, err := createRouter(ctx, logger, p2pMetrics, func() *types.NodeInfo { return &nodeInfo }, nodeKey, peerManager, cfg, nil)
|
||||
if err != nil {
|
||||
return nil, combineCloseError(
|
||||
fmt.Errorf("failed to create router: %w", err),
|
||||
|
||||
@@ -288,6 +288,7 @@ func createPeerManager(
|
||||
}
|
||||
|
||||
func createRouter(
|
||||
ctx context.Context,
|
||||
logger log.Logger,
|
||||
p2pMetrics *p2p.Metrics,
|
||||
nodeInfoProducer func() *types.NodeInfo,
|
||||
@@ -298,22 +299,40 @@ func createRouter(
|
||||
) (*p2p.Router, error) {
|
||||
|
||||
p2pLogger := logger.With("module", "p2p")
|
||||
opts := getRouterConfig(cfg, appClient)
|
||||
opts.NodeInfoProducer = nodeInfoProducer
|
||||
|
||||
transportConf := conn.DefaultMConnConfig()
|
||||
transportConf.FlushThrottle = cfg.P2P.FlushThrottleTimeout
|
||||
transportConf.SendRate = cfg.P2P.SendRate
|
||||
transportConf.RecvRate = cfg.P2P.RecvRate
|
||||
transportConf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize
|
||||
transport := p2p.NewMConnTransport(
|
||||
p2pLogger, transportConf, []*p2p.ChannelDescriptor{},
|
||||
p2p.MConnTransportOptions{
|
||||
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
|
||||
},
|
||||
)
|
||||
if cfg.P2P.UseLibP2P {
|
||||
host, err := p2p.NewHost(cfg.P2P)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts.NetworkHost = host
|
||||
|
||||
ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(cfg.P2P.ListenAddress))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
ps, err := p2p.NewPubSub(ctx, cfg.P2P, host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts.NetworkPubSub = ps
|
||||
} else {
|
||||
transportConf := conn.DefaultMConnConfig()
|
||||
transportConf.FlushThrottle = cfg.P2P.FlushThrottleTimeout
|
||||
transportConf.SendRate = cfg.P2P.SendRate
|
||||
transportConf.RecvRate = cfg.P2P.RecvRate
|
||||
transportConf.MaxPacketMsgPayloadSize = cfg.P2P.MaxPacketMsgPayloadSize
|
||||
transport := p2p.NewMConnTransport(
|
||||
p2pLogger, transportConf, []*p2p.ChannelDescriptor{},
|
||||
p2p.MConnTransportOptions{
|
||||
MaxAcceptedConnections: uint32(cfg.P2P.MaxConnections),
|
||||
},
|
||||
)
|
||||
|
||||
ep, err := p2p.NewEndpoint(nodeKey.ID.AddressString(cfg.P2P.ListenAddress))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts.LegacyEndpoint = ep
|
||||
opts.LegacyTransport = transport
|
||||
}
|
||||
|
||||
return p2p.NewRouter(
|
||||
@@ -321,10 +340,7 @@ func createRouter(
|
||||
p2pMetrics,
|
||||
nodeKey.PrivKey,
|
||||
peerManager,
|
||||
nodeInfoProducer,
|
||||
transport,
|
||||
ep,
|
||||
getRouterConfig(cfg, appClient),
|
||||
opts,
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user