clean up tests

This commit is contained in:
Callum Waters
2021-08-09 15:38:40 +02:00
parent 39c54f5f34
commit f0ccd0fcd1
6 changed files with 105 additions and 32 deletions

View File

@@ -274,8 +274,10 @@ loop:
}
func mockLBResp(t *testing.T, peer types.NodeID, height int64, time time.Time) lightBlockResponse {
vals, pv := factory.RandValidatorSet(3, 10)
_, _, lb := mockLB(t, height, time, factory.MakeBlockID(), vals, pv)
return lightBlockResponse{
block: mockLB(t, height, time, factory.MakeBlockID()),
block: lb,
peer: peer,
}
}

View File

@@ -124,6 +124,15 @@ func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer types.No
return nil, err
}
defer func() {
d.mtx.Lock()
defer d.mtx.Unlock()
if call, ok := d.calls[peer]; ok {
close(call)
delete(d.calls, peer)
}
}()
// wait for a response, cancel or timeout
select {
case resp := <-callCh:
@@ -231,10 +240,6 @@ func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh
func (d *dispatcher) release(peer types.NodeID) {
d.mtx.Lock()
defer d.mtx.Unlock()
if call, ok := d.calls[peer]; ok {
close(call)
delete(d.calls, peer)
}
d.availablePeers.Append(peer)
}

View File

@@ -25,7 +25,7 @@ func TestDispatcherBasic(t *testing.T) {
defer close(closeCh)
d := newDispatcher(ch, 1*time.Second)
d.start()
go handleRequests(t, d, ch, closeCh)
peers := createPeerSet(5)
@@ -58,6 +58,7 @@ func TestDispatcherReturnsNoBlock(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
d.start()
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
doneCh := make(chan struct{})
@@ -80,6 +81,7 @@ func TestDispatcherErrorsWhenNoPeers(t *testing.T) {
t.Cleanup(leaktest.Check(t))
ch := make(chan p2p.Envelope, 100)
d := newDispatcher(ch, 1*time.Second)
d.start()
lb, peerResult, err := d.LightBlock(context.Background(), 1)
@@ -92,6 +94,8 @@ func TestDispatcherReturnsBlockOncePeerAvailable(t *testing.T) {
t.Cleanup(leaktest.Check(t))
dispatcherRequestCh := make(chan p2p.Envelope, 100)
d := newDispatcher(dispatcherRequestCh, 1*time.Second)
d.start()
peerFromSet := createPeerSet(1)[0]
d.addPeer(peerFromSet)
ctx := context.Background()
@@ -138,7 +142,7 @@ func TestDispatcherProviders(t *testing.T) {
defer close(closeCh)
d := newDispatcher(ch, 5*time.Second)
d.start()
go handleRequests(t, d, ch, closeCh)
peers := createPeerSet(5)
@@ -322,6 +326,7 @@ func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh c
block, _ := resp.block.ToProto()
require.NoError(t, d.respond(block, resp.peer))
case <-closeCh:
d.stop()
return
}
}

View File

@@ -190,6 +190,8 @@ func (r *Reactor) OnStart() error {
go r.processBlockCh()
go r.processParamsCh()
go r.processPeerUpdates()
r.dispatcher.start()
@@ -681,7 +683,7 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error {
func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error {
switch msg := envelope.Message.(type) {
case *ssproto.ParamsRequest:
r.Logger.Info("received light block request", "height", msg.Height)
r.Logger.Debug("received consensus params request", "height", msg.Height)
cp, err := r.stateStore.LoadConsensusParams(int64(msg.Height))
if err != nil {
r.Logger.Error("failed to fetch requested consensus params", "err", err, "height", msg.Height)
@@ -700,6 +702,7 @@ func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error {
case *ssproto.ParamsResponse:
r.mtx.RLock()
defer r.mtx.RUnlock()
r.Logger.Debug("received consensus params response", "height", msg.Height)
if r.syncer == nil {
r.Logger.Debug("received unexpected params response; no state sync in progress", "peer", envelope.From)
@@ -709,6 +712,7 @@ func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error {
cp := types.ConsensusParamsFromProto(msg.ConsensusParams)
if sp, ok := r.syncer.stateProvider.(*stateProviderP2P); ok {
r.Logger.Debug("passing along message")
select {
case sp.paramsRecvCh <- cp:
default:

View File

@@ -3,6 +3,7 @@ package statesync
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
@@ -92,6 +93,9 @@ func setup(
blockInCh: make(chan p2p.Envelope, chBuf),
blockOutCh: make(chan p2p.Envelope, chBuf),
blockPeerErrCh: make(chan p2p.PeerError, chBuf),
paramsInCh: make(chan p2p.Envelope, chBuf),
paramsOutCh: make(chan p2p.Envelope, chBuf),
paramsPeerErrCh: make(chan p2p.PeerError, chBuf),
conn: conn,
connQuery: connQuery,
stateProvider: stateProvider,
@@ -405,7 +409,7 @@ func TestReactor_Dispatcher(t *testing.T) {
dispatcher := rts.reactor.dispatcher
providers := dispatcher.Providers(factory.DefaultTestChainID)
require.Len(t, providers, 2)
require.Equal(t, 2, dispatcher.peerCount())
require.Equal(t, 0, dispatcher.peerCount())
wg := sync.WaitGroup{}
@@ -432,19 +436,35 @@ func TestReactor_Dispatcher(t *testing.T) {
t.Fail()
case <-ctx.Done():
}
require.Equal(t, 0, dispatcher.peerCount())
t.Log(dispatcher.availablePeers.Peers())
require.Equal(t, 2, dispatcher.peerCount())
}
func TestReactor_P2P_Provider(t *testing.T) {
rts := setup(t, nil, nil, nil, 2)
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID("cc"),
NodeID: types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength)),
Status: p2p.PeerStatusUp,
}
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength)),
Status: p2p.PeerStatusUp,
}
require.Equal(t, 3, dispatcher.peerCount())
// make syncer non nil else test won't think we are state syncing
rts.reactor.syncer = rts.syncer
closeCh := make(chan struct{})
defer close(closeCh)
chain := buildLightBlockChain(t, 1, 10, time.Now())
go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0)
go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh)
// we now test the p2p state provider
lb, _, err := dispatcher.LightBlock(ctx, 2)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
lb, _, err := rts.reactor.dispatcher.LightBlock(ctx, 2)
require.NoError(t, err)
to := light.TrustOptions{
Period: 24 * time.Hour,
@@ -452,22 +472,24 @@ func TestReactor_Dispatcher(t *testing.T) {
Hash: lb.Hash(),
}
// TODO: move into a separate test
p2pStateProvider, err := NewP2PStateProvider(ctx, "testchain", 1, rts.reactor.dispatcher, to, rts.paramsOutCh, log.TestingLogger())
p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, rts.reactor.dispatcher,
to, rts.reactor.paramsCh.Out, log.TestingLogger())
require.NoError(t, err)
// set the state provider else the test won't think we are state syncing
rts.reactor.syncer = rts.syncer
rts.syncer.stateProvider = p2pStateProvider
appHash, err := p2pStateProvider.AppHash(ctx, 5)
require.NoError(t, err)
require.Len(t, appHash, 20)
require.Len(t, appHash, 32)
state, err := p2pStateProvider.State(ctx, 6)
state, err := p2pStateProvider.State(ctx, 5)
require.NoError(t, err)
require.Equal(t, appHash, state.AppHash)
commit, err := p2pStateProvider.Commit(ctx, 5)
require.NoError(t, err)
require.Equal(t, commit.BlockID, state.LastBlockID)
}
func TestReactor_Backfill(t *testing.T) {
@@ -546,7 +568,6 @@ func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) {
if fn() {
return
}
require.NoError(t, ctx.Err())
}
}
@@ -575,7 +596,9 @@ func handleLightBlockRequests(t *testing.T,
} else {
switch errorCount % 3 {
case 0: // send a different block
differntLB, err := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID()).ToProto()
vals, pv := factory.RandValidatorSet(3, 10)
_, _, lb := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID(), vals, pv)
differntLB, err := lb.ToProto()
require.NoError(t, err)
sending <- p2p.Envelope{
From: envelope.To,
@@ -602,12 +625,36 @@ func handleLightBlockRequests(t *testing.T,
}
}
func handleConsensusParamsRequest(t *testing.T, receiving, sending chan p2p.Envelope, closeCh chan struct{}) {
params := types.DefaultConsensusParams()
paramsProto := params.ToProto()
for {
select {
case envelope := <-receiving:
t.Log("received consensus params request")
msg, ok := envelope.Message.(*ssproto.ParamsRequest)
require.True(t, ok)
sending <- p2p.Envelope{
From: envelope.To,
Message: &ssproto.ParamsResponse{
Height: msg.Height,
ConsensusParams: paramsProto,
},
}
case <-closeCh:
return
}
}
}
func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock {
chain := make(map[int64]*types.LightBlock, toHeight-fromHeight)
lastBlockID := factory.MakeBlockID()
blockTime := startTime.Add(-5 * time.Minute)
blockTime := startTime.Add(time.Duration(fromHeight-toHeight) * time.Minute)
vals, pv := factory.RandValidatorSet(3, 10)
for height := fromHeight; height < toHeight; height++ {
chain[height] = mockLB(t, height, blockTime, lastBlockID)
vals, pv, chain[height] = mockLB(t, height, blockTime, lastBlockID, vals, pv)
lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash())
blockTime = blockTime.Add(1 * time.Minute)
}
@@ -615,24 +662,26 @@ func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime ti
}
func mockLB(t *testing.T, height int64, time time.Time,
lastBlockID types.BlockID) *types.LightBlock {
lastBlockID types.BlockID, currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) {
header, err := factory.MakeHeader(&types.Header{
Height: height,
LastBlockID: lastBlockID,
Time: time,
})
require.NoError(t, err)
vals, pv := factory.RandValidatorSet(3, 10)
header.ValidatorsHash = vals.Hash()
nextVals, nextPrivVals := factory.RandValidatorSet(3, 10)
header.ValidatorsHash = currentVals.Hash()
header.NextValidatorsHash = nextVals.Hash()
header.ConsensusHash = types.DefaultConsensusParams().HashConsensusParams()
lastBlockID = factory.MakeBlockIDWithHash(header.Hash())
voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, vals)
commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, pv, time)
voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, currentVals)
commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, currentPrivVals, time)
require.NoError(t, err)
return &types.LightBlock{
return nextVals, nextPrivVals, &types.LightBlock{
SignedHeader: &types.SignedHeader{
Header: header,
Commit: commit,
},
ValidatorSet: vals,
ValidatorSet: currentVals,
}
}

View File

@@ -335,8 +335,8 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State,
}
func (s *stateProviderP2P) addPeer(peer types.NodeID) {
provider := s.dispatcher.CreateProvider(peer, s.lc.ChainID())
if len(s.lc.Witnesses()) < 6 {
provider := s.dispatcher.CreateProvider(peer, s.lc.ChainID())
s.lc.AddProvider(provider)
}
}
@@ -352,16 +352,24 @@ func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (t
if err != nil {
return types.ConsensusParams{}, fmt.Errorf("invalid provider node id: %w", err)
}
s.paramsSendCh <- p2p.Envelope{
select {
case s.paramsSendCh <- p2p.Envelope{
To: peer,
Message: &ssproto.ParamsRequest{
Height: uint64(height),
},
}:
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
}
select {
// if we get no response from this provider we move on to the next one
case <-time.After(consensusParamsResponseTimeout):
continue
case <-ctx.Done():
return types.ConsensusParams{}, ctx.Err()
case params := <-s.paramsRecvCh:
return params, nil
}