compiles after NewBroadcast change

This commit is contained in:
William Banfield
2022-10-19 13:05:05 -04:00
parent 01db5dc019
commit 73d0f8116a
6 changed files with 109 additions and 30 deletions

View File

@@ -46,6 +46,27 @@ func EncodeMsg(pb proto.Message) ([]byte, error) {
return bz, nil
}
func toWrappedMessage(pb proto.Message) (proto.Message, error) {
msg := bcproto.Message{}
switch pb := pb.(type) {
case *bcproto.BlockRequest:
msg.Sum = &bcproto.Message_BlockRequest{BlockRequest: pb}
case *bcproto.BlockResponse:
msg.Sum = &bcproto.Message_BlockResponse{BlockResponse: pb}
case *bcproto.NoBlockResponse:
msg.Sum = &bcproto.Message_NoBlockResponse{NoBlockResponse: pb}
case *bcproto.StatusRequest:
msg.Sum = &bcproto.Message_StatusRequest{StatusRequest: pb}
case *bcproto.StatusResponse:
msg.Sum = &bcproto.Message_StatusResponse{StatusResponse: pb}
default:
return nil, fmt.Errorf("unknown message type %T", pb)
}
return &msg, nil
}
// DecodeMsg decodes a Protobuf message.
func DecodeMsg(bz []byte) (proto.Message, error) {
pb := &bcproto.Message{}

View File

@@ -181,24 +181,32 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest,
return false
}
msgBytes, err := EncodeMsg(&bcproto.BlockResponse{Block: bl})
wm, err := toWrappedMessage(&bcproto.BlockResponse{Block: bl})
if err != nil {
bcR.Logger.Error("could not marshal msg", "err", err)
return false
}
e := p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
}
return src.TrySend(BlocksyncChannel, msgBytes)
return src.NewTrySend(e)
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes, err := EncodeMsg(&bcproto.NoBlockResponse{Height: msg.Height})
wm, err := toWrappedMessage(&bcproto.NoBlockResponse{Height: msg.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to protobuf", "err", err)
return false
}
e := p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
}
return src.TrySend(BlocksyncChannel, msgBytes)
return src.NewTrySend(e)
}
// Receive implements Reactor by handling 4 types of messages (look below).
@@ -230,15 +238,19 @@ func (bcR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
bcR.pool.AddBlock(src.ID(), bi, len(msgBytes))
case *bcproto.StatusRequest:
// Send peer our state.
msgBytes, err := EncodeMsg(&bcproto.StatusResponse{
wm, err := toWrappedMessage(&bcproto.StatusResponse{
Height: bcR.store.Height(),
Base: bcR.store.Base(),
})
if err != nil {
bcR.Logger.Error("could not convert msg to protobut", "err", err)
bcR.Logger.Error("could not convert msg to wrapped protobuf", "err", err)
return
}
src.TrySend(BlocksyncChannel, msgBytes)
e := p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
}
src.NewTrySend(e)
case *bcproto.StatusResponse:
// Got a peer status. Unverified.
bcR.pool.SetPeerRange(src.ID(), msg.Base, msg.Height)
@@ -284,13 +296,16 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
if peer == nil {
continue
}
msgBytes, err := EncodeMsg(&bcproto.BlockRequest{Height: request.Height})
wm, err := toWrappedMessage(&bcproto.BlockRequest{Height: request.Height})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
bcR.Logger.Error("could not convert msg to wrapped proto", "err", err)
continue
}
queued := peer.TrySend(BlocksyncChannel, msgBytes)
e := p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
}
queued := peer.NewTrySend(e)
if !queued {
bcR.Logger.Debug("Send queue is full, drop block request", "peer", peer.ID(), "height", request.Height)
}
@@ -429,13 +444,16 @@ FOR_LOOP:
// BroadcastStatusRequest broadcasts `BlockStore` base and height.
func (bcR *Reactor) BroadcastStatusRequest() error {
bm, err := EncodeMsg(&bcproto.StatusRequest{})
wm, err := toWrappedMessage(&bcproto.StatusRequest{})
if err != nil {
bcR.Logger.Error("could not convert msg to proto", "err", err)
return fmt.Errorf("could not convert msg to proto: %w", err)
}
bcR.Switch.Broadcast(BlocksyncChannel, bm)
e := p2p.Envelope{
ChannelID: BlocksyncChannel,
Message: wm,
}
bcR.Switch.NewBroadcast(e)
return nil
}

