update switch test to use new receive

This commit is contained in:
William Banfield
2022-10-21 14:04:36 -04:00
parent 27a64acc9a
commit 1ab868180c

View File

@@ -39,9 +39,8 @@ func init() {
}
type PeerMessage struct {
PeerID ID
Bytes []byte
Counter int
Contents proto.Message
Counter int
}
type TestReactor struct {
@@ -73,12 +72,12 @@ func (tr *TestReactor) AddPeer(peer Peer) {}
func (tr *TestReactor) RemovePeer(peer Peer, reason interface{}) {}
func (tr *TestReactor) Receive(chID byte, peer Peer, msgBytes []byte) {
func (tr *TestReactor) Receive(e Envelope) {
if tr.logMessages {
tr.mtx.Lock()
defer tr.mtx.Unlock()
// fmt.Printf("Received: %X, %X\n", chID, msgBytes)
tr.msgsReceived[chID] = append(tr.msgsReceived[chID], PeerMessage{peer.ID(), msgBytes, tr.msgsCounter})
fmt.Printf("Received: %X, %X\n", e.ChannelID, e.Message)
tr.msgsReceived[e.ChannelID] = append(tr.msgsReceived[e.ChannelID], PeerMessage{Contents: e.Message, Counter: tr.msgsCounter})
tr.msgsCounter++
}
}
@@ -106,12 +105,12 @@ func initSwitchFunc(i int, sw *Switch) *Switch {
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
{ID: byte(0x00), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x01), Priority: 10, MessageType: &p2pproto.Message{}},
}, true))
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
{ID: byte(0x02), Priority: 10, MessageType: &p2pproto.Message{}},
{ID: byte(0x03), Priority: 10, MessageType: &p2pproto.Message{}},
}, true))
return sw
@@ -138,66 +137,47 @@ func TestSwitches(t *testing.T) {
}
// Lets send some messages
ch0Msg := &p2pproto.Message{
Sum: &p2pproto.Message_PexAddrs{
PexAddrs: &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "0",
},
},
ch0Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
},
},
}
ch1Msg := &p2pproto.Message{
Sum: &p2pproto.Message_PexAddrs{
PexAddrs: &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
},
},
ch1Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "1",
},
},
}
ch2Msg := &p2pproto.Message{
Sum: &p2pproto.Message_PexAddrs{
PexAddrs: &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "2",
},
},
ch2Msg := &p2pproto.PexAddrs{
Addrs: []p2p.NetAddress{
{
ID: "2",
},
},
}
s1.Broadcast(Envelope{ChannelID: byte(0x00), Message: ch0Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x01), Message: ch1Msg})
s1.Broadcast(Envelope{ChannelID: byte(0x02), Message: ch2Msg})
msgBytes, err := proto.Marshal(ch0Msg)
require.NoError(t, err)
assertMsgReceivedWithTimeout(t,
msgBytes,
ch0Msg,
byte(0x00),
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
msgBytes, err = proto.Marshal(ch1Msg)
require.NoError(t, err)
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t,
msgBytes,
ch1Msg,
byte(0x01),
s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
msgBytes, err = proto.Marshal(ch2Msg)
require.NoError(t, err)
s2.Reactor("foo").(*TestReactor), 200*time.Millisecond, 5*time.Second)
assertMsgReceivedWithTimeout(t,
msgBytes,
ch2Msg,
byte(0x02),
s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
s2.Reactor("bar").(*TestReactor), 200*time.Millisecond, 5*time.Second)
}
func assertMsgReceivedWithTimeout(
t *testing.T,
msgBytes []byte,
msg proto.Message,
channel byte,
reactor *TestReactor,
checkPeriod,
@@ -208,9 +188,13 @@ func assertMsgReceivedWithTimeout(
select {
case <-ticker.C:
msgs := reactor.getMsgs(channel)
expectedBytes, err := proto.Marshal(msgs[0].Contents)
require.NoError(t, err)
gotBytes, err := proto.Marshal(msg)
require.NoError(t, err)
if len(msgs) > 0 {
if !bytes.Equal(msgs[0].Bytes, msgBytes) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msgBytes, msgs[0].Bytes)
if !bytes.Equal(expectedBytes, gotBytes) {
t.Fatalf("Unexpected message bytes. Wanted: %X, Got: %X", msg, msgs[0].Counter)
}
return
}