From 89cfedeb734106ad3213b74eb027cb7bf104bbad Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Thu, 16 Apr 2015 17:46:27 -0700 Subject: [PATCH] Make all messages be registered as pointer receivers, Warn on unknown messages. --- blockchain/reactor.go | 27 ++++++++++++++------------- consensus/reactor.go | 25 +++++++++++++------------ mempool/reactor.go | 7 ++++--- p2p/pex_reactor.go | 11 ++++++----- 4 files changed, 37 insertions(+), 33 deletions(-) diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 4c0a03e72..952241af1 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "reflect" "sync/atomic" "time" @@ -105,7 +106,7 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) { // Send peer our state. - peer.Send(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()}) + peer.Send(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()}) } // Implements Reactor @@ -125,11 +126,11 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) log.Info("Received message", "msg", msg) switch msg := msg.(type) { - case bcBlockRequestMessage: + case *bcBlockRequestMessage: // Got a request for a block. Respond with block if we have it. block := bcR.store.LoadBlock(msg.Height) if block != nil { - msg := bcBlockResponseMessage{Block: block} + msg := &bcBlockResponseMessage{Block: block} queued := src.TrySend(BlockchainChannel, msg) if !queued { // queue is full, just ignore. @@ -137,14 +138,14 @@ func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) } else { // TODO peer is asking for things we don't have. } - case bcBlockResponseMessage: + case *bcBlockResponseMessage: // Got a block. bcR.pool.AddBlock(msg.Block, src.Key) - case bcPeerStatusMessage: + case *bcPeerStatusMessage: // Got a peer status. bcR.pool.SetPeerHeight(src.Key, msg.Height) default: - // Ignore unknown message + log.Warn("Unknown message type %v", reflect.TypeOf(msg)) } } @@ -162,7 +163,7 @@ FOR_LOOP: // We can't fulfill the request. continue FOR_LOOP } - msg := bcBlockRequestMessage{request.Height} + msg := &bcBlockRequestMessage{request.Height} queued := peer.TrySend(BlockchainChannel, msg) if !queued { // We couldn't queue the request. @@ -238,7 +239,7 @@ FOR_LOOP: } func (bcR *BlockchainReactor) BroadcastStatus() error { - bcR.sw.Broadcast(BlockchainChannel, bcPeerStatusMessage{bcR.store.Height()}) + bcR.sw.Broadcast(BlockchainChannel, &bcPeerStatusMessage{bcR.store.Height()}) return nil } @@ -260,9 +261,9 @@ type BlockchainMessage interface{} var _ = binary.RegisterInterface( struct{ BlockchainMessage }{}, - binary.ConcreteType{bcBlockRequestMessage{}, msgTypeBlockRequest}, - binary.ConcreteType{bcBlockResponseMessage{}, msgTypeBlockResponse}, - binary.ConcreteType{bcPeerStatusMessage{}, msgTypePeerStatus}, + binary.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest}, + binary.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse}, + binary.ConcreteType{&bcPeerStatusMessage{}, msgTypePeerStatus}, ) func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { @@ -279,7 +280,7 @@ type bcBlockRequestMessage struct { Height uint } -func (m bcBlockRequestMessage) String() string { +func (m *bcBlockRequestMessage) String() string { return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height) } @@ -289,7 +290,7 @@ type bcBlockResponseMessage struct { Block *types.Block } -func (m bcBlockResponseMessage) String() string { +func (m *bcBlockResponseMessage) String() string { return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height) } diff --git a/consensus/reactor.go b/consensus/reactor.go index 4ca4a7a18..08a648059 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "reflect" "sync" "sync/atomic" "time" @@ -147,7 +148,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte case *HasVoteMessage: ps.ApplyHasVoteMessage(msg) default: - // Ignore unknown message + log.Warn("Unknown message type %v", reflect.TypeOf(msg)) } case DataChannel: @@ -164,11 +165,11 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte ps.SetHasProposalPOLPart(msg.Height, msg.Round, msg.Part.Index) _, err = conR.conS.AddProposalPOLPart(msg.Height, msg.Round, msg.Part) } else { - // Ignore unknown part type + log.Warn("Unknown part type %v", msg.Type) } default: - // Ignore unknown message + log.Warn("Unknown message type %v", reflect.TypeOf(msg)) } case VoteChannel: @@ -212,10 +213,10 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte } default: - // Ignore unknown message + log.Warn("Unknown message type %v", reflect.TypeOf(msg)) } default: - // Ignore unknown channel + log.Warn("Unknown channel %X", chId) } if err != nil { @@ -767,12 +768,12 @@ type ConsensusMessage interface{} var _ = binary.RegisterInterface( struct{ ConsensusMessage }{}, - binary.ConcreteType{NewRoundStepMessage{}, msgTypeNewRoundStep}, - binary.ConcreteType{CommitStepMessage{}, msgTypeCommitStep}, - binary.ConcreteType{ProposalMessage{}, msgTypeProposal}, - binary.ConcreteType{PartMessage{}, msgTypePart}, - binary.ConcreteType{VoteMessage{}, msgTypeVote}, - binary.ConcreteType{HasVoteMessage{}, msgTypeHasVote}, + binary.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep}, + binary.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep}, + binary.ConcreteType{&ProposalMessage{}, msgTypeProposal}, + binary.ConcreteType{&PartMessage{}, msgTypePart}, + binary.ConcreteType{&VoteMessage{}, msgTypeVote}, + binary.ConcreteType{&HasVoteMessage{}, msgTypeHasVote}, ) // TODO: check for unnecessary extra bytes at the end. @@ -780,7 +781,7 @@ func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { msgType = bz[0] n := new(int64) r := bytes.NewReader(bz) - msg = binary.ReadBinary(&msg, r, n, &err) + msg = binary.ReadBinary(struct{ ConsensusMessage }{}, r, n, &err).(struct{ ConsensusMessage }).ConsensusMessage return } diff --git a/mempool/reactor.go b/mempool/reactor.go index 389380627..6d10112b4 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -3,6 +3,7 @@ package mempool import ( "bytes" "fmt" + "reflect" "sync/atomic" "github.com/tendermint/tendermint/binary" @@ -99,7 +100,7 @@ func (memR *MempoolReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) { } default: - // Ignore unknown message + log.Warn("Unknown message type %v", reflect.TypeOf(msg)) } } @@ -129,14 +130,14 @@ type MempoolMessage interface{} var _ = binary.RegisterInterface( struct{ MempoolMessage }{}, - binary.ConcreteType{TxMessage{}, msgTypeTx}, + binary.ConcreteType{&TxMessage{}, msgTypeTx}, ) func DecodeMessage(bz []byte) (msgType byte, msg MempoolMessage, err error) { msgType = bz[0] n := new(int64) r := bytes.NewReader(bz) - msg = binary.ReadBinary(&msg, r, n, &err) + msg = binary.ReadBinary(struct{ MempoolMessage }{}, r, n, &err).(struct{ MempoolMessage }).MempoolMessage return } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index a8bb98ba4..5a9da8ef9 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" "fmt" + "reflect" "sync/atomic" "time" @@ -119,7 +120,7 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { pexR.book.AddAddress(addr, srcAddr) } default: - // Ignore unknown message. + log.Warn("Unknown message type %v", reflect.TypeOf(msg)) } } @@ -228,16 +229,16 @@ type PexMessage interface{} var _ = binary.RegisterInterface( struct{ PexMessage }{}, - binary.ConcreteType{pexHandshakeMessage{}, msgTypeHandshake}, - binary.ConcreteType{pexRequestMessage{}, msgTypeRequest}, - binary.ConcreteType{pexAddrsMessage{}, msgTypeAddrs}, + binary.ConcreteType{&pexHandshakeMessage{}, msgTypeHandshake}, + binary.ConcreteType{&pexRequestMessage{}, msgTypeRequest}, + binary.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs}, ) func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) { msgType = bz[0] n := new(int64) r := bytes.NewReader(bz) - msg = binary.ReadBinary(&msg, r, n, &err) + msg = binary.ReadBinary(struct{ PexMessage }{}, r, n, &err).(struct{ PexMessage }).PexMessage return }