View File

@@ -39,9 +39,6 @@ type Peer interface {
NewSend(Envelope) bool
NewTrySend(Envelope) bool
Send(byte, []byte) bool
TrySend(byte, []byte) bool
Set(string, interface{})
Get(string) interface{}

View File

@@ -8,7 +8,6 @@ import (
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/cmap"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p/conn"
@@ -261,6 +260,7 @@ func (sw *Switch) OnStop() {
// closed once msg bytes are sent to all peers (or time out).
//
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
/*
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", log.NewLazySprintf("%X", msgBytes))
@@ -284,6 +284,7 @@ func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
return successChan
}
*/
// NewBroadcast runs a go routine for each attempted send, which will block trying
// to send for defaultSendTimeoutSeconds. Returns a channel which receives

View File

@@ -14,6 +14,7 @@ import (
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -23,6 +24,8 @@ import (
"github.com/tendermint/tendermint/libs/log"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/proto/tendermint/p2p"
p2pproto "github.com/tendermint/tendermint/proto/tendermint/p2p"
)
var (
@@ -135,24 +138,59 @@ func TestSwitches(t *testing.T) {
}
// Lets send some messages
ch0Msg := []byte("channel zero")
ch1Msg := []byte("channel foo")
ch2Msg := []byte("channel bar")
s1.Broadcast(byte(0x00), ch0Msg)
s1.Broadcast(byte(0x01), ch1Msg)
s1.Broadcast(byte(0x02), ch2Msg)
ch0Msg := &p2pproto.Message{
Sum: &p2pproto.Message_PexAddrs{
PexAddrs: &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "0",
},
},
},
},
}
ch1Msg := &p2pproto.Message{
Sum: &p2pproto.Message_PexAddrs{
PexAddrs: &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
},
},
},
},
}
ch2Msg := &p2pproto.Message{
Sum: &p2pproto.Message_PexAddrs{
PexAddrs: &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "2",
},
},
},
},
}
s1.NewBroadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
s1.NewBroadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
s1.NewBroadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
msgBytes, err := proto.Marshal(ch0Msg)
require.NoError(t, err)
assertMsgReceivedWithTimeout(t,
ch0Msg,
msgBytes,
byte(0x00),
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
msgBytes, err = proto.Marshal(ch1Msg)
require.NoError(t, err)
assertMsgReceivedWithTimeout(t,
ch1Msg,
msgBytes,
byte(0x01),
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
msgBytes, err = proto.Marshal(ch2Msg)
require.NoError(t, err)
assertMsgReceivedWithTimeout(t,
ch2Msg,
msgBytes,
byte(0x02),
s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
}
@@ -827,7 +865,7 @@ func BenchmarkSwitchBroadcast(b *testing.B) {
// Send random message from foo channel to another
for i := 0; i < b.N; i++ {
chID := byte(i % 4)
successChan := s1.Broadcast(chID, []byte("test data"))
successChan := s1.NewBroadcast(Envelope{ChannelID: chID})
for s := range successChan {
if s {
numSuccess++

View File

@@ -277,7 +277,11 @@ func (r *Reactor) Sync(stateProvider StateProvider, discoveryTime time.Duration)
hook := func() {
r.Logger.Debug("Requesting snapshots from known peers")
// Request snapshots from all currently connected peers
r.Switch.Broadcast(SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{}))
e := p2p.Envelope{
ChannelID: SnapshotChannel,
Message: toWrappedProto(&ssproto.SnapshotsRequest{}),
}
r.Switch.NewBroadcast(e)
}
hook()