diff --git a/blocksync/msgs.go b/blocksync/msgs.go index d68d5f0d5..fe1c87d7f 100644 --- a/blocksync/msgs.go +++ b/blocksync/msgs.go @@ -48,7 +48,10 @@ func DecodeMsg(bz []byte) (proto.Message, error) { if err != nil { return nil, err } + return UnwrapMessage(pb) +} +func UnwrapMessage(pb *bcproto.Message) (proto.Message, error) { switch msg := pb.Sum.(type) { case *bcproto.Message_BlockRequest: return msg.BlockRequest, nil diff --git a/blocksync/reactor.go b/blocksync/reactor.go index c690bded7..5e3082d12 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -214,32 +214,32 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest, } // Receive implements Reactor by handling 4 types of messages (look below). -func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := DecodeMsg(msgBytes) +func (bcR *Reactor) Receive(e p2p.Envelope) { + msg, err := UnwrapMessage(e.Message.(*bcproto.Message)) if err != nil { - bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err) - bcR.Switch.StopPeerForError(src, err) + bcR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err) + bcR.Switch.StopPeerForError(e.Src, err) return } if err = ValidateMsg(msg); err != nil { - bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) - bcR.Switch.StopPeerForError(src, err) + bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err) + bcR.Switch.StopPeerForError(e.Src, err) return } - bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg) + bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", msg) switch msg := msg.(type) { case *bcproto.BlockRequest: - bcR.respondToPeer(msg, src) + bcR.respondToPeer(msg, e.Src) case *bcproto.BlockResponse: bi, err := types.BlockFromProto(msg.Block) if err != nil { bcR.Logger.Error("Block content is invalid", "err", err) return } - bcR.pool.AddBlock(src.ID(), bi, len(msgBytes)) + bcR.pool.AddBlock(e.Src.ID(), bi, 0) // TODO: fix block size calculation case *bcproto.StatusRequest: // Send peer our state. wm, err := wrapMsg(&bcproto.StatusResponse{ @@ -251,15 +251,15 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { return } - src.TrySend(p2p.Envelope{ + e.Src.TrySend(p2p.Envelope{ ChannelID: BlocksyncChannel, Message: wm, }) case *bcproto.StatusResponse: // Got a peer status. Unverified. - bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height) + bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height) case *bcproto.NoBlockResponse: - bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height) + bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height) default: bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) }