From 73d0f8116a07a9827a6763dd3aeccfa1fcc6b319 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 19 Oct 2022 13:05:05 -0400 Subject: [PATCH] compiles after NewBroadcast change --- blocksync/msgs.go | 21 ++++++++++++++++ blocksync/reactor.go | 46 ++++++++++++++++++++++----------- p2p/peer.go | 3 --- p2p/switch.go | 3 ++- p2p/switch_test.go | 60 ++++++++++++++++++++++++++++++++++++-------- statesync/reactor.go | 6 ++++- 6 files changed, 109 insertions(+), 30 deletions(-) diff --git a/blocksync/msgs.go b/blocksync/msgs.go index e3d6e551c..cc72e7a10 100644 --- a/blocksync/msgs.go +++ b/blocksync/msgs.go @@ -46,6 +46,27 @@ func EncodeMsg(pb proto.Message) ([]byte, error) { return bz, nil } +func toWrappedMessage(pb proto.Message) (proto.Message, error) { + msg := bcproto.Message{} + + switch pb := pb.(type) { + case *bcproto.BlockRequest: + msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb} + case *bcproto.BlockResponse: + msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb} + case *bcproto.NoBlockResponse: + msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb} + case *bcproto.StatusRequest: + msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb} + case *bcproto.StatusResponse: + msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb} + default: + return nil, fmt.Errorf("unknown message type %T", pb) + } + + return &msg, nil +} + // DecodeMsg decodes a Protobuf message. func DecodeMsg(bz []byte) (proto.Message, error) { pb := &bcproto.Message{} diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 858935e0f..bad048663 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -181,24 +181,32 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest, return false } - msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl}) + wm, err := toWrappedMessage(&bcproto.BlockResponse{Block: bl}) if err != nil { bcR.Logger.Error("could not marshal msg", "err", err) return false } + e := p2p.Envelope{ + ChannelID: BlocksyncChannel, + Message: wm, + } - return src.TrySend(BlocksyncChannel, msgBytes) + return src.NewTrySend(e) } bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) - msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height}) + wm, err := toWrappedMessage(&bcproto.NoBlockResponse{Height: msg.Height}) if err != nil { bcR.Logger.Error("could not convert msg to protobuf", "err", err) return false } + e := p2p.Envelope{ + ChannelID: BlocksyncChannel, + Message: wm, + } - return src.TrySend(BlocksyncChannel, msgBytes) + return src.NewTrySend(e) } // Receive implements Reactor by handling 4 types of messages (look below). @@ -230,15 +238,19 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { bcR.pool.AddBlock(src.ID(), bi, len(msgBytes)) case *bcproto.StatusRequest: // Send peer our state. - msgBytes, err := EncodeMsg(&bcproto.StatusResponse{ + wm, err := toWrappedMessage(&bcproto.StatusResponse{ Height: bcR.store.Height(), Base: bcR.store.Base(), }) if err != nil { - bcR.Logger.Error("could not convert msg to protobut", "err", err) + bcR.Logger.Error("could not convert msg to wrapped protobuf", "err", err) return } - src.TrySend(BlocksyncChannel, msgBytes) + e := p2p.Envelope{ + ChannelID: BlocksyncChannel, + Message: wm, + } + src.NewTrySend(e) case *bcproto.StatusResponse: // Got a peer status. Unverified. bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height) @@ -284,13 +296,16 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) { if peer == nil { continue } - msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: request.Height}) + wm, err := toWrappedMessage(&bcproto.BlockRequest{Height: request.Height}) if err != nil { - bcR.Logger.Error("could not convert msg to proto", "err", err) + bcR.Logger.Error("could not convert msg to wrapped proto", "err", err) continue } - - queued := peer.TrySend(BlocksyncChannel, msgBytes) + e := p2p.Envelope{ + ChannelID: BlocksyncChannel, + Message: wm, + } + queued := peer.NewTrySend(e) if !queued { bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height) } @@ -429,13 +444,16 @@ FOR_LOOP: // BroadcastStatusRequest broadcasts `BlockStore` base and height. func (bcR *Reactor) BroadcastStatusRequest() error { - bm, err := EncodeMsg(&bcproto.StatusRequest{}) + wm, err := toWrappedMessage(&bcproto.StatusRequest{}) if err != nil { bcR.Logger.Error("could not convert msg to proto", "err", err) return fmt.Errorf("could not convert msg to proto: %w", err) } - - bcR.Switch.Broadcast(BlocksyncChannel, bm) + e := p2p.Envelope{ + ChannelID: BlocksyncChannel, + Message: wm, + } + bcR.Switch.NewBroadcast(e) return nil } diff --git a/p2p/peer.go b/p2p/peer.go index d3dfc593f..752a321cd 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -39,9 +39,6 @@ type Peer interface { NewSend(Envelope) bool NewTrySend(Envelope) bool - Send(byte, []byte) bool - TrySend(byte, []byte) bool - Set(string, interface{}) Get(string) interface{} diff --git a/p2p/switch.go b/p2p/switch.go index ba99708e0..f43324e3f 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -8,7 +8,6 @@ import ( "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/cmap" - "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/p2p/conn" @@ -261,6 +260,7 @@ func (sw *Switch) OnStop() { // closed once msg bytes are sent to all peers (or time out). // // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. +/* func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool { sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", log.NewLazySprintf("%X", msgBytes)) @@ -284,6 +284,7 @@ func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool { return successChan } +*/ // NewBroadcast runs a go routine for each attempted send, which will block trying // to send for defaultSendTimeoutSeconds. Returns a channel which receives diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 3e1215e2e..436b801e9 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/golang/protobuf/proto" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -23,6 +24,8 @@ import ( "github.com/tendermint/tendermint/libs/log" tmsync "github.com/tendermint/tendermint/libs/sync" "github.com/tendermint/tendermint/p2p/conn" + "github.com/tendermint/tendermint/proto/tendermint/p2p" + p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p" ) var ( @@ -135,24 +138,59 @@ func TestSwitches(t *testing.T) { } // Lets send some messages - ch0Msg := []byte("channel zero") - ch1Msg := []byte("channel foo") - ch2Msg := []byte("channel bar") - - s1.Broadcast(byte(0x00), ch0Msg) - s1.Broadcast(byte(0x01), ch1Msg) - s1.Broadcast(byte(0x02), ch2Msg) + ch0Msg := &p2pproto.Message{ + Sum: &p2pproto.Message_PexAddrs{ + PexAddrs: &p2pproto.PexAddrs{ + Addrs: []p2p.NetAddress{ + { + ID: "0", + }, + }, + }, + }, + } + ch1Msg := &p2pproto.Message{ + Sum: &p2pproto.Message_PexAddrs{ + PexAddrs: &p2pproto.PexAddrs{ + Addrs: []p2p.NetAddress{ + { + ID: "1", + }, + }, + }, + }, + } + ch2Msg := &p2pproto.Message{ + Sum: &p2pproto.Message_PexAddrs{ + PexAddrs: &p2pproto.PexAddrs{ + Addrs: []p2p.NetAddress{ + { + ID: "2", + }, + }, + }, + }, + } + s1.NewBroadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg}) + s1.NewBroadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg}) + s1.NewBroadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg}) + msgBytes, err := proto.Marshal(ch0Msg) + require.NoError(t, err) assertMsgReceivedWithTimeout(t, - ch0Msg, + msgBytes, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) + msgBytes, err = proto.Marshal(ch1Msg) + require.NoError(t, err) assertMsgReceivedWithTimeout(t, - ch1Msg, + msgBytes, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second) + msgBytes, err = proto.Marshal(ch2Msg) + require.NoError(t, err) assertMsgReceivedWithTimeout(t, - ch2Msg, + msgBytes, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second) } @@ -827,7 +865,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) { // Send random message from foo channel to another for i := 0; i < b.N; i++ { chID := byte(i % 4) - successChan := s1.Broadcast(chID, []byte("test data")) + successChan := s1.NewBroadcast(Envelope{ChannelID: chID}) for s := range successChan { if s { numSuccess++ diff --git a/statesync/reactor.go b/statesync/reactor.go index 236322979..7223d4063 100644 --- a/statesync/reactor.go +++ b/statesync/reactor.go @@ -277,7 +277,11 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration) hook := func() { r.Logger.Debug("Requesting snapshots from known peers") // Request snapshots from all currently connected peers - r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})) + e := p2p.Envelope{ + ChannelID: SnapshotChannel, + Message: toWrappedProto(&ssproto.SnapshotsRequest{}), + } + r.Switch.NewBroadcast(e) } hook()