mirror of
https://github.com/tendermint/tendermint.git
synced 2026-02-07 12:30:45 +00:00
* Add processor prototype
* Change processor API
+ expose a simple `handle` function which mutates internal state
* schedule event handling
* rename schedule -> scheduler
* fill in handle function
* processor tests
* fix gofmt and ohter golangci issues
* scopelint var on range scope
* add check for short block received
* small test reorg
* ci fix changes
* go.mod revert
* some cleanup and review comments
* scheduler fixes and unit tests, also small processor changes.
changed scPeerPruned to include a list of pruned peers
touchPeer to check peer state and remove the blocks from blockStates if the peer removal causes the max peer height to be lower.
remove the block at sc.initHeight
changed peersInactiveSince, peersSlowerThan, getPeersAtHeight check peer state
prunablePeers to return a sorted list of peers
lastRate in markReceived() attempted to divide by 0, temp fix.
fixed allBlocksProcessed conditions
maxHeight() and minHeight() to return sc.initHeight if no ready peers present
make selectPeer() deterministic.
added handleBlockProcessError()
added termination cond. (sc.allBlocksProcessed()) to handleTryPrunePeer() and others.
changed pcBlockVerificationFailure to include peer of H+2 block along with the one for H+1
changed the processor to call purgePeer on block verification failure.
fixed processor tests
added scheduler tests.
* typo and ci fixes
* remove height from scBlockRequest, golangci fixes
* limit on blockState map, updated tests
* remove unused
* separate test for maxHeight(), used for sched. validation
* use Math.Min
* fix golangci
* Document the semantics of blockStates in the scheduler
* better docs
* distinguish between unknown and invalid blockstate
* Standardize peer filtering methods
* feedback
* s/getPeersAtHeight/getPeersAtHeightOrAbove
* small notes
* Update blockchain/v2/scheduler.go
Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com>
* Update comments based on feedback
* Add enum offset
* panic on nil block in processor
* remove unused max height calculation
* format shorter line
135 lines
3.3 KiB
Go
135 lines
3.3 KiB
Go
package v2
|
|
|
|
import (
|
|
"fmt"
|
|
"sync/atomic"
|
|
|
|
"github.com/Workiva/go-datastructures/queue"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
)
|
|
|
|
type handleFunc = func(event Event) (Event, error)
|
|
|
|
// Routines are a structure which model a finite state machine as serialized
|
|
// stream of events processed by a handle function. This Routine structure
|
|
// handles the concurrency and messaging guarantees. Events are sent via
|
|
// `send` are handled by the `handle` function to produce an iterator
|
|
// `next()`. Calling `stop()` on a routine will conclude processing of all
|
|
// sent events and produce `final()` event representing the terminal state.
|
|
type Routine struct {
|
|
name string
|
|
handle handleFunc
|
|
queue *queue.PriorityQueue
|
|
out chan Event
|
|
fin chan error
|
|
rdy chan struct{}
|
|
running *uint32
|
|
logger log.Logger
|
|
metrics *Metrics
|
|
}
|
|
|
|
func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine {
|
|
return &Routine{
|
|
name: name,
|
|
handle: handleFunc,
|
|
queue: queue.NewPriorityQueue(bufferSize, true),
|
|
out: make(chan Event, bufferSize),
|
|
rdy: make(chan struct{}, 1),
|
|
fin: make(chan error, 1),
|
|
running: new(uint32),
|
|
logger: log.NewNopLogger(),
|
|
metrics: NopMetrics(),
|
|
}
|
|
}
|
|
|
|
// nolint: unused
|
|
func (rt *Routine) setLogger(logger log.Logger) {
|
|
rt.logger = logger
|
|
}
|
|
|
|
// nolint:unused
|
|
func (rt *Routine) setMetrics(metrics *Metrics) {
|
|
rt.metrics = metrics
|
|
}
|
|
|
|
func (rt *Routine) start() {
|
|
rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name))
|
|
running := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
|
|
if !running {
|
|
panic(fmt.Sprintf("%s is already running", rt.name))
|
|
}
|
|
close(rt.rdy)
|
|
defer func() {
|
|
stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0))
|
|
if !stopped {
|
|
panic(fmt.Sprintf("%s is failed to stop", rt.name))
|
|
}
|
|
}()
|
|
|
|
for {
|
|
events, err := rt.queue.Get(1)
|
|
if err != nil {
|
|
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
|
|
rt.terminate(fmt.Errorf("stopped"))
|
|
return
|
|
}
|
|
oEvent, err := rt.handle(events[0].(Event))
|
|
rt.metrics.EventsHandled.With("routine", rt.name).Add(1)
|
|
if err != nil {
|
|
rt.terminate(err)
|
|
return
|
|
}
|
|
rt.metrics.EventsOut.With("routine", rt.name).Add(1)
|
|
rt.logger.Debug(fmt.Sprintf("%s produced %T %+v\n", rt.name, oEvent, oEvent))
|
|
|
|
rt.out <- oEvent
|
|
}
|
|
}
|
|
|
|
// XXX: look into returning OpError in the net package
|
|
func (rt *Routine) send(event Event) bool {
|
|
rt.logger.Debug(fmt.Sprintf("%s: received %T %+v", rt.name, event, event))
|
|
if !rt.isRunning() {
|
|
return false
|
|
}
|
|
err := rt.queue.Put(event)
|
|
if err != nil {
|
|
rt.metrics.EventsShed.With("routine", rt.name).Add(1)
|
|
rt.logger.Info(fmt.Sprintf("%s: send failed, queue was full/stopped \n", rt.name))
|
|
return false
|
|
}
|
|
rt.metrics.EventsSent.With("routine", rt.name).Add(1)
|
|
return true
|
|
}
|
|
|
|
func (rt *Routine) isRunning() bool {
|
|
return atomic.LoadUint32(rt.running) == 1
|
|
}
|
|
|
|
func (rt *Routine) next() chan Event {
|
|
return rt.out
|
|
}
|
|
|
|
func (rt *Routine) ready() chan struct{} {
|
|
return rt.rdy
|
|
}
|
|
|
|
func (rt *Routine) stop() {
|
|
if !rt.isRunning() { // XXX: this should check rt.queue.Disposed()
|
|
return
|
|
}
|
|
|
|
rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name))
|
|
rt.queue.Dispose() // this should block until all queue items are free?
|
|
}
|
|
|
|
func (rt *Routine) final() chan error {
|
|
return rt.fin
|
|
}
|
|
|
|
// XXX: Maybe get rid of this
|
|
func (rt *Routine) terminate(reason error) {
|
|
close(rt.out)
|
|
rt.fin <- reason
|
|
}
|