p2p: remove dial sleep and provide disconnect cooldown (#8839)

Alternative proposal for #8826
This commit is contained in:
Sam Kleinman
2022-06-24 13:57:49 -04:00
committed by GitHub
parent c4d24eed7d
commit 52b6dc19ba
5 changed files with 44 additions and 57 deletions

View File

@@ -269,7 +269,7 @@ func (n *Network) MakeNode(ctx context.Context, t *testing.T, opts NodeOptions)
func() *types.NodeInfo { return &nodeInfo },
transport,
ep,
p2p.RouterOptions{DialSleep: func(_ context.Context) {}},
p2p.RouterOptions{},
)
require.NoError(t, err)

View File

@@ -144,6 +144,10 @@ type PeerManagerOptions struct {
// retry times, to avoid thundering herds. 0 disables jitter.
RetryTimeJitter time.Duration
// DisconnectCooldownPeriod is the amount of time after we
// disconnect from a peer before we'll consider dialing a new peer
DisconnectCooldownPeriod time.Duration
// PeerScores sets fixed scores for specific peers. It is mainly used
// for testing. A score of 0 is ignored.
PeerScores map[types.NodeID]PeerScore
@@ -549,6 +553,10 @@ func (m *PeerManager) TryDialNext() NodeAddress {
continue
}
if !peer.LastDisconnected.IsZero() && time.Since(peer.LastDisconnected) < m.options.DisconnectCooldownPeriod {
continue
}
for _, addressInfo := range peer.AddressInfo {
if time.Since(addressInfo.LastDialFailure) < m.retryDelay(addressInfo.DialFailures, peer.Persistent) {
continue
@@ -867,6 +875,22 @@ func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) {
delete(m.evicting, peerID)
delete(m.ready, peerID)
if peer, ok := m.store.Get(peerID); ok {
peer.LastDisconnected = time.Now()
_ = m.store.Set(peer)
// launch a thread to ping the dialWaker when the
// disconnected peer can be dialed again.
go func() {
timer := time.NewTimer(m.options.DisconnectCooldownPeriod)
defer timer.Stop()
select {
case <-timer.C:
m.dialWaker.Wake()
case <-ctx.Done():
}
}()
}
if ready {
m.broadcast(ctx, PeerUpdate{
NodeID: peerID,
@@ -1447,9 +1471,10 @@ func (s *peerStore) Size() int {
// peerInfo contains peer information stored in a peerStore.
type peerInfo struct {
ID types.NodeID
AddressInfo map[NodeAddress]*peerAddressInfo
LastConnected time.Time
ID types.NodeID
AddressInfo map[NodeAddress]*peerAddressInfo
LastConnected time.Time
LastDisconnected time.Time
// These fields are ephemeral, i.e. not persisted to the database.
Persistent bool
@@ -1489,8 +1514,8 @@ func peerInfoFromProto(msg *p2pproto.PeerInfo) (*peerInfo, error) {
func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
msg := &p2pproto.PeerInfo{
ID: string(p.ID),
LastConnected: &p.LastConnected,
Inactive: p.Inactive,
LastConnected: &p.LastConnected,
}
for _, addressInfo := range p.AddressInfo {
msg.AddressInfo = append(msg.AddressInfo, addressInfo.ToProto())
@@ -1498,6 +1523,7 @@ func (p *peerInfo) ToProto() *p2pproto.PeerInfo {
if msg.LastConnected.IsZero() {
msg.LastConnected = nil
}
return msg
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net"
"runtime"
"sync"
@@ -62,13 +61,7 @@ type RouterOptions struct {
// return an error to reject the peer.
FilterPeerByID func(context.Context, types.NodeID) error
// DialSleep controls the amount of time that the router
// sleeps between dialing peers. If not set, a default value
// is used that sleeps for a (random) amount of time up to 3
// seconds between submitting each peer to be dialed.
DialSleep func(context.Context)
// NumConcurrentDials controls how many parallel go routines
// NumConcrruentDials controls how many parallel go routines
// are used to dial peers. This defaults to the value of
// runtime.NumCPU.
NumConcurrentDials func() int
@@ -439,33 +432,6 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error {
return r.options.FilterPeerByID(ctx, id)
}
func (r *Router) dialSleep(ctx context.Context) {
if r.options.DialSleep == nil {
// the connTracker (on the other side) only rate
// limits peers for dialing more than once every 10ms,
// so these numbers are safe.
const (
maxDialerInterval = 500 // ms
minDialerInterval = 100 // ms
)
// nolint:gosec // G404: Use of weak random number generator
dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval)
timer := time.NewTimer(dur * time.Millisecond)
defer timer.Stop()
select {
case <-ctx.Done():
case <-timer.C:
}
return
}
r.options.DialSleep(ctx)
}
// acceptPeers accepts inbound connections from peers on the given transport,
// and spawns goroutines that route messages to/from them.
func (r *Router) acceptPeers(ctx context.Context, transport Transport) {
@@ -590,11 +556,6 @@ LOOP:
select {
case addresses <- address:
// this jitters the frequency that we call
// DialNext and prevents us from attempting to
// create connections too quickly.
r.dialSleep(ctx)
continue LOOP
case <-ctx.Done():
close(addresses)

View File

@@ -715,7 +715,6 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {
mockTransport,
nil,
p2p.RouterOptions{
DialSleep: func(_ context.Context) {},
NumConcurrentDials: func() int {
ncpu := runtime.NumCPU()
if ncpu <= 3 {

View File

@@ -235,17 +235,18 @@ func createPeerManager(
maxUpgradeConns := uint16(4)
options := p2p.PeerManagerOptions{
SelfAddress: selfAddr,
MaxConnected: maxConns,
MaxOutgoingConnections: maxOutgoingConns,
MaxConnectedUpgrade: maxUpgradeConns,
MaxPeers: maxUpgradeConns + 4*maxConns,
MinRetryTime: 250 * time.Millisecond,
MaxRetryTime: 30 * time.Minute,
MaxRetryTimePersistent: 5 * time.Minute,
RetryTimeJitter: 5 * time.Second,
PrivatePeers: privatePeerIDs,
Metrics: metrics,
SelfAddress: selfAddr,
MaxConnected: maxConns,
MaxOutgoingConnections: maxOutgoingConns,
MaxConnectedUpgrade: maxUpgradeConns,
DisconnectCooldownPeriod: 2 * time.Second,
MaxPeers: maxUpgradeConns + 4*maxConns,
MinRetryTime: 250 * time.Millisecond,
MaxRetryTime: 30 * time.Minute,
MaxRetryTimePersistent: 5 * time.Minute,
RetryTimeJitter: 5 * time.Second,
PrivatePeers: privatePeerIDs,
Metrics: metrics,
}
peers := []p2p.NodeAddress{}