From f5b25e020b9698fe9200288533ebe1f75ebc854a Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 1 Dec 2020 17:08:33 +0400 Subject: [PATCH] blockchain/v1: fix deadlock (#5711) I introduced a new variable - syncEnded, which is now used to prevent sending new events to channels (which would block otherwise) if reactor is finished syncing Closes #4591 --- CHANGELOG_PENDING.md | 1 + blockchain/v1/reactor.go | 64 ++++++++++++++++++++++++----------- blockchain/v1/reactor_test.go | 8 ++--- 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 7ad305442..572005d35 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -22,3 +22,4 @@ program](https://hackerone.com/tendermint). ### BUG FIXES: - [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes) +- [blockchain/v1] \#5711 Fix deadlock (@melekes) diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go index b35932330..e9e657789 100644 --- a/blockchain/v1/reactor.go +++ b/blockchain/v1/reactor.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "reflect" + "sync/atomic" "time" amino "github.com/tendermint/go-amino" @@ -73,6 +74,9 @@ type BlockchainReactor struct { eventsFromFSMCh chan bcFsmMessage swReporter *behaviour.SwitchReporter + + // Atomic integer (0 - sync in progress, 1 - finished syncing) + syncEnded int32 } // NewBlockchainReactor returns new reactor instance. @@ -144,13 +148,22 @@ func (bcR *BlockchainReactor) OnStart() error { bcR.swReporter = behaviour.NewSwitcReporter(bcR.BaseReactor.Switch) if bcR.fastSync { go bcR.poolRoutine() + } else { // if we're not fast syncing, mark it as finished + bcR.setSyncEnded() } return nil } // OnStop implements cmn.Service. func (bcR *BlockchainReactor) OnStop() { - _ = bcR.Stop() +} + +func (bcR *BlockchainReactor) isSyncEnded() bool { + return atomic.LoadInt32(&(bcR.syncEnded)) != 0 +} + +func (bcR *BlockchainReactor) setSyncEnded() { + atomic.StoreInt32(&(bcR.syncEnded), 1) } // GetChannels implements Reactor @@ -201,6 +214,10 @@ func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessa // RemovePeer implements Reactor by removing peer from the pool. func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + if bcR.isSyncEnded() { + return + } + msgData := bcReactorMessage{ event: peerRemoveEv, data: bReactorEventData{ @@ -244,6 +261,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) } case *bcBlockResponseMessage: + if bcR.isSyncEnded() { + return + } + msgForFSM := bcReactorMessage{ event: blockResponseEv, data: bReactorEventData{ @@ -257,6 +278,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) bcR.messagesForFSMCh <- msgForFSM case *bcNoBlockResponseMessage: + if bcR.isSyncEnded() { + return + } + msgForFSM := bcReactorMessage{ event: noBlockResponseEv, data: bReactorEventData{ @@ -268,6 +293,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) bcR.messagesForFSMCh <- msgForFSM case *bcStatusResponseMessage: + if bcR.isSyncEnded() { + return + } + // Got a peer status. Unverified. msgForFSM := bcReactorMessage{ event: statusResponseEv, @@ -278,7 +307,6 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) }, } bcR.messagesForFSMCh <- msgForFSM - default: bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg))) } @@ -286,16 +314,20 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) // processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel func (bcR *BlockchainReactor) processBlocksRoutine(stopProcessing chan struct{}) { - processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) - doProcessBlockCh := make(chan struct{}, 1) + defer processReceivedBlockTicker.Stop() - lastHundred := time.Now() - lastRate := 0.0 + var ( + doProcessBlockCh = make(chan struct{}, 1) + lastHundred = time.Now() + lastRate = 0.0 + ) ForLoop: for { select { + case <-bcR.Quit(): + break ForLoop case <-stopProcessing: bcR.Logger.Info("finishing block execution") break ForLoop @@ -338,12 +370,14 @@ ForLoop: // poolRoutine receives and handles messages from the Receive() routine and from the FSM. func (bcR *BlockchainReactor) poolRoutine() { - bcR.fsm.Start() sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond) statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + defer sendBlockRequestTicker.Stop() + // NOTE: statusUpdateTicker can continue to run + stopProcessing := make(chan struct{}, 1) go bcR.processBlocksRoutine(stopProcessing) @@ -376,12 +410,10 @@ ForLoop: case msg := <-bcR.eventsFromFSMCh: switch msg.event { - case syncFinishedEv: + case syncFinishedEv: // Sent from the FSM when it enters finished state. stopProcessing <- struct{}{} - // Sent from the FSM when it enters finished state. - break ForLoop - case peerErrorEv: - // Sent from the FSM when it detects peer error + bcR.setSyncEnded() + case peerErrorEv: // Sent from the FSM when it detects peer error bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID) if msg.data.err == errNoPeerResponse { // Sent from the peer timeout handler routine @@ -448,14 +480,12 @@ func (bcR *BlockchainReactor) processBlock() error { return nil } -// Implements bcRNotifier // sendStatusRequest broadcasts `BlockStore` height. func (bcR *BlockchainReactor) sendStatusRequest() { msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()}) bcR.Switch.Broadcast(BlockchainChannel, msgBytes) } -// Implements bcRNotifier // BlockRequest sends `BlockRequest` height. func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error { peer := bcR.Switch.Peers().Get(peerID) @@ -471,19 +501,14 @@ func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) erro return nil } -// Implements bcRNotifier func (bcR *BlockchainReactor) switchToConsensus() { conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) if ok { conR.SwitchToConsensus(bcR.state, bcR.blocksSynced) bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv} } - // else { - // Should only happen during testing. - // } } -// Implements bcRNotifier // Called by FSM and pool: // - pool calls when it detects slow peer or when peer times out // - FSM calls when: @@ -501,7 +526,6 @@ func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { bcR.eventsFromFSMCh <- msgData } -// Implements bcRNotifier func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { if timer == nil { panic("nil timer pointer parameter") diff --git a/blockchain/v1/reactor_test.go b/blockchain/v1/reactor_test.go index d9cc99af6..02243f5d3 100644 --- a/blockchain/v1/reactor_test.go +++ b/blockchain/v1/reactor_test.go @@ -226,11 +226,9 @@ func TestFastSyncNoBlockResponse(t *testing.T) { } } -// NOTE: This is too hard to test without -// an easy way to add test peer to switch -// or without significant refactoring of the module. -// Alternatively we could actually dial a TCP conn but -// that seems extreme. +// NOTE: This is too hard to test without an easy way to add test peer to +// switch or without significant refactoring of the module. Alternatively we +// could actually dial a TCP conn but that seems extreme. func TestFastSyncBadBlockStopsPeer(t *testing.T) { numNodes := 4 maxBlockHeight := int64(148)