Compare commits

...

18 Commits

Author SHA1 Message Date
William Banfield
8b326810f5 Merge remote-tracking branch 'origin/wb/statesync-init-deadlock' into wb/statesync-init-deadlock 2021-09-30 15:06:43 -04:00
William Banfield
3185eff8c6 add comment to cleanup case 2021-09-30 15:05:42 -04:00
mergify[bot]
51193fdb75 Merge branch 'master' into wb/statesync-init-deadlock 2021-09-30 18:29:20 +00:00
William Banfield
8a0cd32ecf lint++ 2021-09-30 13:57:59 -04:00
William Banfield
6ccc2aef9e lint++ 2021-09-30 13:53:08 -04:00
William Banfield
530814991f Update internal/statesync/syncer.go
Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
2021-09-30 13:46:16 -04:00
William Banfield
93719f4494 minor 2021-09-30 13:43:20 -04:00
William Banfield
28eacd75c2 revert close ch order change 2021-09-30 13:42:54 -04:00
William Banfield
e28bf9b659 Merge remote-tracking branch 'origin/wb/statesync-init-deadlock' into wb/statesync-init-deadlock 2021-09-30 13:41:00 -04:00
William Banfield
0d72e89ac4 recover on send on closed channel in statesync 2021-09-30 13:40:21 -04:00
Callum Waters
3da97821a1 Merge branch 'master' into wb/statesync-init-deadlock 2021-09-30 11:33:38 +02:00
William Banfield
db44aeb62b fix test 2021-09-29 17:55:21 -04:00
William Banfield
a070c42777 nest the close channel cases 2021-09-29 17:54:30 -04:00
William Banfield
f8cebbc5af remove syncer done 2021-09-29 16:56:06 -04:00
William Banfield
022148613e fix 2021-09-29 16:54:22 -04:00
William Banfield
becc6c3072 ensure syncer is not closed before trying to send a snapshot discovery message 2021-09-29 16:47:39 -04:00
William Banfield
41384c6899 fix defer 2021-09-29 14:24:18 -04:00
William Banfield
7679cc5015 statesync: remove deadlock on init fail 2021-09-29 14:16:50 -04:00
3 changed files with 23 additions and 6 deletions

View File

@@ -280,6 +280,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
}
if err := r.initStateProvider(ctx, r.chainID, r.initialHeight); err != nil {
r.mtx.Unlock()
return sm.State{}, err
}
@@ -889,17 +890,20 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
}
r.mtx.Lock()
defer r.mtx.Unlock()
if r.syncer == nil {
r.mtx.Unlock()
return
}
defer r.mtx.Unlock()
switch peerUpdate.Status {
case p2p.PeerStatusUp:
newProvider := NewBlockProvider(peerUpdate.NodeID, r.chainID, r.dispatcher)
r.providers[peerUpdate.NodeID] = newProvider
r.syncer.AddPeer(peerUpdate.NodeID)
err := r.syncer.AddPeer(peerUpdate.NodeID)
if err != nil {
r.Logger.Error("error adding peer to syncer", "error", err)
return
}
if sp, ok := r.stateProvider.(*stateProviderP2P); ok {
// we do this in a separate routine to not block whilst waiting for the light client to finish
// whatever call it's currently executing

View File

@@ -141,7 +141,17 @@ func (s *syncer) AddSnapshot(peerID types.NodeID, snapshot *snapshot) (bool, err
// AddPeer adds a peer to the pool. For now we just keep it simple and send a
// single request to discover snapshots, later we may want to do retries and stuff.
func (s *syncer) AddPeer(peerID types.NodeID) {
func (s *syncer) AddPeer(peerID types.NodeID) (err error) {
defer func() {
// TODO: remove panic recover once AddPeer can no longer accientally send on
// closed channel.
// This recover was added to protect against the p2p message being sent
// to the snapshot channel after the snapshot channel was closed.
if r := recover(); r != nil {
err = fmt.Errorf("panic sending peer snapshot request: %v", r)
}
}()
s.logger.Debug("Requesting snapshots from peer", "peer", peerID)
msg := p2p.Envelope{
@@ -153,6 +163,7 @@ func (s *syncer) AddPeer(peerID types.NodeID) {
case <-s.closeCh:
case s.snapshotCh <- msg:
}
return err
}
// RemovePeer removes a peer from the pool.

View File

@@ -77,12 +77,14 @@ func TestSyncer_SyncAny(t *testing.T) {
require.Error(t, err)
// Adding a couple of peers should trigger snapshot discovery messages
rts.syncer.AddPeer(peerAID)
err = rts.syncer.AddPeer(peerAID)
require.NoError(t, err)
e := <-rts.snapshotOutCh
require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
require.Equal(t, peerAID, e.To)
rts.syncer.AddPeer(peerBID)
err = rts.syncer.AddPeer(peerBID)
require.NoError(t, err)
e = <-rts.snapshotOutCh
require.Equal(t, &ssproto.SnapshotsRequest{}, e.Message)
require.Equal(t, peerBID, e.To)