blockchain v0: refactor Receive

This commit is contained in:
Aleksandr Bezobchuk
2021-01-04 16:33:17 -05:00
parent a7f935c93a
commit bbf126a79d

View File

@@ -2,9 +2,9 @@ package v0
import (
"fmt"
"reflect"
"time"
"github.com/gogo/protobuf/proto"
bc "github.com/tendermint/tendermint/blockchain"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
@@ -125,7 +125,7 @@ func NewReactor(
closeCh: make(chan struct{}),
}
r.BaseService = *service.NewBaseService(logger, "BlockchainReactor", r)
r.BaseService = *service.NewBaseService(logger, "Blockchain", r)
return r
}
@@ -171,8 +171,85 @@ func (r *Reactor) OnStop() {
<-r.peerUpdates.Done()
}
// respondToPeer loads a block and sends it to the requesting peer, if we have it.
// Otherwise, we'll respond saying we do not have it.
func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID p2p.NodeID) {
block := r.store.LoadBlock(msg.Height)
if block != nil {
blockProto, err := block.ToProto()
if err != nil {
r.Logger.Error("failed to convert msg to protobuf", "err", err)
return
}
r.blockchainCh.Out() <- p2p.Envelope{
To: peerID,
Message: &bcproto.BlockResponse{Block: blockProto},
}
return
}
r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height)
r.blockchainCh.Out() <- p2p.Envelope{
To: peerID,
Message: &bcproto.NoBlockResponse{Height: msg.Height},
}
}
// handleBlockchainMessage handles enevelopes sent from peers on the
// BlockchainChannel. It returns an error only if the Envelope.Message is unknown
// for this channel. This should never be called outside of handleMessage.
func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error {
panic("IMPLEMENT ME!")
switch msg := envelope.Message.(type) {
case *bcproto.BlockRequest:
r.respondToPeer(msg, envelope.From)
case *bcproto.BlockResponse:
block, err := types.BlockFromProto(msg.Block)
if err != nil {
r.Logger.Error("failed to convert block from proto", "err", err)
return err
}
protoMsg := new(bcproto.Message)
if err := protoMsg.Wrap(msg); err != nil {
r.Logger.Error("failed to wrap proto message", "err", err)
return nil
}
bz, err := proto.Marshal(protoMsg)
if err != nil {
r.Logger.Error("failed to proto encode message", "err", err)
return nil
}
r.pool.AddBlock(envelope.From, block, len(bz))
case *bcproto.StatusRequest:
// send peer our state
r.blockchainCh.Out() <- p2p.Envelope{
To: envelope.From,
Message: &bcproto.StatusResponse{
Height: r.store.Height(),
Base: r.store.Base(),
},
}
case *bcproto.StatusResponse:
// received an unverified peer status
r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
r.Logger.Debug("peer does not have the requested block", "height", msg.Height)
default:
r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From)
return fmt.Errorf("received unknown message: %T", msg)
}
return nil
}
// handleMessage handles an Envelope sent from a peer on a specific p2p Channel.
@@ -209,6 +286,7 @@ func (r *Reactor) processBlockchainCh() {
select {
case envelope := <-r.blockchainCh.In():
if err := r.handleMessage(r.blockchainCh.ID(), envelope); err != nil {
r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID(), "envelope", envelope, "err", err)
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: envelope.From,
Err: err,
@@ -237,6 +315,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) {
switch peerUpdate.Status {
case p2p.PeerStatusNew, p2p.PeerStatusUp:
// send a status update the newly added peer
r.blockchainCh.Out() <- p2p.Envelope{
To: peerUpdate.PeerID,
Message: &bcproto.StatusResponse{
@@ -288,92 +367,6 @@ func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error {
return nil
}
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest,
src p2p.Peer) (queued bool) {
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
bl, err := block.ToProto()
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
}
return src.TrySend(BlockchainChannel, msgBytes)
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
return src.TrySend(BlockchainChannel, msgBytes)
}
// Receive implements Reactor by handling 4 types of messages (look below).
// XXX: do not call any methods that can block or incur heavy processing.
// https://github.com/tendermint/tendermint/issues/2888
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
logger := bcR.Logger.With("src", src, "chId", chID)
msg, err := bc.DecodeMsg(msgBytes)
if err != nil {
logger.Error("Error decoding message", "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
if err = bc.ValidateMsg(msg); err != nil {
logger.Error("Peer sent us invalid msg", "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
logger.Debug("Receive", "msg", msg)
switch msg := msg.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
logger.Error("Block content is invalid", "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
case *bcproto.StatusRequest:
// Send peer our state.
msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
})
if err != nil {
logger.Error("could not convert msg to protobut", "err", err)
return
}
src.TrySend(BlockchainChannel, msgBytes)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
logger.Debug("Peer does not have requested block", "height", msg.Height)
default:
logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (r *Reactor) poolRoutine(stateSynced bool) {