diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index bb86caff4..41c694dd2 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -44,4 +44,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES: +- [blockchain/v2] [\#4761](https://github.com/tendermint/tendermint/pull/4761) Fix excessive CPU usage caused by spinning on closed channels (@erikgrinaker) - [light] [\#4741](https://github.com/tendermint/tendermint/pull/4741) Correctly return `ErrSignedHeaderNotFound` and `ErrValidatorSetNotFound` on corresponding RPC errors (@erikgrinaker) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 1fb8ada1a..0bf345237 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -130,7 +130,6 @@ type BlockchainReactor struct { p2p.BaseReactor events chan Event // XXX: Rename eventsFromPeers - stopDemux chan struct{} scheduler *Routine processor *Routine logger log.Logger @@ -166,7 +165,6 @@ func newReactor(state state.State, store blockStore, reporter behaviour.Reporter return &BlockchainReactor{ events: make(chan Event, bufferSize), - stopDemux: make(chan struct{}), scheduler: newRoutine("scheduler", scheduler.handle, bufferSize), processor: newRoutine("processor", processor.handle, bufferSize), store: store, @@ -306,19 +304,29 @@ func (r *BlockchainReactor) demux() { processBlockFreq = 20 * time.Millisecond doProcessBlockCh = make(chan struct{}, 1) doProcessBlockTk = time.NewTicker(processBlockFreq) + ) + defer doProcessBlockTk.Stop() + var ( prunePeerFreq = 1 * time.Second doPrunePeerCh = make(chan struct{}, 1) doPrunePeerTk = time.NewTicker(prunePeerFreq) + ) + defer doPrunePeerTk.Stop() + var ( scheduleFreq = 20 * time.Millisecond doScheduleCh = make(chan struct{}, 1) doScheduleTk = time.NewTicker(scheduleFreq) + ) + defer doScheduleTk.Stop() + var ( statusFreq = 10 * time.Second doStatusCh = make(chan struct{}, 1) doStatusTk = time.NewTicker(statusFreq) ) + defer doStatusTk.Stop() // XXX: Extract timers to make testing atemporal for { @@ -355,14 +363,20 @@ func (r *BlockchainReactor) demux() { case <-doStatusCh: r.io.broadcastStatusRequest(r.store.Base(), r.SyncHeight()) - // Events from peers - case event := <-r.events: + // Events from peers. Closing the channel signals event loop termination. + case event, ok := <-r.events: + if !ok { + r.logger.Info("Stopping event processing") + return + } switch event := event.(type) { case bcStatusResponse: r.setMaxPeerHeight(event.height) r.scheduler.send(event) case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse: r.scheduler.send(event) + default: + r.logger.Error("Received unknown event", "event", fmt.Sprintf("%T", event)) } // Incremental events form scheduler @@ -378,6 +392,9 @@ func (r *BlockchainReactor) demux() { case scFinishedEv: r.processor.send(event) r.scheduler.stop() + case noOpEvent: + default: + r.logger.Error("Received unknown scheduler event", "event", fmt.Sprintf("%T", event)) } // Incremental events from processor @@ -397,20 +414,28 @@ func (r *BlockchainReactor) demux() { case pcFinished: r.io.trySwitchToConsensus(event.tmState, event.blocksSynced > 0) r.processor.stop() + case noOpEvent: + default: + r.logger.Error("Received unknown processor event", "event", fmt.Sprintf("%T", event)) } - // Terminal events from scheduler + // Terminal event from scheduler case err := <-r.scheduler.final(): - r.logger.Info(fmt.Sprintf("scheduler final %s", err)) - // send the processor stop? + switch err { + case nil: + r.logger.Info("Scheduler stopped") + default: + r.logger.Error("Scheduler aborted with error", "err", err) + } // Terminal event from processor - case event := <-r.processor.final(): - r.logger.Info(fmt.Sprintf("processor final %s", event)) - - case <-r.stopDemux: - r.logger.Info("demuxing stopped") - return + case err := <-r.processor.final(): + switch err { + case nil: + r.logger.Info("Processor stopped") + default: + r.logger.Error("Processor aborted with error", "err", err) + } } } } @@ -421,7 +446,6 @@ func (r *BlockchainReactor) Stop() error { r.scheduler.stop() r.processor.stop() - close(r.stopDemux) close(r.events) r.logger.Info("reactor stopped") diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index ff12bfebc..ad32e3e82 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -69,9 +69,11 @@ func (rt *Routine) start() { for { events, err := rt.queue.Get(1) - if err != nil { - rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) - rt.terminate(fmt.Errorf("stopped")) + if err == queue.ErrDisposed { + rt.terminate(nil) + return + } else if err != nil { + rt.terminate(err) return } oEvent, err := rt.handle(events[0].(Event)) @@ -131,6 +133,7 @@ func (rt *Routine) final() chan error { // XXX: Maybe get rid of this func (rt *Routine) terminate(reason error) { - close(rt.out) + // We don't close the rt.out channel here, to avoid spinning on the closed channel + // in the event loop. rt.fin <- reason }