blockchain v0: update poolRoutine

This commit is contained in:
Aleksandr Bezobchuk
2021-01-05 11:13:14 -05:00
parent 7a639e5297
commit faa0e09f17

View File

@@ -56,8 +56,8 @@ const (
)
type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
// For when we switch from blockchain reactor and fast sync to the consensus
// machine.
SwitchToConsensus(state sm.State, skipWAL bool)
}
@@ -77,10 +77,11 @@ type Reactor struct {
// immutable
initialState sm.State
blockExec *sm.BlockExecutor
store *store.BlockStore
pool *BlockPool
fastSync bool
blockExec *sm.BlockExecutor
store *store.BlockStore
pool *BlockPool
consReactor consensusReactor
fastSync bool
blockchainCh *p2p.Channel
peerUpdates *p2p.PeerUpdatesCh
@@ -96,6 +97,7 @@ func NewReactor(
state sm.State,
blockExec *sm.BlockExecutor,
store *store.BlockStore,
consReactor consensusReactor,
blockchainCh *p2p.Channel,
peerUpdates *p2p.PeerUpdatesCh,
fastSync bool,
@@ -117,6 +119,7 @@ func NewReactor(
blockExec: blockExec,
store: store,
pool: NewBlockPool(startHeight, requestsCh, errorsCh),
consReactor: consReactor,
fastSync: fastSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
@@ -357,7 +360,8 @@ func (r *Reactor) BroadcastStatusRequest() {
}
}
// SwitchToFastSync is called by the state sync reactor when switching to fast sync.
// SwitchToFastSync is called by the state sync reactor when switching to fast
// sync.
func (r *Reactor) SwitchToFastSync(state sm.State) error {
r.fastSync = true
r.initialState = state
@@ -371,11 +375,8 @@ func (r *Reactor) SwitchToFastSync(state sm.State) error {
return nil
}
// ============================================================================
// ============================================================================
// ============================================================================
// Handle messages from the poolReactor telling the reactor what to do.
// poolRoutine handles messages from the poolReactor telling the reactor what to
// do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (r *Reactor) poolRoutine(stateSynced bool) {
var (
@@ -385,8 +386,8 @@ func (r *Reactor) poolRoutine(stateSynced bool) {
blocksSynced = uint64(0)
chainID = bcR.initialState.ChainID
state = bcR.initialState
chainID = r.initialState.ChainID
state = r.initialState
lastHundred = time.Now()
lastRate = 0.0
@@ -394,42 +395,36 @@ func (r *Reactor) poolRoutine(stateSynced bool) {
didProcessCh = make(chan struct{}, 1)
)
defer trySyncTicker.Stop()
defer statusUpdateTicker.Stop()
defer switchToConsensusTicker.Stop()
go func() {
for {
select {
case <-bcR.Quit():
case <-r.Quit():
return
case <-bcR.pool.Quit():
case <-r.pool.Quit():
return
case request := <-bcR.requestsCh:
peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil {
bcR.Logger.Debug("Can't send request: no peer", "peer_id", request.PeerID)
continue
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert BlockRequest to proto", "err", err)
continue
case request := <-r.requestsCh:
r.blockchainCh.Out() <- p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
case err := <-bcR.errorsCh:
peer := bcR.Switch.Peers().Get(err.peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, err)
case pErr := <-r.errorsCh:
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: pErr.peerID,
Err: pErr.err,
Severity: p2p.PeerErrorSeverityLow,
}
case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest()
go r.BroadcastStatusRequest()
}
}
}()
@@ -440,36 +435,35 @@ FOR_LOOP:
case <-switchToConsensusTicker.C:
var (
height, numPending, lenRequesters = bcR.pool.GetStatus()
outbound, inbound, _ = bcR.Switch.NumPeers()
lastAdvance = bcR.pool.LastAdvance()
height, numPending, lenRequesters = r.pool.GetStatus()
lastAdvance = r.pool.LastAdvance()
)
bcR.Logger.Debug("Consensus ticker",
"numPending", numPending,
"total", lenRequesters)
r.Logger.Debug("consensus ticker", "num_pending", numPending, "total", lenRequesters)
switch {
case bcR.pool.IsCaughtUp():
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
case r.pool.IsCaughtUp():
r.Logger.Info("switching to consensus reactor", "height", height)
case time.Since(lastAdvance) > syncTimeout:
bcR.Logger.Error(fmt.Sprintf("No progress since last advance: %v", lastAdvance))
r.Logger.Error("no progress since last advance", "last_advance", lastAdvance)
default:
bcR.Logger.Info("Not caught up yet",
"height", height, "max_peer_height", bcR.pool.MaxPeerHeight(),
"num_peers", outbound+inbound,
"timeout_in", syncTimeout-time.Since(lastAdvance))
r.Logger.Info(
"not caught up yet",
"height", height,
"max_peer_height", r.pool.MaxPeerHeight(),
"timeout_in", syncTimeout-time.Since(lastAdvance),
)
continue
}
if err := bcR.pool.Stop(); err != nil {
bcR.Logger.Error("Error stopping pool", "err", err)
}
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
if err := r.pool.Stop(); err != nil {
r.Logger.Error("failed to stop pool", "err", err)
}
r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced)
break FOR_LOOP
case <-trySyncTicker.C: // chan time
@@ -487,14 +481,13 @@ FOR_LOOP:
// coupling them as it's written here. TODO uncouple from request
// routine.
// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
// bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
// see if there are any blocks to sync
first, second := r.pool.PeekTwoBlocks()
if first == nil || second == nil {
// We need both to sync the first block.
// we need both to sync the first block
continue FOR_LOOP
} else {
// Try again quickly next loop.
// try again quickly next loop
didProcessCh <- struct{}{}
}
@@ -511,51 +504,67 @@ FOR_LOOP:
err := state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit)
if err != nil {
err = fmt.Errorf("invalid last commit: %w", err)
bcR.Logger.Error(err.Error(),
"last_commit", second.LastCommit, "block_id", firstID, "height", first.Height)
r.Logger.Error(
err.Error(),
"last_commit", second.LastCommit,
"block_id", firstID,
"height", first.Height,
)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
// NOTE: we've already removed the peer's request, but we still need
// to clean up the rest.
bcR.Switch.StopPeerForError(peer, err)
// NOTE: we've already removed the peer's request, but we still need
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: peerID,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
}
peerID2 := bcR.pool.RedoRequest(second.Height)
peerID2 := r.pool.RedoRequest(second.Height)
// TODO: Use !peerID.Equal(peerID2)
if peerID2 != peerID {
if peer2 := bcR.Switch.Peers().Get(peerID2); peer2 != nil {
bcR.Switch.StopPeerForError(peer2, err)
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: peerID2,
Err: err,
Severity: p2p.PeerErrorSeverityLow,
}
}
continue FOR_LOOP
} else {
bcR.pool.PopRequest()
r.pool.PopRequest()
// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: batch saves so we do not persist to disk every block
r.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: same thing for app - but we would need a way to get the hash
// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.
var err error
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
state, _, err = r.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
// TODO: This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
blocksSynced++
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate",
"height", bcR.pool.height, "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
r.Logger.Info(
"fast sync rate",
"height", r.pool.height,
"max_peer_height", r.pool.MaxPeerHeight(),
"blocks/s", lastRate,
)
lastHundred = time.Now()
}
}
continue FOR_LOOP
case <-bcR.Quit():
case <-r.Quit():
break FOR_LOOP
}
}