pex tests passing

This commit is contained in:
William Banfield
2022-10-19 22:25:56 -04:00
parent d2fb6835c8
commit d677121ae0
2 changed files with 88 additions and 7 deletions

View File

@@ -238,6 +238,7 @@ func (r *Reactor) logErrAddrBook(err error) {
// Receive implements Reactor by handling incoming PEX messages.
func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
return
msg, err := decodeMsg(msgBytes)
if err != nil {
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
@@ -306,6 +307,76 @@ func (r *Reactor) Receive(chID byte, src Peer, msgBytes []byte) {
}
}
func (r *Reactor) NewReceive(e p2p.Envelope) {
// return
msg, err := msgFromProto(e.Message)
if err != nil {
r.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
r.Switch.StopPeerForError(e.Src, err)
return
}
r.Logger.Debug("Received message", "src", e.Src, "chId", e.ChannelID, "msg", msg)
switch msg := msg.(type) {
case *tmp2p.PexRequest:
// NOTE: this is a prime candidate for amplification attacks,
// so it's important we
// 1) restrict how frequently peers can request
// 2) limit the output size
// If we're a seed and this is an inbound peer,
// respond once and disconnect.
if r.config.SeedMode && !e.Src.IsOutbound() {
id := string(e.Src.ID())
v := r.lastReceivedRequests.Get(id)
if v != nil {
// FlushStop/StopPeer are already
// running in a go-routine.
return
}
r.lastReceivedRequests.Set(id, time.Now())
// Send addrs and disconnect
r.SendAddrs(e.Src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
go func() {
// In a go-routine so it doesn't block .Receive.
e.Src.FlushStop()
r.Switch.StopPeerGracefully(e.Src)
}()
} else {
// Check we're not receiving requests too frequently.
if err := r.receiveRequest(e.Src); err != nil {
r.Switch.StopPeerForError(e.Src, err)
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
return
}
r.SendAddrs(e.Src, r.book.GetSelection())
}
case *tmp2p.PexAddrs:
// If we asked for addresses, add them to the book
addrs, err := p2p.NetAddressesFromProto(msg.Addrs)
if err != nil {
r.Switch.StopPeerForError(e.Src, err)
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
return
}
err = r.ReceiveAddrs(addrs, e.Src)
if err != nil {
r.Switch.StopPeerForError(e.Src, err)
if err == ErrUnsolicitedList {
r.book.MarkBad(e.Src.SocketAddr(), defaultBanTime)
}
return
}
default:
r.Logger.Error(fmt.Sprintf("Unknown message type %T", msg))
}
}
// enforces a minimum amount of time between requests
func (r *Reactor) receiveRequest(src Peer) error {
id := string(src.ID())
@@ -806,7 +877,11 @@ func decodeMsg(bz []byte) (proto.Message, error) {
if err != nil {
return nil, err
}
return msgFromProto(pb)
}
func msgFromProto(m proto.Message) (proto.Message, error) {
pb := m.(*tmp2p.Message)
switch msg := pb.Sum.(type) {
case *tmp2p.Message_PexRequest:
return msg.PexRequest, nil

View File

@@ -131,12 +131,13 @@ func TestPEXReactorReceive(t *testing.T) {
r.RequestAddrs(peer)
size := book.Size()
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
r.Receive(PexChannel, peer, msg)
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
r.Receive(PexChannel, peer, mustEncode(msg))
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(msg)})
assert.Equal(t, size+1, book.Size())
msg = mustEncode(&tmp2p.PexRequest{})
r.Receive(PexChannel, peer, msg) // should not panic.
r.Receive(PexChannel, peer, mustEncode(&tmp2p.PexRequest{})) // should not panic.
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(&tmp2p.PexRequest{})})
}
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
@@ -159,16 +160,19 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
// first time creates the entry
r.Receive(PexChannel, peer, msg)
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(&tmp2p.PexRequest{})})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// next time sets the last time value
r.Receive(PexChannel, peer, msg)
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(&tmp2p.PexRequest{})})
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// third time is too many too soon - peer is removed
r.Receive(PexChannel, peer, msg)
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(&tmp2p.PexRequest{})})
assert.False(t, r.lastReceivedRequests.Has(id))
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peerAddr))
@@ -192,15 +196,17 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
assert.True(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
msg := mustEncode(&tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}})
msg := &tmp2p.PexAddrs{Addrs: []tmp2p.NetAddress{peer.SocketAddr().ToProto()}}
// receive some addrs. should clear the request
r.Receive(PexChannel, peer, msg)
r.Receive(PexChannel, peer, mustEncode(msg))
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(msg)})
assert.False(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
// receiving more unsolicited addrs causes a disconnect and ban
r.Receive(PexChannel, peer, msg)
r.Receive(PexChannel, peer, mustEncode(msg))
r.NewReceive(p2p.Envelope{ChannelID: PexChannel, Src: peer, Message: toWrappedProto(msg)})
assert.False(t, sw.Peers().Has(peer.ID()))
assert.True(t, book.IsBanned(peer.SocketAddr()))
}