From bbf126a79d6f82e367fda4f21bf8a2703478091f Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Mon, 4 Jan 2021 16:33:17 -0500 Subject: [PATCH] blockchain v0: refactor Receive --- blockchain/v0/reactor.go | 171 +++++++++++++++++++-------------------- 1 file changed, 82 insertions(+), 89 deletions(-) diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index d257b6f1b..591dd3a37 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -2,9 +2,9 @@ package v0 import ( "fmt" - "reflect" "time" + "github.com/gogo/protobuf/proto" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/service" @@ -125,7 +125,7 @@ func NewReactor( closeCh: make(chan struct{}), } - r.BaseService = *service.NewBaseService(logger, "BlockchainReactor", r) + r.BaseService = *service.NewBaseService(logger, "Blockchain", r) return r } @@ -171,8 +171,85 @@ func (r *Reactor) OnStop() { <-r.peerUpdates.Done() } +// respondToPeer loads a block and sends it to the requesting peer, if we have it. +// Otherwise, we'll respond saying we do not have it. +func (r *Reactor) respondToPeer(msg *bcproto.BlockRequest, peerID p2p.NodeID) { + block := r.store.LoadBlock(msg.Height) + if block != nil { + blockProto, err := block.ToProto() + if err != nil { + r.Logger.Error("failed to convert msg to protobuf", "err", err) + return + } + + r.blockchainCh.Out() <- p2p.Envelope{ + To: peerID, + Message: &bcproto.BlockResponse{Block: blockProto}, + } + + return + } + + r.Logger.Info("peer requesting a block we do not have", "peer", peerID, "height", msg.Height) + r.blockchainCh.Out() <- p2p.Envelope{ + To: peerID, + Message: &bcproto.NoBlockResponse{Height: msg.Height}, + } +} + +// handleBlockchainMessage handles enevelopes sent from peers on the +// BlockchainChannel. It returns an error only if the Envelope.Message is unknown +// for this channel. This should never be called outside of handleMessage. func (r *Reactor) handleBlockchainMessage(envelope p2p.Envelope) error { - panic("IMPLEMENT ME!") + switch msg := envelope.Message.(type) { + case *bcproto.BlockRequest: + r.respondToPeer(msg, envelope.From) + + case *bcproto.BlockResponse: + block, err := types.BlockFromProto(msg.Block) + if err != nil { + r.Logger.Error("failed to convert block from proto", "err", err) + return err + } + + protoMsg := new(bcproto.Message) + if err := protoMsg.Wrap(msg); err != nil { + r.Logger.Error("failed to wrap proto message", "err", err) + return nil + } + + bz, err := proto.Marshal(protoMsg) + if err != nil { + r.Logger.Error("failed to proto encode message", "err", err) + return nil + } + + r.pool.AddBlock(envelope.From, block, len(bz)) + + case *bcproto.StatusRequest: + // send peer our state + r.blockchainCh.Out() <- p2p.Envelope{ + To: envelope.From, + Message: &bcproto.StatusResponse{ + Height: r.store.Height(), + Base: r.store.Base(), + }, + } + + case *bcproto.StatusResponse: + // received an unverified peer status + r.pool.SetPeerRange(envelope.From, msg.Base, msg.Height) + + case *bcproto.NoBlockResponse: + r.Logger.Debug("peer does not have the requested block", "height", msg.Height) + + default: + r.Logger.Error("received unknown message", "msg", msg, "peer", envelope.From) + return fmt.Errorf("received unknown message: %T", msg) + + } + + return nil } // handleMessage handles an Envelope sent from a peer on a specific p2p Channel. @@ -209,6 +286,7 @@ func (r *Reactor) processBlockchainCh() { select { case envelope := <-r.blockchainCh.In(): if err := r.handleMessage(r.blockchainCh.ID(), envelope); err != nil { + r.Logger.Error("failed to process message", "ch_id", r.blockchainCh.ID(), "envelope", envelope, "err", err) r.blockchainCh.Error() <- p2p.PeerError{ PeerID: envelope.From, Err: err, @@ -237,6 +315,7 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) (err error) { switch peerUpdate.Status { case p2p.PeerStatusNew, p2p.PeerStatusUp: + // send a status update the newly added peer r.blockchainCh.Out() <- p2p.Envelope{ To: peerUpdate.PeerID, Message: &bcproto.StatusResponse{ @@ -288,92 +367,6 @@ func (bcR *BlockchainReactor) SwitchToFastSync(state sm.State) error { return nil } -// respondToPeer loads a block and sends it to the requesting peer, -// if we have it. Otherwise, we'll respond saying we don't have it. -func (bcR *BlockchainReactor) respondToPeer(msg *bcproto.BlockRequest, - src p2p.Peer) (queued bool) { - - block := bcR.store.LoadBlock(msg.Height) - if block != nil { - bl, err := block.ToProto() - if err != nil { - bcR.Logger.Error("could not convert msg to protobuf", "err", err) - return false - } - - msgBytes, err := bc.EncodeMsg(&bcproto.BlockResponse{Block: bl}) - if err != nil { - bcR.Logger.Error("could not marshal msg", "err", err) - return false - } - - return src.TrySend(BlockchainChannel, msgBytes) - } - - bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) - - msgBytes, err := bc.EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height}) - if err != nil { - bcR.Logger.Error("could not convert msg to protobuf", "err", err) - return false - } - - return src.TrySend(BlockchainChannel, msgBytes) -} - -// Receive implements Reactor by handling 4 types of messages (look below). -// XXX: do not call any methods that can block or incur heavy processing. -// https://github.com/tendermint/tendermint/issues/2888 -func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - logger := bcR.Logger.With("src", src, "chId", chID) - - msg, err := bc.DecodeMsg(msgBytes) - if err != nil { - logger.Error("Error decoding message", "err", err) - bcR.Switch.StopPeerForError(src, err) - return - } - - if err = bc.ValidateMsg(msg); err != nil { - logger.Error("Peer sent us invalid msg", "msg", msg, "err", err) - bcR.Switch.StopPeerForError(src, err) - return - } - - logger.Debug("Receive", "msg", msg) - - switch msg := msg.(type) { - case *bcproto.BlockRequest: - bcR.respondToPeer(msg, src) - case *bcproto.BlockResponse: - bi, err := types.BlockFromProto(msg.Block) - if err != nil { - logger.Error("Block content is invalid", "err", err) - bcR.Switch.StopPeerForError(src, err) - return - } - bcR.pool.AddBlock(src.ID(), bi, len(msgBytes)) - case *bcproto.StatusRequest: - // Send peer our state. - msgBytes, err := bc.EncodeMsg(&bcproto.StatusResponse{ - Height: bcR.store.Height(), - Base: bcR.store.Base(), - }) - if err != nil { - logger.Error("could not convert msg to protobut", "err", err) - return - } - src.TrySend(BlockchainChannel, msgBytes) - case *bcproto.StatusResponse: - // Got a peer status. Unverified. - bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height) - case *bcproto.NoBlockResponse: - logger.Debug("Peer does not have requested block", "height", msg.Height) - default: - logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) - } -} - // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! func (r *Reactor) poolRoutine(stateSynced bool) {