Files
tendermint/p2p/router.go
Cyrus Goh 5182ffee25 docs: master → docs-staging (#5990)
* Makefile: always pull image in proto-gen-docker. (#5953)

The `proto-gen-docker` target didn't pull an updated Docker image, and would use a local image if present which could be outdated and produce wrong results.

* test: fix TestPEXReactorRunning data race (#5955)

Fixes #5941.

Not entirely sure that this will fix the problem (couldn't reproduce), but in any case this is an artifact of a hack in the P2P transport refactor to make it work with the legacy P2P stack, and will be removed when the refactor is done anyway.

* test/fuzz: move fuzz tests into this repo (#5918)

Co-authored-by: Emmanuel T Odeke <emmanuel@orijtech.com>

Closes #5907

- add init-corpus to blockchain reactor
- remove validator-set FromBytes test
now that we have proto, we don't need to test it! bye amino
- simplify mempool test
do we want to test remote ABCI app?
- do not recreate mux on every crash in jsonrpc test
- update p2p pex reactor test
- remove p2p/listener test
the API has changed + I did not understand what it's tested anyway
- update secretconnection test
- add readme and makefile
- list inputs in readme
- add nightly workflow
- remove blockchain fuzz test
EncodeMsg / DecodeMsg no longer exist

* docker: dont login when in PR (#5961)

* docker: release Linux/ARM64 image (#5925)

Co-authored-by: Marko <marbar3778@yahoo.com>

* p2p: make PeerManager.DialNext() and EvictNext() block (#5947)

See #5936 and #5938 for background.

The plan was initially to have `DialNext()` and `EvictNext()` return a channel. However, implementing this became unnecessarily complicated and error-prone. As an example, the channel would be both consumed and populated (via method calls) by the same driving method (e.g. `Router.dialPeers()`) which could easily cause deadlocks where a method call blocked while sending on the channel that the caller itself was responsible for consuming (but couldn't since it was busy making the method call). It would also require a set of goroutines in the peer manager that would interact with the goroutines in the router in non-obvious ways, and fully populating the channel on startup could cause deadlocks with other startup tasks. Several issues like these made the solution hard to reason about.

I therefore simply made `DialNext()` and `EvictNext()` block until the next peer was available, using internal triggers to wake these methods up in a non-blocking fashion when any relevant state changes occurred. This proved much simpler to reason about, since there are no goroutines in the peer manager (except for trivial retry timers), nor any blocking channel sends, and it instead relies entirely on the existing goroutine structure of the router for concurrency. This also happens to be the same pattern used by the `Transport.Accept()` API, following Go stdlib conventions, so all router goroutines end up using a consistent pattern as well.

* libs/log: format []byte as hexidecimal string (uppercased) (#5960)

Closes: #5806 

Co-authored-by: Lanie Hei <heixx011@umn.edu>

* docs: log level docs (#5945)

## Description

add section on configuring log levels

Closes: #XXX

* .github: fix fuzz-nightly job (#5965)

outputs is a property of the job, not an individual step.

* e2e: add control over the log level of nodes (#5958)

* mempool: fix reactor tests (#5967)

## Description

Update the faux router to either drop channel errors or handle them based on an argument. This prevents deadlocks in tests where we try to send an error on the mempool channel but there is no reader.

Closes: #5956

* p2p: improve peerStore prototype (#5954)

This improves the `peerStore` prototype by e.g.:

* Using a database with Protobuf for persistence, but also keeping full peer set in memory for performance.
* Simplifying the API, by taking/returning struct copies for safety, and removing errors for in-memory operations.
* Caching the ranked peer set, as a temporary solution until a better data structure is implemented.
* Adding `PeerManagerOptions.MaxPeers` and pruning the peer store (based on rank) when it's full.
* Rewriting `PeerAddress` to be independent of `url.URL`, normalizing it and tightening semantics.

* p2p: simplify PeerManager upgrade logic (#5962)

Follow-up from #5947, branched off of #5954.

This simplifies the upgrade logic by adding explicit eviction requests, which can also be useful for other use-cases (e.g. if we need to ban a peer that's misbehaving). Changes:

* Add `evict` map which queues up peers to explicitly evict.
* `upgrading` now only tracks peers that we're upgrading via dialing (`DialNext` → `Dialed`/`DialFailed`).
* `Dialed` will unmark `upgrading`, and queue `evict` if still beyond capacity.
* `Accepted` will pick a random lower-scored peer to upgrade to, if appropriate, and doesn't care about `upgrading` (the dial will fail later, since it's already connected).
* `EvictNext` will return a peer scheduled in `evict` if any, otherwise if beyond capacity just evict the lowest-scored peer.

This limits all of the `upgrading` logic to `DialNext`, `Dialed`, and `DialFailed`, making it much simplier, and it should generally do the right thing in all cases I can think of.

* p2p: add PeerManager.Advertise() (#5957)

Adds a naïve `PeerManager.Advertise()` method that the new PEX reactor can use to fetch addresses to advertise, as well as some other `FIXME`s on address advertisement.

* blockchain v0: fix waitgroup data race (#5970)

## Description

Fixes the data race in usage of `WaitGroup`. Specifically, the case where we invoke `Wait` _before_ the first delta `Add` call when the current waitgroup counter is zero. See https://golang.org/pkg/sync/#WaitGroup.Add.

Still not sure how this manifests itself in a test since the reactor has to be stopped virtually immediately after being started (I think?).

Regardless, this is the appropriate fix.

closes: #5968

* tests: fix `make test` (#5966)

## Description
 
- bump deadlock dep to master
  - fixes `make test` since we now use `deadlock.Once`

Closes: #XXX

* terminate go-fuzz gracefully (w/ SIGINT) (#5973)

and preserve exit code.

```
2021/01/26 03:34:49 workers: 2, corpus: 4 (8m28s ago), crashers: 0, restarts: 1/9976, execs: 11013732 (21596/sec), cover: 121, uptime: 8m30s
make: *** [fuzz-mempool] Terminated
Makefile:5: recipe for target 'fuzz-mempool' failed
Error: Process completed with exit code 124.
```

https://github.com/tendermint/tendermint/runs/1766661614

`continue-on-error` should make GH ignore any error codes.

* p2p: add prototype PEX reactor for new stack (#5971)

This adds a prototype PEX reactor for the new P2P stack.

* proto/p2p: rename PEX messages and fields (#5974)

Fixes #5899 by renaming a bunch of P2P Protobuf entities (while maintaining wire compatibility):

* `Message` to `PexMessage` (as it's only used for PEX messages).
* `PexAddrs` to `PexResponse`.
* `PexResponse.Addrs` to `PexResponse.Addresses`.
* `NetAddress` to `PexAddress` (as it's only used by PEX).

* p2p: resolve PEX addresses in PEX reactor (#5980)

This changes the new prototype PEX reactor to resolve peer address URLs into IP/port PEX addresses itself. Branched off of #5974.

I've spent some time thinking about address handling in the P2P stack. We currently use `PeerAddress` URLs everywhere, except for two places: when dialing a peer, and when exchanging addresses via PEX. We had two options:

1. Resolve addresses to endpoints inside `PeerManager`. This would introduce a lot of added complexity: we would have to track connection statistics per endpoint, have goroutines that asynchronously resolve and refresh these endpoints, deal with resolve scheduling before dialing (which is trickier than it sounds since it involves multiple goroutines in the peer manager and router and messes with peer rating order), handle IP address visibility issues, and so on.

2. Resolve addresses to endpoints (IP/port) only where they're used: when dialing, and in PEX. Everywhere else we use URLs.

I went with 2, because this significantly simplifies the handling of hostname resolution, and because I really think the PEX reactor should migrate to exchanging URLs instead of IP/port numbers anyway -- this allows operators to use DNS names for validators (and can easily migrate them to new IPs and/or load balance requests), and also allows different protocols (e.g. QUIC and `MemoryTransport`). Happy to discuss this.

* test/p2p: close transports to avoid goroutine leak failures (#5982)

* mempool: fix TestReactorNoBroadcastToSender (#5984)

## Description

Looks like I missed a test in the original PR when fixing the tests.

Closes: #5956

* mempool: fix mempool tests timeout (#5988)

* p2p: use stopCtx when dialing peers in Router (#5983)

This ensures we don't leak dial goroutines when shutting down the router.

* docs: fix typo in state sync example (#5989)

Co-authored-by: Erik Grinaker <erik@interchain.berlin>
Co-authored-by: Anton Kaliaev <anton.kalyaev@gmail.com>
Co-authored-by: Marko <marbar3778@yahoo.com>
Co-authored-by: odidev <odidev@puresoftware.com>
Co-authored-by: Lanie Hei <heixx011@umn.edu>
Co-authored-by: Callum Waters <cmwaters19@gmail.com>
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
Co-authored-by: Sergey <52304443+c29r3@users.noreply.github.com>
2021-01-26 11:46:21 -08:00

543 lines
17 KiB
Go

package p2p
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
// Router manages peer connections and routes messages between peers and reactor
// channels. This is an early prototype.
//
// Channels are registered via OpenChannel(). When called, we register an input
// message queue for the channel in channelQueues and spawn off a goroutine for
// Router.routeChannel(). This goroutine reads off outbound messages and puts
// them in the appropriate peer message queue, and processes peer errors which
// will close (and thus disconnect) the appriate peer queue. It runs until
// either the channel is closed by the caller or the router is stopped, at which
// point the input message queue is closed and removed.
//
// On startup, the router spawns off three primary goroutines that maintain
// connections to peers and run for the lifetime of the router:
//
// Router.dialPeers(): in a loop, asks the PeerManager for the next peer
// address to contact, resolves it into endpoints, and attempts to dial
// each one.
//
// Router.acceptPeers(): in a loop, waits for the next inbound connection
// from a peer, and checks with the PeerManager if it should be accepted.
//
// Router.evictPeers(): in a loop, asks the PeerManager for any connected
// peers to evict, and disconnects them.
//
// Once either an inbound or outbound connection has been made, an outbound
// message queue is registered in Router.peerQueues and a goroutine is spawned
// off for Router.routePeer() which will spawn off additional goroutines for
// Router.sendPeer() that sends outbound messages from the peer queue over the
// connection and for Router.receivePeer() that reads inbound messages from
// the connection and places them in the appropriate channel queue. When either
// goroutine exits, the connection and peer queue is closed, which will cause
// the other goroutines to close as well.
//
// The peerStore is used to coordinate peer connections, by only allowing a peer
// to be claimed (owned) by a single caller at a time (both for outbound and
// inbound connections). This is done either via peerStore.Dispense() which
// dispenses and claims an eligible peer to dial, or via peerStore.Claim() which
// attempts to claim a given peer for an inbound connection. Peers must be
// returned to the peerStore with peerStore.Return() to release the claim. Over
// time, the peerStore will also do peer scheduling and prioritization, e.g.
// ensuring we do exponential backoff on dial failures and connecting to
// more important peers first (such as persistent peers and validators).
//
// An additional goroutine Router.broadcastPeerUpdates() is also spawned off
// on startup, which consumes peer updates from Router.peerUpdatesCh (currently
// only connections and disconnections), and broadcasts them to all peer update
// subscriptions registered via SubscribePeerUpdates().
//
// On router shutdown, we close Router.stopCh which will signal to all
// goroutines to terminate. This in turn will cause all pending channel/peer
// queues to close, and we wait for this as a signal that goroutines have ended.
//
// All message scheduling should be limited to the queue implementations used
// for channel queues and peer queues. All message sending throughout the router
// is blocking, and if any messages should be dropped or buffered this is the
// sole responsibility of the queue, such that we can limit this logic to a
// single place. There is currently only a FIFO queue implementation that always
// blocks and never drops messages, but this must be improved with other
// implementations. The only exception is that all message sending must also
// select on appropriate channel/queue/router closure signals, to avoid blocking
// forever on a channel that has no consumer.
type Router struct {
*service.BaseService
logger log.Logger
transports map[Protocol]Transport
peerManager *PeerManager
// FIXME: Consider using sync.Map.
peerMtx sync.RWMutex
peerQueues map[NodeID]queue
// FIXME: We don't strictly need to use a mutex for this if we seal the
// channels on router start. This depends on whether we want to allow
// dynamic channels in the future.
channelMtx sync.RWMutex
channelQueues map[ChannelID]queue
channelMessages map[ChannelID]proto.Message
// stopCh is used to signal router shutdown, by closing the channel.
stopCh chan struct{}
}
// NewRouter creates a new Router, dialing the given peers.
//
// FIXME: providing protocol/transport maps is cumbersome in tests, we should
// consider adding Protocols() to the Transport interface instead and register
// protocol/transport mappings automatically on a first-come basis.
func NewRouter(logger log.Logger, peerManager *PeerManager, transports map[Protocol]Transport) *Router {
router := &Router{
logger: logger,
transports: transports,
peerManager: peerManager,
stopCh: make(chan struct{}),
channelQueues: map[ChannelID]queue{},
channelMessages: map[ChannelID]proto.Message{},
peerQueues: map[NodeID]queue{},
}
router.BaseService = service.NewBaseService(logger, "router", router)
return router
}
// OpenChannel opens a new channel for the given message type. The caller must
// close the channel when done, and this must happen before the router stops.
func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) {
// FIXME: NewChannel should take directional channels so we can pass
// queue.dequeue() instead of reaching inside for queue.queueCh.
queue := newFIFOQueue()
channel := NewChannel(id, messageType, queue.queueCh, make(chan Envelope), make(chan PeerError))
r.channelMtx.Lock()
defer r.channelMtx.Unlock()
if _, ok := r.channelQueues[id]; ok {
return nil, fmt.Errorf("channel %v already exists", id)
}
r.channelQueues[id] = queue
r.channelMessages[id] = messageType
go func() {
defer func() {
r.channelMtx.Lock()
delete(r.channelQueues, id)
delete(r.channelMessages, id)
r.channelMtx.Unlock()
queue.close()
}()
r.routeChannel(channel)
}()
return channel, nil
}
// routeChannel receives outbound messages and errors from a channel and routes
// them to the appropriate peer. It returns when either the channel is closed or
// the router is shutting down.
func (r *Router) routeChannel(channel *Channel) {
for {
select {
case envelope, ok := <-channel.outCh:
if !ok {
return
}
// FIXME: This is a bit unergonomic, maybe it'd be better for Wrap()
// to return a wrapped copy.
if _, ok := channel.messageType.(Wrapper); ok {
wrapper := proto.Clone(channel.messageType)
if err := wrapper.(Wrapper).Wrap(envelope.Message); err != nil {
r.Logger.Error("failed to wrap message", "err", err)
continue
}
envelope.Message = wrapper
}
envelope.channelID = channel.id
if envelope.Broadcast {
r.peerMtx.RLock()
peerQueues := make(map[NodeID]queue, len(r.peerQueues))
for peerID, peerQueue := range r.peerQueues {
peerQueues[peerID] = peerQueue
}
r.peerMtx.RUnlock()
for peerID, peerQueue := range peerQueues {
e := envelope
e.Broadcast = false
e.To = peerID
select {
case peerQueue.enqueue() <- e:
case <-peerQueue.closed():
case <-r.stopCh:
return
}
}
} else {
r.peerMtx.RLock()
peerQueue, ok := r.peerQueues[envelope.To]
r.peerMtx.RUnlock()
if !ok {
r.logger.Error("dropping message for non-connected peer",
"peer", envelope.To, "channel", channel.id)
continue
}
select {
case peerQueue.enqueue() <- envelope:
case <-peerQueue.closed():
r.logger.Error("dropping message for non-connected peer",
"peer", envelope.To, "channel", channel.id)
case <-r.stopCh:
return
}
}
case peerError, ok := <-channel.errCh:
if !ok {
return
}
// FIXME: We just disconnect the peer for now
r.logger.Error("peer error, disconnecting", "peer", peerError.PeerID, "err", peerError.Err)
r.peerMtx.RLock()
peerQueue, ok := r.peerQueues[peerError.PeerID]
r.peerMtx.RUnlock()
if ok {
peerQueue.close()
}
case <-channel.Done():
return
case <-r.stopCh:
return
}
}
}
// acceptPeers accepts inbound connections from peers on the given transport.
func (r *Router) acceptPeers(transport Transport) {
ctx := r.stopCtx()
for {
// FIXME: We may need transports to enforce some sort of rate limiting
// here (e.g. by IP address), or alternatively have PeerManager.Accepted()
// do it for us.
conn, err := transport.Accept(ctx)
switch err {
case nil:
case ErrTransportClosed{}, io.EOF, context.Canceled:
r.logger.Debug("stopping accept routine", "transport", transport)
return
default:
r.logger.Error("failed to accept connection", "transport", transport, "err", err)
continue
}
go func() {
defer func() {
_ = conn.Close()
}()
peerID := conn.NodeInfo().NodeID
if err := r.peerManager.Accepted(peerID); err != nil {
r.logger.Error("failed to accept connection", "peer", peerID, "err", err)
return
}
queue := newFIFOQueue()
r.peerMtx.Lock()
r.peerQueues[peerID] = queue
r.peerMtx.Unlock()
r.peerManager.Ready(peerID)
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerID)
r.peerMtx.Unlock()
queue.close()
if err := r.peerManager.Disconnected(peerID); err != nil {
r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
}
}()
r.routePeer(peerID, conn, queue)
}()
}
}
// dialPeers maintains outbound connections to peers.
func (r *Router) dialPeers() {
ctx := r.stopCtx()
for {
peerID, address, err := r.peerManager.DialNext(ctx)
switch err {
case nil:
case context.Canceled:
r.logger.Debug("stopping dial routine")
return
default:
r.logger.Error("failed to find next peer to dial", "err", err)
return
}
go func() {
conn, err := r.dialPeer(ctx, address)
if errors.Is(err, context.Canceled) {
return
} else if err != nil {
r.logger.Error("failed to dial peer", "peer", peerID)
if err = r.peerManager.DialFailed(peerID, address); err != nil {
r.logger.Error("failed to report dial failure", "peer", peerID, "err", err)
}
return
}
defer conn.Close()
if err = r.peerManager.Dialed(peerID, address); err != nil {
r.logger.Error("failed to dial peer", "peer", peerID, "err", err)
return
}
queue := newFIFOQueue()
r.peerMtx.Lock()
r.peerQueues[peerID] = queue
r.peerMtx.Unlock()
r.peerManager.Ready(peerID)
defer func() {
r.peerMtx.Lock()
delete(r.peerQueues, peerID)
r.peerMtx.Unlock()
queue.close()
if err := r.peerManager.Disconnected(peerID); err != nil {
r.logger.Error("failed to disconnect peer", "peer", peerID, "err", err)
}
}()
r.routePeer(peerID, conn, queue)
}()
}
}
// dialPeer attempts to connect to a peer.
func (r *Router) dialPeer(ctx context.Context, address PeerAddress) (Connection, error) {
resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
r.logger.Info("resolving peer address", "address", address)
endpoints, err := address.Resolve(resolveCtx)
if err != nil {
return nil, fmt.Errorf("failed to resolve address %q: %w", address, err)
}
for _, endpoint := range endpoints {
t, ok := r.transports[endpoint.Protocol]
if !ok {
r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol)
continue
}
dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// FIXME: When we dial and handshake the peer, we should pass it
// appropriate address(es) it can use to dial us back. It can't use our
// remote endpoint, since TCP uses different port numbers for outbound
// connections than it does for inbound. Also, we may need to vary this
// by the peer's endpoint, since e.g. a peer on 192.168.0.0 can reach us
// on a private address on this endpoint, but a peer on the public
// Internet can't and needs a different public address.
conn, err := t.Dial(dialCtx, endpoint)
if err != nil {
r.logger.Error("failed to dial endpoint", "endpoint", endpoint, "err", err)
} else {
r.logger.Info("connected to peer", "peer", address.ID, "endpoint", endpoint)
return conn, nil
}
}
return nil, fmt.Errorf("failed to connect to peer via %q", address)
}
// routePeer routes inbound messages from a peer to channels, and also sends
// outbound queued messages to the peer. It will close the connection and send
// queue, using this as a signal to coordinate the internal receivePeer() and
// sendPeer() goroutines. It blocks until the peer is done, e.g. when the
// connection or queue is closed.
func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
r.logger.Info("routing peer", "peer", peerID)
resultsCh := make(chan error, 2)
go func() {
resultsCh <- r.receivePeer(peerID, conn)
}()
go func() {
resultsCh <- r.sendPeer(peerID, conn, sendQueue)
}()
err := <-resultsCh
_ = conn.Close()
sendQueue.close()
if e := <-resultsCh; err == nil {
// The first err was nil, so we update it with the second result,
// which may or may not be nil.
err = e
}
switch err {
case nil, io.EOF, ErrTransportClosed{}:
r.logger.Info("peer disconnected", "peer", peerID)
default:
r.logger.Error("peer failure", "peer", peerID, "err", err)
}
}
// receivePeer receives inbound messages from a peer, deserializes them and
// passes them on to the appropriate channel.
func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
for {
chID, bz, err := conn.ReceiveMessage()
if err != nil {
return err
}
r.channelMtx.RLock()
queue, ok := r.channelQueues[ChannelID(chID)]
messageType := r.channelMessages[ChannelID(chID)]
r.channelMtx.RUnlock()
if !ok {
r.logger.Error("dropping message for unknown channel", "peer", peerID, "channel", chID)
continue
}
msg := proto.Clone(messageType)
if err := proto.Unmarshal(bz, msg); err != nil {
r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
continue
}
if wrapper, ok := msg.(Wrapper); ok {
msg, err = wrapper.Unwrap()
if err != nil {
r.logger.Error("failed to unwrap message", "err", err)
continue
}
}
select {
// FIXME: ReceiveMessage() should return ChannelID.
case queue.enqueue() <- Envelope{channelID: ChannelID(chID), From: peerID, Message: msg}:
r.logger.Debug("received message", "peer", peerID, "message", msg)
case <-queue.closed():
r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID)
case <-r.stopCh:
return nil
}
}
}
// sendPeer sends queued messages to a peer.
func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
for {
select {
case envelope := <-queue.dequeue():
bz, err := proto.Marshal(envelope.Message)
if err != nil {
r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
continue
}
// FIXME: SendMessage() should take ChannelID.
_, err = conn.SendMessage(byte(envelope.channelID), bz)
if err != nil {
return err
}
r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
case <-queue.closed():
return nil
case <-r.stopCh:
return nil
}
}
}
// evictPeers evicts connected peers as requested by the peer manager.
func (r *Router) evictPeers() {
ctx := r.stopCtx()
for {
peerID, err := r.peerManager.EvictNext(ctx)
switch err {
case nil:
case context.Canceled:
r.logger.Debug("stopping evict routine")
return
default:
r.logger.Error("failed to find next peer to evict", "err", err)
return
}
r.logger.Info("evicting peer", "peer", peerID)
r.peerMtx.RLock()
if queue, ok := r.peerQueues[peerID]; ok {
queue.close()
}
r.peerMtx.RUnlock()
}
}
// OnStart implements service.Service.
func (r *Router) OnStart() error {
go r.dialPeers()
for _, transport := range r.transports {
go r.acceptPeers(transport)
}
go r.evictPeers()
return nil
}
// OnStop implements service.Service.
func (r *Router) OnStop() {
// Collect all active queues, so we can wait for them to close.
queues := []queue{}
r.channelMtx.RLock()
for _, q := range r.channelQueues {
queues = append(queues, q)
}
r.channelMtx.RUnlock()
r.peerMtx.RLock()
for _, q := range r.peerQueues {
queues = append(queues, q)
}
r.peerMtx.RUnlock()
// Signal router shutdown, and wait for queues (and thus goroutines)
// to complete.
close(r.stopCh)
for _, q := range queues {
<-q.closed()
}
}
// stopCtx returns a context that is cancelled when the router stops.
func (r *Router) stopCtx() context.Context {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-r.stopCh
cancel()
}()
return ctx
}