diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 6a1b484cb..73b9e0af1 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -247,10 +247,6 @@ func (r *Reactor) OnStop() { // wait for any remaining requests to complete <-r.dispatcher.Done() - if r.syncer != nil { - r.syncer.Close() - } - // Close closeCh to signal to all spawned goroutines to gracefully exit. All // p2p Channels should execute Close(). close(r.closeCh) @@ -296,6 +292,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { r.stateProvider, r.snapshotCh.Out, r.chunkCh.Out, + r.snapshotCh.Done(), r.tempDir, r.metrics, ) @@ -872,10 +869,13 @@ func (r *Reactor) processCh(ch *p2p.Channel, chName string) { Err: err, } } - - case <-r.closeCh: - r.Logger.Debug(fmt.Sprintf("stopped listening on %s channel; closing...", chName)) - return + default: + select { + case <-r.closeCh: + r.Logger.Debug(fmt.Sprintf("stopped listening on %s channel; closing...", chName)) + return + default: + } } } } diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 3f77b14a4..68bec6880 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -70,7 +70,7 @@ type syncer struct { avgChunkTime int64 lastSyncedSnapshotHeight int64 processingSnapshot *snapshot - closeCh chan struct{} + closeCh <-chan struct{} } // newSyncer creates a new syncer. @@ -82,6 +82,7 @@ func newSyncer( stateProvider StateProvider, snapshotCh chan<- p2p.Envelope, chunkCh chan<- p2p.Envelope, + closeCh <-chan struct{}, tempDir string, metrics *Metrics, ) *syncer { @@ -97,7 +98,7 @@ func newSyncer( fetchers: cfg.Fetchers, retryTimeout: cfg.ChunkRequestTimeout, metrics: metrics, - closeCh: make(chan struct{}), + closeCh: closeCh, } } @@ -150,8 +151,7 @@ func (s *syncer) AddPeer(peerID types.NodeID) { select { case <-s.closeCh: - default: - s.snapshotCh <- msg + case s.snapshotCh <- msg: } } @@ -368,11 +368,6 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu return state, commit, nil } -// Close signals to the syncer that it should stop processing any updates. -func (s *syncer) Close() { - close(s.closeCh) -} - // offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's // response, or nil if the snapshot was accepted. func (s *syncer) offerSnapshot(ctx context.Context, snapshot *snapshot) error {