sync any passes

This commit is contained in:
William Banfield
2022-10-19 14:57:37 -04:00
parent adbcd0c450
commit c529a902eb
2 changed files with 32 additions and 10 deletions

View File

@@ -140,10 +140,18 @@ func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
peer := &p2pmocks.Peer{}
if len(tc.expectResponses) > 0 {
peer.On("ID").Return(p2p.ID("id"))
peer.On("Send", SnapshotChannel, mock.Anything).Run(func(args mock.Arguments) {
msg, err := decodeMsg(args[1].([]byte))
peer.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == SnapshotChannel
})).Run(func(args mock.Arguments) {
e := args[0].(p2p.Envelope)
// Marshal to simulate a wire roundtrip.
bz, err := proto.Marshal(e.Message)
require.NoError(t, err)
responses = append(responses, msg.(*ssproto.SnapshotsResponse))
err = proto.Unmarshal(bz, e.Message)
require.NoError(t, err)
responses = append(responses, e.Message.(*ssproto.Message).GetSnapshotsResponse())
}).Return(true)
}

View File

@@ -98,13 +98,21 @@ func TestSyncer_SyncAny(t *testing.T) {
// Adding a couple of peers should trigger snapshot discovery messages
peerA := &p2pmocks.Peer{}
peerA.On("ID").Return(p2p.ID("a"))
peerA.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
req := e.Message.(*ssproto.Message).GetSnapshotsRequest()
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
syncer.AddPeer(peerA)
peerA.AssertExpectations(t)
peerB := &p2pmocks.Peer{}
peerB.On("ID").Return(p2p.ID("b"))
peerB.On("Send", SnapshotChannel, mustEncodeMsg(&ssproto.SnapshotsRequest{})).Return(true)
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
req := e.Message.(*ssproto.Message).GetSnapshotsRequest()
return ok && e.ChannelID == SnapshotChannel && req != nil
})).Return(true)
syncer.AddPeer(peerB)
peerB.AssertExpectations(t)
@@ -147,9 +155,9 @@ func TestSyncer_SyncAny(t *testing.T) {
chunkRequests := make(map[uint32]int)
chunkRequestsMtx := tmsync.Mutex{}
onChunkRequest := func(args mock.Arguments) {
pb, err := decodeMsg(args[1].([]byte))
require.NoError(t, err)
msg := pb.(*ssproto.ChunkRequest)
e, ok := args[0].(p2p.Envelope)
require.True(t, ok)
msg := e.Message.(*ssproto.Message).GetChunkRequest()
require.EqualValues(t, 1, msg.Height)
require.EqualValues(t, 1, msg.Format)
require.LessOrEqual(t, msg.Index, uint32(len(chunks)))
@@ -162,8 +170,14 @@ func TestSyncer_SyncAny(t *testing.T) {
chunkRequests[msg.Index]++
chunkRequestsMtx.Unlock()
}
peerA.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
peerB.On("Send", ChunkChannel, mock.Anything).Maybe().Run(onChunkRequest).Return(true)
peerA.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == ChunkChannel
})).Maybe().Run(onChunkRequest).Return(true)
peerB.On("Send", mock.MatchedBy(func(i interface{}) bool {
e, ok := i.(p2p.Envelope)
return ok && e.ChannelID == ChunkChannel
})).Maybe().Run(onChunkRequest).Return(true)
// The first time we're applying chunk 2 we tell it to retry the snapshot and discard chunk 1,
// which should cause it to keep the existing chunk 0 and 2, and restart restoration from