diff --git a/internal/p2p/metrics.gen.go b/internal/p2p/metrics.gen.go index bef979853..8375249f3 100644 --- a/internal/p2p/metrics.gen.go +++ b/internal/p2p/metrics.gen.go @@ -50,6 +50,30 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peer_pending_send_bytes", Help: "Number of bytes pending being sent to a given peer.", }, append(labels, "peer_id")).With(labelsAndValues...), + PeerConnectionSuccess: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_connection_success", + Help: "Number of successful connection attempts", + }, labels).With(labelsAndValues...), + PeerConnectionFailure: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peer_connection_failure", + Help: "Number failed connection attempts", + }, labels).With(labelsAndValues...), + PeersConnectedIncoming: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peers_connected_incoming", + Help: "Number of peers connected as a result of dialing the peer.", + }, labels).With(labelsAndValues...), + PeersConnectedOutgoing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peers_connected_outgoing", + Help: "Number of peers connected as a result of the peer dialing this node.", + }, labels).With(labelsAndValues...), RouterPeerQueueRecv: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -91,6 +115,10 @@ func NopMetrics() *Metrics { PeerReceiveBytesTotal: discard.NewCounter(), PeerSendBytesTotal: discard.NewCounter(), PeerPendingSendBytes: discard.NewGauge(), + PeerConnectionSuccess: discard.NewCounter(), + PeerConnectionFailure: discard.NewCounter(), + PeersConnectedIncoming: discard.NewGauge(), + PeersConnectedOutgoing: discard.NewGauge(), RouterPeerQueueRecv: discard.NewHistogram(), RouterPeerQueueSend: discard.NewHistogram(), RouterChannelQueueSend: discard.NewHistogram(), diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index a267f12a0..f7da47fb3 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -39,6 +39,18 @@ type Metrics struct { // Number of bytes pending being sent to a given peer. PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"` + // Number of successful connection attempts + PeerConnectionSuccess metrics.Counter + // Number failed connection attempts + PeerConnectionFailure metrics.Counter + + // Number of peers connected as a result of dialing the + // peer. + PeersConnectedIncoming metrics.Gauge + // Number of peers connected as a result of the peer dialing + // this node. + PeersConnectedOutgoing metrics.Gauge + // RouterPeerQueueRecv defines the time taken to read off of a peer's queue // before sending on the connection. //metrics:The time taken to read off of a peer's queue before sending on the connection. diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 6fde1ff79..ed340f1c8 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -422,13 +422,13 @@ func (m *PeerManager) isConnected(peerID types.NodeID) bool { return ok } -type connectedInfo struct { +type connectionStats struct { incoming uint16 outgoing uint16 } -func (m *PeerManager) getConnectedInfo() connectedInfo { - out := connectedInfo{} +func (m *PeerManager) getConnectedInfo() connectionStats { + out := connectionStats{} for _, direction := range m.connected { switch direction { case peerConnectionIncoming: @@ -575,6 +575,7 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) { func (m *PeerManager) DialFailed(ctx context.Context, address NodeAddress) error { m.mtx.Lock() defer m.mtx.Unlock() + m.metrics.PeerConnectionFailure.Add(1) delete(m.dialing, address.NodeID) for from, to := range m.upgrading { @@ -639,6 +640,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error { m.mtx.Lock() defer m.mtx.Unlock() + m.metrics.PeerConnectionSuccess.Add(1) + delete(m.dialing, address.NodeID) var upgradeFromPeer types.NodeID @@ -694,6 +697,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error { } m.evict[upgradeFromPeer] = true } + + m.metrics.PeersConnectedOutgoing.Add(1) m.connected[peer.ID] = peerConnectionOutgoing m.evictWaker.Wake() @@ -761,6 +766,7 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error { return err } + m.metrics.PeersConnectedIncoming.Add(1) m.connected[peerID] = peerConnectionIncoming if upgradeFromPeer != "" { m.evict[upgradeFromPeer] = true @@ -848,6 +854,13 @@ func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) { m.mtx.Lock() defer m.mtx.Unlock() + switch m.connected[peerID] { + case peerConnectionIncoming: + m.metrics.PeersConnectedIncoming.Add(-1) + case peerConnectionOutgoing: + m.metrics.PeersConnectedOutgoing.Add(-1) + } + ready := m.ready[peerID] delete(m.connected, peerID) @@ -904,15 +917,14 @@ func (m *PeerManager) Inactivate(peerID types.NodeID) error { // Advertise returns a list of peer addresses to advertise to a peer. // -// It finds twice as many peers as the limit specifies, shuffles this -// list, and then returns the limit. The goal of this is to gossip the -// "best" peers but also ensure that more fresh peers also have a -// chance. +// It sorts all peers in the peer store, and assembles a list of peers +// that is most likely to include the highest priority of peers and +// then func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress { m.mtx.Lock() defer m.mtx.Unlock() - addresses := make([]NodeAddress, 0, 2*limit) + addresses := make([]NodeAddress, 0, limit) // advertise ourselves, to let everyone know how to dial us back // and enable mutual address discovery @@ -920,24 +932,81 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress addresses = append(addresses, m.options.SelfAddress) } -OUTER: - for _, peer := range m.store.Ranked() { + var numAddresses int + var attempts int + + ranked := m.store.Ranked() + seenAddresses := map[NodeAddress]struct{}{} + + // get the total number of possible addresses + for _, peer := range ranked { if peer.ID == peerID { continue } - if peer.Inactive { - continue + + for addr := range peer.AddressInfo { + if _, ok := m.options.PrivatePeers[addr.NodeID]; !ok { + numAddresses++ + } + } + } + +RETRY: + for { + attempts++ + addedLastIteration := false + ITERATION: + for idx, peer := range ranked { + if peer.ID == peerID { + continue ITERATION + } + + if len(addresses) >= int(limit) { + break RETRY + } + + PEER: + for nodeAddr, addressInfo := range peer.AddressInfo { + if len(addresses) >= int(limit) { + break RETRY + } + + if _, ok := seenAddresses[addressInfo.Address]; ok { + continue PEER + } + + // only add non-private NodeIDs + if _, ok := m.options.PrivatePeers[nodeAddr.NodeID]; !ok { + // add the peer if the total + // number of ranked peers is + // will fit within the limit, + // and then with decreasing + // liklihood if the number the + // lower the priority of the + // peers is. + + if len(ranked) <= int(limit) || rand.Intn(idx+1*2) <= idx { + seenAddresses[addressInfo.Address] = struct{}{} + addresses = append(addresses, addressInfo.Address) + addedLastIteration = true + } + + } else { + // skip private peers in future iterations + seenAddresses[addressInfo.Address] = struct{}{} + } + + } } - for nodeAddr, addressInfo := range peer.AddressInfo { - if len(addresses) >= int(limit)*2 { - break OUTER - } - - // only add non-private NodeIDs - if _, ok := m.options.PrivatePeers[nodeAddr.NodeID]; !ok { - addresses = append(addresses, addressInfo.Address) - } + // once we have the required number of peers, we + // should stop. + if len(addresses) >= int(limit) { + break RETRY + } + // give up eventually + if attempts >= 32 && !addedLastIteration { + break RETRY } } @@ -945,11 +1014,7 @@ OUTER: addresses[i], addresses[j] = addresses[j], addresses[i] }) - if len(addresses) <= int(limit) { - return addresses - } - - return addresses[:limit] + return addresses } // PeerEventSubscriber describes the type of the subscription method, to assist diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index 35c4853fe..5e9c3a8a4 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -1836,6 +1836,7 @@ func TestPeerManager_Advertise(t *testing.T) { require.NoError(t, err) require.True(t, added) + require.Len(t, peerManager.Advertise(dID, 100), 6) // d should get all addresses. require.ElementsMatch(t, []p2p.NodeAddress{ aTCP, aMem, bTCP, bMem, cTCP, cMem, diff --git a/internal/p2p/router.go b/internal/p2p/router.go index fc2bc3c07..d7236d472 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -443,6 +443,9 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error { func (r *Router) dialSleep(ctx context.Context) { if r.options.DialSleep == nil { + // the connTracker (on the other side) only rate + // limits peers for dialing more than once every 10ms, + // so these numbers are safe. const ( maxDialerInterval = 500 // ms minDialerInterval = 100 // ms