p2p: scope and add libp2p components (#8443)

This commit is contained in:
Sam Kleinman
2022-04-29 11:57:40 -04:00
committed by GitHub
parent e7ed0720fd
commit 9f94161163
6 changed files with 781 additions and 109 deletions

99
go.mod
View File

@@ -30,10 +30,10 @@ require (
github.com/stretchr/testify v1.7.1
github.com/tendermint/tm-db v0.6.6
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
golang.org/x/net v0.0.0-20220412020605-290c469a71a5
golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.46.0
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
pgregory.net/rapid v0.4.7
)
@@ -46,7 +46,98 @@ require (
gotest.tools v2.2.0+incompatible
)
require github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/cheekybits/genny v1.0.0 // indirect
github.com/containerd/cgroups v1.0.3 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/huin/goupnp v1.0.3 // indirect
github.com/ipfs/go-cid v0.1.0 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/koron/go-ssdp v0.0.2 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-conn-security-multistream v0.3.0 // indirect
github.com/libp2p/go-eventbus v0.2.1 // indirect
github.com/libp2p/go-flow-metrics v0.0.3 // indirect
github.com/libp2p/go-libp2p v0.19.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.1.0 // indirect
github.com/libp2p/go-libp2p-blankhost v0.3.0 // indirect
github.com/libp2p/go-libp2p-core v0.15.1 // indirect
github.com/libp2p/go-libp2p-discovery v0.6.0 // indirect
github.com/libp2p/go-libp2p-nat v0.1.0 // indirect
github.com/libp2p/go-libp2p-noise v0.4.0 // indirect
github.com/libp2p/go-libp2p-peerstore v0.6.0 // indirect
github.com/libp2p/go-libp2p-pnet v0.2.0 // indirect
github.com/libp2p/go-libp2p-pubsub v0.6.1 // indirect
github.com/libp2p/go-libp2p-quic-transport v0.17.0 // indirect
github.com/libp2p/go-libp2p-resource-manager v0.2.1 // indirect
github.com/libp2p/go-libp2p-swarm v0.10.2 // indirect
github.com/libp2p/go-libp2p-tls v0.4.1 // indirect
github.com/libp2p/go-libp2p-transport-upgrader v0.7.1 // indirect
github.com/libp2p/go-libp2p-yamux v0.9.1 // indirect
github.com/libp2p/go-msgio v0.2.0 // indirect
github.com/libp2p/go-nat v0.1.0 // indirect
github.com/libp2p/go-netroute v0.2.0 // indirect
github.com/libp2p/go-openssl v0.0.7 // indirect
github.com/libp2p/go-reuseport v0.1.0 // indirect
github.com/libp2p/go-reuseport-transport v0.1.0 // indirect
github.com/libp2p/go-stream-muxer-multistream v0.4.0 // indirect
github.com/libp2p/go-tcp-transport v0.5.1 // indirect
github.com/libp2p/go-ws-transport v0.6.0 // indirect
github.com/libp2p/go-yamux/v3 v3.1.1 // indirect
github.com/lucas-clemente/quic-go v0.27.0 // indirect
github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect
github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect
github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/miekg/dns v1.1.48 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.0.4 // indirect
github.com/multiformats/go-base36 v0.1.0 // indirect
github.com/multiformats/go-multiaddr v0.5.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.0.3 // indirect
github.com/multiformats/go-multicodec v0.4.1 // indirect
github.com/multiformats/go-multihash v0.1.0 // indirect
github.com/multiformats/go-multistream v0.3.0 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/opencontainers/runtime-spec v1.0.3-0.20210326190908-1c3f411f0417 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pelletier/go-toml/v2 v2.0.0-beta.8 // indirect
github.com/raulk/clock v1.1.0 // indirect
github.com/raulk/go-watchdog v1.2.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
require (
4d63.com/gochecknoglobals v0.1.0 // indirect
@@ -166,7 +257,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polyfloyd/go-errorlint v0.0.0-20211125173453-6d6d39c5bb8b // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/common v0.33.0 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/quasilyte/go-ruleguard v0.3.15 // indirect
github.com/quasilyte/gogrep v0.0.0-20220103110004-ffaa07af02e3 // indirect

546
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,8 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/libp2p/go-libp2p-core/host"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/libs/log"
@@ -83,6 +85,9 @@ type RouterOptions struct {
LegacyTransport Transport
LegacyEndpoint *Endpoint
NetworkHost host.Host
NetworkPubSub *pubsub.PubSub
}
const (
@@ -112,6 +117,12 @@ func (o *RouterOptions) Validate() error {
if o.LegacyEndpoint != nil {
return errors.New("when using libp2p you must not specify legacy components (endpoint)")
}
if o.NetworkHost == nil {
return errors.New("when using libp2p you must specify network components (host)")
}
if o.NetworkPubSub == nil {
return errors.New("when using libp2p you must specify network components (pubsub)")
}
} else {
if o.LegacyTransport == nil {
return errors.New("when using legacy p2p you must specify a transport")
@@ -119,6 +130,12 @@ func (o *RouterOptions) Validate() error {
if o.LegacyEndpoint == nil {
return errors.New("when using legacy p2p you must specify an endpoint")
}
if o.NetworkHost != nil {
return errors.New("when using legacy p2p you must not specify libp2p components (host)")
}
if o.NetworkPubSub != nil {
return errors.New("when using legacy p2p you must not specify libp2p components (pubsub)")
}
}
switch {
@@ -177,30 +194,38 @@ func (o *RouterOptions) Validate() error {
// quality of service.
type Router struct {
*service.BaseService
logger log.Logger
logger log.Logger
metrics *Metrics
metrics *Metrics
options RouterOptions
privKey crypto.PrivKey
peerManager *PeerManager
chDescs []*ChannelDescriptor
transport Transport
endpoint *Endpoint
connTracker connectionTracker
options RouterOptions
privKey crypto.PrivKey
chDescs []*ChannelDescriptor
peerMtx sync.RWMutex
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
// the channels that the peer queue has open
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue
nodeInfoProducer func() *types.NodeInfo
// FIXME: We don't strictly need to use a mutex for this if we seal the
// channels on router start. This depends on whether we want to allow
// dynamic channels in the future.
channelMtx sync.RWMutex
channelQueues map[ChannelID]queue // inbound messages from all peers to a single channel
channelMessages map[ChannelID]proto.Message
legacy struct {
peerManager *PeerManager
transport Transport
endpoint *Endpoint
connTracker connectionTracker
peerMtx sync.RWMutex
peerQueues map[types.NodeID]queue // outbound messages per peer for all channels
// the channels that the peer queue has open
peerChannels map[types.NodeID]ChannelIDSet
queueFactory func(int) queue
// FIXME: We don't strictly need to use a mutex for this if we seal the
// channels on router start. This depends on whether we want to allow
// dynamic channels in the future.
channelMtx sync.RWMutex
channelQueues map[ChannelID]queue // inbound messages from all peers to a single channel
channelMessages map[ChannelID]proto.Message
}
network struct {
host host.Host // network handle for ourselves
ps *pubsub.PubSub
}
}
// NewRouter creates a new Router. The given Transports must already be
@@ -211,33 +236,38 @@ func NewRouter(logger log.Logger, metrics *Metrics, key crypto.PrivKey, pm *Peer
return nil, err
}
if opts.UseLibP2P {
return nil, errors.New("libp2p is not supported")
}
router := &Router{
logger: logger,
metrics: metrics,
privKey: key,
nodeInfoProducer: opts.NodeInfoProducer,
connTracker: newConnTracker(
opts.MaxIncomingConnectionAttempts,
opts.IncomingConnectionWindow,
),
chDescs: make([]*ChannelDescriptor, 0),
transport: opts.LegacyTransport,
endpoint: opts.LegacyEndpoint,
peerManager: pm,
options: opts,
channelQueues: map[ChannelID]queue{},
channelMessages: map[ChannelID]proto.Message{},
peerQueues: map[types.NodeID]queue{},
peerChannels: make(map[types.NodeID]ChannelIDSet),
options: opts,
chDescs: make([]*ChannelDescriptor, 0),
}
router.BaseService = service.NewBaseService(logger, "router", router)
return router, nil
switch {
case opts.UseLibP2P:
router.network.host = opts.NetworkHost
router.options.NetworkPubSub = opts.NetworkPubSub
return nil, errors.New("libp2p is not (yet) supported")
default:
router.legacy.connTracker = newConnTracker(
opts.MaxIncomingConnectionAttempts,
opts.IncomingConnectionWindow,
)
router.legacy.transport = opts.LegacyTransport
router.legacy.endpoint = opts.LegacyEndpoint
router.legacy.peerManager = pm
router.legacy.channelQueues = map[ChannelID]queue{}
router.legacy.channelMessages = map[ChannelID]proto.Message{}
router.legacy.peerQueues = map[types.NodeID]queue{}
router.legacy.peerChannels = make(map[types.NodeID]ChannelIDSet)
return router, nil
}
}
func (r *Router) createQueueFactory(ctx context.Context) (func(int) queue, error) {
@@ -273,18 +303,18 @@ type ChannelCreator func(context.Context, *ChannelDescriptor) (*Channel, error)
// wrapper message. The caller may provide a size to make the channel buffered,
// which internally makes the inbound, outbound, and error channel buffered.
func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*Channel, error) {
r.channelMtx.Lock()
defer r.channelMtx.Unlock()
r.legacy.channelMtx.Lock()
defer r.legacy.channelMtx.Unlock()
id := chDesc.ID
if _, ok := r.channelQueues[id]; ok {
if _, ok := r.legacy.channelQueues[id]; ok {
return nil, fmt.Errorf("channel %v already exists", id)
}
r.chDescs = append(r.chDescs, chDesc)
messageType := chDesc.MessageType
queue := r.queueFactory(chDesc.RecvBufferCapacity)
queue := r.legacy.queueFactory(chDesc.RecvBufferCapacity)
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
errCh := make(chan PeerError, chDesc.RecvBufferCapacity)
channel := NewChannel(id, messageType, queue.dequeue(), outCh, errCh)
@@ -295,20 +325,20 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (*C
wrapper = w
}
r.channelQueues[id] = queue
r.channelMessages[id] = messageType
r.legacy.channelQueues[id] = queue
r.legacy.channelMessages[id] = messageType
// add the channel to the nodeInfo if it's not already there.
r.nodeInfoProducer().AddChannel(uint16(chDesc.ID))
r.transport.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
r.legacy.transport.AddChannelDescriptors([]*ChannelDescriptor{chDesc})
go func() {
defer func() {
r.channelMtx.Lock()
delete(r.channelQueues, id)
delete(r.channelMessages, id)
r.channelMtx.Unlock()
r.legacy.channelMtx.Lock()
delete(r.legacy.channelQueues, id)
delete(r.legacy.channelMessages, id)
r.legacy.channelMtx.Unlock()
queue.close()
}()
@@ -355,11 +385,11 @@ func (r *Router) routeChannel(
// collect peer queues to pass the message via
var queues []queue
if envelope.Broadcast {
r.peerMtx.RLock()
r.legacy.peerMtx.RLock()
queues = make([]queue, 0, len(r.peerQueues))
for nodeID, q := range r.peerQueues {
peerChs := r.peerChannels[nodeID]
queues = make([]queue, 0, len(r.legacy.peerQueues))
for nodeID, q := range r.legacy.peerQueues {
peerChs := r.legacy.peerChannels[nodeID]
// check whether the peer is receiving on that channel
if _, ok := peerChs[chID]; ok {
@@ -367,19 +397,19 @@ func (r *Router) routeChannel(
}
}
r.peerMtx.RUnlock()
r.legacy.peerMtx.RUnlock()
} else {
r.peerMtx.RLock()
r.legacy.peerMtx.RLock()
q, ok := r.peerQueues[envelope.To]
q, ok := r.legacy.peerQueues[envelope.To]
contains := false
if ok {
peerChs := r.peerChannels[envelope.To]
peerChs := r.legacy.peerChannels[envelope.To]
// check whether the peer is receiving on that channel
_, contains = peerChs[chID]
}
r.peerMtx.RUnlock()
r.legacy.peerMtx.RUnlock()
if !ok {
r.logger.Debug("dropping message for unconnected peer", "peer", envelope.To, "channel", chID)
@@ -420,7 +450,7 @@ func (r *Router) routeChannel(
r.logger.Error("peer error, evicting", "peer", peerError.NodeID, "err", peerError.Err)
r.peerManager.Errored(peerError.NodeID, peerError.Err)
r.legacy.peerManager.Errored(peerError.NodeID, peerError.Err)
case <-ctx.Done():
return
}
@@ -491,7 +521,7 @@ func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
}
incomingIP := conn.RemoteEndpoint().IP
if err := r.connTracker.AddConn(incomingIP); err != nil {
if err := r.legacy.connTracker.AddConn(incomingIP); err != nil {
closeErr := conn.Close()
r.logger.Debug("rate limiting incoming peer",
"err", err,
@@ -510,7 +540,7 @@ func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
func (r *Router) openConnection(ctx context.Context, conn Connection) {
defer conn.Close()
defer r.connTracker.RemoveConn(conn.RemoteEndpoint().IP)
defer r.legacy.connTracker.RemoveConn(conn.RemoteEndpoint().IP)
re := conn.RemoteEndpoint()
incomingIP := re.IP
@@ -548,7 +578,7 @@ func (r *Router) openConnection(ctx context.Context, conn Connection) {
return
}
if err := r.runWithPeerMutex(func() error { return r.peerManager.Accepted(peerInfo.NodeID) }); err != nil {
if err := r.runWithPeerMutex(func() error { return r.legacy.peerManager.Accepted(peerInfo.NodeID) }); err != nil {
r.logger.Error("failed to accept connection",
"op", "incoming/accepted", "peer", peerInfo.NodeID, "err", err)
return
@@ -586,7 +616,7 @@ func (r *Router) dialPeers(ctx context.Context) {
LOOP:
for {
address, err := r.peerManager.DialNext(ctx)
address, err := r.legacy.peerManager.DialNext(ctx)
switch {
case errors.Is(err, context.Canceled):
break LOOP
@@ -619,7 +649,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
return
case err != nil:
r.logger.Error("failed to dial peer", "peer", address, "err", err)
if err = r.peerManager.DialFailed(ctx, address); err != nil {
if err = r.legacy.peerManager.DialFailed(ctx, address); err != nil {
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
}
return
@@ -632,14 +662,14 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
return
case err != nil:
r.logger.Error("failed to handshake with peer", "peer", address, "err", err)
if err = r.peerManager.DialFailed(ctx, address); err != nil {
if err = r.legacy.peerManager.DialFailed(ctx, address); err != nil {
r.logger.Error("failed to report dial failure", "peer", address, "err", err)
}
conn.Close()
return
}
if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil {
if err := r.runWithPeerMutex(func() error { return r.legacy.peerManager.Dialed(address) }); err != nil {
r.logger.Error("failed to dial peer",
"op", "outgoing/dialing", "peer", address.NodeID, "err", err)
conn.Close()
@@ -651,16 +681,16 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) {
}
func (r *Router) getOrMakeQueue(peerID types.NodeID, channels ChannelIDSet) queue {
r.peerMtx.Lock()
defer r.peerMtx.Unlock()
r.legacy.peerMtx.Lock()
defer r.legacy.peerMtx.Unlock()
if peerQueue, ok := r.peerQueues[peerID]; ok {
if peerQueue, ok := r.legacy.peerQueues[peerID]; ok {
return peerQueue
}
peerQueue := r.queueFactory(queueBufferDefault)
r.peerQueues[peerID] = peerQueue
r.peerChannels[peerID] = channels
peerQueue := r.legacy.queueFactory(queueBufferDefault)
r.legacy.peerQueues[peerID] = peerQueue
r.legacy.peerChannels[peerID] = channels
return peerQueue
}
@@ -697,7 +727,7 @@ func (r *Router) dialPeer(ctx context.Context, address NodeAddress) (Connection,
// by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us
// on a private address on this endpoint, but a peer on the public
// Internet can't and needs a different public address.
conn, err := r.transport.Dial(dialCtx, endpoint)
conn, err := r.legacy.transport.Dial(dialCtx, endpoint)
if err != nil {
r.logger.Error("failed to dial endpoint", "peer", address.NodeID, "endpoint", endpoint, "err", err)
} else {
@@ -749,8 +779,8 @@ func (r *Router) handshakePeer(
}
func (r *Router) runWithPeerMutex(fn func() error) error {
r.peerMtx.Lock()
defer r.peerMtx.Unlock()
r.legacy.peerMtx.Lock()
defer r.legacy.peerMtx.Unlock()
return fn()
}
@@ -759,18 +789,18 @@ func (r *Router) runWithPeerMutex(fn func() error) error {
// they are closed elsewhere it will cause this method to shut down and return.
func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connection, channels ChannelIDSet) {
r.metrics.Peers.Add(1)
r.peerManager.Ready(ctx, peerID, channels)
r.legacy.peerManager.Ready(ctx, peerID, channels)
sendQueue := r.getOrMakeQueue(peerID, channels)
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerID)
delete(r.peerChannels, peerID)
r.peerMtx.Unlock()
r.legacy.peerMtx.Lock()
delete(r.legacy.peerQueues, peerID)
delete(r.legacy.peerChannels, peerID)
r.legacy.peerMtx.Unlock()
sendQueue.close()
r.peerManager.Disconnected(ctx, peerID)
r.legacy.peerManager.Disconnected(ctx, peerID)
r.metrics.Peers.Add(-1)
}()
@@ -833,10 +863,10 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn
return err
}
r.channelMtx.RLock()
queue, ok := r.channelQueues[chID]
messageType := r.channelMessages[chID]
r.channelMtx.RUnlock()
r.legacy.channelMtx.RLock()
queue, ok := r.legacy.channelQueues[chID]
messageType := r.legacy.channelMessages[chID]
r.legacy.channelMtx.RUnlock()
if !ok {
r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID)
@@ -914,7 +944,7 @@ func (r *Router) sendPeer(ctx context.Context, peerID types.NodeID, conn Connect
// evictPeers evicts connected peers as requested by the peer manager.
func (r *Router) evictPeers(ctx context.Context) {
for {
peerID, err := r.peerManager.EvictNext(ctx)
peerID, err := r.legacy.peerManager.EvictNext(ctx)
switch {
case errors.Is(err, context.Canceled):
@@ -926,9 +956,9 @@ func (r *Router) evictPeers(ctx context.Context) {
r.logger.Info("evicting peer", "peer", peerID)
r.peerMtx.RLock()
queue, ok := r.peerQueues[peerID]
r.peerMtx.RUnlock()
r.legacy.peerMtx.RLock()
queue, ok := r.legacy.peerQueues[peerID]
r.legacy.peerMtx.RUnlock()
if ok {
queue.close()
@@ -942,7 +972,7 @@ func (r *Router) setupQueueFactory(ctx context.Context) error {
return err
}
r.queueFactory = qf
r.legacy.queueFactory = qf
return nil
}
@@ -952,13 +982,13 @@ func (r *Router) OnStart(ctx context.Context) error {
return err
}
if err := r.transport.Listen(r.endpoint); err != nil {
if err := r.legacy.transport.Listen(r.legacy.endpoint); err != nil {
return err
}
go r.dialPeers(ctx)
go r.evictPeers(ctx)
go r.acceptPeers(ctx, r.transport)
go r.acceptPeers(ctx, r.legacy.transport)
return nil
}
@@ -971,24 +1001,24 @@ func (r *Router) OnStart(ctx context.Context) error {
// sender's responsibility.
func (r *Router) OnStop() {
// Close transport listeners (unblocks Accept calls).
if err := r.transport.Close(); err != nil {
if err := r.legacy.transport.Close(); err != nil {
r.logger.Error("failed to close transport", "err", err)
}
// Collect all remaining queues, and wait for them to close.
queues := []queue{}
r.channelMtx.RLock()
for _, q := range r.channelQueues {
r.legacy.channelMtx.RLock()
for _, q := range r.legacy.channelQueues {
queues = append(queues, q)
}
r.channelMtx.RUnlock()
r.legacy.channelMtx.RUnlock()
r.peerMtx.RLock()
for _, q := range r.peerQueues {
r.legacy.peerMtx.RLock()
for _, q := range r.legacy.peerQueues {
queues = append(queues, q)
}
r.peerMtx.RUnlock()
r.legacy.peerMtx.RUnlock()
for _, q := range queues {
q.close()

View File

@@ -19,8 +19,7 @@ func TestConnectionFiltering(t *testing.T) {
filterByIPCount := 0
router := &Router{
logger: logger,
connTracker: newConnTracker(1, time.Second),
logger: logger,
options: RouterOptions{
FilterPeerByIP: func(ctx context.Context, ip net.IP, port uint16) error {
filterByIPCount++
@@ -28,6 +27,8 @@ func TestConnectionFiltering(t *testing.T) {
},
},
}
router.legacy.connTracker = newConnTracker(1, time.Second)
require.Equal(t, 0, filterByIPCount)
router.openConnection(ctx, &MemoryConnection{logger: logger, closeFn: func() {}})
require.Equal(t, 1, filterByIPCount)

View File

@@ -34,7 +34,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
_, ok := r.queueFactory(1).(*fifoQueue)
_, ok := r.legacy.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Fifo", func(t *testing.T) {
@@ -44,7 +44,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
_, ok := r.queueFactory(1).(*fifoQueue)
_, ok := r.legacy.queueFactory(1).(*fifoQueue)
require.True(t, ok)
})
t.Run("Priority", func(t *testing.T) {
@@ -54,7 +54,7 @@ func TestRouter_ConstructQueueFactory(t *testing.T) {
require.NoError(t, err)
require.NoError(t, r.setupQueueFactory(ctx))
q, ok := r.queueFactory(1).(*pqScheduler)
q, ok := r.legacy.queueFactory(1).(*pqScheduler)
require.True(t, ok)
defer q.close()
})

View File

@@ -14,6 +14,8 @@ import (
"github.com/fortytw2/leaktest"
"github.com/gogo/protobuf/proto"
gogotypes "github.com/gogo/protobuf/types"
pubsub "github.com/libp2p/go-libp2p-pubsub"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"
@@ -61,6 +63,8 @@ func TestRouterConstruction(t *testing.T) {
LegacyEndpoint: nil,
LegacyTransport: nil,
NodeInfoProducer: func() *types.NodeInfo { return &types.NodeInfo{} },
NetworkHost: &basichost.BasicHost{},
NetworkPubSub: &pubsub.PubSub{},
}
if err := opts.Validate(); err != nil {
t.Fatalf("options should validate: %v", err)
@@ -78,7 +82,7 @@ func TestRouterConstruction(t *testing.T) {
)
if err == nil {
t.Error("support for libp2p does not exist, and should prevent the router from being constructed")
} else if err.Error() != "libp2p is not supported" {
} else if err.Error() != "libp2p is not (yet) supported" {
t.Errorf("incorrect error: %q", err.Error())
}
if router != nil {