From 91506bf25d385050f531982b20b98d2c09f8c2d2 Mon Sep 17 00:00:00 2001 From: Sam Kleinman Date: Mon, 29 Mar 2021 16:12:23 -0400 Subject: [PATCH] p2p: simple peer scoring (#6277) --- blockchain/v0/reactor_test.go | 2 +- consensus/reactor.go | 17 ++++---- evidence/reactor_test.go | 2 +- mempool/reactor_test.go | 2 +- p2p/peermanager.go | 78 +++++++++++++++++++++++++++++------ p2p/peermanager_test.go | 36 ++++++++++++++++ p2p/shim.go | 6 +-- statesync/reactor_test.go | 2 +- 8 files changed, 118 insertions(+), 27 deletions(-) diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index a92d6fad1..64a66a6a0 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -152,7 +152,7 @@ func (rts *reactorTestSuite) addNode(t *testing.T, } rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) - rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID]) + rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID]) rts.reactors[nodeID], err = NewReactor( rts.logger.With("nodeID", nodeID), diff --git a/consensus/reactor.go b/consensus/reactor.go index 8185f4202..cf3f3e42d 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -1377,18 +1377,21 @@ func (r *Reactor) peerStatsRoutine() { switch msg.Msg.(type) { case *VoteMessage: - if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { // nolint: staticcheck - // TODO: Handle peer quality via the peer manager. - // r.Switch.MarkPeerAsGood(peer) + if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { + r.peerUpdates.SendUpdate(p2p.PeerUpdate{ + NodeID: msg.PeerID, + Status: p2p.PeerStatusGood, + }) } case *BlockPartMessage: - if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 { // nolint: staticcheck - // TODO: Handle peer quality via the peer manager. - // r.Switch.MarkPeerAsGood(peer) + if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 { + r.peerUpdates.SendUpdate(p2p.PeerUpdate{ + NodeID: msg.PeerID, + Status: p2p.PeerStatusGood, + }) } } - case <-r.closeCh: return } diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 2170d7589..684a9f406 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -86,7 +86,7 @@ func setup(t *testing.T, stateStores []sm.Store, chBuf uint) *reactorTestSuite { require.NoError(t, err) rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) - rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID]) + rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID]) rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID]) diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index d1d41209e..e287302f0 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -60,7 +60,7 @@ func setup(t *testing.T, cfg *cfg.MempoolConfig, numNodes int, chBuf uint) *reac rts.mempools[nodeID] = mempool rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) - rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID]) + rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) rts.network.Nodes[nodeID].PeerManager.Register(rts.peerUpdates[nodeID]) rts.reactors[nodeID] = NewReactor( diff --git a/p2p/peermanager.go b/p2p/peermanager.go index 3f80bf9d6..bbb1e31f9 100644 --- a/p2p/peermanager.go +++ b/p2p/peermanager.go @@ -33,13 +33,15 @@ type PeerStatus string const ( PeerStatusUp PeerStatus = "up" // connected and ready PeerStatusDown PeerStatus = "down" // disconnected + PeerStatusGood PeerStatus = "good" // peer observed as good + PeerStatusBad PeerStatus = "bad" // peer observed as bad ) // PeerScore is a numeric score assigned to a peer (higher is better). type PeerScore uint8 const ( - PeerScorePersistent PeerScore = 100 // persistent peers + PeerScorePersistent PeerScore = math.MaxUint8 // persistent peers ) // PeerUpdate is a peer update event sent via PeerUpdates. @@ -51,24 +53,35 @@ type PeerUpdate struct { // PeerUpdates is a peer update subscription with notifications about peer // events (currently just status changes). type PeerUpdates struct { - updatesCh chan PeerUpdate - closeCh chan struct{} - closeOnce sync.Once + routerUpdatesCh chan PeerUpdate + reactorUpdatesCh chan PeerUpdate + closeCh chan struct{} + closeOnce sync.Once } // NewPeerUpdates creates a new PeerUpdates subscription. It is primarily for // internal use, callers should typically use PeerManager.Subscribe(). The // subscriber must call Close() when done. -func NewPeerUpdates(updatesCh chan PeerUpdate) *PeerUpdates { +func NewPeerUpdates(updatesCh chan PeerUpdate, buf int) *PeerUpdates { return &PeerUpdates{ - updatesCh: updatesCh, - closeCh: make(chan struct{}), + reactorUpdatesCh: updatesCh, + routerUpdatesCh: make(chan PeerUpdate, buf), + closeCh: make(chan struct{}), } } // Updates returns a channel for consuming peer updates. func (pu *PeerUpdates) Updates() <-chan PeerUpdate { - return pu.updatesCh + return pu.reactorUpdatesCh +} + +// SendUpdate pushes information about a peer into the routing layer, +// presumably from a peer. +func (pu *PeerUpdates) SendUpdate(update PeerUpdate) { + select { + case <-pu.closeCh: + case pu.routerUpdatesCh <- update: + } } // Close closes the peer updates subscription. @@ -791,7 +804,7 @@ func (m *PeerManager) Subscribe() *PeerUpdates { // to the next subscriptions. This also prevents tail latencies from // compounding. Limiting it to 1 means that the subscribers are still // reasonably in sync. However, this should probably be benchmarked. - peerUpdates := NewPeerUpdates(make(chan PeerUpdate, 1)) + peerUpdates := NewPeerUpdates(make(chan PeerUpdate, 1), 1) m.Register(peerUpdates) return peerUpdates } @@ -809,6 +822,19 @@ func (m *PeerManager) Register(peerUpdates *PeerUpdates) { m.subscriptions[peerUpdates] = peerUpdates m.mtx.Unlock() + go func() { + for { + select { + case <-peerUpdates.closeCh: + return + case <-m.closeCh: + return + case pu := <-peerUpdates.routerUpdatesCh: + m.processPeerEvent(pu) + } + } + }() + go func() { select { case <-peerUpdates.Done(): @@ -820,6 +846,22 @@ func (m *PeerManager) Register(peerUpdates *PeerUpdates) { }() } +func (m *PeerManager) processPeerEvent(pu PeerUpdate) { + m.mtx.Lock() + defer m.mtx.Unlock() + + if _, ok := m.store.peers[pu.NodeID]; !ok { + m.store.peers[pu.NodeID] = &peerInfo{} + } + + switch pu.Status { + case PeerStatusBad: + m.store.peers[pu.NodeID].MutableScore-- + case PeerStatusGood: + m.store.peers[pu.NodeID].MutableScore++ + } +} + // broadcast broadcasts a peer update to all subscriptions. The caller must // already hold the mutex lock, to make sure updates are sent in the same order // as the PeerManager processes them, but this means subscribers must be @@ -837,7 +879,7 @@ func (m *PeerManager) broadcast(peerUpdate PeerUpdate) { default: } select { - case sub.updatesCh <- peerUpdate: + case sub.reactorUpdatesCh <- peerUpdate: case <-sub.closeCh: } } @@ -1149,6 +1191,8 @@ type peerInfo struct { Persistent bool Height int64 FixedScore PeerScore // mainly for tests + + MutableScore int64 // updated by router } // peerInfoFromProto converts a Protobuf PeerInfo message to a peerInfo, @@ -1205,14 +1249,22 @@ func (p *peerInfo) Copy() peerInfo { // Score calculates a score for the peer. Higher-scored peers will be // preferred over lower scores. func (p *peerInfo) Score() PeerScore { - var score PeerScore if p.FixedScore > 0 { return p.FixedScore } if p.Persistent { - score += PeerScorePersistent + return PeerScorePersistent } - return score + + if p.MutableScore <= 0 { + return 0 + } + + if p.MutableScore >= math.MaxUint8 { + return PeerScore(math.MaxUint8) + } + + return PeerScore(p.MutableScore) } // Validate validates the peer info. diff --git a/p2p/peermanager_test.go b/p2p/peermanager_test.go index d15d9f6d6..0c0e4b9bc 100644 --- a/p2p/peermanager_test.go +++ b/p2p/peermanager_test.go @@ -1615,3 +1615,39 @@ func TestPeerManager_SetHeight_GetHeight(t *testing.T) { require.Zero(t, peerManager.GetHeight(a.NodeID)) require.Zero(t, peerManager.GetHeight(b.NodeID)) } + +func TestPeerScoring(t *testing.T) { + // create a mock peer manager + db := dbm.NewMemDB() + peerManager, err := p2p.NewPeerManager(selfID, db, p2p.PeerManagerOptions{}) + require.NoError(t, err) + defer peerManager.Close() + + // create a fake node + id := p2p.NodeID(strings.Repeat("a1", 20)) + require.NoError(t, peerManager.Add(p2p.NodeAddress{NodeID: id, Protocol: "memory"})) + + // update the manager and make sure it's correct + pu := peerManager.Subscribe() + require.EqualValues(t, 0, peerManager.Scores()[id]) + + // add a bunch of good status updates and watch things increase. + for i := 1; i < 10; i++ { + pu.SendUpdate(p2p.PeerUpdate{ + NodeID: id, + Status: p2p.PeerStatusGood, + }) + time.Sleep(time.Millisecond) // force a context switch + require.EqualValues(t, i, peerManager.Scores()[id]) + } + + // watch the corresponding decreases respond to update + for i := 10; i == 0; i-- { + pu.SendUpdate(p2p.PeerUpdate{ + NodeID: id, + Status: p2p.PeerStatusBad, + }) + time.Sleep(time.Millisecond) // force a context switch + require.EqualValues(t, i, peerManager.Scores()[id]) + } +} diff --git a/p2p/shim.go b/p2p/shim.go index 5f20d06dd..5f5e8cede 100644 --- a/p2p/shim.go +++ b/p2p/shim.go @@ -64,7 +64,7 @@ func NewReactorShim(logger log.Logger, name string, descriptors map[ChannelID]*C rs := &ReactorShim{ Name: name, - PeerUpdates: NewPeerUpdates(make(chan PeerUpdate)), + PeerUpdates: NewPeerUpdates(make(chan PeerUpdate), 0), Channels: channels, } @@ -230,7 +230,7 @@ func (rs *ReactorShim) GetChannels() []*ChannelDescriptor { // handle adding a peer. func (rs *ReactorShim) AddPeer(peer Peer) { select { - case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusUp}: + case rs.PeerUpdates.reactorUpdatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusUp}: rs.Logger.Debug("sent peer update", "reactor", rs.Name, "peer", peer.ID(), "status", PeerStatusUp) case <-rs.PeerUpdates.Done(): @@ -249,7 +249,7 @@ func (rs *ReactorShim) AddPeer(peer Peer) { // handle removing a peer. func (rs *ReactorShim) RemovePeer(peer Peer, reason interface{}) { select { - case rs.PeerUpdates.updatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusDown}: + case rs.PeerUpdates.reactorUpdatesCh <- PeerUpdate{NodeID: peer.ID(), Status: PeerStatusDown}: rs.Logger.Debug( "sent peer update", "reactor", rs.Name, diff --git a/statesync/reactor_test.go b/statesync/reactor_test.go index 0760c1e54..14ffa9509 100644 --- a/statesync/reactor_test.go +++ b/statesync/reactor_test.go @@ -62,7 +62,7 @@ func setup( chunkInCh: make(chan p2p.Envelope, chBuf), chunkOutCh: make(chan p2p.Envelope, chBuf), chunkPeerErrCh: make(chan p2p.PeerError, chBuf), - peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate)), + peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate), int(chBuf)), conn: conn, connQuery: connQuery, stateProvider: stateProvider,