From 86cf8ee3f96fbff39ebf79d58e186201a2bd2222 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Fri, 10 May 2019 19:45:50 +0200 Subject: [PATCH] p2p: PeerBehaviour implementation (#3539) (#3552) * p2p: initial implementation of peer behaviour * [p2p] re-use newMockPeer * p2p: add inline docs for peer_behaviour interface * [p2p] Align PeerBehaviour interface (#3558) * [p2p] make switchedPeerHebaviour private * [p2p] make storePeerHebaviour private * [p2p] Add CHANGELOG_PENDING entry for PeerBehaviour * [p2p] Adjustment naming for PeerBehaviour * [p2p] Add coarse lock around storedPeerBehaviour * [p2p]: Fix non-pointer methods in storedPeerBehaviour + Structs with embeded locks must specify all methods with pointer receivers to avoid creating a copy of the embeded lock. * [p2p] Thorough refactoring based on comments in #3552 + Decouple PeerBehaviour interface from Peer by parametrizing methods with `p2p.ID` instead of `p2p.Peer` + Setter methods wrapped in a write lock + Getter methods wrapped in a read lock + Getter methods on storedPeerBehaviour now take a `p2p.ID` + Getter methods return a copy of underlying stored behaviours + Add doc strings to public types and methods * [p2p] make structs public * [p2p] Test empty StoredPeerBehaviour * [p2p] typo fix * [p2p] add TestStoredPeerBehaviourConcurrency + Add a test which uses StoredPeerBehaviour in multiple goroutines to ensure thread-safety. * Update p2p/peer_behaviour.go Co-Authored-By: brapse * Update p2p/peer_behaviour.go Co-Authored-By: brapse * Update p2p/peer_behaviour.go Co-Authored-By: brapse * Update p2p/peer_behaviour.go Co-Authored-By: brapse * Update p2p/peer_behaviour.go Co-Authored-By: brapse * Update p2p/peer_behaviour_test.go Co-Authored-By: brapse * Update p2p/peer_behaviour.go Co-Authored-By: brapse * Update p2p/peer_behaviour.go Co-Authored-By: brapse * Update p2p/peer_behaviour.go Co-Authored-By: brapse * Update p2p/peer_behaviour.go Co-Authored-By: brapse * [p2p] field ordering convention * p2p: peer behaviour refactor + Change naming of reporting behaviour to `Report` + Remove the responsibility of distinguishing between the categories of good and bad behaviour and instead focus on reporting behaviour. * p2p: rename PeerReporter -> PeerBehaviourReporter --- CHANGELOG_PENDING.md | 2 + p2p/peer_behaviour.go | 92 +++++++++++++++++++++++++++++ p2p/peer_behaviour_test.go | 115 +++++++++++++++++++++++++++++++++++++ 3 files changed, 209 insertions(+) create mode 100644 p2p/peer_behaviour.go create mode 100644 p2p/peer_behaviour_test.go diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 0067c8c15..fbb87eef1 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -31,6 +31,8 @@ - [node] \#2659 Add `node.Mempool()` method, which allows you to access mempool ### IMPROVEMENTS: +- [p2p] [\#3463](https://github.com/tendermint/tendermint/pull/3463) Do not log "Can't add peer's address to addrbook" error for a private peer +- [p2p] [\#3552](https://github.com/tendermint/tendermint/pull/3552) Add PeerBehaviour Interface (@brapse) - [rpc] [\#3534](https://github.com/tendermint/tendermint/pull/3534) Add support for batched requests/responses in JSON RPC - [cli] \#3585 Add option to not clear address book with unsafe reset (@climber73) - [cli] [\#3160](https://github.com/tendermint/tendermint/issues/3160) Add `-config=` option to `testnet` cmd (@gregdhill) diff --git a/p2p/peer_behaviour.go b/p2p/peer_behaviour.go new file mode 100644 index 000000000..8d27805c3 --- /dev/null +++ b/p2p/peer_behaviour.go @@ -0,0 +1,92 @@ +package p2p + +import ( + "errors" + "sync" +) + +// PeerBehaviour are types of reportable behaviours about peers. +type PeerBehaviour int + +const ( + PeerBehaviourBadMessage = iota + PeerBehaviourMessageOutOfOrder + PeerBehaviourVote + PeerBehaviourBlockPart +) + +// PeerBehaviourReporter provides an interface for reactors to report the behaviour +// of peers synchronously to other components. +type PeerBehaviourReporter interface { + Report(peerID ID, behaviour PeerBehaviour) error +} + +// SwitchPeerBehaviouReporter reports peer behaviour to an internal Switch +type SwitchPeerBehaviourReporter struct { + sw *Switch +} + +// Return a new SwitchPeerBehaviourReporter instance which wraps the Switch. +func NewSwitchPeerBehaviourReporter(sw *Switch) *SwitchPeerBehaviourReporter { + return &SwitchPeerBehaviourReporter{ + sw: sw, + } +} + +// Report reports the behaviour of a peer to the Switch +func (spbr *SwitchPeerBehaviourReporter) Report(peerID ID, behaviour PeerBehaviour) error { + peer := spbr.sw.Peers().Get(peerID) + if peer == nil { + return errors.New("Peer not found") + } + + switch behaviour { + case PeerBehaviourVote, PeerBehaviourBlockPart: + spbr.sw.MarkPeerAsGood(peer) + case PeerBehaviourBadMessage: + spbr.sw.StopPeerForError(peer, "Bad message") + case PeerBehaviourMessageOutOfOrder: + spbr.sw.StopPeerForError(peer, "Message out of order") + default: + return errors.New("Unknown behaviour") + } + + return nil +} + +// MockPeerBehaviourReporter serves a mock concrete implementation of the +// PeerBehaviourReporter interface used in reactor tests to ensure reactors +// report the correct behaviour in manufactured scenarios. +type MockPeerBehaviourReporter struct { + mtx sync.RWMutex + pb map[ID][]PeerBehaviour +} + +// NewMockPeerBehaviourReporter returns a PeerBehaviourReporter which records all reported +// behaviours in memory. +func NewMockPeerBehaviourReporter() *MockPeerBehaviourReporter { + return &MockPeerBehaviourReporter{ + pb: map[ID][]PeerBehaviour{}, + } +} + +// Report stores the PeerBehaviour produced by the peer identified by peerID. +func (mpbr *MockPeerBehaviourReporter) Report(peerID ID, behaviour PeerBehaviour) { + mpbr.mtx.Lock() + defer mpbr.mtx.Unlock() + mpbr.pb[peerID] = append(mpbr.pb[peerID], behaviour) +} + +// GetBehaviours returns all behaviours reported on the peer identified by peerID. +func (mpbr *MockPeerBehaviourReporter) GetBehaviours(peerID ID) []PeerBehaviour { + mpbr.mtx.RLock() + defer mpbr.mtx.RUnlock() + if items, ok := mpbr.pb[peerID]; ok { + result := make([]PeerBehaviour, len(items)) + copy(result, items) + + return result + } else { + return []PeerBehaviour{} + } +} diff --git a/p2p/peer_behaviour_test.go b/p2p/peer_behaviour_test.go new file mode 100644 index 000000000..03ea4969f --- /dev/null +++ b/p2p/peer_behaviour_test.go @@ -0,0 +1,115 @@ +package p2p + +import ( + "net" + "sync" + "testing" +) + +// TestMockPeerBehaviour tests the MockPeerBehaviour' ability to store reported +// peer behaviour in memory indexed by the peerID +func TestMockPeerBehaviourReporter(t *testing.T) { + peer := newMockPeer(net.IP{127, 0, 0, 1}) + pr := NewMockPeerBehaviourReporter() + + behaviours := pr.GetBehaviours(peer.ID()) + if len(behaviours) != 0 { + t.Errorf("Expected to have no behaviours reported") + } + + pr.Report(peer.ID(), PeerBehaviourBadMessage) + behaviours = pr.GetBehaviours(peer.ID()) + if len(behaviours) != 1 { + t.Errorf("Expected the peer have one reported behaviour") + } + + if behaviours[0] != PeerBehaviourBadMessage { + t.Errorf("Expected PeerBehaviourBadMessage to have been reported") + } +} + +type scriptedBehaviours struct { + PeerID ID + Behaviours []PeerBehaviour +} + +type scriptItem struct { + PeerID ID + Behaviour PeerBehaviour +} + +func equalBehaviours(a []PeerBehaviour, b []PeerBehaviour) bool { + if len(a) != len(b) { + return false + } + same := make([]PeerBehaviour, len(a)) + + for i, aBehaviour := range a { + for _, bBehaviour := range b { + if aBehaviour == bBehaviour { + same[i] = aBehaviour + } + } + } + + return len(same) == len(a) +} + +// TestPeerBehaviourConcurrency constructs a scenario in which +// multiple goroutines are using the same MockPeerBehaviourReporter instance. +// This test reproduces the conditions in which MockPeerBehaviourReporter will +// be used within a Reactor Receive method tests to ensure thread safety. +func TestMockPeerBehaviourReporterConcurrency(t *testing.T) { + behaviourScript := []scriptedBehaviours{ + {"1", []PeerBehaviour{PeerBehaviourVote}}, + {"2", []PeerBehaviour{PeerBehaviourVote, PeerBehaviourVote, PeerBehaviourVote, PeerBehaviourVote}}, + {"3", []PeerBehaviour{PeerBehaviourBlockPart, PeerBehaviourVote, PeerBehaviourBlockPart, PeerBehaviourVote}}, + {"4", []PeerBehaviour{PeerBehaviourVote, PeerBehaviourVote, PeerBehaviourVote, PeerBehaviourVote}}, + {"5", []PeerBehaviour{PeerBehaviourBlockPart, PeerBehaviourVote, PeerBehaviourBlockPart, PeerBehaviourVote}}, + } + + var receiveWg sync.WaitGroup + pr := NewMockPeerBehaviourReporter() + scriptItems := make(chan scriptItem) + done := make(chan int) + numConsumers := 3 + for i := 0; i < numConsumers; i++ { + receiveWg.Add(1) + go func() { + defer receiveWg.Done() + for { + select { + case pb := <-scriptItems: + pr.Report(pb.PeerID, pb.Behaviour) + case <-done: + return + } + } + }() + } + + var sendingWg sync.WaitGroup + sendingWg.Add(1) + go func() { + defer sendingWg.Done() + for _, item := range behaviourScript { + for _, reason := range item.Behaviours { + scriptItems <- scriptItem{item.PeerID, reason} + } + } + }() + + sendingWg.Wait() + + for i := 0; i < numConsumers; i++ { + done <- 1 + } + + receiveWg.Wait() + + for _, items := range behaviourScript { + if !equalBehaviours(pr.GetBehaviours(items.PeerID), items.Behaviours) { + t.Errorf("Expected peer %s to have behaved \n", items.PeerID) + } + } +}