diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 2171d682f..2d31b9c7e 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -238,6 +238,7 @@ func (r *Reactor) logErrAddrBook(err error) { // Receive implements Reactor by handling incoming PEX messages. func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) { + return msg, err := decodeMsg(msgBytes) if err != nil { r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) @@ -306,6 +307,76 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) { } } +func (r *Reactor) NewReceive(e p2p.Envelope) { + // return + msg, err := msgFromProto(e.Message) + if err != nil { + r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) + r.Switch.StopPeerForError(e.Src, err) + return + } + r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", msg) + + switch msg := msg.(type) { + case *tmp2p.PexRequest: + + // NOTE: this is a prime candidate for amplification attacks, + // so it's important we + // 1) restrict how frequently peers can request + // 2) limit the output size + + // If we're a seed and this is an inbound peer, + // respond once and disconnect. + if r.config.SeedMode && !e.Src.IsOutbound() { + id := string(e.Src.ID()) + v := r.lastReceivedRequests.Get(id) + if v != nil { + // FlushStop/StopPeer are already + // running in a go-routine. + return + } + r.lastReceivedRequests.Set(id, time.Now()) + + // Send addrs and disconnect + r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) + go func() { + // In a go-routine so it doesn't block .Receive. + e.Src.FlushStop() + r.Switch.StopPeerGracefully(e.Src) + }() + + } else { + // Check we're not receiving requests too frequently. + if err := r.receiveRequest(e.Src); err != nil { + r.Switch.StopPeerForError(e.Src, err) + r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime) + return + } + r.SendAddrs(e.Src, r.book.GetSelection()) + } + + case *tmp2p.PexAddrs: + // If we asked for addresses, add them to the book + addrs, err := p2p.NetAddressesFromProto(msg.Addrs) + if err != nil { + r.Switch.StopPeerForError(e.Src, err) + r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime) + return + } + err = r.ReceiveAddrs(addrs, e.Src) + if err != nil { + r.Switch.StopPeerForError(e.Src, err) + if err == ErrUnsolicitedList { + r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime) + } + return + } + + default: + r.Logger.Error(fmt.Sprintf("Unknown message type %T", msg)) + } +} + // enforces a minimum amount of time between requests func (r *Reactor) receiveRequest(src Peer) error { id := string(src.ID()) @@ -806,7 +877,11 @@ func decodeMsg(bz []byte) (proto.Message, error) { if err != nil { return nil, err } + return msgFromProto(pb) +} +func msgFromProto(m proto.Message) (proto.Message, error) { + pb := m.(*tmp2p.Message) switch msg := pb.Sum.(type) { case *tmp2p.Message_PexRequest: return msg.PexRequest, nil diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index d5e052e91..252322606 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -131,12 +131,13 @@ func TestPEXReactorReceive(t *testing.T) { r.RequestAddrs(peer) size := book.Size() - msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}) - r.Receive(PexChannel, peer, msg) + msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}} + r.Receive(PexChannel, peer, mustEncode(msg)) + r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(msg)}) assert.Equal(t, size+1, book.Size()) - msg = mustEncode(&tmp2p.PexRequest{}) - r.Receive(PexChannel, peer, msg) // should not panic. + r.Receive(PexChannel, peer, mustEncode(&tmp2p.PexRequest{})) // should not panic. + r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(&tmp2p.PexRequest{})}) } func TestPEXReactorRequestMessageAbuse(t *testing.T) { @@ -159,16 +160,19 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) { // first time creates the entry r.Receive(PexChannel, peer, msg) + r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(&tmp2p.PexRequest{})}) assert.True(t, r.lastReceivedRequests.Has(id)) assert.True(t, sw.Peers().Has(peer.ID())) // next time sets the last time value r.Receive(PexChannel, peer, msg) + r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(&tmp2p.PexRequest{})}) assert.True(t, r.lastReceivedRequests.Has(id)) assert.True(t, sw.Peers().Has(peer.ID())) // third time is too many too soon - peer is removed r.Receive(PexChannel, peer, msg) + r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(&tmp2p.PexRequest{})}) assert.False(t, r.lastReceivedRequests.Has(id)) assert.False(t, sw.Peers().Has(peer.ID())) assert.True(t, book.IsBanned(peerAddr)) @@ -192,15 +196,17 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { assert.True(t, r.requestsSent.Has(id)) assert.True(t, sw.Peers().Has(peer.ID())) - msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}) + msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}} // receive some addrs. should clear the request - r.Receive(PexChannel, peer, msg) + r.Receive(PexChannel, peer, mustEncode(msg)) + r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(msg)}) assert.False(t, r.requestsSent.Has(id)) assert.True(t, sw.Peers().Has(peer.ID())) // receiving more unsolicited addrs causes a disconnect and ban - r.Receive(PexChannel, peer, msg) + r.Receive(PexChannel, peer, mustEncode(msg)) + r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(msg)}) assert.False(t, sw.Peers().Has(peer.ID())) assert.True(t, book.IsBanned(peer.SocketAddr())) }