blockchain/v1: fix deadlock (#5711)

I introduced a new variable - syncEnded, which is now used to prevent
sending new events to channels (which would block otherwise) if reactor
is finished syncing

Closes #4591
This commit is contained in:
Anton Kaliaev
2020-12-01 17:08:33 +04:00
parent bf873365e7
commit f5b25e020b
3 changed files with 48 additions and 25 deletions

View File

@@ -22,3 +22,4 @@ program](https://hackerone.com/tendermint).
### BUG FIXES:
- [blockchain/v1] [\#5701](https://github.com/tendermint/tendermint/pull/5701) Handle peers without blocks (@melekes)
- [blockchain/v1] \#5711 Fix deadlock (@melekes)

View File

@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"reflect"
"sync/atomic"
"time"
amino "github.com/tendermint/go-amino"
@@ -73,6 +74,9 @@ type BlockchainReactor struct {
eventsFromFSMCh chan bcFsmMessage
swReporter *behaviour.SwitchReporter
// Atomic integer (0 - sync in progress, 1 - finished syncing)
syncEnded int32
}
// NewBlockchainReactor returns new reactor instance.
@@ -144,13 +148,22 @@ func (bcR *BlockchainReactor) OnStart() error {
bcR.swReporter = behaviour.NewSwitcReporter(bcR.BaseReactor.Switch)
if bcR.fastSync {
go bcR.poolRoutine()
} else { // if we're not fast syncing, mark it as finished
bcR.setSyncEnded()
}
return nil
}
// OnStop implements cmn.Service.
func (bcR *BlockchainReactor) OnStop() {
_ = bcR.Stop()
}
func (bcR *BlockchainReactor) isSyncEnded() bool {
return atomic.LoadInt32(&(bcR.syncEnded)) != 0
}
func (bcR *BlockchainReactor) setSyncEnded() {
atomic.StoreInt32(&(bcR.syncEnded), 1)
}
// GetChannels implements Reactor
@@ -201,6 +214,10 @@ func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessa
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
if bcR.isSyncEnded() {
return
}
msgData := bcReactorMessage{
event: peerRemoveEv,
data: bReactorEventData{
@@ -244,6 +261,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
}
case *bcBlockResponseMessage:
if bcR.isSyncEnded() {
return
}
msgForFSM := bcReactorMessage{
event: blockResponseEv,
data: bReactorEventData{
@@ -257,6 +278,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
bcR.messagesForFSMCh <- msgForFSM
case *bcNoBlockResponseMessage:
if bcR.isSyncEnded() {
return
}
msgForFSM := bcReactorMessage{
event: noBlockResponseEv,
data: bReactorEventData{
@@ -268,6 +293,10 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
bcR.messagesForFSMCh <- msgForFSM
case *bcStatusResponseMessage:
if bcR.isSyncEnded() {
return
}
// Got a peer status. Unverified.
msgForFSM := bcReactorMessage{
event: statusResponseEv,
@@ -278,7 +307,6 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
},
}
bcR.messagesForFSMCh <- msgForFSM
default:
bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg)))
}
@@ -286,16 +314,20 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
// processBlocksRoutine processes blocks until signlaed to stop over the stopProcessing channel
func (bcR *BlockchainReactor) processBlocksRoutine(stopProcessing chan struct{}) {
processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
doProcessBlockCh := make(chan struct{}, 1)
defer processReceivedBlockTicker.Stop()
lastHundred := time.Now()
lastRate := 0.0
var (
doProcessBlockCh = make(chan struct{}, 1)
lastHundred = time.Now()
lastRate = 0.0
)
ForLoop:
for {
select {
case <-bcR.Quit():
break ForLoop
case <-stopProcessing:
bcR.Logger.Info("finishing block execution")
break ForLoop
@@ -338,12 +370,14 @@ ForLoop:
// poolRoutine receives and handles messages from the Receive() routine and from the FSM.
func (bcR *BlockchainReactor) poolRoutine() {
bcR.fsm.Start()
sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond)
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer sendBlockRequestTicker.Stop()
// NOTE: statusUpdateTicker can continue to run
stopProcessing := make(chan struct{}, 1)
go bcR.processBlocksRoutine(stopProcessing)
@@ -376,12 +410,10 @@ ForLoop:
case msg := <-bcR.eventsFromFSMCh:
switch msg.event {
case syncFinishedEv:
case syncFinishedEv: // Sent from the FSM when it enters finished state.
stopProcessing <- struct{}{}
// Sent from the FSM when it enters finished state.
break ForLoop
case peerErrorEv:
// Sent from the FSM when it detects peer error
bcR.setSyncEnded()
case peerErrorEv: // Sent from the FSM when it detects peer error
bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID)
if msg.data.err == errNoPeerResponse {
// Sent from the peer timeout handler routine
@@ -448,14 +480,12 @@ func (bcR *BlockchainReactor) processBlock() error {
return nil
}
// Implements bcRNotifier
// sendStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) sendStatusRequest() {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()})
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
}
// Implements bcRNotifier
// BlockRequest sends `BlockRequest` height.
func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
peer := bcR.Switch.Peers().Get(peerID)
@@ -471,19 +501,14 @@ func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) erro
return nil
}
// Implements bcRNotifier
func (bcR *BlockchainReactor) switchToConsensus() {
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(bcR.state, bcR.blocksSynced)
bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv}
}
// else {
// Should only happen during testing.
// }
}
// Implements bcRNotifier
// Called by FSM and pool:
// - pool calls when it detects slow peer or when peer times out
// - FSM calls when:
@@ -501,7 +526,6 @@ func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
bcR.eventsFromFSMCh <- msgData
}
// Implements bcRNotifier
func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
if timer == nil {
panic("nil timer pointer parameter")

View File

@@ -226,11 +226,9 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
}
}
// NOTE: This is too hard to test without
// an easy way to add test peer to switch
// or without significant refactoring of the module.
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
// NOTE: This is too hard to test without an easy way to add test peer to
// switch or without significant refactoring of the module. Alternatively we
// could actually dial a TCP conn but that seems extreme.
func TestFastSyncBadBlockStopsPeer(t *testing.T) {
numNodes := 4
maxBlockHeight := int64(148)