update blocksync receive method

This commit is contained in:
William Banfield
2022-10-20 13:56:25 -04:00
parent e18106eeda
commit d0bfcef698
2 changed files with 15 additions and 12 deletions

View File

@@ -48,7 +48,10 @@ func DecodeMsg(bz []byte) (proto.Message, error) {
if err != nil {
return nil, err
}
return UnwrapMessage(pb)
}
func UnwrapMessage(pb *bcproto.Message) (proto.Message, error) {
switch msg := pb.Sum.(type) {
case *bcproto.Message_BlockRequest:
return msg.BlockRequest, nil

View File

@@ -214,32 +214,32 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := DecodeMsg(msgBytes)
func (bcR *Reactor) Receive(e p2p.Envelope) {
msg, err := UnwrapMessage(e.Message.(*bcproto.Message))
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "err", err)
bcR.Switch.StopPeerForError(src, err)
bcR.Logger.Error("Error decoding message", "src", e.Src, "chId", e.ChannelID, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}
if err = ValidateMsg(msg); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
bcR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(e.Src, err)
return
}
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
bcR.Logger.Debug("Receive", "e.Src", e.Src, "chID", e.ChannelID, "msg", msg)
switch msg := msg.(type) {
case *bcproto.BlockRequest:
bcR.respondToPeer(msg, src)
bcR.respondToPeer(msg, e.Src)
case *bcproto.BlockResponse:
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
bcR.Logger.Error("Block content is invalid", "err", err)
return
}
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
bcR.pool.AddBlock(e.Src.ID(), bi, 0) // TODO: fix block size calculation
case *bcproto.StatusRequest:
// Send peer our state.
wm, err := wrapMsg(&bcproto.StatusResponse{
@@ -251,15 +251,15 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
return
}
src.TrySend(p2p.Envelope{
e.Src.TrySend(p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
})
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height)
case *bcproto.NoBlockResponse:
bcR.Logger.Debug("Peer does not have requested block", "peer", src, "height", msg.Height)
bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}