mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-10 06:57:24 +00:00
blockchain/v2: backport fixes (#4887)
* blockchain/v2: fix excessive CPU usage due to spinning on closed channels (#4761) The event loop uses a `select` on multiple channels. However, reading from a closed channel in Go always yields the channel's zero value. The processor and scheduler close their channels when done, and since these channels are always ready to receive, the event loop keeps spinning on them. This changes `routine.terminate()` to not close the channel, and also removes `stopDemux` and instead uses `events` channel closure to signal event loop termination. Fixes #4687. * blockchain/v2: respect fast_sync option (#4772) Not thoroughly tested, but seems to work. Will do further testing as this is integrated with state sync. Fixes #4688.
This commit is contained in:
@@ -35,4 +35,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [blockchain/v2] [\#4761](https://github.com/tendermint/tendermint/pull/4761) Fix excessive CPU usage caused by spinning on closed channels (@erikgrinaker)
|
||||
- [blockchain/v2] Respect `fast_sync` option (@erikgrinaker)
|
||||
- [light] [\#4741](https://github.com/tendermint/tendermint/pull/4741) Correctly return `ErrSignedHeaderNotFound` and `ErrValidatorSetNotFound` on corresponding RPC errors (@erikgrinaker)
|
||||
|
||||
@@ -129,8 +129,8 @@ type blockStore interface {
|
||||
type BlockchainReactor struct {
|
||||
p2p.BaseReactor
|
||||
|
||||
fastSync bool // if true, enable fast sync on start
|
||||
events chan Event // XXX: Rename eventsFromPeers
|
||||
stopDemux chan struct{}
|
||||
scheduler *Routine
|
||||
processor *Routine
|
||||
logger log.Logger
|
||||
@@ -157,7 +157,7 @@ type blockApplier interface {
|
||||
// XXX: unify naming in this package around tmState
|
||||
// XXX: V1 stores a copy of state as initialState, which is never mutated. Is that nessesary?
|
||||
func newReactor(state state.State, store blockStore, reporter behaviour.Reporter,
|
||||
blockApplier blockApplier, bufferSize int) *BlockchainReactor {
|
||||
blockApplier blockApplier, bufferSize int, fastSync bool) *BlockchainReactor {
|
||||
scheduler := newScheduler(state.LastBlockHeight, time.Now())
|
||||
pContext := newProcessorContext(store, blockApplier, state)
|
||||
// TODO: Fix naming to just newProcesssor
|
||||
@@ -166,12 +166,12 @@ func newReactor(state state.State, store blockStore, reporter behaviour.Reporter
|
||||
|
||||
return &BlockchainReactor{
|
||||
events: make(chan Event, bufferSize),
|
||||
stopDemux: make(chan struct{}),
|
||||
scheduler: newRoutine("scheduler", scheduler.handle, bufferSize),
|
||||
processor: newRoutine("processor", processor.handle, bufferSize),
|
||||
store: store,
|
||||
reporter: reporter,
|
||||
logger: log.NewNopLogger(),
|
||||
fastSync: fastSync,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,7 +182,7 @@ func NewBlockchainReactor(
|
||||
store blockStore,
|
||||
fastSync bool) *BlockchainReactor {
|
||||
reporter := behaviour.NewMockReporter()
|
||||
return newReactor(state, store, reporter, blockApplier, 1000)
|
||||
return newReactor(state, store, reporter, blockApplier, 1000, fastSync)
|
||||
}
|
||||
|
||||
// SetSwitch implements Reactor interface.
|
||||
@@ -226,9 +226,11 @@ func (r *BlockchainReactor) SetLogger(logger log.Logger) {
|
||||
// Start implements cmn.Service interface
|
||||
func (r *BlockchainReactor) Start() error {
|
||||
r.reporter = behaviour.NewSwitchReporter(r.BaseReactor.Switch)
|
||||
go r.scheduler.start()
|
||||
go r.processor.start()
|
||||
go r.demux()
|
||||
if r.fastSync {
|
||||
go r.scheduler.start()
|
||||
go r.processor.start()
|
||||
go r.demux()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -306,19 +308,29 @@ func (r *BlockchainReactor) demux() {
|
||||
processBlockFreq = 20 * time.Millisecond
|
||||
doProcessBlockCh = make(chan struct{}, 1)
|
||||
doProcessBlockTk = time.NewTicker(processBlockFreq)
|
||||
)
|
||||
defer doProcessBlockTk.Stop()
|
||||
|
||||
var (
|
||||
prunePeerFreq = 1 * time.Second
|
||||
doPrunePeerCh = make(chan struct{}, 1)
|
||||
doPrunePeerTk = time.NewTicker(prunePeerFreq)
|
||||
)
|
||||
defer doPrunePeerTk.Stop()
|
||||
|
||||
var (
|
||||
scheduleFreq = 20 * time.Millisecond
|
||||
doScheduleCh = make(chan struct{}, 1)
|
||||
doScheduleTk = time.NewTicker(scheduleFreq)
|
||||
)
|
||||
defer doScheduleTk.Stop()
|
||||
|
||||
var (
|
||||
statusFreq = 10 * time.Second
|
||||
doStatusCh = make(chan struct{}, 1)
|
||||
doStatusTk = time.NewTicker(statusFreq)
|
||||
)
|
||||
defer doStatusTk.Stop()
|
||||
|
||||
// XXX: Extract timers to make testing atemporal
|
||||
for {
|
||||
@@ -355,14 +367,20 @@ func (r *BlockchainReactor) demux() {
|
||||
case <-doStatusCh:
|
||||
r.io.broadcastStatusRequest(r.store.Base(), r.SyncHeight())
|
||||
|
||||
// Events from peers
|
||||
case event := <-r.events:
|
||||
// Events from peers. Closing the channel signals event loop termination.
|
||||
case event, ok := <-r.events:
|
||||
if !ok {
|
||||
r.logger.Info("Stopping event processing")
|
||||
return
|
||||
}
|
||||
switch event := event.(type) {
|
||||
case bcStatusResponse:
|
||||
r.setMaxPeerHeight(event.height)
|
||||
r.scheduler.send(event)
|
||||
case bcAddNewPeer, bcRemovePeer, bcBlockResponse, bcNoBlockResponse:
|
||||
r.scheduler.send(event)
|
||||
default:
|
||||
r.logger.Error("Received unknown event", "event", fmt.Sprintf("%T", event))
|
||||
}
|
||||
|
||||
// Incremental events form scheduler
|
||||
@@ -378,6 +396,9 @@ func (r *BlockchainReactor) demux() {
|
||||
case scFinishedEv:
|
||||
r.processor.send(event)
|
||||
r.scheduler.stop()
|
||||
case noOpEvent:
|
||||
default:
|
||||
r.logger.Error("Received unknown scheduler event", "event", fmt.Sprintf("%T", event))
|
||||
}
|
||||
|
||||
// Incremental events from processor
|
||||
@@ -397,20 +418,28 @@ func (r *BlockchainReactor) demux() {
|
||||
case pcFinished:
|
||||
r.io.trySwitchToConsensus(event.tmState, event.blocksSynced)
|
||||
r.processor.stop()
|
||||
case noOpEvent:
|
||||
default:
|
||||
r.logger.Error("Received unknown processor event", "event", fmt.Sprintf("%T", event))
|
||||
}
|
||||
|
||||
// Terminal events from scheduler
|
||||
// Terminal event from scheduler
|
||||
case err := <-r.scheduler.final():
|
||||
r.logger.Info(fmt.Sprintf("scheduler final %s", err))
|
||||
// send the processor stop?
|
||||
switch err {
|
||||
case nil:
|
||||
r.logger.Info("Scheduler stopped")
|
||||
default:
|
||||
r.logger.Error("Scheduler aborted with error", "err", err)
|
||||
}
|
||||
|
||||
// Terminal event from processor
|
||||
case event := <-r.processor.final():
|
||||
r.logger.Info(fmt.Sprintf("processor final %s", event))
|
||||
|
||||
case <-r.stopDemux:
|
||||
r.logger.Info("demuxing stopped")
|
||||
return
|
||||
case err := <-r.processor.final():
|
||||
switch err {
|
||||
case nil:
|
||||
r.logger.Info("Processor stopped")
|
||||
default:
|
||||
r.logger.Error("Processor aborted with error", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -421,7 +450,6 @@ func (r *BlockchainReactor) Stop() error {
|
||||
|
||||
r.scheduler.stop()
|
||||
r.processor.stop()
|
||||
close(r.stopDemux)
|
||||
close(r.events)
|
||||
|
||||
r.logger.Info("reactor stopped")
|
||||
|
||||
@@ -156,7 +156,7 @@ func newTestReactor(p testReactorParams) *BlockchainReactor {
|
||||
sm.SaveState(db, state)
|
||||
}
|
||||
|
||||
r := newReactor(state, store, reporter, appl, p.bufferSize)
|
||||
r := newReactor(state, store, reporter, appl, p.bufferSize, true)
|
||||
logger := log.TestingLogger()
|
||||
r.SetLogger(logger.With("module", "blockchain"))
|
||||
|
||||
|
||||
@@ -69,9 +69,11 @@ func (rt *Routine) start() {
|
||||
|
||||
for {
|
||||
events, err := rt.queue.Get(1)
|
||||
if err != nil {
|
||||
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
|
||||
rt.terminate(fmt.Errorf("stopped"))
|
||||
if err == queue.ErrDisposed {
|
||||
rt.terminate(nil)
|
||||
return
|
||||
} else if err != nil {
|
||||
rt.terminate(err)
|
||||
return
|
||||
}
|
||||
oEvent, err := rt.handle(events[0].(Event))
|
||||
@@ -131,6 +133,7 @@ func (rt *Routine) final() chan error {
|
||||
|
||||
// XXX: Maybe get rid of this
|
||||
func (rt *Routine) terminate(reason error) {
|
||||
close(rt.out)
|
||||
// We don't close the rt.out channel here, to avoid spinning on the closed channel
|
||||
// in the event loop.
|
||||
rt.fin <- reason
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user