From 1d20f378104af5eb46ef419d669866870bb8feb1 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Fri, 28 Oct 2022 17:06:36 -0400 Subject: [PATCH] add proto message unmarshal to reactors --- blockchain/v1/reactor.go | 8 ++++++++ blockchain/v2/reactor.go | 8 ++++++++ blocksync/reactor.go | 10 +++++++++- consensus/reactor.go | 7 +++++++ evidence/reactor.go | 6 ++++++ mempool/v1/reactor.go | 8 ++++++++ p2p/mock/reactor.go | 5 +---- p2p/pex/pex_reactor.go | 8 ++++++++ p2p/switch_test.go | 11 ++++++++--- statesync/reactor.go | 8 ++++++++ test/maverick/consensus/reactor.go | 6 ++++++ 11 files changed, 77 insertions(+), 8 deletions(-) diff --git a/blockchain/v1/reactor.go b/blockchain/v1/reactor.go index cd03e96de..765a05904 100644 --- a/blockchain/v1/reactor.go +++ b/blockchain/v1/reactor.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + "github.com/cosmos/gogoproto/proto" + "github.com/tendermint/tendermint/behaviour" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/libs/log" @@ -307,9 +309,15 @@ func (bcR *BlockchainReactor) NewReceive(e p2p.Envelope) { } func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { + var msg *bcproto.Message + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } bcR.NewReceive(p2p.Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) } diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index c102fe236..062841dba 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/cosmos/gogoproto/proto" + "github.com/tendermint/tendermint/behaviour" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/libs/log" @@ -514,9 +516,15 @@ func (r *BlockchainReactor) NewReceive(e p2p.Envelope) { } func (r *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { + var msg *bcproto.Message + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } r.NewReceive(p2p.Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) } diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 6e1e29003..32ca8eb33 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -5,6 +5,8 @@ import ( "reflect" "time" + "github.com/cosmos/gogoproto/proto" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" bcproto "github.com/tendermint/tendermint/proto/tendermint/blocksync" @@ -233,10 +235,16 @@ func (bcR *Reactor) NewReceive(e p2p.Envelope) { } } -func (bcR *BlockchainReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { +func (bcR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { + var msg *bcproto.Message + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } bcR.NewReceive(p2p.Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) } diff --git a/consensus/reactor.go b/consensus/reactor.go index ef04e0365..17b8dc1ed 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/gogo/protobuf/proto" cstypes "github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/libs/bits" tmevents "github.com/tendermint/tendermint/libs/events" @@ -388,9 +389,15 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { } func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { + var msg *tmcons.Message + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } conR.NewReceive(p2p.Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) } diff --git a/evidence/reactor.go b/evidence/reactor.go index 4773e4e09..45a8d4f27 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -93,9 +93,15 @@ func (evR *Reactor) NewReceive(e p2p.Envelope) { } func (evR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { + var msg *tmproto.EvidenceList + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } evR.NewReceive(p2p.Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) } diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 0cf17f0f1..e761c54eb 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/cosmos/gogoproto/proto" + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/clist" "github.com/tendermint/tendermint/libs/log" @@ -188,9 +190,15 @@ func (memR *Reactor) NewReceive(e p2p.Envelope) { } func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { + var msg *protomem.Message + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } memR.NewReceive(p2p.Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) } diff --git a/p2p/mock/reactor.go b/p2p/mock/reactor.go index 878a5755b..893ed2032 100644 --- a/p2p/mock/reactor.go +++ b/p2p/mock/reactor.go @@ -22,8 +22,5 @@ func NewReactor() *Reactor { func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels } func (r *Reactor) AddPeer(peer p2p.Peer) {} func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {} -func (r *Reactor) NewReceive(e p2p.Envelope) -ChannelID: chID, -Src: peer, -{} +func (r *Reactor) NewReceive(e p2p.Envelope) {} func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {} diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index ca46b8579..ae1695cda 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/cosmos/gogoproto/proto" + "github.com/tendermint/tendermint/libs/cmap" tmmath "github.com/tendermint/tendermint/libs/math" tmrand "github.com/tendermint/tendermint/libs/rand" @@ -299,9 +301,15 @@ func (r *Reactor) NewReceive(e p2p.Envelope) { } func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { + var msg *tmp2p.Message + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } r.NewReceive(p2p.Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index cfe4f2252..b577d6562 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -15,7 +15,6 @@ import ( "time" "github.com/cosmos/gogoproto/proto" - "github.com/ethereum/go-ethereum/p2p" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -82,10 +81,16 @@ func (tr *TestReactor) NewReceive(e Envelope) { } } -func (tr *TestReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - tr.NewReceive(p2p.Envelope{ +func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) { + var msg *p2pproto.Message + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } + tr.NewReceive(Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) } diff --git a/statesync/reactor.go b/statesync/reactor.go index 2f08c1d91..e5f83b835 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -5,6 +5,8 @@ import ( "sort" "time" + "github.com/cosmos/gogoproto/proto" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/config" tmsync "github.com/tendermint/tendermint/libs/sync" @@ -224,9 +226,15 @@ func (r *Reactor) NewReceive(e p2p.Envelope) { } func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { + var msg *ssproto.Message + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } r.NewReceive(p2p.Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) } diff --git a/test/maverick/consensus/reactor.go b/test/maverick/consensus/reactor.go index 08e7a009f..f8f936843 100644 --- a/test/maverick/consensus/reactor.go +++ b/test/maverick/consensus/reactor.go @@ -388,9 +388,15 @@ func (conR *Reactor) NewReceive(e p2p.Envelope) { } func (conR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { + var msg *tmcons.Message + err := proto.Unmarshal(msgBytes, msg) + if err != nil { + panic(err) + } conR.NewReceive(p2p.Envelope{ ChannelID: chID, Src: peer, + Message: msg, }) }