diff --git a/p2p/peer.go b/p2p/peer.go index 19790e276..13ddc3cae 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -493,32 +493,43 @@ func (m *PeerManager) Add(address PeerAddress) error { return nil } -// Advertise returns a list of peer addresses to advertise to a peer. +// Advertise returns a list of peer endpoints to advertise to a peer. // -// FIXME: We currently just pass all addresses we have, which is very naïve. We -// should e.g. only send addresses that the peer can actually reach (by -// resolving the addresses into endpoints and making sure any private IP -// addresses are on the same network as the remote peer endpoint). However, this -// would require resolving endpoints either here (too slow) or generally in the -// peer manager -- maybe it should keep track of endpoints internally instead of -// leaving that to the router when dialing? -func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []PeerAddress { +// FIXME: The current PEX protocol only supports IP/port endpoints, so we +// returns endpoints here. The PEX protocol should exchange addresses (URLs) +// instead, so it can support multiple protocols and allow operators to +// change their IP addresses. +// +// FIXME: We currently just resolve and pass all addresses we have, which is +// very naïve. We should e.g. only send addresses that the peer can actually +// reach, by making sure any private IP addresses are on the same network as the +// remote peer endpoint. We should also resolve endpoints when addresses are +// added to the peer manager, and periodically as appropriate. +func (m *PeerManager) Advertise(peerID NodeID, limit uint16) []Endpoint { m.mtx.Lock() defer m.mtx.Unlock() - addresses := []PeerAddress{} + endpoints := make([]Endpoint, 0, limit) for _, peer := range m.store.Ranked() { - switch { - case len(addresses) >= int(limit): - break - case peer.ID == peerID: - default: - for _, addressInfo := range peer.AddressInfo { - addresses = append(addresses, addressInfo.Address) + if peer.ID == peerID { + continue + } + for _, addressInfo := range peer.AddressInfo { + addressEndpoints, err := addressInfo.Address.Resolve(context.Background()) + if err != nil { + continue + } + for _, endpoint := range addressEndpoints { + if len(endpoints) >= int(limit) { + return endpoints + } + if endpoint.IP != nil { + endpoints = append(endpoints, endpoint) + } } } } - return addresses + return endpoints } // makePeerInfo creates a peerInfo for a new peer. diff --git a/p2p/pex/reactor.go b/p2p/pex/reactor.go new file mode 100644 index 000000000..56a5222e8 --- /dev/null +++ b/p2p/pex/reactor.go @@ -0,0 +1,189 @@ +package pex + +import ( + "fmt" + + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/libs/service" + "github.com/tendermint/tendermint/p2p" + protop2p "github.com/tendermint/tendermint/proto/tendermint/p2p" +) + +var ( + _ service.Service = (*ReactorV2)(nil) + _ p2p.Wrapper = (*protop2p.Message)(nil) +) + +const ( + maxAddresses uint16 = 100 +) + +// ReactorV2 is a PEX reactor for the new P2P stack. The legacy reactor +// is Reactor. +// +// FIXME: Rename this when Reactor is removed, and consider moving to p2p/. +type ReactorV2 struct { + service.BaseService + + peerManager *p2p.PeerManager + pexCh *p2p.Channel + peerUpdates *p2p.PeerUpdatesCh + closeCh chan struct{} +} + +// NewReactor returns a reference to a new reactor. +func NewReactorV2( + logger log.Logger, + peerManager *p2p.PeerManager, + pexCh *p2p.Channel, + peerUpdates *p2p.PeerUpdatesCh, +) *ReactorV2 { + r := &ReactorV2{ + peerManager: peerManager, + pexCh: pexCh, + peerUpdates: peerUpdates, + closeCh: make(chan struct{}), + } + + r.BaseService = *service.NewBaseService(logger, "PEX", r) + return r +} + +// OnStart starts separate go routines for each p2p Channel and listens for +// envelopes on each. In addition, it also listens for peer updates and handles +// messages on that p2p channel accordingly. The caller must be sure to execute +// OnStop to ensure the outbound p2p Channels are closed. +func (r *ReactorV2) OnStart() error { + go r.processPexCh() + go r.processPeerUpdates() + return nil +} + +// OnStop stops the reactor by signaling to all spawned goroutines to exit and +// blocking until they all exit. +func (r *ReactorV2) OnStop() { + // Close closeCh to signal to all spawned goroutines to gracefully exit. All + // p2p Channels should execute Close(). + close(r.closeCh) + + // Wait for all p2p Channels to be closed before returning. This ensures we + // can easily reason about synchronization of all p2p Channels and ensure no + // panics will occur. + <-r.pexCh.Done() + <-r.peerUpdates.Done() +} + +// handlePexMessage handles envelopes sent from peers on the PexChannel. +func (r *ReactorV2) handlePexMessage(envelope p2p.Envelope) error { + logger := r.Logger.With("peer", envelope.From) + + // FIXME: We may want to add DoS protection here, by rate limiting peers and + // only processing addresses we actually requested. + switch msg := envelope.Message.(type) { + case *protop2p.PexRequest: + endpoints := r.peerManager.Advertise(envelope.From, maxAddresses) + resp := &protop2p.PexAddrs{Addrs: make([]protop2p.NetAddress, 0, len(endpoints))} + for _, endpoint := range endpoints { + // FIXME: This shouldn't rely on NetAddress. + resp.Addrs = append(resp.Addrs, endpoint.NetAddress().ToProto()) + } + r.pexCh.Out() <- p2p.Envelope{To: envelope.From, Message: resp} + + case *protop2p.PexAddrs: + for _, pbAddr := range msg.Addrs { + // FIXME: This shouldn't rely on NetAddress. + netaddr, err := p2p.NetAddressFromProto(pbAddr) + if err != nil { + logger.Debug("received invalid PEX address", "addr", netaddr, "err", err) + continue + } + if err = r.peerManager.Add(netaddr.Endpoint().PeerAddress()); err != nil { + logger.Debug("received invalid PEX address", "addr", netaddr, "err", err) + continue + } + } + + default: + return fmt.Errorf("received unknown message: %T", msg) + } + + return nil +} + +// handleMessage handles an Envelope sent from a peer on a specific p2p Channel. +// It will handle errors and any possible panics gracefully. A caller can handle +// any error returned by sending a PeerError on the respective channel. +func (r *ReactorV2) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err error) { + defer func() { + if e := recover(); e != nil { + err = fmt.Errorf("panic in processing message: %v", e) + } + }() + + r.Logger.Debug("received message", "peer", envelope.From) + + switch chID { + case p2p.ChannelID(PexChannel): + err = r.handlePexMessage(envelope) + + default: + err = fmt.Errorf("unknown channel ID (%d) for envelope (%v)", chID, envelope) + } + + return err +} + +// processPexCh implements a blocking event loop where we listen for p2p +// Envelope messages from the pexCh. +func (r *ReactorV2) processPexCh() { + defer r.pexCh.Close() + + for { + select { + case envelope := <-r.pexCh.In(): + if err := r.handleMessage(r.pexCh.ID(), envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.pexCh.ID(), "envelope", envelope, "err", err) + r.pexCh.Error() <- p2p.PeerError{ + PeerID: envelope.From, + Err: err, + Severity: p2p.PeerErrorSeverityLow, + } + } + + case <-r.closeCh: + r.Logger.Debug("stopped listening on PEX channel; closing...") + return + } + } +} + +// processPeerUpdate processes a PeerUpdate. For added peers, PeerStatusUp, we +// send a request for addresses. +func (r *ReactorV2) processPeerUpdate(peerUpdate p2p.PeerUpdate) { + r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status) + + if peerUpdate.Status == p2p.PeerStatusUp { + r.pexCh.Out() <- p2p.Envelope{ + To: peerUpdate.PeerID, + Message: &protop2p.PexRequest{}, + } + } +} + +// processPeerUpdates initiates a blocking process where we listen for and handle +// PeerUpdate messages. When the reactor is stopped, we will catch the signal and +// close the p2p PeerUpdatesCh gracefully. +func (r *ReactorV2) processPeerUpdates() { + defer r.peerUpdates.Close() + + for { + select { + case peerUpdate := <-r.peerUpdates.Updates(): + r.processPeerUpdate(peerUpdate) + + case <-r.closeCh: + r.Logger.Debug("stopped listening on peer updates channel; closing...") + return + } + } +} diff --git a/proto/tendermint/p2p/pex.go b/proto/tendermint/p2p/pex.go new file mode 100644 index 000000000..fecb056d5 --- /dev/null +++ b/proto/tendermint/p2p/pex.go @@ -0,0 +1,33 @@ +package p2p + +import ( + fmt "fmt" + + proto "github.com/gogo/protobuf/proto" +) + +// Wrap implements the p2p Wrapper interface and wraps a PEX message. +func (m *Message) Wrap(pb proto.Message) error { + switch msg := pb.(type) { + case *PexRequest: + m.Sum = &Message_PexRequest{PexRequest: msg} + case *PexAddrs: + m.Sum = &Message_PexAddrs{PexAddrs: msg} + default: + return fmt.Errorf("unknown message: %T", msg) + } + return nil +} + +// Unwrap implements the p2p Wrapper interface and unwraps a wrapped PEX +// message. +func (m *Message) Unwrap() (proto.Message, error) { + switch msg := m.Sum.(type) { + case *Message_PexRequest: + return msg.PexRequest, nil + case *Message_PexAddrs: + return msg.PexAddrs, nil + default: + return nil, fmt.Errorf("unknown message: %T", msg) + } +}