From c529a902ebf5db135cc174b7fa88a162399cb9ce Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 19 Oct 2022 14:57:37 -0400 Subject: [PATCH] sync any passes --- statesync/reactor_test.go | 14 +++++++++++--- statesync/syncer_test.go | 28 +++++++++++++++++++++------- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/statesync/reactor_test.go b/statesync/reactor_test.go index fa970d66a..c175ee9e4 100644 --- a/statesync/reactor_test.go +++ b/statesync/reactor_test.go @@ -140,10 +140,18 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) { peer := &p2pmocks.Peer{} if len(tc.expectResponses) > 0 { peer.On("ID").Return(p2p.ID("id")) - peer.On("Send", SnapshotChannel, mock.Anything).Run(func(args mock.Arguments) { - msg, err := decodeMsg(args[1].([]byte)) + peer.On("Send", mock.MatchedBy(func(i interface{}) bool { + e, ok := i.(p2p.Envelope) + return ok && e.ChannelID == SnapshotChannel + })).Run(func(args mock.Arguments) { + e := args[0].(p2p.Envelope) + + // Marshal to simulate a wire roundtrip. + bz, err := proto.Marshal(e.Message) require.NoError(t, err) - responses = append(responses, msg.(*ssproto.SnapshotsResponse)) + err = proto.Unmarshal(bz, e.Message) + require.NoError(t, err) + responses = append(responses, e.Message.(*ssproto.Message).GetSnapshotsResponse()) }).Return(true) } diff --git a/statesync/syncer_test.go b/statesync/syncer_test.go index 4dabe7288..2e2902d92 100644 --- a/statesync/syncer_test.go +++ b/statesync/syncer_test.go @@ -98,13 +98,21 @@ func TestSyncer_SyncAny(t *testing.T) { // Adding a couple of peers should trigger snapshot discovery messages peerA := &p2pmocks.Peer{} peerA.On("ID").Return(p2p.ID("a")) - peerA.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true) + peerA.On("Send", mock.MatchedBy(func(i interface{}) bool { + e, ok := i.(p2p.Envelope) + req := e.Message.(*ssproto.Message).GetSnapshotsRequest() + return ok && e.ChannelID == SnapshotChannel && req != nil + })).Return(true) syncer.AddPeer(peerA) peerA.AssertExpectations(t) peerB := &p2pmocks.Peer{} peerB.On("ID").Return(p2p.ID("b")) - peerB.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true) + peerB.On("Send", mock.MatchedBy(func(i interface{}) bool { + e, ok := i.(p2p.Envelope) + req := e.Message.(*ssproto.Message).GetSnapshotsRequest() + return ok && e.ChannelID == SnapshotChannel && req != nil + })).Return(true) syncer.AddPeer(peerB) peerB.AssertExpectations(t) @@ -147,9 +155,9 @@ func TestSyncer_SyncAny(t *testing.T) { chunkRequests := make(map[uint32]int) chunkRequestsMtx := tmsync.Mutex{} onChunkRequest := func(args mock.Arguments) { - pb, err := decodeMsg(args[1].([]byte)) - require.NoError(t, err) - msg := pb.(*ssproto.ChunkRequest) + e, ok := args[0].(p2p.Envelope) + require.True(t, ok) + msg := e.Message.(*ssproto.Message).GetChunkRequest() require.EqualValues(t, 1, msg.Height) require.EqualValues(t, 1, msg.Format) require.LessOrEqual(t, msg.Index, uint32(len(chunks))) @@ -162,8 +170,14 @@ func TestSyncer_SyncAny(t *testing.T) { chunkRequests[msg.Index]++ chunkRequestsMtx.Unlock() } - peerA.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true) - peerB.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true) + peerA.On("Send", mock.MatchedBy(func(i interface{}) bool { + e, ok := i.(p2p.Envelope) + return ok && e.ChannelID == ChunkChannel + })).Maybe().Run(onChunkRequest).Return(true) + peerB.On("Send", mock.MatchedBy(func(i interface{}) bool { + e, ok := i.(p2p.Envelope) + return ok && e.ChannelID == ChunkChannel + })).Maybe().Run(onChunkRequest).Return(true) // The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1, // which should cause it to keep the existing chunk 0 and 2, and restart restoration from