diff --git a/internal/statesync/block_queue_test.go b/internal/statesync/block_queue_test.go index 3a4c71e4e..dc5e2bc82 100644 --- a/internal/statesync/block_queue_test.go +++ b/internal/statesync/block_queue_test.go @@ -274,8 +274,10 @@ loop: } func mockLBResp(t *testing.T, peer types.NodeID, height int64, time time.Time) lightBlockResponse { + vals, pv := factory.RandValidatorSet(3, 10) + _, _, lb := mockLB(t, height, time, factory.MakeBlockID(), vals, pv) return lightBlockResponse{ - block: mockLB(t, height, time, factory.MakeBlockID()), + block: lb, peer: peer, } } diff --git a/internal/statesync/dispatcher.go b/internal/statesync/dispatcher.go index f8197118a..74752a505 100644 --- a/internal/statesync/dispatcher.go +++ b/internal/statesync/dispatcher.go @@ -124,6 +124,15 @@ func (d *dispatcher) lightBlock(ctx context.Context, height int64, peer types.No return nil, err } + defer func() { + d.mtx.Lock() + defer d.mtx.Unlock() + if call, ok := d.calls[peer]; ok { + close(call) + delete(d.calls, peer) + } + }() + // wait for a response, cancel or timeout select { case resp := <-callCh: @@ -231,10 +240,6 @@ func (d *dispatcher) dispatch(peer types.NodeID, height int64) (chan *types.Ligh func (d *dispatcher) release(peer types.NodeID) { d.mtx.Lock() defer d.mtx.Unlock() - if call, ok := d.calls[peer]; ok { - close(call) - delete(d.calls, peer) - } d.availablePeers.Append(peer) } diff --git a/internal/statesync/dispatcher_test.go b/internal/statesync/dispatcher_test.go index 66afcd4c0..8da29018e 100644 --- a/internal/statesync/dispatcher_test.go +++ b/internal/statesync/dispatcher_test.go @@ -25,7 +25,7 @@ func TestDispatcherBasic(t *testing.T) { defer close(closeCh) d := newDispatcher(ch, 1*time.Second) - + d.start() go handleRequests(t, d, ch, closeCh) peers := createPeerSet(5) @@ -58,6 +58,7 @@ func TestDispatcherReturnsNoBlock(t *testing.T) { t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) d := newDispatcher(ch, 1*time.Second) + d.start() peerFromSet := createPeerSet(1)[0] d.addPeer(peerFromSet) doneCh := make(chan struct{}) @@ -80,6 +81,7 @@ func TestDispatcherErrorsWhenNoPeers(t *testing.T) { t.Cleanup(leaktest.Check(t)) ch := make(chan p2p.Envelope, 100) d := newDispatcher(ch, 1*time.Second) + d.start() lb, peerResult, err := d.LightBlock(context.Background(), 1) @@ -92,6 +94,8 @@ func TestDispatcherReturnsBlockOncePeerAvailable(t *testing.T) { t.Cleanup(leaktest.Check(t)) dispatcherRequestCh := make(chan p2p.Envelope, 100) d := newDispatcher(dispatcherRequestCh, 1*time.Second) + d.start() + peerFromSet := createPeerSet(1)[0] d.addPeer(peerFromSet) ctx := context.Background() @@ -138,7 +142,7 @@ func TestDispatcherProviders(t *testing.T) { defer close(closeCh) d := newDispatcher(ch, 5*time.Second) - + d.start() go handleRequests(t, d, ch, closeCh) peers := createPeerSet(5) @@ -322,6 +326,7 @@ func handleRequests(t *testing.T, d *dispatcher, ch chan p2p.Envelope, closeCh c block, _ := resp.block.ToProto() require.NoError(t, d.respond(block, resp.peer)) case <-closeCh: + d.stop() return } } diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index ea935f3b3..906a7a17f 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -190,6 +190,8 @@ func (r *Reactor) OnStart() error { go r.processBlockCh() + go r.processParamsCh() + go r.processPeerUpdates() r.dispatcher.start() @@ -681,7 +683,7 @@ func (r *Reactor) handleLightBlockMessage(envelope p2p.Envelope) error { func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error { switch msg := envelope.Message.(type) { case *ssproto.ParamsRequest: - r.Logger.Info("received light block request", "height", msg.Height) + r.Logger.Debug("received consensus params request", "height", msg.Height) cp, err := r.stateStore.LoadConsensusParams(int64(msg.Height)) if err != nil { r.Logger.Error("failed to fetch requested consensus params", "err", err, "height", msg.Height) @@ -700,6 +702,7 @@ func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error { case *ssproto.ParamsResponse: r.mtx.RLock() defer r.mtx.RUnlock() + r.Logger.Debug("received consensus params response", "height", msg.Height) if r.syncer == nil { r.Logger.Debug("received unexpected params response; no state sync in progress", "peer", envelope.From) @@ -709,6 +712,7 @@ func (r *Reactor) handleParamsMessage(envelope p2p.Envelope) error { cp := types.ConsensusParamsFromProto(msg.ConsensusParams) if sp, ok := r.syncer.stateProvider.(*stateProviderP2P); ok { + r.Logger.Debug("passing along message") select { case sp.paramsRecvCh <- cp: default: diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index cdc7b5491..36482872d 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -3,6 +3,7 @@ package statesync import ( "context" "fmt" + "strings" "sync" "testing" "time" @@ -92,6 +93,9 @@ func setup( blockInCh: make(chan p2p.Envelope, chBuf), blockOutCh: make(chan p2p.Envelope, chBuf), blockPeerErrCh: make(chan p2p.PeerError, chBuf), + paramsInCh: make(chan p2p.Envelope, chBuf), + paramsOutCh: make(chan p2p.Envelope, chBuf), + paramsPeerErrCh: make(chan p2p.PeerError, chBuf), conn: conn, connQuery: connQuery, stateProvider: stateProvider, @@ -405,7 +409,7 @@ func TestReactor_Dispatcher(t *testing.T) { dispatcher := rts.reactor.dispatcher providers := dispatcher.Providers(factory.DefaultTestChainID) require.Len(t, providers, 2) - require.Equal(t, 2, dispatcher.peerCount()) + require.Equal(t, 0, dispatcher.peerCount()) wg := sync.WaitGroup{} @@ -432,19 +436,35 @@ func TestReactor_Dispatcher(t *testing.T) { t.Fail() case <-ctx.Done(): } + require.Equal(t, 0, dispatcher.peerCount()) - t.Log(dispatcher.availablePeers.Peers()) - require.Equal(t, 2, dispatcher.peerCount()) +} +func TestReactor_P2P_Provider(t *testing.T) { + rts := setup(t, nil, nil, nil, 2) rts.peerUpdateCh <- p2p.PeerUpdate{ - NodeID: types.NodeID("cc"), + NodeID: types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength)), + Status: p2p.PeerStatusUp, + } + rts.peerUpdateCh <- p2p.PeerUpdate{ + NodeID: types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength)), Status: p2p.PeerStatusUp, } - require.Equal(t, 3, dispatcher.peerCount()) + // make syncer non nil else test won't think we are state syncing + rts.reactor.syncer = rts.syncer + + closeCh := make(chan struct{}) + defer close(closeCh) + + chain := buildLightBlockChain(t, 1, 10, time.Now()) + go handleLightBlockRequests(t, chain, rts.blockOutCh, rts.blockInCh, closeCh, 0) + go handleConsensusParamsRequest(t, rts.paramsOutCh, rts.paramsInCh, closeCh) // we now test the p2p state provider - lb, _, err := dispatcher.LightBlock(ctx, 2) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + lb, _, err := rts.reactor.dispatcher.LightBlock(ctx, 2) require.NoError(t, err) to := light.TrustOptions{ Period: 24 * time.Hour, @@ -452,22 +472,24 @@ func TestReactor_Dispatcher(t *testing.T) { Hash: lb.Hash(), } - // TODO: move into a separate test - p2pStateProvider, err := NewP2PStateProvider(ctx, "testchain", 1, rts.reactor.dispatcher, to, rts.paramsOutCh, log.TestingLogger()) + p2pStateProvider, err := NewP2PStateProvider(ctx, "test-chain", 1, rts.reactor.dispatcher, + to, rts.reactor.paramsCh.Out, log.TestingLogger()) require.NoError(t, err) + // set the state provider else the test won't think we are state syncing + rts.reactor.syncer = rts.syncer + rts.syncer.stateProvider = p2pStateProvider appHash, err := p2pStateProvider.AppHash(ctx, 5) require.NoError(t, err) - require.Len(t, appHash, 20) + require.Len(t, appHash, 32) - state, err := p2pStateProvider.State(ctx, 6) + state, err := p2pStateProvider.State(ctx, 5) require.NoError(t, err) require.Equal(t, appHash, state.AppHash) commit, err := p2pStateProvider.Commit(ctx, 5) require.NoError(t, err) require.Equal(t, commit.BlockID, state.LastBlockID) - } func TestReactor_Backfill(t *testing.T) { @@ -546,7 +568,6 @@ func retryUntil(t *testing.T, fn func() bool, timeout time.Duration) { if fn() { return } - require.NoError(t, ctx.Err()) } } @@ -575,7 +596,9 @@ func handleLightBlockRequests(t *testing.T, } else { switch errorCount % 3 { case 0: // send a different block - differntLB, err := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID()).ToProto() + vals, pv := factory.RandValidatorSet(3, 10) + _, _, lb := mockLB(t, int64(msg.Height), factory.DefaultTestTime, factory.MakeBlockID(), vals, pv) + differntLB, err := lb.ToProto() require.NoError(t, err) sending <- p2p.Envelope{ From: envelope.To, @@ -602,12 +625,36 @@ func handleLightBlockRequests(t *testing.T, } } +func handleConsensusParamsRequest(t *testing.T, receiving, sending chan p2p.Envelope, closeCh chan struct{}) { + params := types.DefaultConsensusParams() + paramsProto := params.ToProto() + for { + select { + case envelope := <-receiving: + t.Log("received consensus params request") + msg, ok := envelope.Message.(*ssproto.ParamsRequest) + require.True(t, ok) + sending <- p2p.Envelope{ + From: envelope.To, + Message: &ssproto.ParamsResponse{ + Height: msg.Height, + ConsensusParams: paramsProto, + }, + } + + case <-closeCh: + return + } + } +} + func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime time.Time) map[int64]*types.LightBlock { chain := make(map[int64]*types.LightBlock, toHeight-fromHeight) lastBlockID := factory.MakeBlockID() - blockTime := startTime.Add(-5 * time.Minute) + blockTime := startTime.Add(time.Duration(fromHeight-toHeight) * time.Minute) + vals, pv := factory.RandValidatorSet(3, 10) for height := fromHeight; height < toHeight; height++ { - chain[height] = mockLB(t, height, blockTime, lastBlockID) + vals, pv, chain[height] = mockLB(t, height, blockTime, lastBlockID, vals, pv) lastBlockID = factory.MakeBlockIDWithHash(chain[height].Header.Hash()) blockTime = blockTime.Add(1 * time.Minute) } @@ -615,24 +662,26 @@ func buildLightBlockChain(t *testing.T, fromHeight, toHeight int64, startTime ti } func mockLB(t *testing.T, height int64, time time.Time, - lastBlockID types.BlockID) *types.LightBlock { + lastBlockID types.BlockID, currentVals *types.ValidatorSet, currentPrivVals []types.PrivValidator) (*types.ValidatorSet, []types.PrivValidator, *types.LightBlock) { header, err := factory.MakeHeader(&types.Header{ Height: height, LastBlockID: lastBlockID, Time: time, }) require.NoError(t, err) - vals, pv := factory.RandValidatorSet(3, 10) - header.ValidatorsHash = vals.Hash() + nextVals, nextPrivVals := factory.RandValidatorSet(3, 10) + header.ValidatorsHash = currentVals.Hash() + header.NextValidatorsHash = nextVals.Hash() + header.ConsensusHash = types.DefaultConsensusParams().HashConsensusParams() lastBlockID = factory.MakeBlockIDWithHash(header.Hash()) - voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, vals) - commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, pv, time) + voteSet := types.NewVoteSet(factory.DefaultTestChainID, height, 0, tmproto.PrecommitType, currentVals) + commit, err := factory.MakeCommit(lastBlockID, height, 0, voteSet, currentPrivVals, time) require.NoError(t, err) - return &types.LightBlock{ + return nextVals, nextPrivVals, &types.LightBlock{ SignedHeader: &types.SignedHeader{ Header: header, Commit: commit, }, - ValidatorSet: vals, + ValidatorSet: currentVals, } } diff --git a/internal/statesync/stateprovider.go b/internal/statesync/stateprovider.go index b6a8d8b4c..b361db4b4 100644 --- a/internal/statesync/stateprovider.go +++ b/internal/statesync/stateprovider.go @@ -335,8 +335,8 @@ func (s *stateProviderP2P) State(ctx context.Context, height uint64) (sm.State, } func (s *stateProviderP2P) addPeer(peer types.NodeID) { - provider := s.dispatcher.CreateProvider(peer, s.lc.ChainID()) if len(s.lc.Witnesses()) < 6 { + provider := s.dispatcher.CreateProvider(peer, s.lc.ChainID()) s.lc.AddProvider(provider) } } @@ -352,16 +352,24 @@ func (s *stateProviderP2P) consensusParams(ctx context.Context, height int64) (t if err != nil { return types.ConsensusParams{}, fmt.Errorf("invalid provider node id: %w", err) } - s.paramsSendCh <- p2p.Envelope{ + + select { + case s.paramsSendCh <- p2p.Envelope{ To: peer, Message: &ssproto.ParamsRequest{ Height: uint64(height), }, + }: + case <-ctx.Done(): + return types.ConsensusParams{}, ctx.Err() } + select { // if we get no response from this provider we move on to the next one case <-time.After(consensusParamsResponseTimeout): continue + case <-ctx.Done(): + return types.ConsensusParams{}, ctx.Err() case params := <-s.paramsRecvCh: return params, nil }