mirror of
https://github.com/tendermint/tendermint.git
synced 2026-01-06 21:36:26 +00:00
* p2p: add a per-message type send and receive metric (#9622) * p2p: ressurrect the p2p envelope and use to calculate message metric Add new SendEnvelope, TrySendEnvelope, BroadcastEnvelope, and ReceiveEnvelope methods in the p2p package to work with the new envelope type. Care was taken to ensure this was performed in a non-breaking manner. Co-authored-by: William Banfield <4561443+williambanfield@users.noreply.github.com> Co-authored-by: William Banfield <wbanfield@gmail.com>
186 lines
6.5 KiB
Go
186 lines
6.5 KiB
Go
package statesync
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/mock"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
"github.com/tendermint/tendermint/config"
|
|
"github.com/tendermint/tendermint/p2p"
|
|
p2pmocks "github.com/tendermint/tendermint/p2p/mocks"
|
|
ssproto "github.com/tendermint/tendermint/proto/tendermint/statesync"
|
|
proxymocks "github.com/tendermint/tendermint/proxy/mocks"
|
|
)
|
|
|
|
func TestReactor_Receive_ChunkRequest(t *testing.T) {
|
|
testcases := map[string]struct {
|
|
request *ssproto.ChunkRequest
|
|
chunk []byte
|
|
expectResponse *ssproto.ChunkResponse
|
|
}{
|
|
"chunk is returned": {
|
|
&ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
|
|
[]byte{1, 2, 3},
|
|
&ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: []byte{1, 2, 3}}},
|
|
"empty chunk is returned, as nil": {
|
|
&ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
|
|
[]byte{},
|
|
&ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Chunk: nil}},
|
|
"nil (missing) chunk is returned as missing": {
|
|
&ssproto.ChunkRequest{Height: 1, Format: 1, Index: 1},
|
|
nil,
|
|
&ssproto.ChunkResponse{Height: 1, Format: 1, Index: 1, Missing: true},
|
|
},
|
|
}
|
|
|
|
for name, tc := range testcases {
|
|
tc := tc
|
|
t.Run(name, func(t *testing.T) {
|
|
// Mock ABCI connection to return local snapshots
|
|
conn := &proxymocks.AppConnSnapshot{}
|
|
conn.On("LoadSnapshotChunkSync", abci.RequestLoadSnapshotChunk{
|
|
Height: tc.request.Height,
|
|
Format: tc.request.Format,
|
|
Chunk: tc.request.Index,
|
|
}).Return(&abci.ResponseLoadSnapshotChunk{Chunk: tc.chunk}, nil)
|
|
|
|
// Mock peer to store response, if found
|
|
peer := &p2pmocks.Peer{}
|
|
peer.On("ID").Return(p2p.ID("id"))
|
|
var response *ssproto.ChunkResponse
|
|
if tc.expectResponse != nil {
|
|
peer.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool {
|
|
e, ok := i.(p2p.Envelope)
|
|
return ok && e.ChannelID == ChunkChannel
|
|
})).Run(func(args mock.Arguments) {
|
|
e := args[0].(p2p.Envelope)
|
|
|
|
// Marshal to simulate a wire roundtrip.
|
|
bz, err := proto.Marshal(e.Message)
|
|
require.NoError(t, err)
|
|
err = proto.Unmarshal(bz, e.Message)
|
|
require.NoError(t, err)
|
|
response = e.Message.(*ssproto.ChunkResponse)
|
|
}).Return(true)
|
|
}
|
|
|
|
// Start a reactor and send a ssproto.ChunkRequest, then wait for and check response
|
|
cfg := config.DefaultStateSyncConfig()
|
|
r := NewReactor(*cfg, conn, nil, "")
|
|
err := r.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := r.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
r.ReceiveEnvelope(p2p.Envelope{
|
|
ChannelID: ChunkChannel,
|
|
Src: peer,
|
|
Message: tc.request,
|
|
})
|
|
time.Sleep(100 * time.Millisecond)
|
|
assert.Equal(t, tc.expectResponse, response)
|
|
|
|
conn.AssertExpectations(t)
|
|
peer.AssertExpectations(t)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestReactor_Receive_SnapshotsRequest(t *testing.T) {
|
|
testcases := map[string]struct {
|
|
snapshots []*abci.Snapshot
|
|
expectResponses []*ssproto.SnapshotsResponse
|
|
}{
|
|
"no snapshots": {nil, []*ssproto.SnapshotsResponse{}},
|
|
">10 unordered snapshots": {
|
|
[]*abci.Snapshot{
|
|
{Height: 1, Format: 2, Chunks: 7, Hash: []byte{1, 2}, Metadata: []byte{1}},
|
|
{Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
|
|
{Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
|
|
{Height: 1, Format: 1, Chunks: 7, Hash: []byte{1, 1}, Metadata: []byte{4}},
|
|
{Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
|
|
{Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
|
|
{Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
|
|
{Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
|
|
{Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
|
|
{Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
|
|
{Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
|
|
{Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
|
|
},
|
|
[]*ssproto.SnapshotsResponse{
|
|
{Height: 3, Format: 4, Chunks: 7, Hash: []byte{3, 4}, Metadata: []byte{9}},
|
|
{Height: 3, Format: 3, Chunks: 7, Hash: []byte{3, 3}, Metadata: []byte{12}},
|
|
{Height: 3, Format: 2, Chunks: 7, Hash: []byte{3, 2}, Metadata: []byte{3}},
|
|
{Height: 3, Format: 1, Chunks: 7, Hash: []byte{3, 1}, Metadata: []byte{6}},
|
|
{Height: 2, Format: 4, Chunks: 7, Hash: []byte{2, 4}, Metadata: []byte{8}},
|
|
{Height: 2, Format: 3, Chunks: 7, Hash: []byte{2, 3}, Metadata: []byte{11}},
|
|
{Height: 2, Format: 2, Chunks: 7, Hash: []byte{2, 2}, Metadata: []byte{2}},
|
|
{Height: 2, Format: 1, Chunks: 7, Hash: []byte{2, 1}, Metadata: []byte{5}},
|
|
{Height: 1, Format: 4, Chunks: 7, Hash: []byte{1, 4}, Metadata: []byte{7}},
|
|
{Height: 1, Format: 3, Chunks: 7, Hash: []byte{1, 3}, Metadata: []byte{10}},
|
|
},
|
|
},
|
|
}
|
|
|
|
for name, tc := range testcases {
|
|
tc := tc
|
|
t.Run(name, func(t *testing.T) {
|
|
// Mock ABCI connection to return local snapshots
|
|
conn := &proxymocks.AppConnSnapshot{}
|
|
conn.On("ListSnapshotsSync", abci.RequestListSnapshots{}).Return(&abci.ResponseListSnapshots{
|
|
Snapshots: tc.snapshots,
|
|
}, nil)
|
|
|
|
// Mock peer to catch responses and store them in a slice
|
|
responses := []*ssproto.SnapshotsResponse{}
|
|
peer := &p2pmocks.Peer{}
|
|
if len(tc.expectResponses) > 0 {
|
|
peer.On("ID").Return(p2p.ID("id"))
|
|
peer.On("SendEnvelope", mock.MatchedBy(func(i interface{}) bool {
|
|
e, ok := i.(p2p.Envelope)
|
|
return ok && e.ChannelID == SnapshotChannel
|
|
})).Run(func(args mock.Arguments) {
|
|
e := args[0].(p2p.Envelope)
|
|
|
|
// Marshal to simulate a wire roundtrip.
|
|
bz, err := proto.Marshal(e.Message)
|
|
require.NoError(t, err)
|
|
err = proto.Unmarshal(bz, e.Message)
|
|
require.NoError(t, err)
|
|
responses = append(responses, e.Message.(*ssproto.SnapshotsResponse))
|
|
}).Return(true)
|
|
}
|
|
|
|
// Start a reactor and send a SnapshotsRequestMessage, then wait for and check responses
|
|
cfg := config.DefaultStateSyncConfig()
|
|
r := NewReactor(*cfg, conn, nil, "")
|
|
err := r.Start()
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
if err := r.Stop(); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
|
|
r.ReceiveEnvelope(p2p.Envelope{
|
|
ChannelID: SnapshotChannel,
|
|
Src: peer,
|
|
Message: &ssproto.SnapshotsRequest{},
|
|
})
|
|
time.Sleep(100 * time.Millisecond)
|
|
assert.Equal(t, tc.expectResponses, responses)
|
|
|
|
conn.AssertExpectations(t)
|
|
peer.AssertExpectations(t)
|
|
})
|
|
}
|
|
}
|