cr feedback

This commit is contained in:
tycho garen
2022-06-16 09:21:26 -04:00
parent f822f21ecd
commit 26a24ff175
4 changed files with 112 additions and 27 deletions

View File

@@ -41,6 +41,18 @@ type Metrics struct {
// Pending bytes to be sent to a given peer.
PeerPendingSendBytes metrics.Gauge
// Number of successful connection attempts
PeerConnectionSuccess metrics.Counter
// Number failed connection attempts
PeerConnectionFailure metrics.Counter
// Number of peers connected as a result of dialing the
// peer.
PeersConnectedIncoming metrics.Gauge
// Number of peers connected as a result of the peer dialing
// this node.
PeersConnectedOutgoing metrics.Gauge
// RouterPeerQueueRecv defines the time taken to read off of a peer's queue
// before sending on the connection.
RouterPeerQueueRecv metrics.Histogram

View File

@@ -444,13 +444,13 @@ func (m *PeerManager) isConnected(peerID types.NodeID) bool {
return ok
}
type connectedInfo struct {
type connectionStats struct {
incoming uint16
outgoing uint16
}
func (m *PeerManager) getConnectedInfo() connectedInfo {
out := connectedInfo{}
func (m *PeerManager) getConnectedInfo() connectionStats {
out := connectionStats{}
for _, direction := range m.connected {
switch direction {
case peerConnectionIncoming:
@@ -598,6 +598,7 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) {
func (m *PeerManager) DialFailed(address NodeAddress) error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.metrics.PeerConnectionFailure.Add(1)
delete(m.dialing, address.NodeID)
for from, to := range m.upgrading {
@@ -662,6 +663,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.metrics.PeerConnectionSuccess.Add(1)
delete(m.dialing, address.NodeID)
var upgradeFromPeer types.NodeID
@@ -717,6 +720,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
}
m.evict[upgradeFromPeer] = true
}
m.metrics.PeersConnectedOutgoing.Add(1)
m.connected[peer.ID] = peerConnectionOutgoing
m.evictWaker.Wake()
@@ -784,6 +789,7 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error {
return err
}
m.metrics.PeersConnectedIncoming.Add(1)
m.connected[peerID] = peerConnectionIncoming
if upgradeFromPeer != "" {
m.evict[upgradeFromPeer] = true
@@ -871,6 +877,13 @@ func (m *PeerManager) Disconnected(peerID types.NodeID) {
m.mtx.Lock()
defer m.mtx.Unlock()
switch m.connected[peerID] {
case peerConnectionIncoming:
m.metrics.PeersConnectedIncoming.Add(-1)
case peerConnectionOutgoing:
m.metrics.PeersConnectedOutgoing.Add(-1)
}
ready := m.ready[peerID]
delete(m.connected, peerID)
@@ -927,15 +940,14 @@ func (m *PeerManager) Inactivate(peerID types.NodeID) error {
// Advertise returns a list of peer addresses to advertise to a peer.
//
// It finds twice as many peers as the limit specifies, shuffles this
// list, and then returns the limit. The goal of this is to gossip the
// "best" peers but also ensure that more fresh peers also have a
// chance.
// It sorts all peers in the peer store, and assembles a list of peers
// that is most likely to include the highest priority of peers and
// then
func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress {
m.mtx.Lock()
defer m.mtx.Unlock()
addresses := make([]NodeAddress, 0, 2*limit)
addresses := make([]NodeAddress, 0, limit)
// advertise ourselves, to let everyone know how to dial us back
// and enable mutual address discovery
@@ -943,24 +955,81 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
addresses = append(addresses, m.options.SelfAddress)
}
OUTER:
for _, peer := range m.store.Ranked() {
var numAddresses int
var attempts int
ranked := m.store.Ranked()
seenAddresses := map[NodeAddress]struct{}{}
// get the total number of possible addresses
for _, peer := range ranked {
if peer.ID == peerID {
continue
}
if peer.Inactive {
continue
for addr := range peer.AddressInfo {
if _, ok := m.options.PrivatePeers[addr.NodeID]; !ok {
numAddresses++
}
}
}
RETRY:
for {
attempts++
addedLastIteration := false
ITERATION:
for idx, peer := range ranked {
if peer.ID == peerID {
continue ITERATION
}
if len(addresses) >= int(limit) {
break RETRY
}
PEER:
for nodeAddr, addressInfo := range peer.AddressInfo {
if len(addresses) >= int(limit) {
break RETRY
}
if _, ok := seenAddresses[addressInfo.Address]; ok {
continue PEER
}
// only add non-private NodeIDs
if _, ok := m.options.PrivatePeers[nodeAddr.NodeID]; !ok {
// add the peer if the total
// number of ranked peers is
// will fit within the limit,
// and then with decreasing
// liklihood if the number the
// lower the priority of the
// peers is.
if len(ranked) <= int(limit) || rand.Intn(idx+1*2) <= idx {
seenAddresses[addressInfo.Address] = struct{}{}
addresses = append(addresses, addressInfo.Address)
addedLastIteration = true
}
} else {
// skip private peers in future iterations
seenAddresses[addressInfo.Address] = struct{}{}
}
}
}
for nodeAddr, addressInfo := range peer.AddressInfo {
if len(addresses) >= int(limit)*2 {
break OUTER
}
// only add non-private NodeIDs
if _, ok := m.options.PrivatePeers[nodeAddr.NodeID]; !ok {
addresses = append(addresses, addressInfo.Address)
}
// once we have the required number of peers, we
// should stop.
if len(addresses) >= int(limit) {
break RETRY
}
// give up eventually
if attempts >= 32 && !addedLastIteration {
break RETRY
}
}
@@ -968,11 +1037,7 @@ OUTER:
addresses[i], addresses[j] = addresses[j], addresses[i]
})
if len(addresses) <= int(limit) {
return addresses
}
return addresses[:limit]
return addresses
}
// Subscribe subscribes to peer updates. The caller must consume the peer

View File

@@ -1766,6 +1766,7 @@ func TestPeerManager_Advertise(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.Len(t, peerManager.Advertise(dID, 100), 6)
// d should get all addresses.
require.ElementsMatch(t, []p2p.NodeAddress{
aTCP, aMem, bTCP, bMem, cTCP, cMem,

View File

@@ -554,8 +554,15 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error {
func (r *Router) dialSleep(ctx context.Context) {
if r.options.DialSleep == nil {
const (
maxDialerInterval = 3000
minDialerInterval = 250
)
// nolint:gosec // G404: Use of weak random number generator
timer := time.NewTimer(time.Duration(rand.Int63n(500)) * time.Millisecond)
dur := time.Duration(rand.Int63n(maxDialerInterval-minDialerInterval+1) + minDialerInterval)
timer := time.NewTimer(dur * time.Millisecond)
defer timer.Stop()
select {