blockchain v0: cleanup

This commit is contained in:
Aleksandr Bezobchuk
2021-01-06 10:15:35 -05:00
parent 537464d26a
commit 952cfaa174

View File

@@ -230,7 +230,6 @@ func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
r.pool.AddBlock(envelope.From, block, len(bz))
case *bcproto.StatusRequest:
// send peer our state
r.blockchainCh.Out() <- p2p.Envelope{
To: envelope.From,
Message: &bcproto.StatusResponse{
@@ -240,11 +239,14 @@ func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
}
case *bcproto.StatusResponse:
// received an unverified peer status
r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
r.Logger.Debug("peer does not have the requested block", "height", msg.Height)
r.Logger.Debug(
"peer does not have the requested block",
"height", msg.Height,
"peer", envelope.From,
)
default:
r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From)
@@ -306,14 +308,7 @@ func (r *Reactor) processBlockchainCh() {
// processPeerUpdate processes a PeerUpdate, returning an error upon failing to
// handle the PeerUpdate or if a panic is recovered.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("panic in processing peer update: %v", e)
r.Logger.Error("recovering from processing peer update panic", "err", err)
}
}()
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
r.Logger.Debug("received peer update", "peer", peerUpdate.PeerID, "status", peerUpdate.Status)
switch peerUpdate.Status {
@@ -330,8 +325,6 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) {
case p2p.PeerStatusDown, p2p.PeerStatusRemoved, p2p.PeerStatusBanned:
r.pool.RemovePeer(peerUpdate.PeerID)
}
return err
}
// processPeerUpdates initiates a blocking process where we listen for and handle
@@ -343,7 +336,7 @@ func (r *Reactor) processPeerUpdates() {
for {
select {
case peerUpdate := <-r.peerUpdates.Updates():
_ = r.processPeerUpdate(peerUpdate)
r.processPeerUpdate(peerUpdate)
case <-r.closeCh:
r.Logger.Debug("stopped listening on peer updates channel; closing...")
@@ -377,6 +370,7 @@ func (r *Reactor) SwitchToFastSync(state sm.State) error {
// poolRoutine handles messages from the poolReactor telling the reactor what to
// do.
//
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (r *Reactor) poolRoutine(stateSynced bool) {
var (
@@ -432,7 +426,6 @@ func (r *Reactor) poolRoutine(stateSynced bool) {
FOR_LOOP:
for {
select {
case <-switchToConsensusTicker.C:
var (
height, numPending, lenRequesters = r.pool.GetStatus()
@@ -473,13 +466,14 @@ FOR_LOOP:
}
case <-didProcessCh:
// NOTE: It is a subtle mistake to process more than a single block
// at a time (e.g. 10) here, because we only TrySend 1 request per
// loop. The ratio mismatch can result in starving of blocks, a
// sudden burst of requests and responses, and repeat.
// Consequently, it is better to split these routines rather than
// coupling them as it's written here. TODO uncouple from request
// routine.
// NOTE: It is a subtle mistake to process more than a single block at a
// time (e.g. 10) here, because we only send one BlockRequest per loop
// iteration. The ratio mismatch can result in starving of blocks, i.e. a
// sudden burst of requests and responses, and repeat. Consequently, it is
// better to split these routines rather than coupling them as it is
// written here.
//
// TODO: uncouple from request routine.
// see if there are any blocks to sync
first, second := r.pool.PeekTwoBlocks()
@@ -497,8 +491,9 @@ FOR_LOOP:
firstID = types.BlockID{Hash: first.Hash(), PartSetHeader: firstPartSetHeader}
)
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// Finally, verify the first block using the second's commit.
//
// NOTE: We can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := state.Validators.VerifyCommitLight(chainID, firstID, first.Height, second.LastCommit)
@@ -511,7 +506,7 @@ FOR_LOOP:
"height", first.Height,
)
// NOTE: we've already removed the peer's request, but we still need
// NOTE: We've already removed the peer's request, but we still need
// to clean up the rest.
peerID := r.pool.RedoRequest(first.Height)
r.blockchainCh.Error() <- p2p.PeerError{
@@ -521,7 +516,6 @@ FOR_LOOP:
}
peerID2 := r.pool.RedoRequest(second.Height)
// TODO: Use !peerID.Equal(peerID2)
if peerID2 != peerID {
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: peerID2,
@@ -544,7 +538,7 @@ FOR_LOOP:
state, _, err = r.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO: This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
blocksSynced++