diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 09dd2ef90..858935e0f 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -149,15 +149,14 @@ func (bcR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor by sending our state to peer. func (bcR *Reactor) AddPeer(peer p2p.Peer) { - msgBytes, err := EncodeMsg(&bcproto.StatusResponse{ - Base: bcR.store.Base(), - Height: bcR.store.Height()}) - if err != nil { - bcR.Logger.Error("could not convert msg to protobuf", "err", err) - return + e := p2p.Envelope{ + ChannelID: BlocksyncChannel, + Message: &bcproto.StatusResponse{ + Base: bcR.store.Base(), + Height: bcR.store.Height(), + }, } - - peer.Send(BlocksyncChannel, msgBytes) + peer.NewSend(e) // it's OK if send fails. will try later in poolRoutine // peer is added to the pool once we receive the first diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index fe0c36a14..d3085ce17 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -165,10 +165,22 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { for i, peer := range peerList { if i < len(peerList)/2 { bcs.Logger.Info("Signed and pushed vote", "vote", prevote1, "peer", peer) - peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote1})) + p, err := MsgToProto(&VoteMessage{prevote1}) + require.NoError(t, err) + e := p2p.Envelope{ + Message: p, + ChannelID: VoteChannel, + } + peer.NewSend(e) } else { bcs.Logger.Info("Signed and pushed vote", "vote", prevote2, "peer", peer) - peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote2})) + p, err := MsgToProto(&VoteMessage{prevote2}) + require.NoError(t, err) + e := p2p.Envelope{ + Message: p, + ChannelID: VoteChannel, + } + peer.NewSend(e) } } } else { @@ -521,7 +533,15 @@ func sendProposalAndParts( ) { // proposal msg := &ProposalMessage{Proposal: proposal} - peer.Send(DataChannel, MustEncode(msg)) + p, err := MsgToProto(msg) + if err != nil { + panic(err) + } + e := p2p.Envelope{ + ChannelID: DataChannel, + Message: p, + } + peer.NewSend(e) // parts for i := 0; i < int(parts.Total()); i++ { @@ -531,7 +551,15 @@ func sendProposalAndParts( Round: round, // This tells peer that this part applies to us. Part: part, } - peer.Send(DataChannel, MustEncode(msg)) + p, err := MsgToProto(msg) + if err != nil { + panic(err) + } + e := p2p.Envelope{ + ChannelID: DataChannel, + Message: p, + } + peer.NewSend(e) } // votes @@ -539,9 +567,24 @@ func sendProposalAndParts( prevote, _ := cs.signVote(tmproto.PrevoteType, blockHash, parts.Header()) precommit, _ := cs.signVote(tmproto.PrecommitType, blockHash, parts.Header()) cs.mtx.Unlock() - - peer.Send(VoteChannel, MustEncode(&VoteMessage{prevote})) - peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit})) + p, err = MsgToProto(&VoteMessage{prevote}) + if err != nil { + panic(err) + } + e = p2p.Envelope{ + ChannelID: VoteChannel, + Message: p, + } + peer.NewSend(e) + p, err = MsgToProto(&VoteMessage{precommit}) + if err != nil { + panic(err) + } + e = p2p.Envelope{ + ChannelID: VoteChannel, + Message: p, + } + peer.NewSend(e) } //---------------------------------------- diff --git a/consensus/invalid_test.go b/consensus/invalid_test.go index f96018157..2f0209b2a 100644 --- a/consensus/invalid_test.go +++ b/consensus/invalid_test.go @@ -3,6 +3,7 @@ package consensus import ( "testing" + "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/bytes" "github.com/tendermint/tendermint/libs/log" tmrand "github.com/tendermint/tendermint/libs/rand" @@ -94,7 +95,13 @@ func invalidDoPrevoteFunc(t *testing.T, height int64, round int32, cs *State, sw peers := sw.Peers().List() for _, peer := range peers { cs.Logger.Info("Sending bad vote", "block", blockHash, "peer", peer) - peer.Send(VoteChannel, MustEncode(&VoteMessage{precommit})) + p, err := MsgToProto(&VoteMessage{precommit}) + require.NoError(t, err) + e := p2p.Envelope{ + Message: p, + ChannelID: VoteChannel, + } + peer.NewSend(e) } }() } diff --git a/evidence/reactor.go b/evidence/reactor.go index 2a136dbfb..10d3d9817 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -126,11 +126,15 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) { evis := evR.prepareEvidenceMessage(peer, ev) if len(evis) > 0 { evR.Logger.Debug("Gossiping evidence to peer", "ev", ev, "peer", peer) - msgBytes, err := encodeMsg(evis) + evp, err := evidenceListToProto(evis) if err != nil { panic(err) } - success := peer.Send(EvidenceChannel, msgBytes) + e := p2p.Envelope{ + ChannelID: EvidenceChannel, + Message: evp, + } + success := peer.NewSend(e) if !success { time.Sleep(peerRetryMessageIntervalMS * time.Millisecond) continue @@ -226,6 +230,23 @@ func encodeMsg(evis []types.Evidence) ([]byte, error) { return epl.Marshal() } +// encodemsg takes a array of evidence +// returns the byte encoding of the List Message +func evidenceListToProto(evis []types.Evidence) (*tmproto.EvidenceList, error) { + evi := make([]tmproto.Evidence, len(evis)) + for i := 0; i < len(evis); i++ { + ev, err := types.EvidenceToProto(evis[i]) + if err != nil { + return nil, err + } + evi[i] = *ev + } + epl := tmproto.EvidenceList{ + Evidence: evi, + } + return &epl, nil +} + // decodemsg takes an array of bytes // returns an array of evidence func decodeMsg(bz []byte) (evis []types.Evidence, err error) { diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index 3fc850641..58bd5ffb5 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -234,18 +234,15 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // https://github.com/tendermint/tendermint/issues/5796 if _, ok := memTx.senders.Load(peerID); !ok { - msg := protomem.Message{ - Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, + e := p2p.Envelope{ + ChannelID: mempool.MempoolChannel, + Message: &protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, + }, }, } - - bz, err := msg.Marshal() - if err != nil { - panic(err) - } - - success := peer.Send(mempool.MempoolChannel, bz) + success := peer.NewSend(e) if !success { time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) continue diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 4da51bab8..0480f71b3 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -233,18 +233,17 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { // NOTE: Transaction batching was disabled due to // https://github.com/tendermint/tendermint/issues/5796 if !memTx.HasPeer(peerID) { - msg := protomem.Message{ - Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, + + e := p2p.Envelope{ + ChannelID: mempool.MempoolChannel, + Message: &protomem.Message{ + Sum: &protomem.Message_Txs{ + Txs: &protomem.Txs{Txs: [][]byte{memTx.tx}}, + }, }, } - bz, err := msg.Marshal() - if err != nil { - panic(err) - } - - success := peer.Send(mempool.MempoolChannel, bz) + success := peer.NewSend(e) if !success { time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) continue diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index db3d9261e..57d8648bc 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -21,6 +21,8 @@ type mockPeer struct { func (mp *mockPeer) FlushStop() { mp.Stop() } //nolint:errcheck // ignore error func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true } func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true } +func (mp *mockPeer) NewTrySend(e Envelope) bool { return true } +func (mp *mockPeer) NewSend(e Envelope) bool { return true } func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } func (mp *mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } func (mp *mockPeer) ID() ID { return mp.id } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 006f89cd7..034a005b3 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -348,7 +348,11 @@ func (r *Reactor) RequestAddrs(p Peer) { } r.Logger.Debug("Request addrs", "from", p) r.requestsSent.Set(id, struct{}{}) - p.Send(PexChannel, mustEncode(&tmp2p.PexRequest{})) + e := p2p.Envelope{ + ChannelID: PexChannel, + Message: &tmp2p.PexRequest{}, + } + p.NewSend(e) } // ReceiveAddrs adds the given addrs to the addrbook if theres an open @@ -406,7 +410,11 @@ func (r *Reactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error { // SendAddrs sends addrs to the peer. func (r *Reactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) { - p.Send(PexChannel, mustEncode(&tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)})) + e := p2p.Envelope{ + ChannelID: PexChannel, + Message: &tmp2p.PexAddrs{Addrs: p2p.NetAddressesToProto(netAddrs)}, + } + p.NewSend(e) } // SetEnsurePeersPeriod sets period to ensure peers connected. diff --git a/p2p/switch.go b/p2p/switch.go index d4ed091d1..ba99708e0 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -6,7 +6,6 @@ import ( "sync" "time" - "github.com/cosmos/gogoproto/proto" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/cmap" "github.com/tendermint/tendermint/libs/log" @@ -293,11 +292,7 @@ func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool { // // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. func (sw *Switch) NewBroadcast(e Envelope) chan bool { - msgBytes, err := proto.Marshal(e.Message) - if err != nil { - panic(err) - } - sw.Logger.Debug("Broadcast", "channel", e.ChannelID, "msgBytes", log.NewLazySprintf("%X", msgBytes)) + sw.Logger.Debug("Broadcast", "channel", e.ChannelID) peers := sw.peers.List() var wg sync.WaitGroup @@ -307,7 +302,7 @@ func (sw *Switch) NewBroadcast(e Envelope) chan bool { for _, peer := range peers { go func(p Peer) { defer wg.Done() - success := p.Send(e.ChannelID, msgBytes) + success := p.NewSend(e) successChan <- success }(peer) } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 9d5466df7..3e1215e2e 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -429,7 +429,10 @@ func TestSwitchStopPeerForError(t *testing.T) { // send messages to the peer from sw1 p := sw1.Peers().List()[0] - p.Send(0x1, []byte("here's a message to send")) + e := Envelope{ + ChannelID: 0x1, + } + p.NewSend(e) // stop sw2. this should cause the p to fail, // which results in calling StopPeerForError internally diff --git a/statesync/messages.go b/statesync/messages.go index 901036a7a..3e4640f91 100644 --- a/statesync/messages.go +++ b/statesync/messages.go @@ -38,6 +38,23 @@ func mustEncodeMsg(pb proto.Message) []byte { return bz } +func toWrappedProto(pb proto.Message) proto.Message { + msg := ssproto.Message{} + switch pb := pb.(type) { + case *ssproto.ChunkRequest: + msg.Sum = &ssproto.Message_ChunkRequest{ChunkRequest: pb} + case *ssproto.ChunkResponse: + msg.Sum = &ssproto.Message_ChunkResponse{ChunkResponse: pb} + case *ssproto.SnapshotsRequest: + msg.Sum = &ssproto.Message_SnapshotsRequest{SnapshotsRequest: pb} + case *ssproto.SnapshotsResponse: + msg.Sum = &ssproto.Message_SnapshotsResponse{SnapshotsResponse: pb} + default: + panic(fmt.Errorf("unknown message type %T", pb)) + } + return &msg +} + // decodeMsg decodes a Protobuf message. func decodeMsg(bz []byte) (proto.Message, error) { pb := &ssproto.Message{} diff --git a/statesync/reactor.go b/statesync/reactor.go index 8434b6adf..236322979 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -130,13 +130,17 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { for _, snapshot := range snapshots { r.Logger.Debug("Advertising snapshot", "height", snapshot.Height, "format", snapshot.Format, "peer", src.ID()) - src.Send(chID, mustEncodeMsg(&ssproto.SnapshotsResponse{ - Height: snapshot.Height, - Format: snapshot.Format, - Chunks: snapshot.Chunks, - Hash: snapshot.Hash, - Metadata: snapshot.Metadata, - })) + e := p2p.Envelope{ + ChannelID: chID, + Message: &ssproto.SnapshotsResponse{ + Height: snapshot.Height, + Format: snapshot.Format, + Chunks: snapshot.Chunks, + Hash: snapshot.Hash, + Metadata: snapshot.Metadata, + }, + } + src.NewSend(e) } case *ssproto.SnapshotsResponse: @@ -182,13 +186,17 @@ func (r *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { } r.Logger.Debug("Sending chunk", "height", msg.Height, "format", msg.Format, "chunk", msg.Index, "peer", src.ID()) - src.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkResponse{ - Height: msg.Height, - Format: msg.Format, - Index: msg.Index, - Chunk: resp.Chunk, - Missing: resp.Chunk == nil, - })) + e := p2p.Envelope{ + ChannelID: ChunkChannel, + Message: toWrappedProto(&ssproto.ChunkResponse{ + Height: msg.Height, + Format: msg.Format, + Index: msg.Index, + Chunk: resp.Chunk, + Missing: resp.Chunk == nil, + }), + } + src.NewSend(e) case *ssproto.ChunkResponse: r.mtx.RLock() diff --git a/statesync/syncer.go b/statesync/syncer.go index 7cb9f2946..9fbda7ad5 100644 --- a/statesync/syncer.go +++ b/statesync/syncer.go @@ -126,7 +126,11 @@ func (s *syncer) AddSnapshot(peer p2p.Peer, snapshot *snapshot) (bool, error) { // to discover snapshots, later we may want to do retries and stuff. func (s *syncer) AddPeer(peer p2p.Peer) { s.logger.Debug("Requesting snapshots from peer", "peer", peer.ID()) - peer.Send(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})) + e := p2p.Envelope{ + ChannelID: SnapshotChannel, + Message: toWrappedProto(&ssproto.SnapshotsRequest{}), + } + peer.NewSend(e) } // RemovePeer removes a peer from the pool. @@ -467,11 +471,15 @@ func (s *syncer) requestChunk(snapshot *snapshot, chunk uint32) { } s.logger.Debug("Requesting snapshot chunk", "height", snapshot.Height, "format", snapshot.Format, "chunk", chunk, "peer", peer.ID()) - peer.Send(ChunkChannel, mustEncodeMsg(&ssproto.ChunkRequest{ - Height: snapshot.Height, - Format: snapshot.Format, - Index: chunk, - })) + e := p2p.Envelope{ + ChannelID: ChunkChannel, + Message: &ssproto.ChunkRequest{ + Height: snapshot.Height, + Format: snapshot.Format, + Index: chunk, + }, + } + peer.NewSend(e) } // verifyApp verifies the sync, checking the app hash, last block height and app version