pex: remove legacy proto messages (#7147)

This PR implements the proto changes made in https://github.com/tendermint/spec/pull/352, removing the legacy messages that were used in the pex reactor.
This commit is contained in:
Callum Waters
2021-10-22 14:07:46 +02:00
committed by GitHub
parent 88bdd328ed
commit 68ca65f5d7
28 changed files with 669 additions and 1199 deletions

View File

@@ -1,7 +1,6 @@
package pex
import (
"context"
"fmt"
"runtime/debug"
"sync"
@@ -21,8 +20,6 @@ var (
_ p2p.Wrapper = (*protop2p.PexMessage)(nil)
)
// TODO: Consolidate with params file.
// See https://github.com/tendermint/tendermint/issues/6371
const (
// PexChannel is a channel for PEX messages
PexChannel = 0x00
@@ -46,9 +43,6 @@ const (
// the maximum amount of addresses that can be included in a response
maxAddresses uint16 = 100
// allocated time to resolve a node address into a set of endpoints
resolveTimeout = 3 * time.Second
// How long to wait when there are no peers available before trying again
noAvailablePeersWaitPeriod = 1 * time.Second
@@ -217,16 +211,22 @@ func (r *Reactor) handlePexMessage(envelope p2p.Envelope) error {
logger := r.Logger.With("peer", envelope.From)
switch msg := envelope.Message.(type) {
case *protop2p.PexRequest:
// Check if the peer hasn't sent a prior request too close to this one
// in time.
// check if the peer hasn't sent a prior request too close to this one
// in time
if err := r.markPeerRequest(envelope.From); err != nil {
return err
}
// parse and send the legacy PEX addresses
pexAddresses := r.resolve(r.peerManager.Advertise(envelope.From, maxAddresses))
// request peers from the peer manager and parse the NodeAddresses into
// URL strings
nodeAddresses := r.peerManager.Advertise(envelope.From, maxAddresses)
pexAddresses := make([]protop2p.PexAddress, len(nodeAddresses))
for idx, addr := range nodeAddresses {
pexAddresses[idx] = protop2p.PexAddress{
URL: addr.String(),
}
}
r.pexCh.Out <- p2p.Envelope{
To: envelope.From,
Message: &protop2p.PexResponse{Addresses: pexAddresses},
@@ -247,9 +247,7 @@ func (r *Reactor) handlePexMessage(envelope p2p.Envelope) error {
}
for _, pexAddress := range msg.Addresses {
// no protocol is prefixed so we assume the default (mconn)
peerAddress, err := p2p.ParseNodeAddress(
fmt.Sprintf("%s@%s:%d", pexAddress.ID, pexAddress.IP, pexAddress.Port))
peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL)
if err != nil {
continue
}
@@ -264,58 +262,6 @@ func (r *Reactor) handlePexMessage(envelope p2p.Envelope) error {
r.totalPeers++
}
// V2 PEX MESSAGES
case *protop2p.PexRequestV2:
// check if the peer hasn't sent a prior request too close to this one
// in time
if err := r.markPeerRequest(envelope.From); err != nil {
return err
}
// request peers from the peer manager and parse the NodeAddresses into
// URL strings
nodeAddresses := r.peerManager.Advertise(envelope.From, maxAddresses)
pexAddressesV2 := make([]protop2p.PexAddressV2, len(nodeAddresses))
for idx, addr := range nodeAddresses {
pexAddressesV2[idx] = protop2p.PexAddressV2{
URL: addr.String(),
}
}
r.pexCh.Out <- p2p.Envelope{
To: envelope.From,
Message: &protop2p.PexResponseV2{Addresses: pexAddressesV2},
}
case *protop2p.PexResponseV2:
// check if the response matches a request that was made to that peer
if err := r.markPeerResponse(envelope.From); err != nil {
return err
}
// check the size of the response
if len(msg.Addresses) > int(maxAddresses) {
return fmt.Errorf("peer sent too many addresses (max: %d, got: %d)",
maxAddresses,
len(msg.Addresses),
)
}
for _, pexAddress := range msg.Addresses {
peerAddress, err := p2p.ParseNodeAddress(pexAddress.URL)
if err != nil {
continue
}
added, err := r.peerManager.Add(peerAddress)
if err != nil {
logger.Error("failed to add V2 PEX address", "address", peerAddress, "err", err)
}
if added {
r.newPeers++
logger.Debug("added V2 PEX address", "address", peerAddress)
}
r.totalPeers++
}
default:
return fmt.Errorf("received unknown message: %T", msg)
}
@@ -323,55 +269,6 @@ func (r *Reactor) handlePexMessage(envelope p2p.Envelope) error {
return nil
}
// resolve resolves a set of peer addresses into PEX addresses.
//
// FIXME: This is necessary because the current PEX protocol only supports
// IP/port pairs, while the P2P stack uses NodeAddress URLs. The PEX protocol
// should really use URLs too, to exchange DNS names instead of IPs and allow
// different transport protocols (e.g. QUIC and MemoryTransport).
//
// FIXME: We may want to cache and parallelize this, but for now we'll just rely
// on the operating system to cache it for us.
func (r *Reactor) resolve(addresses []p2p.NodeAddress) []protop2p.PexAddress {
limit := len(addresses)
pexAddresses := make([]protop2p.PexAddress, 0, limit)
for _, address := range addresses {
ctx, cancel := context.WithTimeout(context.Background(), resolveTimeout)
endpoints, err := address.Resolve(ctx)
r.Logger.Debug("resolved node address", "endpoints", endpoints)
cancel()
if err != nil {
r.Logger.Debug("failed to resolve address", "address", address, "err", err)
continue
}
for _, endpoint := range endpoints {
r.Logger.Debug("checking endpint", "IP", endpoint.IP, "Port", endpoint.Port)
if len(pexAddresses) >= limit {
return pexAddresses
} else if endpoint.IP != nil {
r.Logger.Debug("appending pex address")
// PEX currently only supports IP-networked transports (as
// opposed to e.g. p2p.MemoryTransport).
//
// FIXME: as the PEX address contains no information about the
// protocol, we jam this into the ID. We won't need to this once
// we support URLs
pexAddresses = append(pexAddresses, protop2p.PexAddress{
ID: string(address.NodeID),
IP: endpoint.IP.String(),
Port: uint32(endpoint.Port),
})
}
}
}
return pexAddresses
}
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
// It will handle errors and any possible panics gracefully. A caller can handle
// any error returned by sending a PeerError on the respective channel.
@@ -444,17 +341,10 @@ func (r *Reactor) sendRequestForPeers() {
break
}
// The node accommodates for both pex systems
if r.isLegacyPeer(peerID) {
r.pexCh.Out <- p2p.Envelope{
To: peerID,
Message: &protop2p.PexRequest{},
}
} else {
r.pexCh.Out <- p2p.Envelope{
To: peerID,
Message: &protop2p.PexRequestV2{},
}
// send out the pex request
r.pexCh.Out <- p2p.Envelope{
To: peerID,
Message: &protop2p.PexRequest{},
}
// remove the peer from the abvailable peers list and mark it in the requestsSent map
@@ -538,14 +428,3 @@ func (r *Reactor) markPeerResponse(peer types.NodeID) error {
r.availablePeers[peer] = struct{}{}
return nil
}
// all addresses must use a MCONN protocol for the peer to be considered part of the
// legacy p2p pex system
func (r *Reactor) isLegacyPeer(peer types.NodeID) bool {
for _, addr := range r.peerManager.Addresses(peer) {
if addr.Protocol != p2p.MConnProtocol {
return false
}
}
return true
}

View File

@@ -1,7 +1,6 @@
package pex_test
import (
"context"
"strings"
"testing"
"time"
@@ -27,7 +26,6 @@ const (
firstNode = 0
secondNode = 1
thirdNode = 2
fourthNode = 3
)
func TestReactorBasic(t *testing.T) {
@@ -44,8 +42,8 @@ func TestReactorBasic(t *testing.T) {
// assert that when a mock node sends a request it receives a response (and
// the correct one)
testNet.sendRequest(t, firstNode, secondNode, true)
testNet.listenForResponse(t, secondNode, firstNode, shortWait, []p2pproto.PexAddressV2(nil))
testNet.sendRequest(t, firstNode, secondNode)
testNet.listenForResponse(t, secondNode, firstNode, shortWait, []p2pproto.PexAddress(nil))
}
func TestReactorConnectFullNetwork(t *testing.T) {
@@ -71,17 +69,17 @@ func TestReactorSendsRequestsTooOften(t *testing.T) {
r.pexInCh <- p2p.Envelope{
From: badNode,
Message: &p2pproto.PexRequestV2{},
Message: &p2pproto.PexRequest{},
}
resp := <-r.pexOutCh
msg, ok := resp.Message.(*p2pproto.PexResponseV2)
msg, ok := resp.Message.(*p2pproto.PexResponse)
require.True(t, ok)
require.Empty(t, msg.Addresses)
r.pexInCh <- p2p.Envelope{
From: badNode,
Message: &p2pproto.PexRequestV2{},
Message: &p2pproto.PexRequest{},
}
peerErr := <-r.pexErrCh
@@ -102,8 +100,8 @@ func TestReactorSendsResponseWithoutRequest(t *testing.T) {
// firstNode sends the secondNode an unrequested response
// NOTE: secondNode will send a request by default during startup so we send
// two responses to counter that.
testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode}, true)
testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode}, true)
testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode})
testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode})
// secondNode should evict the firstNode
testNet.listenForPeerUpdate(t, secondNode, firstNode, p2p.PeerStatusDown, shortWait)
@@ -136,10 +134,10 @@ func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
addresses := make([]p2pproto.PexAddressV2, 101)
addresses := make([]p2pproto.PexAddress, 101)
for i := 0; i < len(addresses); i++ {
nodeAddress := p2p.NodeAddress{Protocol: p2p.MemoryProtocol, NodeID: randomNodeID(t)}
addresses[i] = p2pproto.PexAddressV2{
addresses[i] = p2pproto.PexAddress{
URL: nodeAddress.String(),
}
}
@@ -152,12 +150,12 @@ func TestReactorErrorsOnReceivingTooManyPeers(t *testing.T) {
select {
// wait for a request and then send a response with too many addresses
case req := <-r.pexOutCh:
if _, ok := req.Message.(*p2pproto.PexRequestV2); !ok {
if _, ok := req.Message.(*p2pproto.PexRequest); !ok {
t.Fatal("expected v2 pex request")
}
r.pexInCh <- p2p.Envelope{
From: peer.NodeID,
Message: &p2pproto.PexResponseV2{
Message: &p2pproto.PexResponse{
Addresses: addresses,
},
}
@@ -239,38 +237,6 @@ func TestReactorWithNetworkGrowth(t *testing.T) {
}
}
func TestReactorIntegrationWithLegacyHandleRequest(t *testing.T) {
testNet := setupNetwork(t, testOptions{
MockNodes: 1,
TotalNodes: 3,
})
testNet.connectAll(t)
testNet.start(t)
t.Log(testNet.nodes)
// mock node sends a V1 Pex message to the second node
testNet.sendRequest(t, firstNode, secondNode, false)
addrs := testNet.getAddressesFor(t, []int{thirdNode})
testNet.listenForLegacyResponse(t, secondNode, firstNode, shortWait, addrs)
}
func TestReactorIntegrationWithLegacyHandleResponse(t *testing.T) {
testNet := setupNetwork(t, testOptions{
MockNodes: 1,
TotalNodes: 4,
BufferSize: 4,
})
testNet.connectPeers(t, firstNode, secondNode)
testNet.connectPeers(t, firstNode, thirdNode)
testNet.connectPeers(t, firstNode, fourthNode)
testNet.start(t)
testNet.listenForRequest(t, secondNode, firstNode, shortWait)
// send a v1 response instead
testNet.sendResponse(t, firstNode, secondNode, []int{thirdNode, fourthNode}, false)
testNet.requireNumberOfPeers(t, secondNode, len(testNet.nodes)-1, shortWait)
}
type singleTestReactor struct {
reactor *pex.Reactor
pexInCh chan p2p.Envelope
@@ -484,11 +450,11 @@ func (r *reactorTestSuite) listenForRequest(t *testing.T, fromNode, toNode int,
r.logger.Info("Listening for request", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
_, ok := msg.Message.(*p2pproto.PexRequestV2)
_, ok := msg.Message.(*p2pproto.PexRequest)
return ok && msg.From == from
}
assertion := func(t *testing.T, msg p2p.Envelope) bool {
require.Equal(t, &p2pproto.PexRequestV2{}, msg.Message)
require.Equal(t, &p2pproto.PexRequest{}, msg.Message)
return true
}
r.listenFor(t, to, conditional, assertion, waitPeriod)
@@ -503,11 +469,11 @@ func (r *reactorTestSuite) pingAndlistenForNAddresses(
r.logger.Info("Listening for addresses", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
_, ok := msg.Message.(*p2pproto.PexResponseV2)
_, ok := msg.Message.(*p2pproto.PexResponse)
return ok && msg.From == from
}
assertion := func(t *testing.T, msg p2p.Envelope) bool {
m, ok := msg.Message.(*p2pproto.PexResponseV2)
m, ok := msg.Message.(*p2pproto.PexResponse)
if !ok {
require.Fail(t, "expected pex response v2")
return true
@@ -519,34 +485,14 @@ func (r *reactorTestSuite) pingAndlistenForNAddresses(
// if we didn't get the right length, we wait and send the
// request again
time.Sleep(300 * time.Millisecond)
r.sendRequest(t, toNode, fromNode, true)
r.sendRequest(t, toNode, fromNode)
return false
}
r.sendRequest(t, toNode, fromNode, true)
r.sendRequest(t, toNode, fromNode)
r.listenFor(t, to, conditional, assertion, waitPeriod)
}
func (r *reactorTestSuite) listenForResponse(
t *testing.T,
fromNode, toNode int,
waitPeriod time.Duration,
addresses []p2pproto.PexAddressV2,
) {
r.logger.Info("Listening for response", "from", fromNode, "to", toNode)
to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
_, ok := msg.Message.(*p2pproto.PexResponseV2)
r.logger.Info("message", msg, "ok", ok)
return ok && msg.From == from
}
assertion := func(t *testing.T, msg p2p.Envelope) bool {
require.Equal(t, &p2pproto.PexResponseV2{Addresses: addresses}, msg.Message)
return true
}
r.listenFor(t, to, conditional, assertion, waitPeriod)
}
func (r *reactorTestSuite) listenForLegacyResponse(
t *testing.T,
fromNode, toNode int,
waitPeriod time.Duration,
@@ -556,6 +502,7 @@ func (r *reactorTestSuite) listenForLegacyResponse(
to, from := r.checkNodePair(t, toNode, fromNode)
conditional := func(msg p2p.Envelope) bool {
_, ok := msg.Message.(*p2pproto.PexResponse)
r.logger.Info("message", msg, "ok", ok)
return ok && msg.From == from
}
assertion := func(t *testing.T, msg p2p.Envelope) bool {
@@ -591,46 +538,22 @@ func (r *reactorTestSuite) listenForPeerUpdate(
}
}
func (r *reactorTestSuite) getV2AddressesFor(nodes []int) []p2pproto.PexAddressV2 {
addresses := make([]p2pproto.PexAddressV2, len(nodes))
func (r *reactorTestSuite) getAddressesFor(nodes []int) []p2pproto.PexAddress {
addresses := make([]p2pproto.PexAddress, len(nodes))
for idx, node := range nodes {
nodeID := r.nodes[node]
addresses[idx] = p2pproto.PexAddressV2{
addresses[idx] = p2pproto.PexAddress{
URL: r.network.Nodes[nodeID].NodeAddress.String(),
}
}
return addresses
}
func (r *reactorTestSuite) getAddressesFor(t *testing.T, nodes []int) []p2pproto.PexAddress {
addresses := make([]p2pproto.PexAddress, len(nodes))
for idx, node := range nodes {
nodeID := r.nodes[node]
nodeAddrs := r.network.Nodes[nodeID].NodeAddress
endpoints, err := nodeAddrs.Resolve(context.Background())
require.NoError(t, err)
require.Len(t, endpoints, 1)
addresses[idx] = p2pproto.PexAddress{
ID: string(nodeAddrs.NodeID),
IP: endpoints[0].IP.String(),
Port: uint32(endpoints[0].Port),
}
}
return addresses
}
func (r *reactorTestSuite) sendRequest(t *testing.T, fromNode, toNode int, v2 bool) {
func (r *reactorTestSuite) sendRequest(t *testing.T, fromNode, toNode int) {
to, from := r.checkNodePair(t, toNode, fromNode)
if v2 {
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &p2pproto.PexRequestV2{},
}
} else {
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &p2pproto.PexRequest{},
}
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &p2pproto.PexRequest{},
}
}
@@ -638,25 +561,14 @@ func (r *reactorTestSuite) sendResponse(
t *testing.T,
fromNode, toNode int,
withNodes []int,
v2 bool,
) {
from, to := r.checkNodePair(t, fromNode, toNode)
if v2 {
addrs := r.getV2AddressesFor(withNodes)
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &p2pproto.PexResponseV2{
Addresses: addrs,
},
}
} else {
addrs := r.getAddressesFor(t, withNodes)
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &p2pproto.PexResponse{
Addresses: addrs,
},
}
addrs := r.getAddressesFor(withNodes)
r.pexChannels[from].Out <- p2p.Envelope{
To: to,
Message: &p2pproto.PexResponse{
Addresses: addrs,
},
}
}
@@ -759,32 +671,6 @@ func (r *reactorTestSuite) connectPeers(t *testing.T, sourceNode, targetNode int
require.True(t, added)
}
// nolint: unused
func (r *reactorTestSuite) pexAddresses(t *testing.T, nodeIndices []int) []p2pproto.PexAddress {
var addresses []p2pproto.PexAddress
for _, i := range nodeIndices {
if i < len(r.nodes) {
require.Fail(t, "index for pex address is greater than number of nodes")
}
nodeAddrs := r.network.Nodes[r.nodes[i]].NodeAddress
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
endpoints, err := nodeAddrs.Resolve(ctx)
cancel()
require.NoError(t, err)
for _, endpoint := range endpoints {
if endpoint.IP != nil {
addresses = append(addresses, p2pproto.PexAddress{
ID: string(nodeAddrs.NodeID),
IP: endpoint.IP.String(),
Port: uint32(endpoint.Port),
})
}
}
}
return addresses
}
func (r *reactorTestSuite) checkNodePair(t *testing.T, first, second int) (types.NodeID, types.NodeID) {
require.NotEqual(t, first, second)
require.Less(t, first, r.total)