From 26a24ff17589ef5c9a472ba105dcd2a35ee65fbd Mon Sep 17 00:00:00 2001 From: tycho garen Date: Thu, 16 Jun 2022 09:21:26 -0400 Subject: [PATCH] cr feedback --- internal/p2p/metrics.go | 12 ++++ internal/p2p/peermanager.go | 117 ++++++++++++++++++++++++------- internal/p2p/peermanager_test.go | 1 + internal/p2p/router.go | 9 ++- 4 files changed, 112 insertions(+), 27 deletions(-) diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index 054a59470..d0e9a5cee 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -41,6 +41,18 @@ type Metrics struct { // Pending bytes to be sent to a given peer. PeerPendingSendBytes metrics.Gauge + // Number of successful connection attempts + PeerConnectionSuccess metrics.Counter + // Number failed connection attempts + PeerConnectionFailure metrics.Counter + + // Number of peers connected as a result of dialing the + // peer. + PeersConnectedIncoming metrics.Gauge + // Number of peers connected as a result of the peer dialing + // this node. + PeersConnectedOutgoing metrics.Gauge + // RouterPeerQueueRecv defines the time taken to read off of a peer's queue // before sending on the connection. RouterPeerQueueRecv metrics.Histogram diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index b2f20840c..4c6fea653 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -444,13 +444,13 @@ func (m *PeerManager) isConnected(peerID types.NodeID) bool { return ok } -type connectedInfo struct { +type connectionStats struct { incoming uint16 outgoing uint16 } -func (m *PeerManager) getConnectedInfo() connectedInfo { - out := connectedInfo{} +func (m *PeerManager) getConnectedInfo() connectionStats { + out := connectionStats{} for _, direction := range m.connected { switch direction { case peerConnectionIncoming: @@ -598,6 +598,7 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) { func (m *PeerManager) DialFailed(address NodeAddress) error { m.mtx.Lock() defer m.mtx.Unlock() + m.metrics.PeerConnectionFailure.Add(1) delete(m.dialing, address.NodeID) for from, to := range m.upgrading { @@ -662,6 +663,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error { m.mtx.Lock() defer m.mtx.Unlock() + m.metrics.PeerConnectionSuccess.Add(1) + delete(m.dialing, address.NodeID) var upgradeFromPeer types.NodeID @@ -717,6 +720,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error { } m.evict[upgradeFromPeer] = true } + + m.metrics.PeersConnectedOutgoing.Add(1) m.connected[peer.ID] = peerConnectionOutgoing m.evictWaker.Wake() @@ -784,6 +789,7 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { return err } + m.metrics.PeersConnectedIncoming.Add(1) m.connected[peerID] = peerConnectionIncoming if upgradeFromPeer != "" { m.evict[upgradeFromPeer] = true @@ -871,6 +877,13 @@ func (m *PeerManager) Disconnected(peerID types.NodeID) { m.mtx.Lock() defer m.mtx.Unlock() + switch m.connected[peerID] { + case peerConnectionIncoming: + m.metrics.PeersConnectedIncoming.Add(-1) + case peerConnectionOutgoing: + m.metrics.PeersConnectedOutgoing.Add(-1) + } + ready := m.ready[peerID] delete(m.connected, peerID) @@ -927,15 +940,14 @@ func (m *PeerManager) Inactivate(peerID types.NodeID) error { // Advertise returns a list of peer addresses to advertise to a peer. // -// It finds twice as many peers as the limit specifies, shuffles this -// list, and then returns the limit. The goal of this is to gossip the -// "best" peers but also ensure that more fresh peers also have a -// chance. +// It sorts all peers in the peer store, and assembles a list of peers +// that is most likely to include the highest priority of peers and +// then func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress { m.mtx.Lock() defer m.mtx.Unlock() - addresses := make([]NodeAddress, 0, 2*limit) + addresses := make([]NodeAddress, 0, limit) // advertise ourselves, to let everyone know how to dial us back // and enable mutual address discovery @@ -943,24 +955,81 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress addresses = append(addresses, m.options.SelfAddress) } -OUTER: - for _, peer := range m.store.Ranked() { + var numAddresses int + var attempts int + + ranked := m.store.Ranked() + seenAddresses := map[NodeAddress]struct{}{} + + // get the total number of possible addresses + for _, peer := range ranked { if peer.ID == peerID { continue } - if peer.Inactive { - continue + + for addr := range peer.AddressInfo { + if _, ok := m.options.PrivatePeers[addr.NodeID]; !ok { + numAddresses++ + } + } + } + +RETRY: + for { + attempts++ + addedLastIteration := false + ITERATION: + for idx, peer := range ranked { + if peer.ID == peerID { + continue ITERATION + } + + if len(addresses) >= int(limit) { + break RETRY + } + + PEER: + for nodeAddr, addressInfo := range peer.AddressInfo { + if len(addresses) >= int(limit) { + break RETRY + } + + if _, ok := seenAddresses[addressInfo.Address]; ok { + continue PEER + } + + // only add non-private NodeIDs + if _, ok := m.options.PrivatePeers[nodeAddr.NodeID]; !ok { + // add the peer if the total + // number of ranked peers is + // will fit within the limit, + // and then with decreasing + // liklihood if the number the + // lower the priority of the + // peers is. + + if len(ranked) <= int(limit) || rand.Intn(idx+1*2) <= idx { + seenAddresses[addressInfo.Address] = struct{}{} + addresses = append(addresses, addressInfo.Address) + addedLastIteration = true + } + + } else { + // skip private peers in future iterations + seenAddresses[addressInfo.Address] = struct{}{} + } + + } } - for nodeAddr, addressInfo := range peer.AddressInfo { - if len(addresses) >= int(limit)*2 { - break OUTER - } - - // only add non-private NodeIDs - if _, ok := m.options.PrivatePeers[nodeAddr.NodeID]; !ok { - addresses = append(addresses, addressInfo.Address) - } + // once we have the required number of peers, we + // should stop. + if len(addresses) >= int(limit) { + break RETRY + } + // give up eventually + if attempts >= 32 && !addedLastIteration { + break RETRY } } @@ -968,11 +1037,7 @@ OUTER: addresses[i], addresses[j] = addresses[j], addresses[i] }) - if len(addresses) <= int(limit) { - return addresses - } - - return addresses[:limit] + return addresses } // Subscribe subscribes to peer updates. The caller must consume the peer diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index a307549e8..085d9ee81 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -1766,6 +1766,7 @@ func TestPeerManager_Advertise(t *testing.T) { require.NoError(t, err) require.True(t, added) + require.Len(t, peerManager.Advertise(dID, 100), 6) // d should get all addresses. require.ElementsMatch(t, []p2p.NodeAddress{ aTCP, aMem, bTCP, bMem, cTCP, cMem, diff --git a/internal/p2p/router.go b/internal/p2p/router.go index 653e19b46..552c2d763 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -554,8 +554,15 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error { func (r *Router) dialSleep(ctx context.Context) { if r.options.DialSleep == nil { + const ( + maxDialerInterval = 3000 + minDialerInterval = 250 + ) + // nolint:gosec // G404: Use of weak random number generator - timer := time.NewTimer(time.Duration(rand.Int63n(500)) * time.Millisecond) + dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval) + + timer := time.NewTimer(dur * time.Millisecond) defer timer.Stop() select {