diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 1445b24ae..063776cb1 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -2,6 +2,7 @@ package v0 import ( "fmt" + "sync" "time" "github.com/gogo/protobuf/proto" @@ -89,6 +90,11 @@ type Reactor struct { requestsCh <-chan BlockRequest errorsCh <-chan peerError + + // poolWG is used to synchronize the graceful shutdown of the poolRoutine and + // requestRoutine spawned goroutines when stopping the reactor and before + // stopping the p2p Channel(s). + poolWG sync.WaitGroup } // NewReactor returns new reactor instance. @@ -163,6 +169,9 @@ func (r *Reactor) OnStop() { } } + // wait for the poolRoutine and requestRoutine goroutines to gracefully exit + r.poolWG.Wait() + // Close closeCh to signal to all spawned goroutines to gracefully exit. All // p2p Channels should execute Close(). close(r.closeCh) @@ -362,17 +371,67 @@ func (r *Reactor) SwitchToFastSync(state sm.State) error { return nil } +func (r *Reactor) broadcastStatusRequest() { + envelope := p2p.Envelope{ + Broadcast: true, + Message: &bcproto.StatusRequest{}, + } + + r.poolWG.Add(1) + defer r.poolWG.Done() + + select { + case <-r.Quit(): + return + + case <-r.pool.Quit(): + return + + case r.blockchainCh.Out() <- envelope: + } +} + +func (r *Reactor) requestRoutine() { + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + defer statusUpdateTicker.Stop() + + r.poolWG.Add(1) + defer r.poolWG.Done() + + for { + select { + case <-r.Quit(): + return + + case <-r.pool.Quit(): + return + + case request := <-r.requestsCh: + r.blockchainCh.Out() <- p2p.Envelope{ + To: request.PeerID, + Message: &bcproto.BlockRequest{Height: request.Height}, + } + + case pErr := <-r.errorsCh: + r.blockchainCh.Error() <- p2p.PeerError{ + PeerID: pErr.peerID, + Err: pErr.err, + Severity: p2p.PeerErrorSeverityLow, + } + + case <-statusUpdateTicker.C: + go r.broadcastStatusRequest() + } + } +} + // poolRoutine handles messages from the poolReactor telling the reactor what to // do. // // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -// -// TODO: Ensure this works nicely with stopping reactor and that no race -// conditions or deadlocks exist. func (r *Reactor) poolRoutine(stateSynced bool) { var ( trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond) - statusUpdateTicker = time.NewTicker(statusUpdateIntervalSeconds * time.Second) switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second) blocksSynced = uint64(0) @@ -387,42 +446,12 @@ func (r *Reactor) poolRoutine(stateSynced bool) { ) defer trySyncTicker.Stop() - defer statusUpdateTicker.Stop() defer switchToConsensusTicker.Stop() - go func() { - for { - select { + go r.requestRoutine() - case <-r.Quit(): - return - - case <-r.pool.Quit(): - return - - case request := <-r.requestsCh: - r.blockchainCh.Out() <- p2p.Envelope{ - To: request.PeerID, - Message: &bcproto.BlockRequest{Height: request.Height}, - } - - case pErr := <-r.errorsCh: - r.blockchainCh.Error() <- p2p.PeerError{ - PeerID: pErr.peerID, - Err: pErr.err, - Severity: p2p.PeerErrorSeverityLow, - } - - case <-statusUpdateTicker.C: - go func() { - r.blockchainCh.Out() <- p2p.Envelope{ - Broadcast: true, - Message: &bcproto.StatusRequest{}, - } - }() - } - } - }() + r.poolWG.Add(1) + defer r.poolWG.Done() FOR_LOOP: for { diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index 150b6a01a..5ea40eaca 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -386,11 +386,13 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) { "expected new node to be fully synced", ) - require.Eventually( + require.Eventuallyf( t, func() bool { return len(newSuite.reactor.pool.peers) == len(testSuites)-3 }, - time.Second, + 10*time.Second, 10*time.Millisecond, - "invalid number of peers", + "invalid number of peers; expected: %d, got: %d", + len(testSuites)-3, + len(newSuite.reactor.pool.peers), ) }