Make all messages be registered as pointer receivers,

Warn on unknown messages.
This commit is contained in:
Jae Kwon
2015-04-16 17:46:27 -07:00
parent 22fdb6ca2f
commit 89cfedeb73
4 changed files with 37 additions and 33 deletions

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"reflect"
"sync/atomic"
"time"
@@ -105,7 +106,7 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
// Implements Reactor
func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
// Send peer our state.
peer.Send(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
peer.Send(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()})
}
// Implements Reactor
@@ -125,11 +126,11 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte)
log.Info("Received message", "msg", msg)
switch msg := msg.(type) {
case bcBlockRequestMessage:
case *bcBlockRequestMessage:
// Got a request for a block. Respond with block if we have it.
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
msg := bcBlockResponseMessage{Block: block}
msg := &bcBlockResponseMessage{Block: block}
queued := src.TrySend(BlockchainChannel, msg)
if !queued {
// queue is full, just ignore.
@@ -137,14 +138,14 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte)
} else {
// TODO peer is asking for things we don't have.
}
case bcBlockResponseMessage:
case *bcBlockResponseMessage:
// Got a block.
bcR.pool.AddBlock(msg.Block, src.Key)
case bcPeerStatusMessage:
case *bcPeerStatusMessage:
// Got a peer status.
bcR.pool.SetPeerHeight(src.Key, msg.Height)
default:
// Ignore unknown message
log.Warn("Unknown message type %v", reflect.TypeOf(msg))
}
}
@@ -162,7 +163,7 @@ FOR_LOOP:
// We can't fulfill the request.
continue FOR_LOOP
}
msg := bcBlockRequestMessage{request.Height}
msg := &bcBlockRequestMessage{request.Height}
queued := peer.TrySend(BlockchainChannel, msg)
if !queued {
// We couldn't queue the request.
@@ -238,7 +239,7 @@ FOR_LOOP:
}
func (bcR *BlockchainReactor) BroadcastStatus() error {
bcR.sw.Broadcast(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()})
bcR.sw.Broadcast(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()})
return nil
}
@@ -260,9 +261,9 @@ type BlockchainMessage interface{}
var _ = binary.RegisterInterface(
struct{ BlockchainMessage }{},
binary.ConcreteType{bcBlockRequestMessage{}, msgTypeBlockRequest},
binary.ConcreteType{bcBlockResponseMessage{}, msgTypeBlockResponse},
binary.ConcreteType{bcPeerStatusMessage{}, msgTypePeerStatus},
binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
binary.ConcreteType{&bcPeerStatusMessage{}, msgTypePeerStatus},
)
func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
@@ -279,7 +280,7 @@ type bcBlockRequestMessage struct {
Height uint
}
func (m bcBlockRequestMessage) String() string {
func (m *bcBlockRequestMessage) String() string {
return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
}
@@ -289,7 +290,7 @@ type bcBlockResponseMessage struct {
Block *types.Block
}
func (m bcBlockResponseMessage) String() string {
func (m *bcBlockResponseMessage) String() string {
return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
@@ -147,7 +148,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
default:
// Ignore unknown message
log.Warn("Unknown message type %v", reflect.TypeOf(msg))
}
case DataChannel:
@@ -164,11 +165,11 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index)
_, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part)
} else {
// Ignore unknown part type
log.Warn("Unknown part type %v", msg.Type)
}
default:
// Ignore unknown message
log.Warn("Unknown message type %v", reflect.TypeOf(msg))
}
case VoteChannel:
@@ -212,10 +213,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
}
default:
// Ignore unknown message
log.Warn("Unknown message type %v", reflect.TypeOf(msg))
}
default:
// Ignore unknown channel
log.Warn("Unknown channel %X", chId)
}
if err != nil {
@@ -767,12 +768,12 @@ type ConsensusMessage interface{}
var _ = binary.RegisterInterface(
struct{ ConsensusMessage }{},
binary.ConcreteType{NewRoundStepMessage{}, msgTypeNewRoundStep},
binary.ConcreteType{CommitStepMessage{}, msgTypeCommitStep},
binary.ConcreteType{ProposalMessage{}, msgTypeProposal},
binary.ConcreteType{PartMessage{}, msgTypePart},
binary.ConcreteType{VoteMessage{}, msgTypeVote},
binary.ConcreteType{HasVoteMessage{}, msgTypeHasVote},
binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep},
binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep},
binary.ConcreteType{&ProposalMessage{}, msgTypeProposal},
binary.ConcreteType{&PartMessage{}, msgTypePart},
binary.ConcreteType{&VoteMessage{}, msgTypeVote},
binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote},
)
// TODO: check for unnecessary extra bytes at the end.
@@ -780,7 +781,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
msgType = bz[0]
n := new(int64)
r := bytes.NewReader(bz)
msg = binary.ReadBinary(&msg, r, n, &err)
msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage
return
}

View File

@@ -3,6 +3,7 @@ package mempool
import (
"bytes"
"fmt"
"reflect"
"sync/atomic"
"github.com/tendermint/tendermint/binary"
@@ -99,7 +100,7 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
}
default:
// Ignore unknown message
log.Warn("Unknown message type %v", reflect.TypeOf(msg))
}
}
@@ -129,14 +130,14 @@ type MempoolMessage interface{}
var _ = binary.RegisterInterface(
struct{ MempoolMessage }{},
binary.ConcreteType{TxMessage{}, msgTypeTx},
binary.ConcreteType{&TxMessage{}, msgTypeTx},
)
func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) {
msgType = bz[0]
n := new(int64)
r := bytes.NewReader(bz)
msg = binary.ReadBinary(&msg, r, n, &err)
msg = binary.ReadBinary(struct{ MempoolMessage }{}, r, n, &err).(struct{ MempoolMessage }).MempoolMessage
return
}

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"reflect"
"sync/atomic"
"time"
@@ -119,7 +120,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
pexR.book.AddAddress(addr, srcAddr)
}
default:
// Ignore unknown message.
log.Warn("Unknown message type %v", reflect.TypeOf(msg))
}
}
@@ -228,16 +229,16 @@ type PexMessage interface{}
var _ = binary.RegisterInterface(
struct{ PexMessage }{},
binary.ConcreteType{pexHandshakeMessage{}, msgTypeHandshake},
binary.ConcreteType{pexRequestMessage{}, msgTypeRequest},
binary.ConcreteType{pexAddrsMessage{}, msgTypeAddrs},
binary.ConcreteType{&pexHandshakeMessage{}, msgTypeHandshake},
binary.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
binary.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
)
func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
msgType = bz[0]
n := new(int64)
r := bytes.NewReader(bz)
msg = binary.ReadBinary(&msg, r, n, &err)
msg = binary.ReadBinary(struct{ PexMessage }{}, r, n, &err).(struct{ PexMessage }).PexMessage
return
}