diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index c60ac5e83..055e0de5e 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -533,10 +533,7 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error { if attempts > 0 { jitterSeconds := time.Duration(cmn.RandFloat64() * float64(time.Second)) // 1s == (1e9 ns) backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second) - if r.Switch.IsPeerPersistent(addr) && r.config.PersistentPeersMaxDialPeriod > 0 && - backoffDuration > r.config.PersistentPeersMaxDialPeriod { - backoffDuration = r.config.PersistentPeersMaxDialPeriod - } + backoffDuration = r.maxBackoffDurationForPeer(addr, backoffDuration) sinceLastDialed := time.Since(lastDialed) if sinceLastDialed < backoffDuration { return errTooEarlyToDial{backoffDuration, lastDialed} @@ -565,6 +562,16 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error { return nil } +// maxBackoffDurationForPeer caps the backoff duration for persistent peers. +func (r *PEXReactor) maxBackoffDurationForPeer(addr *p2p.NetAddress, planned time.Duration) time.Duration { + if r.config.PersistentPeersMaxDialPeriod > 0 && + planned > r.config.PersistentPeersMaxDialPeriod && + r.Switch.IsPeerPersistent(addr) { + return r.config.PersistentPeersMaxDialPeriod + } + return planned +} + // checkSeeds checks that addresses are well formed. // Returns number of seeds we can connect to, along with all seeds addrs. // return err if user provided any badly formatted seed addresses. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 8fc0d4767..786d41022 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -530,33 +530,37 @@ func TestSwitchFullConnectivity(t *testing.T) { func TestSwitchAcceptRoutine(t *testing.T) { cfg.MaxNumInboundPeers = 5 - unconditionalNodeCnt := 2 - remoteUnconditionalPeers := make([]*remotePeer, 0) - var unconditionalNodeIds []string - for i := 0; i < unconditionalNodeCnt; i++ { - rup := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} - remoteUnconditionalPeers = append(remoteUnconditionalPeers, rup) - rup.Start() - unconditionalNodeIds = append(unconditionalNodeIds, string(rup.ID())) + // Create some unconditional peers. + const unconditionalPeersNum = 2 + var ( + unconditionalPeers = make([]*remotePeer, unconditionalPeersNum) + unconditionalPeerIDs = make([]string, unconditionalPeersNum) + ) + for i := 0; i < unconditionalPeersNum; i++ { + peer := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + peer.Start() + unconditionalPeers[i] = peer + unconditionalPeerIDs[i] = string(peer.ID()) } // make switch sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) - sw.AddUnconditionalPeerIDs(unconditionalNodeIds) + sw.AddUnconditionalPeerIDs(unconditionalPeerIDs) err := sw.Start() require.NoError(t, err) defer sw.Stop() - remotePeers := make([]*remotePeer, 0) + // 0. check there are no peers assert.Equal(t, 0, sw.Peers().Size()) // 1. check we connect up to MaxNumInboundPeers + peers := make([]*remotePeer, 0) for i := 0; i < cfg.MaxNumInboundPeers; i++ { - rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} - remotePeers = append(remotePeers, rp) - rp.Start() - c, err := rp.Dial(sw.NetAddress()) + peer := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + peers = append(peers, peer) + peer.Start() + c, err := peer.Dial(sw.NetAddress()) require.NoError(t, err) // spawn a reading routine to prevent connection from closing go func(c net.Conn) { @@ -573,9 +577,9 @@ func TestSwitchAcceptRoutine(t *testing.T) { assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size()) // 2. check we close new connections if we already have MaxNumInboundPeers peers - rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} - rp.Start() - conn, err := rp.Dial(sw.NetAddress()) + peer := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + peer.Start() + conn, err := peer.Dial(sw.NetAddress()) require.NoError(t, err) // check conn is closed one := make([]byte, 1) @@ -583,10 +587,11 @@ func TestSwitchAcceptRoutine(t *testing.T) { _, err = conn.Read(one) assert.Equal(t, io.EOF, err) assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size()) - rp.Stop() + peer.Stop() - for _, rup := range remoteUnconditionalPeers { - c, err := rup.Dial(sw.NetAddress()) + // 3. check we connect to unconditional peers despite the limit. + for _, peer := range unconditionalPeers { + c, err := peer.Dial(sw.NetAddress()) require.NoError(t, err) // spawn a reading routine to prevent connection from closing go func(c net.Conn) { @@ -600,101 +605,13 @@ func TestSwitchAcceptRoutine(t *testing.T) { }(c) } time.Sleep(10 * time.Millisecond) - assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalNodeCnt, sw.Peers().Size()) + assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalPeersNum, sw.Peers().Size()) - // stop remote peers - for _, rp := range remotePeers { - rp.Stop() + for _, peer := range peers { + peer.Stop() } - // stop remote unconditional peers - for _, rup := range remoteUnconditionalPeers { - rup.Stop() - } -} - -func TestSwitchAcceptRoutineUnconditionalPeersFirst(t *testing.T) { - cfg.MaxNumInboundPeers = 5 - unconditionalNodeCnt := 7 - remoteUnconditionalPeers := make([]*remotePeer, 0) - var unconditionalNodeIds []string - - for i := 0; i < unconditionalNodeCnt; i++ { - rup := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} - remoteUnconditionalPeers = append(remoteUnconditionalPeers, rup) - rup.Start() - unconditionalNodeIds = append(unconditionalNodeIds, string(rup.ID())) - } - - // make switch - sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) - sw.AddUnconditionalPeerIDs(unconditionalNodeIds) - err := sw.Start() - require.NoError(t, err) - defer sw.Stop() - - remotePeers := make([]*remotePeer, 0) - assert.Equal(t, 0, sw.Peers().Size()) - - for _, rup := range remoteUnconditionalPeers { - c, err := rup.Dial(sw.NetAddress()) - require.NoError(t, err) - // spawn a reading routine to prevent connection from closing - go func(c net.Conn) { - for { - one := make([]byte, 1) - _, err := c.Read(one) - if err != nil { - return - } - } - }(c) - } - - time.Sleep(10 * time.Millisecond) - assert.Equal(t, unconditionalNodeCnt, sw.Peers().Size()) - assert.True(t, sw.Peers().Size() > cfg.MaxNumInboundPeers) - - // check we connect up to MaxNumInboundPeers - for i := 0; i < cfg.MaxNumInboundPeers; i++ { - rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} - remotePeers = append(remotePeers, rp) - rp.Start() - c, err := rp.Dial(sw.NetAddress()) - require.NoError(t, err) - // spawn a reading routine to prevent connection from closing - go func(c net.Conn) { - for { - one := make([]byte, 1) - _, err := c.Read(one) - if err != nil { - return - } - } - }(c) - } - time.Sleep(10 * time.Millisecond) - assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalNodeCnt, sw.Peers().Size()) - - // check we close new connections if we already have MaxNumInboundPeers peers - rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} - rp.Start() - conn, err := rp.Dial(sw.NetAddress()) - require.NoError(t, err) - // check conn is closed - one := make([]byte, 1) - conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)) - _, err = conn.Read(one) - assert.Equal(t, io.EOF, err) - assert.Equal(t, cfg.MaxNumInboundPeers+unconditionalNodeCnt, sw.Peers().Size()) - rp.Stop() - - // stop remote peers - for _, rp := range remotePeers { - rp.Stop() - } - // stop remote unconditional peers - for _, rup := range remoteUnconditionalPeers { - rup.Stop() + for _, peer := range unconditionalPeers { + peer.Stop() } }