From faa0e09f174a2be4ab4a06b1dcb27d0dcd7c34e3 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Tue, 5 Jan 2021 11:13:14 -0500 Subject: [PATCH] blockchain v0: update poolRoutine --- blockchain/v0/reactor.go | 169 +++++++++++++++++++++------------------ 1 file changed, 89 insertions(+), 80 deletions(-) diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index a597bdd0c..ac1e5cfd5 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -56,8 +56,8 @@ const ( ) type consensusReactor interface { - // for when we switch from blockchain reactor and fast sync to - // the consensus machine + // For when we switch from blockchain reactor and fast sync to the consensus + // machine. SwitchToConsensus(state sm.State, skipWAL bool) } @@ -77,10 +77,11 @@ type Reactor struct { // immutable initialState sm.State - blockExec *sm.BlockExecutor - store *store.BlockStore - pool *BlockPool - fastSync bool + blockExec *sm.BlockExecutor + store *store.BlockStore + pool *BlockPool + consReactor consensusReactor + fastSync bool blockchainCh *p2p.Channel peerUpdates *p2p.PeerUpdatesCh @@ -96,6 +97,7 @@ func NewReactor( state sm.State, blockExec *sm.BlockExecutor, store *store.BlockStore, + consReactor consensusReactor, blockchainCh *p2p.Channel, peerUpdates *p2p.PeerUpdatesCh, fastSync bool, @@ -117,6 +119,7 @@ func NewReactor( blockExec: blockExec, store: store, pool: NewBlockPool(startHeight, requestsCh, errorsCh), + consReactor: consReactor, fastSync: fastSync, requestsCh: requestsCh, errorsCh: errorsCh, @@ -357,7 +360,8 @@ func (r *Reactor) BroadcastStatusRequest() { } } -// SwitchToFastSync is called by the state sync reactor when switching to fast sync. +// SwitchToFastSync is called by the state sync reactor when switching to fast +// sync. func (r *Reactor) SwitchToFastSync(state sm.State) error { r.fastSync = true r.initialState = state @@ -371,11 +375,8 @@ func (r *Reactor) SwitchToFastSync(state sm.State) error { return nil } -// ============================================================================ -// ============================================================================ -// ============================================================================ - -// Handle messages from the poolReactor telling the reactor what to do. +// poolRoutine handles messages from the poolReactor telling the reactor what to +// do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! func (r *Reactor) poolRoutine(stateSynced bool) { var ( @@ -385,8 +386,8 @@ func (r *Reactor) poolRoutine(stateSynced bool) { blocksSynced = uint64(0) - chainID = bcR.initialState.ChainID - state = bcR.initialState + chainID = r.initialState.ChainID + state = r.initialState lastHundred = time.Now() lastRate = 0.0 @@ -394,42 +395,36 @@ func (r *Reactor) poolRoutine(stateSynced bool) { didProcessCh = make(chan struct{}, 1) ) + defer trySyncTicker.Stop() + defer statusUpdateTicker.Stop() + defer switchToConsensusTicker.Stop() + go func() { for { select { - case <-bcR.Quit(): + case <-r.Quit(): return - case <-bcR.pool.Quit(): + case <-r.pool.Quit(): return - case request := <-bcR.requestsCh: - peer := bcR.Switch.Peers().Get(request.PeerID) - if peer == nil { - bcR.Logger.Debug("Can't send request: no peer", "peer_id", request.PeerID) - continue - } - msgBytes, err := bc.EncodeMsg(&bcproto.BlockRequest{Height: request.Height}) - if err != nil { - bcR.Logger.Error("could not convert BlockRequest to proto", "err", err) - continue + case request := <-r.requestsCh: + r.blockchainCh.Out() <- p2p.Envelope{ + To: request.PeerID, + Message: &bcproto.BlockRequest{Height: request.Height}, } - queued := peer.TrySend(BlockchainChannel, msgBytes) - if !queued { - bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height) - } - - case err := <-bcR.errorsCh: - peer := bcR.Switch.Peers().Get(err.peerID) - if peer != nil { - bcR.Switch.StopPeerForError(peer, err) + case pErr := <-r.errorsCh: + r.blockchainCh.Error() <- p2p.PeerError{ + PeerID: pErr.peerID, + Err: pErr.err, + Severity: p2p.PeerErrorSeverityLow, } case <-statusUpdateTicker.C: // ask for status updates - go bcR.BroadcastStatusRequest() + go r.BroadcastStatusRequest() } } }() @@ -440,36 +435,35 @@ FOR_LOOP: case <-switchToConsensusTicker.C: var ( - height, numPending, lenRequesters = bcR.pool.GetStatus() - outbound, inbound, _ = bcR.Switch.NumPeers() - lastAdvance = bcR.pool.LastAdvance() + height, numPending, lenRequesters = r.pool.GetStatus() + lastAdvance = r.pool.LastAdvance() ) - bcR.Logger.Debug("Consensus ticker", - "numPending", numPending, - "total", lenRequesters) + r.Logger.Debug("consensus ticker", "num_pending", numPending, "total", lenRequesters) switch { - case bcR.pool.IsCaughtUp(): - bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) + case r.pool.IsCaughtUp(): + r.Logger.Info("switching to consensus reactor", "height", height) + case time.Since(lastAdvance) > syncTimeout: - bcR.Logger.Error(fmt.Sprintf("No progress since last advance: %v", lastAdvance)) + r.Logger.Error("no progress since last advance", "last_advance", lastAdvance) + default: - bcR.Logger.Info("Not caught up yet", - "height", height, "max_peer_height", bcR.pool.MaxPeerHeight(), - "num_peers", outbound+inbound, - "timeout_in", syncTimeout-time.Since(lastAdvance)) + r.Logger.Info( + "not caught up yet", + "height", height, + "max_peer_height", r.pool.MaxPeerHeight(), + "timeout_in", syncTimeout-time.Since(lastAdvance), + ) continue } - if err := bcR.pool.Stop(); err != nil { - bcR.Logger.Error("Error stopping pool", "err", err) - } - conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - if ok { - conR.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) + if err := r.pool.Stop(); err != nil { + r.Logger.Error("failed to stop pool", "err", err) } + r.consReactor.SwitchToConsensus(state, blocksSynced > 0 || stateSynced) + break FOR_LOOP case <-trySyncTicker.C: // chan time @@ -487,14 +481,13 @@ FOR_LOOP: // coupling them as it's written here. TODO uncouple from request // routine. - // See if there are any blocks to sync. - first, second := bcR.pool.PeekTwoBlocks() - // bcR.Logger.Info("TrySync peeked", "first", first, "second", second) + // see if there are any blocks to sync + first, second := r.pool.PeekTwoBlocks() if first == nil || second == nil { - // We need both to sync the first block. + // we need both to sync the first block continue FOR_LOOP } else { - // Try again quickly next loop. + // try again quickly next loop didProcessCh <- struct{}{} } @@ -511,51 +504,67 @@ FOR_LOOP: err := state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit) if err != nil { err = fmt.Errorf("invalid last commit: %w", err) - bcR.Logger.Error(err.Error(), - "last_commit", second.LastCommit, "block_id", firstID, "height", first.Height) + r.Logger.Error( + err.Error(), + "last_commit", second.LastCommit, + "block_id", firstID, + "height", first.Height, + ) - peerID := bcR.pool.RedoRequest(first.Height) - peer := bcR.Switch.Peers().Get(peerID) - if peer != nil { - // NOTE: we've already removed the peer's request, but we still need - // to clean up the rest. - bcR.Switch.StopPeerForError(peer, err) + // NOTE: we've already removed the peer's request, but we still need + // to clean up the rest. + peerID := r.pool.RedoRequest(first.Height) + r.blockchainCh.Error() <- p2p.PeerError{ + PeerID: peerID, + Err: err, + Severity: p2p.PeerErrorSeverityLow, } - peerID2 := bcR.pool.RedoRequest(second.Height) + peerID2 := r.pool.RedoRequest(second.Height) + // TODO: Use !peerID.Equal(peerID2) if peerID2 != peerID { - if peer2 := bcR.Switch.Peers().Get(peerID2); peer2 != nil { - bcR.Switch.StopPeerForError(peer2, err) + r.blockchainCh.Error() <- p2p.PeerError{ + PeerID: peerID2, + Err: err, + Severity: p2p.PeerErrorSeverityLow, } } continue FOR_LOOP } else { - bcR.pool.PopRequest() + r.pool.PopRequest() - // TODO: batch saves so we dont persist to disk every block - bcR.store.SaveBlock(first, firstParts, second.LastCommit) + // TODO: batch saves so we do not persist to disk every block + r.store.SaveBlock(first, firstParts, second.LastCommit) - // TODO: same thing for app - but we would need a way to get the hash + // TODO: Same thing for app - but we would need a way to get the hash // without persisting the state. var err error - state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first) + + state, _, err = r.blockExec.ApplyBlock(state, firstID, first) if err != nil { - // TODO This is bad, are we zombie? + // TODO: This is bad, are we zombie? panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } + blocksSynced++ if blocksSynced%100 == 0 { lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) - bcR.Logger.Info("Fast Sync Rate", - "height", bcR.pool.height, "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) + r.Logger.Info( + "fast sync rate", + "height", r.pool.height, + "max_peer_height", r.pool.MaxPeerHeight(), + "blocks/s", lastRate, + ) + lastHundred = time.Now() } } + continue FOR_LOOP - case <-bcR.Quit(): + case <-r.Quit(): break FOR_LOOP } }