cr feedback

This commit is contained in:
tycho garen
2022-06-16 09:21:26 -04:00
parent c411a83ccc
commit db39cfbfb2
5 changed files with 135 additions and 26 deletions

View File

@@ -50,6 +50,30 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "peer_pending_send_bytes",
Help: "Number of bytes pending being sent to a given peer.",
}, append(labels, "peer_id")).With(labelsAndValues...),
PeerConnectionSuccess: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peer_connection_success",
Help: "Number of successful connection attempts",
}, labels).With(labelsAndValues...),
PeerConnectionFailure: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peer_connection_failure",
Help: "Number failed connection attempts",
}, labels).With(labelsAndValues...),
PeersConnectedIncoming: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peers_connected_incoming",
Help: "Number of peers connected as a result of dialing the peer.",
}, labels).With(labelsAndValues...),
PeersConnectedOutgoing: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peers_connected_outgoing",
Help: "Number of peers connected as a result of the peer dialing this node.",
}, labels).With(labelsAndValues...),
RouterPeerQueueRecv: prometheus.NewHistogramFrom(stdprometheus.HistogramOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
@@ -91,6 +115,10 @@ func NopMetrics() *Metrics {
PeerReceiveBytesTotal: discard.NewCounter(),
PeerSendBytesTotal: discard.NewCounter(),
PeerPendingSendBytes: discard.NewGauge(),
PeerConnectionSuccess: discard.NewCounter(),
PeerConnectionFailure: discard.NewCounter(),
PeersConnectedIncoming: discard.NewGauge(),
PeersConnectedOutgoing: discard.NewGauge(),
RouterPeerQueueRecv: discard.NewHistogram(),
RouterPeerQueueSend: discard.NewHistogram(),
RouterChannelQueueSend: discard.NewHistogram(),

View File

@@ -39,6 +39,18 @@ type Metrics struct {
// Number of bytes pending being sent to a given peer.
PeerPendingSendBytes metrics.Gauge `metrics_labels:"peer_id"`
// Number of successful connection attempts
PeerConnectionSuccess metrics.Counter
// Number failed connection attempts
PeerConnectionFailure metrics.Counter
// Number of peers connected as a result of dialing the
// peer.
PeersConnectedIncoming metrics.Gauge
// Number of peers connected as a result of the peer dialing
// this node.
PeersConnectedOutgoing metrics.Gauge
// RouterPeerQueueRecv defines the time taken to read off of a peer's queue
// before sending on the connection.
//metrics:The time taken to read off of a peer's queue before sending on the connection.

View File

@@ -422,13 +422,13 @@ func (m *PeerManager) isConnected(peerID types.NodeID) bool {
return ok
}
type connectedInfo struct {
type connectionStats struct {
incoming uint16
outgoing uint16
}
func (m *PeerManager) getConnectedInfo() connectedInfo {
out := connectedInfo{}
func (m *PeerManager) getConnectedInfo() connectionStats {
out := connectionStats{}
for _, direction := range m.connected {
switch direction {
case peerConnectionIncoming:
@@ -575,6 +575,7 @@ func (m *PeerManager) TryDialNext() (NodeAddress, error) {
func (m *PeerManager) DialFailed(ctx context.Context, address NodeAddress) error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.metrics.PeerConnectionFailure.Add(1)
delete(m.dialing, address.NodeID)
for from, to := range m.upgrading {
@@ -639,6 +640,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.metrics.PeerConnectionSuccess.Add(1)
delete(m.dialing, address.NodeID)
var upgradeFromPeer types.NodeID
@@ -694,6 +697,8 @@ func (m *PeerManager) Dialed(address NodeAddress) error {
}
m.evict[upgradeFromPeer] = true
}
m.metrics.PeersConnectedOutgoing.Add(1)
m.connected[peer.ID] = peerConnectionOutgoing
m.evictWaker.Wake()
@@ -761,6 +766,7 @@ func (m *PeerManager) Accepted(peerID types.NodeID) error {
return err
}
m.metrics.PeersConnectedIncoming.Add(1)
m.connected[peerID] = peerConnectionIncoming
if upgradeFromPeer != "" {
m.evict[upgradeFromPeer] = true
@@ -848,6 +854,13 @@ func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) {
m.mtx.Lock()
defer m.mtx.Unlock()
switch m.connected[peerID] {
case peerConnectionIncoming:
m.metrics.PeersConnectedIncoming.Add(-1)
case peerConnectionOutgoing:
m.metrics.PeersConnectedOutgoing.Add(-1)
}
ready := m.ready[peerID]
delete(m.connected, peerID)
@@ -904,15 +917,14 @@ func (m *PeerManager) Inactivate(peerID types.NodeID) error {
// Advertise returns a list of peer addresses to advertise to a peer.
//
// It finds twice as many peers as the limit specifies, shuffles this
// list, and then returns the limit. The goal of this is to gossip the
// "best" peers but also ensure that more fresh peers also have a
// chance.
// It sorts all peers in the peer store, and assembles a list of peers
// that is most likely to include the highest priority of peers and
// then
func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress {
m.mtx.Lock()
defer m.mtx.Unlock()
addresses := make([]NodeAddress, 0, 2*limit)
addresses := make([]NodeAddress, 0, limit)
// advertise ourselves, to let everyone know how to dial us back
// and enable mutual address discovery
@@ -920,24 +932,81 @@ func (m *PeerManager) Advertise(peerID types.NodeID, limit uint16) []NodeAddress
addresses = append(addresses, m.options.SelfAddress)
}
OUTER:
for _, peer := range m.store.Ranked() {
var numAddresses int
var attempts int
ranked := m.store.Ranked()
seenAddresses := map[NodeAddress]struct{}{}
// get the total number of possible addresses
for _, peer := range ranked {
if peer.ID == peerID {
continue
}
if peer.Inactive {
continue
for addr := range peer.AddressInfo {
if _, ok := m.options.PrivatePeers[addr.NodeID]; !ok {
numAddresses++
}
}
}
RETRY:
for {
attempts++
addedLastIteration := false
ITERATION:
for idx, peer := range ranked {
if peer.ID == peerID {
continue ITERATION
}
if len(addresses) >= int(limit) {
break RETRY
}
PEER:
for nodeAddr, addressInfo := range peer.AddressInfo {
if len(addresses) >= int(limit) {
break RETRY
}
if _, ok := seenAddresses[addressInfo.Address]; ok {
continue PEER
}
// only add non-private NodeIDs
if _, ok := m.options.PrivatePeers[nodeAddr.NodeID]; !ok {
// add the peer if the total
// number of ranked peers is
// will fit within the limit,
// and then with decreasing
// liklihood if the number the
// lower the priority of the
// peers is.
if len(ranked) <= int(limit) || rand.Intn(idx+1*2) <= idx {
seenAddresses[addressInfo.Address] = struct{}{}
addresses = append(addresses, addressInfo.Address)
addedLastIteration = true
}
} else {
// skip private peers in future iterations
seenAddresses[addressInfo.Address] = struct{}{}
}
}
}
for nodeAddr, addressInfo := range peer.AddressInfo {
if len(addresses) >= int(limit)*2 {
break OUTER
}
// only add non-private NodeIDs
if _, ok := m.options.PrivatePeers[nodeAddr.NodeID]; !ok {
addresses = append(addresses, addressInfo.Address)
}
// once we have the required number of peers, we
// should stop.
if len(addresses) >= int(limit) {
break RETRY
}
// give up eventually
if attempts >= 32 && !addedLastIteration {
break RETRY
}
}
@@ -945,11 +1014,7 @@ OUTER:
addresses[i], addresses[j] = addresses[j], addresses[i]
})
if len(addresses) <= int(limit) {
return addresses
}
return addresses[:limit]
return addresses
}
// PeerEventSubscriber describes the type of the subscription method, to assist

View File

@@ -1836,6 +1836,7 @@ func TestPeerManager_Advertise(t *testing.T) {
require.NoError(t, err)
require.True(t, added)
require.Len(t, peerManager.Advertise(dID, 100), 6)
// d should get all addresses.
require.ElementsMatch(t, []p2p.NodeAddress{
aTCP, aMem, bTCP, bMem, cTCP, cMem,

View File

@@ -443,6 +443,9 @@ func (r *Router) filterPeersID(ctx context.Context, id types.NodeID) error {
func (r *Router) dialSleep(ctx context.Context) {
if r.options.DialSleep == nil {
// the connTracker (on the other side) only rate
// limits peers for dialing more than once every 10ms,
// so these numbers are safe.
const (
maxDialerInterval = 500 // ms
minDialerInterval = 100 // ms