mirror of
https://github.com/tendermint/tendermint.git
synced 2026-05-22 07:01:29 +00:00
p2p: add Router prototype (#5831)
Early but functional prototype of the new `p2p.Router`, see its GoDoc comment for details on how it works. Expect much of this logic to change and improve as we evolve the new P2P stack. There is a simple test that sets up an in-memory network of four routers with reactors and passes messages between them, but otherwise no exhaustive tests since this is very much a work-in-progress.
This commit is contained in:
@@ -15,6 +15,16 @@ type Envelope struct {
|
||||
To NodeID // Message receiver, or empty for inbound messages.
|
||||
Broadcast bool // Send message to all connected peers, ignoring To.
|
||||
Message proto.Message // Payload.
|
||||
|
||||
// For internal use in the Router.
|
||||
channelID ChannelID
|
||||
}
|
||||
|
||||
// Strip strips internal information from the envelope. Primarily used for
|
||||
// testing, such that returned envelopes can be compared with literals.
|
||||
func (e Envelope) Strip() Envelope {
|
||||
e.channelID = 0
|
||||
return e
|
||||
}
|
||||
|
||||
// Channel is a bidirectional channel for Protobuf message exchange with peers.
|
||||
|
||||
230
p2p/peer.go
230
p2p/peer.go
@@ -1,20 +1,128 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/cmap"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
|
||||
tmconn "github.com/tendermint/tendermint/p2p/conn"
|
||||
)
|
||||
|
||||
// PeerAddress is a peer address URL.
|
||||
type PeerAddress struct {
|
||||
*url.URL
|
||||
}
|
||||
|
||||
// ParsePeerAddress parses a peer address URL into a PeerAddress.
|
||||
func ParsePeerAddress(address string) (PeerAddress, error) {
|
||||
u, err := url.Parse(address)
|
||||
if err != nil || u == nil {
|
||||
return PeerAddress{}, fmt.Errorf("unable to parse peer address %q: %w", address, err)
|
||||
}
|
||||
if u.Scheme == "" {
|
||||
u.Scheme = string(defaultProtocol)
|
||||
}
|
||||
pa := PeerAddress{URL: u}
|
||||
if err = pa.Validate(); err != nil {
|
||||
return PeerAddress{}, err
|
||||
}
|
||||
return pa, nil
|
||||
}
|
||||
|
||||
// NodeID returns the address node ID.
|
||||
func (a PeerAddress) NodeID() NodeID {
|
||||
return NodeID(a.User.Username())
|
||||
}
|
||||
|
||||
// Resolve resolves a PeerAddress into a set of Endpoints, by expanding
|
||||
// out a DNS name in Host to its IP addresses. Field mapping:
|
||||
//
|
||||
// Scheme → Endpoint.Protocol
|
||||
// Host → Endpoint.IP
|
||||
// User → Endpoint.PeerID
|
||||
// Port → Endpoint.Port
|
||||
// Path+Query+Fragment,Opaque → Endpoint.Path
|
||||
//
|
||||
func (a PeerAddress) Resolve(ctx context.Context) ([]Endpoint, error) {
|
||||
ips, err := net.DefaultResolver.LookupIP(ctx, "ip", a.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
port, err := a.parsePort()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path := a.Path
|
||||
if a.RawPath != "" {
|
||||
path = a.RawPath
|
||||
}
|
||||
if a.Opaque != "" { // used for e.g. "about:blank" style URLs
|
||||
path = a.Opaque
|
||||
}
|
||||
if a.RawQuery != "" {
|
||||
path += "?" + a.RawQuery
|
||||
}
|
||||
if a.RawFragment != "" {
|
||||
path += "#" + a.RawFragment
|
||||
}
|
||||
|
||||
endpoints := make([]Endpoint, len(ips))
|
||||
for i, ip := range ips {
|
||||
endpoints[i] = Endpoint{
|
||||
PeerID: a.NodeID(),
|
||||
Protocol: Protocol(a.Scheme),
|
||||
IP: ip,
|
||||
Port: port,
|
||||
Path: path,
|
||||
}
|
||||
}
|
||||
return endpoints, nil
|
||||
}
|
||||
|
||||
// Validates validates a PeerAddress.
|
||||
func (a PeerAddress) Validate() error {
|
||||
if a.Scheme == "" {
|
||||
return errors.New("no protocol")
|
||||
}
|
||||
if id := a.User.Username(); id == "" {
|
||||
return errors.New("no peer ID")
|
||||
} else if err := NodeID(id).Validate(); err != nil {
|
||||
return fmt.Errorf("invalid peer ID: %w", err)
|
||||
}
|
||||
if a.Hostname() == "" && len(a.Query()) == 0 && a.Opaque == "" {
|
||||
return errors.New("no host or path given")
|
||||
}
|
||||
if port, err := a.parsePort(); err != nil {
|
||||
return err
|
||||
} else if port > 0 && a.Hostname() == "" {
|
||||
return errors.New("cannot specify port without host")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// parsePort returns the port number as a uint16.
|
||||
func (a PeerAddress) parsePort() (uint16, error) {
|
||||
if portString := a.Port(); portString != "" {
|
||||
port64, err := strconv.ParseUint(portString, 10, 16)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("invalid port %q: %w", portString, err)
|
||||
}
|
||||
return uint16(port64), nil
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// PeerStatus specifies peer statuses.
|
||||
type PeerStatus string
|
||||
|
||||
@@ -106,6 +214,126 @@ type PeerUpdate struct {
|
||||
Status PeerStatus
|
||||
}
|
||||
|
||||
// peerStore manages information about peers. It is currently a bare-bones
|
||||
// in-memory store of peer addresses, and will be fleshed out later.
|
||||
//
|
||||
// The main function of peerStore is currently to dispense peers to connect to
|
||||
// (via peerStore.Dispense), giving the caller exclusive "ownership" of that
|
||||
// peer until the peer is returned (via peerStore.Return). This is used to
|
||||
// schedule and synchronize peer dialing and accepting in the Router, e.g.
|
||||
// making sure we only have a single connection (in either direction) to peers.
|
||||
type peerStore struct {
|
||||
mtx sync.Mutex
|
||||
peers map[NodeID]*peerInfo
|
||||
claimed map[NodeID]bool
|
||||
}
|
||||
|
||||
// newPeerStore creates a new peer store.
|
||||
func newPeerStore() *peerStore {
|
||||
return &peerStore{
|
||||
peers: map[NodeID]*peerInfo{},
|
||||
claimed: map[NodeID]bool{},
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds a peer to the store, given as an address.
|
||||
func (s *peerStore) Add(address PeerAddress) error {
|
||||
if err := address.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
peerID := address.NodeID()
|
||||
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
|
||||
peer, ok := s.peers[peerID]
|
||||
if !ok {
|
||||
peer = newStorePeer(peerID)
|
||||
s.peers[peerID] = peer
|
||||
} else if s.claimed[peerID] {
|
||||
// FIXME: We need to handle modifications of claimed peers somehow.
|
||||
return fmt.Errorf("peer %q is claimed", peerID)
|
||||
}
|
||||
peer.AddAddress(address)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Claim claims a peer. The caller has exclusive ownership of the peer, and must
|
||||
// return it by calling Return(). Returns nil if the peer could not be claimed.
|
||||
// If the peer is not known to the store, it is registered and claimed.
|
||||
func (s *peerStore) Claim(id NodeID) *peerInfo {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
if s.claimed[id] {
|
||||
return nil
|
||||
}
|
||||
peer, ok := s.peers[id]
|
||||
if !ok {
|
||||
peer = newStorePeer(id)
|
||||
s.peers[id] = peer
|
||||
}
|
||||
s.claimed[id] = true
|
||||
return peer
|
||||
}
|
||||
|
||||
// Dispense finds an appropriate peer to contact and claims it. The caller has
|
||||
// exclusive ownership of the peer, and must return it by calling Return(). The
|
||||
// peer will not be dispensed again until returned.
|
||||
//
|
||||
// Returns nil if no appropriate peers are available.
|
||||
func (s *peerStore) Dispense() *peerInfo {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
for key, peer := range s.peers {
|
||||
switch {
|
||||
case len(peer.Addresses) == 0:
|
||||
case s.claimed[key]:
|
||||
default:
|
||||
s.claimed[key] = true
|
||||
return peer
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Return returns a claimed peer, making it available for other
|
||||
// callers to claim.
|
||||
func (s *peerStore) Return(id NodeID) {
|
||||
s.mtx.Lock()
|
||||
defer s.mtx.Unlock()
|
||||
delete(s.claimed, id)
|
||||
}
|
||||
|
||||
// peerInfo is a peer stored in the peerStore.
|
||||
//
|
||||
// FIXME: This should be renamed peer or something else once the old peer is
|
||||
// removed.
|
||||
type peerInfo struct {
|
||||
ID NodeID
|
||||
Addresses []PeerAddress
|
||||
}
|
||||
|
||||
// newStorePeer creates a new storePeer.
|
||||
func newStorePeer(id NodeID) *peerInfo {
|
||||
return &peerInfo{
|
||||
ID: id,
|
||||
Addresses: []PeerAddress{},
|
||||
}
|
||||
}
|
||||
|
||||
// AddAddress adds an address to a peer, unless it already exists. It does not
|
||||
// validate the address.
|
||||
func (p *peerInfo) AddAddress(address PeerAddress) {
|
||||
// We just do a linear search for now.
|
||||
addressString := address.String()
|
||||
for _, a := range p.Addresses {
|
||||
if a.String() == addressString {
|
||||
return
|
||||
}
|
||||
}
|
||||
p.Addresses = append(p.Addresses, address)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Types and business logic below may be deprecated.
|
||||
//
|
||||
|
||||
59
p2p/queue.go
Normal file
59
p2p/queue.go
Normal file
@@ -0,0 +1,59 @@
|
||||
package p2p
|
||||
|
||||
import "sync"
|
||||
|
||||
// queue does QoS scheduling for Envelopes, enqueueing and dequeueing according
|
||||
// to some policy. Queues are used at contention points, i.e.:
|
||||
//
|
||||
// - Receiving inbound messages to a single channel from all peers.
|
||||
// - Sending outbound messages to a single peer from all channels.
|
||||
type queue interface {
|
||||
// enqueue returns a channel for submitting envelopes.
|
||||
enqueue() chan<- Envelope
|
||||
|
||||
// dequeue returns a channel ordered according to some queueing policy.
|
||||
dequeue() <-chan Envelope
|
||||
|
||||
// close closes the queue. After this call enqueue() will block, so the
|
||||
// caller must select on closed() as well to avoid blocking forever. The
|
||||
// enqueue() and dequeue() channels will not be closed.
|
||||
close()
|
||||
|
||||
// closed returns a channel that's closed when the scheduler is closed.
|
||||
closed() <-chan struct{}
|
||||
}
|
||||
|
||||
// fifoQueue is a simple unbuffered lossless queue that passes messages through
|
||||
// in the order they were received, and blocks until message is received.
|
||||
type fifoQueue struct {
|
||||
queueCh chan Envelope
|
||||
closeCh chan struct{}
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
var _ queue = (*fifoQueue)(nil)
|
||||
|
||||
func newFIFOQueue() *fifoQueue {
|
||||
return &fifoQueue{
|
||||
queueCh: make(chan Envelope),
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (q *fifoQueue) enqueue() chan<- Envelope {
|
||||
return q.queueCh
|
||||
}
|
||||
|
||||
func (q *fifoQueue) dequeue() <-chan Envelope {
|
||||
return q.queueCh
|
||||
}
|
||||
|
||||
func (q *fifoQueue) close() {
|
||||
q.closeOnce.Do(func() {
|
||||
close(q.closeCh)
|
||||
})
|
||||
}
|
||||
|
||||
func (q *fifoQueue) closed() <-chan struct{} {
|
||||
return q.closeCh
|
||||
}
|
||||
568
p2p/router.go
Normal file
568
p2p/router.go
Normal file
@@ -0,0 +1,568 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/libs/service"
|
||||
)
|
||||
|
||||
// Router manages peer connections and routes messages between peers and reactor
|
||||
// channels. This is an early prototype.
|
||||
//
|
||||
// Channels are registered via OpenChannel(). When called, we register an input
|
||||
// message queue for the channel in channelQueues and spawn off a goroutine for
|
||||
// Router.routeChannel(). This goroutine reads off outbound messages and puts
|
||||
// them in the appropriate peer message queue, and processes peer errors which
|
||||
// will close (and thus disconnect) the appriate peer queue. It runs until
|
||||
// either the channel is closed by the caller or the router is stopped, at which
|
||||
// point the input message queue is closed and removed.
|
||||
//
|
||||
// On startup, the router spawns off two primary goroutines that maintain
|
||||
// connections to peers and run for the lifetime of the router:
|
||||
//
|
||||
// Router.dialPeers(): in a loop, asks the peerStore to dispense an
|
||||
// eligible peer to connect to, and attempts to resolve and dial each
|
||||
// address until successful.
|
||||
//
|
||||
// Router.acceptPeers(): in a loop, waits for the next inbound connection
|
||||
// from a peer, and attempts to claim it in the peerStore.
|
||||
//
|
||||
// Once either an inbound or outbound connection has been made, an outbound
|
||||
// message queue is registered in Router.peerQueues and a goroutine is spawned
|
||||
// off for Router.routePeer() which will spawn off additional goroutines for
|
||||
// Router.sendPeer() that sends outbound messages from the peer queue over the
|
||||
// connection and for Router.receivePeer() that reads inbound messages from
|
||||
// the connection and places them in the appropriate channel queue. When either
|
||||
// goroutine exits, the connection and peer queue is closed, which will cause
|
||||
// the other goroutines to close as well.
|
||||
//
|
||||
// The peerStore is used to coordinate peer connections, by only allowing a peer
|
||||
// to be claimed (owned) by a single caller at a time (both for outbound and
|
||||
// inbound connections). This is done either via peerStore.Dispense() which
|
||||
// dispenses and claims an eligible peer to dial, or via peerStore.Claim() which
|
||||
// attempts to claim a given peer for an inbound connection. Peers must be
|
||||
// returned to the peerStore with peerStore.Return() to release the claim. Over
|
||||
// time, the peerStore will also do peer scheduling and prioritization, e.g.
|
||||
// ensuring we do exponential backoff on dial failures and connecting to
|
||||
// more important peers first (such as persistent peers and validators).
|
||||
//
|
||||
// An additional goroutine Router.broadcastPeerUpdates() is also spawned off
|
||||
// on startup, which consumes peer updates from Router.peerUpdatesCh (currently
|
||||
// only connections and disconnections), and broadcasts them to all peer update
|
||||
// subscriptions registered via SubscribePeerUpdates().
|
||||
//
|
||||
// On router shutdown, we close Router.stopCh which will signal to all
|
||||
// goroutines to terminate. This in turn will cause all pending channel/peer
|
||||
// queues to close, and we wait for this as a signal that goroutines have ended.
|
||||
//
|
||||
// All message scheduling should be limited to the queue implementations used
|
||||
// for channel queues and peer queues. All message sending throughout the router
|
||||
// is blocking, and if any messages should be dropped or buffered this is the
|
||||
// sole responsibility of the queue, such that we can limit this logic to a
|
||||
// single place. There is currently only a FIFO queue implementation that always
|
||||
// blocks and never drops messages, but this must be improved with other
|
||||
// implementations. The only exception is that all message sending must also
|
||||
// select on appropriate channel/queue/router closure signals, to avoid blocking
|
||||
// forever on a channel that has no consumer.
|
||||
type Router struct {
|
||||
*service.BaseService
|
||||
logger log.Logger
|
||||
transports map[Protocol]Transport
|
||||
store *peerStore
|
||||
|
||||
// FIXME: Consider using sync.Map.
|
||||
peerMtx sync.RWMutex
|
||||
peerQueues map[NodeID]queue
|
||||
|
||||
// FIXME: We don't strictly need to use a mutex for this if we seal the
|
||||
// channels on router start. This depends on whether we want to allow
|
||||
// dynamic channels in the future.
|
||||
channelMtx sync.RWMutex
|
||||
channelQueues map[ChannelID]queue
|
||||
channelMessages map[ChannelID]proto.Message
|
||||
|
||||
peerUpdatesCh chan PeerUpdate
|
||||
peerUpdatesMtx sync.RWMutex
|
||||
peerUpdatesSubs map[*PeerUpdatesCh]*PeerUpdatesCh // keyed by struct identity (address)
|
||||
|
||||
// stopCh is used to signal router shutdown, by closing the channel.
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// NewRouter creates a new Router, dialing the given peers.
|
||||
//
|
||||
// FIXME: providing protocol/transport maps is cumbersome in tests, we should
|
||||
// consider adding Protocols() to the Transport interface instead and register
|
||||
// protocol/transport mappings automatically on a first-come basis.
|
||||
func NewRouter(logger log.Logger, transports map[Protocol]Transport, peers []PeerAddress) *Router {
|
||||
router := &Router{
|
||||
logger: logger,
|
||||
transports: transports,
|
||||
store: newPeerStore(),
|
||||
stopCh: make(chan struct{}),
|
||||
channelQueues: map[ChannelID]queue{},
|
||||
channelMessages: map[ChannelID]proto.Message{},
|
||||
peerQueues: map[NodeID]queue{},
|
||||
peerUpdatesCh: make(chan PeerUpdate),
|
||||
peerUpdatesSubs: map[*PeerUpdatesCh]*PeerUpdatesCh{},
|
||||
}
|
||||
router.BaseService = service.NewBaseService(logger, "router", router)
|
||||
|
||||
for _, address := range peers {
|
||||
if err := router.store.Add(address); err != nil {
|
||||
logger.Error("failed to add peer", "address", address, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return router
|
||||
}
|
||||
|
||||
// OpenChannel opens a new channel for the given message type. The caller must
|
||||
// close the channel when done, and this must happen before the router stops.
|
||||
func (r *Router) OpenChannel(id ChannelID, messageType proto.Message) (*Channel, error) {
|
||||
// FIXME: NewChannel should take directional channels so we can pass
|
||||
// queue.dequeue() instead of reaching inside for queue.queueCh.
|
||||
queue := newFIFOQueue()
|
||||
channel := NewChannel(id, messageType, queue.queueCh, make(chan Envelope), make(chan PeerError))
|
||||
|
||||
r.channelMtx.Lock()
|
||||
defer r.channelMtx.Unlock()
|
||||
|
||||
if _, ok := r.channelQueues[id]; ok {
|
||||
return nil, fmt.Errorf("channel %v already exists", id)
|
||||
}
|
||||
r.channelQueues[id] = queue
|
||||
r.channelMessages[id] = messageType
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
r.channelMtx.Lock()
|
||||
delete(r.channelQueues, id)
|
||||
delete(r.channelMessages, id)
|
||||
r.channelMtx.Unlock()
|
||||
queue.close()
|
||||
}()
|
||||
r.routeChannel(channel)
|
||||
}()
|
||||
|
||||
return channel, nil
|
||||
}
|
||||
|
||||
// routeChannel receives outbound messages and errors from a channel and routes
|
||||
// them to the appropriate peer. It returns when either the channel is closed or
|
||||
// the router is shutting down.
|
||||
func (r *Router) routeChannel(channel *Channel) {
|
||||
for {
|
||||
select {
|
||||
case envelope, ok := <-channel.outCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// FIXME: This is a bit unergonomic, maybe it'd be better for Wrap()
|
||||
// to return a wrapped copy.
|
||||
if _, ok := channel.messageType.(Wrapper); ok {
|
||||
wrapper := proto.Clone(channel.messageType)
|
||||
if err := wrapper.(Wrapper).Wrap(envelope.Message); err != nil {
|
||||
r.Logger.Error("failed to wrap message", "err", err)
|
||||
continue
|
||||
}
|
||||
envelope.Message = wrapper
|
||||
}
|
||||
envelope.channelID = channel.id
|
||||
|
||||
if envelope.Broadcast {
|
||||
r.peerMtx.RLock()
|
||||
peerQueues := make(map[NodeID]queue, len(r.peerQueues))
|
||||
for peerID, peerQueue := range r.peerQueues {
|
||||
peerQueues[peerID] = peerQueue
|
||||
}
|
||||
r.peerMtx.RUnlock()
|
||||
|
||||
for peerID, peerQueue := range peerQueues {
|
||||
e := envelope
|
||||
e.Broadcast = false
|
||||
e.To = peerID
|
||||
select {
|
||||
case peerQueue.enqueue() <- e:
|
||||
case <-peerQueue.closed():
|
||||
case <-r.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
r.peerMtx.RLock()
|
||||
peerQueue, ok := r.peerQueues[envelope.To]
|
||||
r.peerMtx.RUnlock()
|
||||
if !ok {
|
||||
r.logger.Error("dropping message for non-connected peer",
|
||||
"peer", envelope.To, "channel", channel.id)
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case peerQueue.enqueue() <- envelope:
|
||||
case <-peerQueue.closed():
|
||||
r.logger.Error("dropping message for non-connected peer",
|
||||
"peer", envelope.To, "channel", channel.id)
|
||||
case <-r.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
case peerError, ok := <-channel.errCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// FIXME: We just disconnect the peer for now
|
||||
r.logger.Error("peer error, disconnecting", "peer", peerError.PeerID, "err", peerError.Err)
|
||||
r.peerMtx.RLock()
|
||||
peerQueue, ok := r.peerQueues[peerError.PeerID]
|
||||
r.peerMtx.RUnlock()
|
||||
if ok {
|
||||
peerQueue.close()
|
||||
}
|
||||
|
||||
case <-channel.Done():
|
||||
return
|
||||
case <-r.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// acceptPeers accepts inbound connections from peers on the given transport.
|
||||
func (r *Router) acceptPeers(transport Transport) {
|
||||
for {
|
||||
select {
|
||||
case <-r.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
conn, err := transport.Accept(context.Background())
|
||||
switch err {
|
||||
case nil:
|
||||
case ErrTransportClosed{}, io.EOF:
|
||||
r.logger.Info("transport closed; stopping accept routine", "transport", transport)
|
||||
return
|
||||
default:
|
||||
r.logger.Error("failed to accept connection", "transport", transport, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
peerID := conn.NodeInfo().NodeID
|
||||
if r.store.Claim(peerID) == nil {
|
||||
r.logger.Error("already connected to peer, rejecting connection", "peer", peerID)
|
||||
_ = conn.Close()
|
||||
continue
|
||||
}
|
||||
|
||||
queue := newFIFOQueue()
|
||||
r.peerMtx.Lock()
|
||||
r.peerQueues[peerID] = queue
|
||||
r.peerMtx.Unlock()
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
r.peerMtx.Lock()
|
||||
delete(r.peerQueues, peerID)
|
||||
r.peerMtx.Unlock()
|
||||
queue.close()
|
||||
_ = conn.Close()
|
||||
r.store.Return(peerID)
|
||||
}()
|
||||
|
||||
r.routePeer(peerID, conn, queue)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// dialPeers maintains outbound connections to peers.
|
||||
func (r *Router) dialPeers() {
|
||||
for {
|
||||
select {
|
||||
case <-r.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
peer := r.store.Dispense()
|
||||
if peer == nil {
|
||||
r.logger.Debug("no eligible peers, sleeping")
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
continue
|
||||
case <-r.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer r.store.Return(peer.ID)
|
||||
conn, err := r.dialPeer(peer)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to dial peer, will retry", "peer", peer.ID)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
queue := newFIFOQueue()
|
||||
defer queue.close()
|
||||
r.peerMtx.Lock()
|
||||
r.peerQueues[peer.ID] = queue
|
||||
r.peerMtx.Unlock()
|
||||
|
||||
defer func() {
|
||||
r.peerMtx.Lock()
|
||||
delete(r.peerQueues, peer.ID)
|
||||
r.peerMtx.Unlock()
|
||||
}()
|
||||
|
||||
r.routePeer(peer.ID, conn, queue)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// dialPeer attempts to connect to a peer.
|
||||
func (r *Router) dialPeer(peer *peerInfo) (Connection, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
for _, address := range peer.Addresses {
|
||||
resolveCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
r.logger.Info("resolving peer address", "peer", peer.ID, "address", address)
|
||||
endpoints, err := address.Resolve(resolveCtx)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to resolve address", "address", address, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, endpoint := range endpoints {
|
||||
t, ok := r.transports[endpoint.Protocol]
|
||||
if !ok {
|
||||
r.logger.Error("no transport found for protocol", "protocol", endpoint.Protocol)
|
||||
continue
|
||||
}
|
||||
dialCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
conn, err := t.Dial(dialCtx, endpoint)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to dial endpoint", "endpoint", endpoint)
|
||||
} else {
|
||||
r.logger.Info("connected to peer", "peer", peer.ID, "endpoint", endpoint)
|
||||
return conn, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, errors.New("failed to connect to peer")
|
||||
}
|
||||
|
||||
// routePeer routes inbound messages from a peer to channels, and also sends
|
||||
// outbound queued messages to the peer. It will close the connection and send
|
||||
// queue, using this as a signal to coordinate the internal receivePeer() and
|
||||
// sendPeer() goroutines. It blocks until the peer is done, e.g. when the
|
||||
// connection or queue is closed.
|
||||
func (r *Router) routePeer(peerID NodeID, conn Connection, sendQueue queue) {
|
||||
// FIXME: Peer updates should probably be handled by the peer store.
|
||||
r.peerUpdatesCh <- PeerUpdate{
|
||||
PeerID: peerID,
|
||||
Status: PeerStatusUp,
|
||||
}
|
||||
defer func() {
|
||||
r.peerUpdatesCh <- PeerUpdate{
|
||||
PeerID: peerID,
|
||||
Status: PeerStatusDown,
|
||||
}
|
||||
}()
|
||||
|
||||
resultsCh := make(chan error, 2)
|
||||
go func() {
|
||||
resultsCh <- r.receivePeer(peerID, conn)
|
||||
}()
|
||||
go func() {
|
||||
resultsCh <- r.sendPeer(peerID, conn, sendQueue)
|
||||
}()
|
||||
|
||||
err := <-resultsCh
|
||||
_ = conn.Close()
|
||||
sendQueue.close()
|
||||
if e := <-resultsCh; err == nil {
|
||||
// The first err was nil, so we update it with the second result,
|
||||
// which may or may not be nil.
|
||||
err = e
|
||||
}
|
||||
switch err {
|
||||
case nil, io.EOF, ErrTransportClosed{}:
|
||||
r.logger.Info("peer disconnected", "peer", peerID)
|
||||
default:
|
||||
r.logger.Error("peer failure", "peer", peerID, "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// receivePeer receives inbound messages from a peer, deserializes them and
|
||||
// passes them on to the appropriate channel.
|
||||
func (r *Router) receivePeer(peerID NodeID, conn Connection) error {
|
||||
for {
|
||||
chID, bz, err := conn.ReceiveMessage()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.channelMtx.RLock()
|
||||
queue, ok := r.channelQueues[ChannelID(chID)]
|
||||
messageType := r.channelMessages[ChannelID(chID)]
|
||||
r.channelMtx.RUnlock()
|
||||
if !ok {
|
||||
r.logger.Error("dropping message for unknown channel", "peer", peerID, "channel", chID)
|
||||
continue
|
||||
}
|
||||
|
||||
msg := proto.Clone(messageType)
|
||||
if err := proto.Unmarshal(bz, msg); err != nil {
|
||||
r.logger.Error("message decoding failed, dropping message", "peer", peerID, "err", err)
|
||||
continue
|
||||
}
|
||||
if wrapper, ok := msg.(Wrapper); ok {
|
||||
msg, err = wrapper.Unwrap()
|
||||
if err != nil {
|
||||
r.logger.Error("failed to unwrap message", "err", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
// FIXME: ReceiveMessage() should return ChannelID.
|
||||
case queue.enqueue() <- Envelope{channelID: ChannelID(chID), From: peerID, Message: msg}:
|
||||
r.logger.Debug("received message", "peer", peerID, "message", msg)
|
||||
case <-queue.closed():
|
||||
r.logger.Error("channel closed, dropping message", "peer", peerID, "channel", chID)
|
||||
case <-r.stopCh:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendPeer sends queued messages to a peer.
|
||||
func (r *Router) sendPeer(peerID NodeID, conn Connection, queue queue) error {
|
||||
for {
|
||||
select {
|
||||
case envelope := <-queue.dequeue():
|
||||
bz, err := proto.Marshal(envelope.Message)
|
||||
if err != nil {
|
||||
r.logger.Error("failed to marshal message", "peer", peerID, "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// FIXME: SendMessage() should take ChannelID.
|
||||
_, err = conn.SendMessage(byte(envelope.channelID), bz)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.logger.Debug("sent message", "peer", envelope.To, "message", envelope.Message)
|
||||
|
||||
case <-queue.closed():
|
||||
return nil
|
||||
|
||||
case <-r.stopCh:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SubscribePeerUpdates creates a new peer updates subscription. The caller must
|
||||
// consume the peer updates in a timely fashion, since delivery is guaranteed and
|
||||
// will block peer connection/disconnection otherwise.
|
||||
func (r *Router) SubscribePeerUpdates() (*PeerUpdatesCh, error) {
|
||||
// FIXME: We may want to use a size 1 buffer here. When the router
|
||||
// broadcasts a peer update it has to loop over all of the
|
||||
// subscriptions, and we want to avoid blocking and waiting for a
|
||||
// context switch before continuing to the next subscription. This also
|
||||
// prevents tail latencies from compounding across updates. We also want
|
||||
// to make sure the subscribers are reasonably in sync, so it should be
|
||||
// kept at 1. However, this should be benchmarked first.
|
||||
peerUpdates := NewPeerUpdates(make(chan PeerUpdate))
|
||||
r.peerUpdatesMtx.Lock()
|
||||
r.peerUpdatesSubs[peerUpdates] = peerUpdates
|
||||
r.peerUpdatesMtx.Unlock()
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-peerUpdates.Done():
|
||||
r.peerUpdatesMtx.Lock()
|
||||
delete(r.peerUpdatesSubs, peerUpdates)
|
||||
r.peerUpdatesMtx.Unlock()
|
||||
case <-r.stopCh:
|
||||
}
|
||||
}()
|
||||
return peerUpdates, nil
|
||||
}
|
||||
|
||||
// broadcastPeerUpdates broadcasts peer updates received from the router
|
||||
// to all subscriptions.
|
||||
func (r *Router) broadcastPeerUpdates() {
|
||||
for {
|
||||
select {
|
||||
case peerUpdate := <-r.peerUpdatesCh:
|
||||
subs := []*PeerUpdatesCh{}
|
||||
r.peerUpdatesMtx.RLock()
|
||||
for _, sub := range r.peerUpdatesSubs {
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
r.peerUpdatesMtx.RUnlock()
|
||||
|
||||
for _, sub := range subs {
|
||||
select {
|
||||
case sub.updatesCh <- peerUpdate:
|
||||
case <-sub.doneCh:
|
||||
case <-r.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
case <-r.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// OnStart implements service.Service.
|
||||
func (r *Router) OnStart() error {
|
||||
go r.broadcastPeerUpdates()
|
||||
go r.dialPeers()
|
||||
for _, transport := range r.transports {
|
||||
go r.acceptPeers(transport)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements service.Service.
|
||||
func (r *Router) OnStop() {
|
||||
// Collect all active queues, so we can wait for them to close.
|
||||
queues := []queue{}
|
||||
r.channelMtx.RLock()
|
||||
for _, q := range r.channelQueues {
|
||||
queues = append(queues, q)
|
||||
}
|
||||
r.channelMtx.RUnlock()
|
||||
r.peerMtx.RLock()
|
||||
for _, q := range r.peerQueues {
|
||||
queues = append(queues, q)
|
||||
}
|
||||
r.peerMtx.RUnlock()
|
||||
|
||||
// Signal router shutdown, and wait for queues (and thus goroutines)
|
||||
// to complete.
|
||||
close(r.stopCh)
|
||||
for _, q := range queues {
|
||||
<-q.closed()
|
||||
}
|
||||
}
|
||||
117
p2p/router_test.go
Normal file
117
p2p/router_test.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package p2p_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
gogotypes "github.com/gogo/protobuf/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
type TestMessage = gogotypes.StringValue
|
||||
|
||||
func echoReactor(channel *p2p.Channel) {
|
||||
for {
|
||||
select {
|
||||
case envelope := <-channel.In():
|
||||
channel.Out() <- p2p.Envelope{
|
||||
To: envelope.From,
|
||||
Message: &TestMessage{Value: envelope.Message.(*TestMessage).Value},
|
||||
}
|
||||
case <-channel.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter(t *testing.T) {
|
||||
logger := log.TestingLogger()
|
||||
network := p2p.NewMemoryNetwork(logger)
|
||||
transport := network.GenerateTransport()
|
||||
chID := p2p.ChannelID(1)
|
||||
|
||||
// Start some other in-memory network nodes to communicate with, running
|
||||
// a simple echo reactor that returns received messages.
|
||||
peers := []p2p.PeerAddress{}
|
||||
for i := 0; i < 3; i++ {
|
||||
peerTransport := network.GenerateTransport()
|
||||
peerRouter := p2p.NewRouter(logger.With("peerID", i), map[p2p.Protocol]p2p.Transport{
|
||||
p2p.MemoryProtocol: peerTransport,
|
||||
}, nil)
|
||||
peers = append(peers, peerTransport.Endpoints()[0].PeerAddress())
|
||||
|
||||
channel, err := peerRouter.OpenChannel(chID, &TestMessage{})
|
||||
require.NoError(t, err)
|
||||
defer channel.Close()
|
||||
go echoReactor(channel)
|
||||
|
||||
err = peerRouter.Start()
|
||||
require.NoError(t, err)
|
||||
defer func() { require.NoError(t, peerRouter.Stop()) }()
|
||||
}
|
||||
|
||||
// Start the main router and connect it to the peers above.
|
||||
router := p2p.NewRouter(logger, map[p2p.Protocol]p2p.Transport{
|
||||
p2p.MemoryProtocol: transport,
|
||||
}, peers)
|
||||
channel, err := router.OpenChannel(chID, &TestMessage{})
|
||||
require.NoError(t, err)
|
||||
peerUpdates, err := router.SubscribePeerUpdates()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = router.Start()
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
channel.Close()
|
||||
peerUpdates.Close()
|
||||
require.NoError(t, router.Stop())
|
||||
}()
|
||||
|
||||
// Wait for peers to come online, and ping them as they do.
|
||||
for i := 0; i < len(peers); i++ {
|
||||
peerUpdate := <-peerUpdates.Updates()
|
||||
peerID := peerUpdate.PeerID
|
||||
require.Equal(t, p2p.PeerUpdate{
|
||||
PeerID: peerID,
|
||||
Status: p2p.PeerStatusUp,
|
||||
}, peerUpdate)
|
||||
|
||||
channel.Out() <- p2p.Envelope{To: peerID, Message: &TestMessage{Value: "hi!"}}
|
||||
assert.Equal(t, p2p.Envelope{
|
||||
From: peerID,
|
||||
Message: &TestMessage{Value: "hi!"},
|
||||
}, (<-channel.In()).Strip())
|
||||
}
|
||||
|
||||
// We then submit an error for a peer, and watch it get disconnected.
|
||||
channel.Error() <- p2p.PeerError{
|
||||
PeerID: peers[0].NodeID(),
|
||||
Err: errors.New("test error"),
|
||||
Severity: p2p.PeerErrorSeverityCritical,
|
||||
}
|
||||
peerUpdate := <-peerUpdates.Updates()
|
||||
require.Equal(t, p2p.PeerUpdate{
|
||||
PeerID: peers[0].NodeID(),
|
||||
Status: p2p.PeerStatusDown,
|
||||
}, peerUpdate)
|
||||
|
||||
// We now broadcast a message, which we should receive back from only two peers.
|
||||
channel.Out() <- p2p.Envelope{
|
||||
Broadcast: true,
|
||||
Message: &TestMessage{Value: "broadcast"},
|
||||
}
|
||||
for i := 0; i < len(peers)-1; i++ {
|
||||
envelope := <-channel.In()
|
||||
require.NotEqual(t, peers[0].NodeID(), envelope.From)
|
||||
require.Equal(t, &TestMessage{Value: "broadcast"}, envelope.Message)
|
||||
}
|
||||
select {
|
||||
case envelope := <-channel.In():
|
||||
t.Errorf("unexpected message: %v", envelope)
|
||||
default:
|
||||
}
|
||||
}
|
||||
@@ -11,9 +11,15 @@ import (
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultProtocol Protocol = MConnProtocol
|
||||
)
|
||||
|
||||
// Transport is an arbitrary mechanism for exchanging bytes with a peer.
|
||||
type Transport interface {
|
||||
// Accept waits for the next inbound connection on a listening endpoint.
|
||||
// Accept waits for the next inbound connection on a listening endpoint. If
|
||||
// this returns io.EOF or ErrTransportClosed the transport should be
|
||||
// considered closed and further Accept() calls are futile.
|
||||
Accept(context.Context) (Connection, error)
|
||||
|
||||
// Dial creates an outbound connection to an endpoint.
|
||||
@@ -60,21 +66,27 @@ type Endpoint struct {
|
||||
Port uint16
|
||||
}
|
||||
|
||||
// String formats an endpoint as a URL string.
|
||||
func (e Endpoint) String() string {
|
||||
u := url.URL{Scheme: string(e.Protocol)}
|
||||
if e.PeerID != "" {
|
||||
u.User = url.User(string(e.PeerID))
|
||||
// PeerAddress converts the endpoint into a peer address URL.
|
||||
func (e Endpoint) PeerAddress() PeerAddress {
|
||||
u := &url.URL{
|
||||
Scheme: string(e.Protocol),
|
||||
User: url.User(string(e.PeerID)),
|
||||
}
|
||||
if len(e.IP) > 0 {
|
||||
if e.IP != nil {
|
||||
u.Host = e.IP.String()
|
||||
if e.Port > 0 {
|
||||
u.Host = net.JoinHostPort(u.Host, fmt.Sprintf("%v", e.Port))
|
||||
}
|
||||
} else if e.Path != "" {
|
||||
u.Path = e.Path
|
||||
} else {
|
||||
u.Opaque = e.Path
|
||||
}
|
||||
return u.String()
|
||||
return PeerAddress{URL: u}
|
||||
}
|
||||
|
||||
// String formats an endpoint as a URL string.
|
||||
func (e Endpoint) String() string {
|
||||
return e.PeerAddress().URL.String()
|
||||
}
|
||||
|
||||
// Validate validates an endpoint.
|
||||
|
||||
Reference in New Issue
Block a user