mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-04 12:13:57 +00:00
When a peer is stopped due to some network issue, the Reactor calls scheduler#handleRemovePeer, which removes the peer from the scheduler. BUT the peer stays in the processor, which sometimes could lead to "duplicate block enqueued by processor" panic WHEN the same block is requested by the scheduler again from a different peer. The solution is to return scPeerError, which will be propagated to the processor. The processor will clean up the blocks associated with the peer in purgePeer. Closes #5513, #5517
193 lines
5.0 KiB
Go
193 lines
5.0 KiB
Go
package v2
|
|
|
|
import (
|
|
"fmt"
|
|
|
|
"github.com/tendermint/tendermint/p2p"
|
|
tmState "github.com/tendermint/tendermint/state"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
// Events generated by the processor:
|
|
// block execution failure, event will indicate the peer(s) that caused the error
|
|
type pcBlockVerificationFailure struct {
|
|
priorityNormal
|
|
height int64
|
|
firstPeerID p2p.ID
|
|
secondPeerID p2p.ID
|
|
}
|
|
|
|
func (e pcBlockVerificationFailure) String() string {
|
|
return fmt.Sprintf("pcBlockVerificationFailure{%d 1st peer: %v, 2nd peer: %v}",
|
|
e.height, e.firstPeerID, e.secondPeerID)
|
|
}
|
|
|
|
// successful block execution
|
|
type pcBlockProcessed struct {
|
|
priorityNormal
|
|
height int64
|
|
peerID p2p.ID
|
|
}
|
|
|
|
func (e pcBlockProcessed) String() string {
|
|
return fmt.Sprintf("pcBlockProcessed{%d peer: %v}", e.height, e.peerID)
|
|
}
|
|
|
|
// processor has finished
|
|
type pcFinished struct {
|
|
priorityNormal
|
|
blocksSynced int
|
|
tmState tmState.State
|
|
}
|
|
|
|
func (p pcFinished) Error() string {
|
|
return "finished"
|
|
}
|
|
|
|
type queueItem struct {
|
|
block *types.Block
|
|
peerID p2p.ID
|
|
}
|
|
|
|
type blockQueue map[int64]queueItem
|
|
|
|
type pcState struct {
|
|
// blocks waiting to be processed
|
|
queue blockQueue
|
|
|
|
// draining indicates that the next rProcessBlock event with a queue miss constitutes completion
|
|
draining bool
|
|
|
|
// the number of blocks successfully synced by the processor
|
|
blocksSynced int
|
|
|
|
// the processorContext which contains the processor dependencies
|
|
context processorContext
|
|
}
|
|
|
|
func (state *pcState) String() string {
|
|
return fmt.Sprintf("height: %d queue length: %d draining: %v blocks synced: %d",
|
|
state.height(), len(state.queue), state.draining, state.blocksSynced)
|
|
}
|
|
|
|
// newPcState returns a pcState initialized with the last verified block enqueued
|
|
func newPcState(context processorContext) *pcState {
|
|
return &pcState{
|
|
queue: blockQueue{},
|
|
draining: false,
|
|
blocksSynced: 0,
|
|
context: context,
|
|
}
|
|
}
|
|
|
|
// nextTwo returns the next two unverified blocks
|
|
func (state *pcState) nextTwo() (queueItem, queueItem, error) {
|
|
if first, ok := state.queue[state.height()+1]; ok {
|
|
if second, ok := state.queue[state.height()+2]; ok {
|
|
return first, second, nil
|
|
}
|
|
}
|
|
return queueItem{}, queueItem{}, fmt.Errorf("not found")
|
|
}
|
|
|
|
// synced returns true when at most the last verified block remains in the queue
|
|
func (state *pcState) synced() bool {
|
|
return len(state.queue) <= 1
|
|
}
|
|
|
|
func (state *pcState) enqueue(peerID p2p.ID, block *types.Block, height int64) {
|
|
if item, ok := state.queue[height]; ok {
|
|
panic(fmt.Sprintf(
|
|
"duplicate block %d (%X) enqueued by processor (sent by %v; existing block %X from %v)",
|
|
height, block.Hash(), peerID, item.block.Hash(), item.peerID))
|
|
}
|
|
|
|
state.queue[height] = queueItem{block: block, peerID: peerID}
|
|
}
|
|
|
|
func (state *pcState) height() int64 {
|
|
return state.context.tmState().LastBlockHeight
|
|
}
|
|
|
|
// purgePeer moves all unprocessed blocks from the queue
|
|
func (state *pcState) purgePeer(peerID p2p.ID) {
|
|
// what if height is less than state.height?
|
|
for height, item := range state.queue {
|
|
if item.peerID == peerID {
|
|
delete(state.queue, height)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handle processes FSM events
|
|
func (state *pcState) handle(event Event) (Event, error) {
|
|
switch event := event.(type) {
|
|
case bcResetState:
|
|
state.context.setState(event.state)
|
|
return noOp, nil
|
|
|
|
case scFinishedEv:
|
|
if state.synced() {
|
|
return pcFinished{tmState: state.context.tmState(), blocksSynced: state.blocksSynced}, nil
|
|
}
|
|
state.draining = true
|
|
return noOp, nil
|
|
|
|
case scPeerError:
|
|
state.purgePeer(event.peerID)
|
|
return noOp, nil
|
|
|
|
case scBlockReceived:
|
|
if event.block == nil {
|
|
return noOp, nil
|
|
}
|
|
|
|
// enqueue block if height is higher than state height, else ignore it
|
|
if event.block.Height > state.height() {
|
|
state.enqueue(event.peerID, event.block, event.block.Height)
|
|
}
|
|
return noOp, nil
|
|
|
|
case rProcessBlock:
|
|
tmState := state.context.tmState()
|
|
firstItem, secondItem, err := state.nextTwo()
|
|
if err != nil {
|
|
if state.draining {
|
|
return pcFinished{tmState: tmState, blocksSynced: state.blocksSynced}, nil
|
|
}
|
|
return noOp, nil
|
|
}
|
|
|
|
var (
|
|
first, second = firstItem.block, secondItem.block
|
|
firstParts = first.MakePartSet(types.BlockPartSizeBytes)
|
|
firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstParts.Header()}
|
|
)
|
|
|
|
// verify if +second+ last commit "confirms" +first+ block
|
|
err = state.context.verifyCommit(tmState.ChainID, firstID, first.Height, second.LastCommit)
|
|
if err != nil {
|
|
state.purgePeer(firstItem.peerID)
|
|
if firstItem.peerID != secondItem.peerID {
|
|
state.purgePeer(secondItem.peerID)
|
|
}
|
|
return pcBlockVerificationFailure{
|
|
height: first.Height, firstPeerID: firstItem.peerID, secondPeerID: secondItem.peerID},
|
|
nil
|
|
}
|
|
|
|
state.context.saveBlock(first, firstParts, second.LastCommit)
|
|
|
|
if err := state.context.applyBlock(firstID, first); err != nil {
|
|
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
|
|
}
|
|
|
|
delete(state.queue, first.Height)
|
|
state.blocksSynced++
|
|
|
|
return pcBlockProcessed{height: first.Height, peerID: firstItem.peerID}, nil
|
|
}
|
|
|
|
return noOp, nil
|
|
}
|