diff --git a/internal/p2p/metrics.gen.go b/internal/p2p/metrics.gen.go index cb215f2b6..9cffbc46b 100644 --- a/internal/p2p/metrics.gen.go +++ b/internal/p2p/metrics.gen.go @@ -74,6 +74,12 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "peers_connected_outgoing", Help: "Number of peers connected as a result of the peer dialing this node.", }, labels).With(labelsAndValues...), + PeersEvicted: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "peers_evicted", + Help: "Number of peers evicted by this node.", + }, labels).With(labelsAndValues...), RouterPeerQueueRecv: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{ Namespace: namespace, Subsystem: MetricsSubsystem, @@ -119,6 +125,7 @@ func NopMetrics() *Metrics { PeersConnectedFailure: discard.NewCounter(), PeersConnectedIncoming: discard.NewGauge(), PeersConnectedOutgoing: discard.NewGauge(), + PeersEvicted: discard.NewCounter(), RouterPeerQueueRecv: discard.NewHistogram(), RouterPeerQueueSend: discard.NewHistogram(), RouterChannelQueueSend: discard.NewHistogram(), diff --git a/internal/p2p/metrics.go b/internal/p2p/metrics.go index a88aaa3f2..bc233f691 100644 --- a/internal/p2p/metrics.go +++ b/internal/p2p/metrics.go @@ -51,6 +51,9 @@ type Metrics struct { // this node. PeersConnectedOutgoing metrics.Gauge + // Number of peers evicted by this node. + PeersEvicted metrics.Counter + // RouterPeerQueueRecv defines the time taken to read off of a peer's queue // before sending on the connection. //metrics:The time taken to read off of a peer's queue before sending on the connection. diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index 210c34e2a..f2f74007b 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -513,12 +513,13 @@ func (m *PeerManager) HasDialedMaxPeers() bool { // returned peer. func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) { for { - address, err := m.TryDialNext() - if err != nil || (address != NodeAddress{}) { - return address, err + if address := m.TryDialNext(); (address != NodeAddress{}) { + return address, nil } + select { case <-m.dialWaker.Sleep(): + continue case <-ctx.Done(): return NodeAddress{}, ctx.Err() } @@ -527,7 +528,7 @@ func (m *PeerManager) DialNext(ctx context.Context) (NodeAddress, error) { // TryDialNext is equivalent to DialNext(), but immediately returns an empty // address if no peers or connection slots are available. -func (m *PeerManager) TryDialNext() (NodeAddress, error) { +func (m *PeerManager) TryDialNext() NodeAddress { m.mtx.Lock() defer m.mtx.Unlock() @@ -535,12 +536,12 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) { // MaxConnectedUpgrade allows us to probe additional peers that have a // higher score than any other peers, and if successful evict it. if m.options.MaxConnected > 0 && len(m.connected)+len(m.dialing) >= int(m.options.MaxConnected)+int(m.options.MaxConnectedUpgrade) { - return NodeAddress{}, nil + return NodeAddress{} } cinfo := m.getConnectedInfo() if m.options.MaxOutgoingConnections > 0 && cinfo.outgoing >= m.options.MaxOutgoingConnections { - return NodeAddress{}, nil + return NodeAddress{} } for _, peer := range m.store.Ranked() { @@ -563,16 +564,16 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) { if m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { upgradeFromPeer := m.findUpgradeCandidate(peer.ID, peer.Score()) if upgradeFromPeer == "" { - return NodeAddress{}, nil + return NodeAddress{} } m.upgrading[upgradeFromPeer] = peer.ID } m.dialing[peer.ID] = true - return addressInfo.Address, nil + return addressInfo.Address } } - return NodeAddress{}, nil + return NodeAddress{} } // DialFailed reports a failed dial attempt. This will make the peer available @@ -680,8 +681,7 @@ func (m *PeerManager) Dialed(address NodeAddress) error { return err } - if upgradeFromPeer != "" && m.options.MaxConnected > 0 && - len(m.connected) >= int(m.options.MaxConnected) { + if upgradeFromPeer != "" && m.options.MaxConnected > 0 && len(m.connected) >= int(m.options.MaxConnected) { // Look for an even lower-scored peer that may have appeared since we // started the upgrade. if p, ok := m.store.Get(upgradeFromPeer); ok { @@ -690,11 +690,11 @@ func (m *PeerManager) Dialed(address NodeAddress) error { } } m.evict[upgradeFromPeer] = true + m.evictWaker.Wake() } m.metrics.PeersConnectedOutgoing.Add(1) m.connected[peer.ID] = peerConnectionOutgoing - m.evictWaker.Wake() return nil } diff --git a/internal/p2p/peermanager_test.go b/internal/p2p/peermanager_test.go index 5e9c3a8a4..a1543bf18 100644 --- a/internal/p2p/peermanager_test.go +++ b/internal/p2p/peermanager_test.go @@ -384,16 +384,14 @@ func TestPeerManager_DialNext_WakeOnDialFailed(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) // Add b. We shouldn't be able to dial it, due to MaxConnected. added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // Spawn a goroutine to fail a's dial attempt. @@ -427,8 +425,7 @@ func TestPeerManager_DialNext_WakeOnDialFailedRetry(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.DialFailed(ctx, dial)) failed := time.Now() @@ -458,8 +455,7 @@ func TestPeerManager_DialNext_WakeOnDisconnected(t *testing.T) { err = peerManager.Accepted(a.NodeID) require.NoError(t, err) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Zero(t, dial) dctx, dcancel := context.WithTimeout(ctx, 300*time.Millisecond) @@ -490,8 +486,7 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -499,16 +494,14 @@ func TestPeerManager_TryDialNext_MaxConnected(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) // At this point, adding c will not allow dialing it. added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) } @@ -540,7 +533,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() + dial := peerManager.TryDialNext() require.NoError(t, err) require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -549,8 +542,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) // Even though we are at capacity, we should be allowed to dial c for an @@ -558,8 +550,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, c, dial) // However, since we're using all upgrade slots now, we can't add and dial @@ -567,16 +558,14 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(d) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // We go through with c's upgrade. require.NoError(t, peerManager.Dialed(c)) // Still can't dial d. - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // Now, if we disconnect a, we should be allowed to dial d because we have a @@ -592,8 +581,7 @@ func TestPeerManager_TryDialNext_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(e) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) } @@ -613,8 +601,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -622,8 +609,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) // Adding c and dialing it will fail, because a is the only connected @@ -631,8 +617,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Empty(t, dial) } @@ -653,22 +638,19 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) // Adding a's TCP address will not dispense a, since it's already dialing. added, err = peerManager.Add(aTCP) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // Marking a as dialed will still not dispense it. require.NoError(t, peerManager.Dialed(a)) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) // Adding b and accepting a connection from it will not dispense it either. @@ -676,8 +658,7 @@ func TestPeerManager_TryDialNext_DialingConnected(t *testing.T) { require.NoError(t, err) require.True(t, added) require.NoError(t, peerManager.Accepted(bID)) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) } @@ -706,16 +687,14 @@ func TestPeerManager_TryDialNext_Multiple(t *testing.T) { // All addresses should be dispensed as long as dialing them has failed. dial := []p2p.NodeAddress{} for range addresses { - address, err := peerManager.TryDialNext() - require.NoError(t, err) + address := peerManager.TryDialNext() require.NotZero(t, address) require.NoError(t, peerManager.DialFailed(ctx, address)) dial = append(dial, address) } require.ElementsMatch(t, dial, addresses) - address, err := peerManager.TryDialNext() - require.NoError(t, err) + address := peerManager.TryDialNext() require.Zero(t, address) } @@ -740,15 +719,14 @@ func TestPeerManager_DialFailed(t *testing.T) { // Dialing and then calling DialFailed with a different address (same // NodeID) should unmark as dialing and allow us to dial the other address // again, but not register the failed address. - dial, err := peerManager.TryDialNext() + dial := peerManager.TryDialNext() require.NoError(t, err) require.Equal(t, a, dial) require.NoError(t, peerManager.DialFailed(ctx, p2p.NodeAddress{ Protocol: "tcp", NodeID: aID, Hostname: "localhost"})) require.Equal(t, []p2p.NodeAddress{a}, peerManager.Addresses(aID)) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, a, dial) // Calling DialFailed on same address twice should be fine. @@ -782,8 +760,7 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -791,8 +768,7 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) // Adding c and dialing it will fail, even though it could upgrade a and we @@ -801,14 +777,12 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Empty(t, dial) // Failing b's dial will now make c available for dialing. require.NoError(t, peerManager.DialFailed(ctx, b)) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, c, dial) } @@ -823,8 +797,7 @@ func TestPeerManager_Dialed_Connected(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.Dialed(a)) @@ -834,8 +807,7 @@ func TestPeerManager_Dialed_Connected(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, b, dial) require.NoError(t, peerManager.Accepted(b.NodeID)) @@ -864,8 +836,7 @@ func TestPeerManager_Dialed_MaxConnected(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) // Marking b as dialed in the meanwhile (even without TryDialNext) @@ -907,8 +878,7 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, c, dial) require.NoError(t, peerManager.Dialed(c)) @@ -952,8 +922,7 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, b, dial) require.NoError(t, peerManager.Dialed(b)) @@ -962,8 +931,7 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Empty(t, dial) // a should now be evicted. @@ -1009,8 +977,7 @@ func TestPeerManager_Dialed_UpgradeEvenLower(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, c, dial) // In the meanwhile, a disconnects and d connects. d is even lower-scored @@ -1063,7 +1030,7 @@ func TestPeerManager_Dialed_UpgradeNoEvict(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() + dial := peerManager.TryDialNext() require.NoError(t, err) require.Equal(t, c, dial) @@ -1109,8 +1076,7 @@ func TestPeerManager_Accepted(t *testing.T) { added, err = peerManager.Add(c) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, c, dial) require.NoError(t, peerManager.Accepted(c.NodeID)) require.Error(t, peerManager.Dialed(c)) @@ -1119,8 +1085,7 @@ func TestPeerManager_Accepted(t *testing.T) { added, err = peerManager.Add(d) require.NoError(t, err) require.True(t, added) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, d, dial) require.NoError(t, peerManager.Dialed(d)) require.Error(t, peerManager.Accepted(d.NodeID)) @@ -1271,8 +1236,7 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) { added, err = peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, b, dial) // a has already been claimed as an upgrade of a, so accepting @@ -1446,8 +1410,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) { added, err := peerManager.Add(b) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, b, dial) require.NoError(t, peerManager.Dialed(b)) }() @@ -1581,13 +1544,11 @@ func TestPeerManager_Disconnected(t *testing.T) { // Disconnecting a dialing peer does not unmark it as dialing, to avoid // dialing it multiple times in parallel. - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) peerManager.Disconnected(ctx, a.NodeID) - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Zero(t, dial) } @@ -1660,8 +1621,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates()) // Outbound connection with peer error and eviction. - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.Empty(t, sub.Updates()) @@ -1684,8 +1644,7 @@ func TestPeerManager_Subscribe(t *testing.T) { require.Equal(t, p2p.PeerUpdate{NodeID: a.NodeID, Status: p2p.PeerStatusDown}, <-sub.Updates()) // Outbound connection with dial failure. - dial, err = peerManager.TryDialNext() - require.NoError(t, err) + dial = peerManager.TryDialNext() require.Equal(t, a, dial) require.Empty(t, sub.Updates()) @@ -1790,8 +1749,7 @@ func TestPeerManager_Close(t *testing.T) { added, err := peerManager.Add(a) require.NoError(t, err) require.True(t, added) - dial, err := peerManager.TryDialNext() - require.NoError(t, err) + dial := peerManager.TryDialNext() require.Equal(t, a, dial) require.NoError(t, peerManager.DialFailed(ctx, a)) } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index ec225f8e3..ff90e8c21 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -310,11 +310,7 @@ func (r *Router) routeChannel( ) { for { select { - case envelope, ok := <-outCh: - if !ok { - return - } - + case envelope := <-outCh: // Mark the envelope with the channel ID to allow sendPeer() to pass // it on to Transport.SendMessage(). envelope.ChannelID = chID @@ -391,20 +387,22 @@ func (r *Router) routeChannel( } } - case peerError, ok := <-errCh: - if !ok { - return - } - - shouldEvict := peerError.Fatal || r.peerManager.HasMaxPeerCapacity() + case peerError := <-errCh: + maxPeerCapacity := r.peerManager.HasMaxPeerCapacity() r.logger.Error("peer error", "peer", peerError.NodeID, "err", peerError.Err, - "evicting", shouldEvict, + "disconnecting", peerError.Fatal || maxPeerCapacity, ) - if shouldEvict { + + if peerError.Fatal || maxPeerCapacity { + // if the error is fatal or all peer + // slots are in use, we can error + // (disconnect) from the peer. r.peerManager.Errored(peerError.NodeID, peerError.Err) } else { + // this just decrements the peer + // score. r.peerManager.processPeerEvent(ctx, PeerUpdate{ NodeID: peerError.NodeID, Status: PeerStatusBad, @@ -466,11 +464,6 @@ func (r *Router) dialSleep(ctx context.Context) { } r.options.DialSleep(ctx) - - if !r.peerManager.HasDialedMaxPeers() { - r.peerManager.dialWaker.Wake() - } - } // acceptPeers accepts inbound connections from peers on the given transport, @@ -591,9 +584,8 @@ LOOP: switch { case errors.Is(err, context.Canceled): break LOOP - case err != nil: - r.logger.Error("failed to find next peer to dial", "err", err) - break LOOP + case address == NodeAddress{}: + continue LOOP } select { @@ -603,7 +595,7 @@ LOOP: // create connections too quickly. r.dialSleep(ctx) - continue + continue LOOP case <-ctx.Done(): close(addresses) break LOOP @@ -642,6 +634,7 @@ func (r *Router) connectPeer(ctx context.Context, address NodeAddress) { if err := r.runWithPeerMutex(func() error { return r.peerManager.Dialed(address) }); err != nil { r.logger.Error("failed to dial peer", "op", "outgoing/dialing", "peer", address.NodeID, "err", err) + r.peerManager.dialWaker.Wake() conn.Close() return } @@ -937,6 +930,8 @@ func (r *Router) evictPeers(ctx context.Context) { queue, ok := r.peerQueues[peerID] r.peerMtx.RUnlock() + r.metrics.PeersEvicted.Add(1) + if ok { queue.close() }