diff --git a/internal/p2p/p2ptest/network.go b/internal/p2p/p2ptest/network.go index 5c1b0a218..c040a08a8 100644 --- a/internal/p2p/p2ptest/network.go +++ b/internal/p2p/p2ptest/network.go @@ -269,7 +269,7 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions) func() *types.NodeInfo { return &nodeInfo }, transport, ep, - p2p.RouterOptions{DialSleep: func(_ context.Context) {}}, + p2p.RouterOptions{}, ) require.NoError(t, err) diff --git a/internal/p2p/peermanager.go b/internal/p2p/peermanager.go index b30c5dbfb..ac2e28225 100644 --- a/internal/p2p/peermanager.go +++ b/internal/p2p/peermanager.go @@ -144,6 +144,10 @@ type PeerManagerOptions struct { // retry times, to avoid thundering herds. 0 disables jitter. RetryTimeJitter time.Duration + // DisconnectCooldownPeriod is the amount of time after we + // disconnect from a peer before we'll consider dialing a new peer + DisconnectCooldownPeriod time.Duration + // PeerScores sets fixed scores for specific peers. It is mainly used // for testing. A score of 0 is ignored. PeerScores map[types.NodeID]PeerScore @@ -549,6 +553,10 @@ func (m *PeerManager) TryDialNext() NodeAddress { continue } + if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod { + continue + } + for _, addressInfo := range peer.AddressInfo { if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) { continue @@ -867,6 +875,22 @@ func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) { delete(m.evicting, peerID) delete(m.ready, peerID) + if peer, ok := m.store.Get(peerID); ok { + peer.LastDisconnected = time.Now() + _ = m.store.Set(peer) + // launch a thread to ping the dialWaker when the + // disconnected peer can be dialed again. + go func() { + timer := time.NewTimer(m.options.DisconnectCooldownPeriod) + defer timer.Stop() + select { + case <-timer.C: + m.dialWaker.Wake() + case <-ctx.Done(): + } + }() + } + if ready { m.broadcast(ctx, PeerUpdate{ NodeID: peerID, @@ -1447,9 +1471,10 @@ func (s *peerStore) Size() int { // peerInfo contains peer information stored in a peerStore. type peerInfo struct { - ID types.NodeID - AddressInfo map[NodeAddress]*peerAddressInfo - LastConnected time.Time + ID types.NodeID + AddressInfo map[NodeAddress]*peerAddressInfo + LastConnected time.Time + LastDisconnected time.Time // These fields are ephemeral, i.e. not persisted to the database. Persistent bool @@ -1489,8 +1514,8 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) { func (p *peerInfo) ToProto() *p2pproto.PeerInfo { msg := &p2pproto.PeerInfo{ ID: string(p.ID), - LastConnected: &p.LastConnected, Inactive: p.Inactive, + LastConnected: &p.LastConnected, } for _, addressInfo := range p.AddressInfo { msg.AddressInfo = append(msg.AddressInfo, addressInfo.ToProto()) @@ -1498,6 +1523,7 @@ func (p *peerInfo) ToProto() *p2pproto.PeerInfo { if msg.LastConnected.IsZero() { msg.LastConnected = nil } + return msg } diff --git a/internal/p2p/router.go b/internal/p2p/router.go index d5edd42be..8b7db9a03 100644 --- a/internal/p2p/router.go +++ b/internal/p2p/router.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "math/rand" "net" "runtime" "sync" @@ -62,13 +61,7 @@ type RouterOptions struct { // return an error to reject the peer. FilterPeerByID func(context.Context, types.NodeID) error - // DialSleep controls the amount of time that the router - // sleeps between dialing peers. If not set, a default value - // is used that sleeps for a (random) amount of time up to 3 - // seconds between submitting each peer to be dialed. - DialSleep func(context.Context) - - // NumConcurrentDials controls how many parallel go routines + // NumConcrruentDials controls how many parallel go routines // are used to dial peers. This defaults to the value of // runtime.NumCPU. NumConcurrentDials func() int @@ -439,33 +432,6 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error { return r.options.FilterPeerByID(ctx, id) } -func (r *Router) dialSleep(ctx context.Context) { - if r.options.DialSleep == nil { - // the connTracker (on the other side) only rate - // limits peers for dialing more than once every 10ms, - // so these numbers are safe. - const ( - maxDialerInterval = 500 // ms - minDialerInterval = 100 // ms - ) - - // nolint:gosec // G404: Use of weak random number generator - dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval) - - timer := time.NewTimer(dur * time.Millisecond) - defer timer.Stop() - - select { - case <-ctx.Done(): - case <-timer.C: - } - - return - } - - r.options.DialSleep(ctx) -} - // acceptPeers accepts inbound connections from peers on the given transport, // and spawns goroutines that route messages to/from them. func (r *Router) acceptPeers(ctx context.Context, transport Transport) { @@ -590,11 +556,6 @@ LOOP: select { case addresses <- address: - // this jitters the frequency that we call - // DialNext and prevents us from attempting to - // create connections too quickly. - - r.dialSleep(ctx) continue LOOP case <-ctx.Done(): close(addresses) diff --git a/internal/p2p/router_test.go b/internal/p2p/router_test.go index 8a58146fd..92f56f768 100644 --- a/internal/p2p/router_test.go +++ b/internal/p2p/router_test.go @@ -715,7 +715,6 @@ func TestRouter_DialPeers_Parallel(t *testing.T) { mockTransport, nil, p2p.RouterOptions{ - DialSleep: func(_ context.Context) {}, NumConcurrentDials: func() int { ncpu := runtime.NumCPU() if ncpu <= 3 { diff --git a/node/setup.go b/node/setup.go index 345489f8d..df9d19195 100644 --- a/node/setup.go +++ b/node/setup.go @@ -235,17 +235,18 @@ func createPeerManager( maxUpgradeConns := uint16(4) options := p2p.PeerManagerOptions{ - SelfAddress: selfAddr, - MaxConnected: maxConns, - MaxOutgoingConnections: maxOutgoingConns, - MaxConnectedUpgrade: maxUpgradeConns, - MaxPeers: maxUpgradeConns + 4*maxConns, - MinRetryTime: 250 * time.Millisecond, - MaxRetryTime: 30 * time.Minute, - MaxRetryTimePersistent: 5 * time.Minute, - RetryTimeJitter: 5 * time.Second, - PrivatePeers: privatePeerIDs, - Metrics: metrics, + SelfAddress: selfAddr, + MaxConnected: maxConns, + MaxOutgoingConnections: maxOutgoingConns, + MaxConnectedUpgrade: maxUpgradeConns, + DisconnectCooldownPeriod: 2 * time.Second, + MaxPeers: maxUpgradeConns + 4*maxConns, + MinRetryTime: 250 * time.Millisecond, + MaxRetryTime: 30 * time.Minute, + MaxRetryTimePersistent: 5 * time.Minute, + RetryTimeJitter: 5 * time.Second, + PrivatePeers: privatePeerIDs, + Metrics: metrics, } peers := []p2p.NodeAddress{}