nest the close channel cases

This commit is contained in:
William Banfield
2021-09-29 17:54:30 -04:00
parent f8cebbc5af
commit a070c42777
2 changed files with 12 additions and 17 deletions

View File

@@ -247,10 +247,6 @@ func (r *Reactor) OnStop() {
// wait for any remaining requests to complete
<-r.dispatcher.Done()
if r.syncer != nil {
r.syncer.Close()
}
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
@@ -296,6 +292,7 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.stateProvider,
r.snapshotCh.Out,
r.chunkCh.Out,
r.snapshotCh.Done(),
r.tempDir,
r.metrics,
)
@@ -872,10 +869,13 @@ func (r *Reactor) processCh(ch *p2p.Channel, chName string) {
Err: err,
}
}
case <-r.closeCh:
r.Logger.Debug(fmt.Sprintf("stopped listening on %s channel; closing...", chName))
return
default:
select {
case <-r.closeCh:
r.Logger.Debug(fmt.Sprintf("stopped listening on %s channel; closing...", chName))
return
default:
}
}
}
}

View File

@@ -70,7 +70,7 @@ type syncer struct {
avgChunkTime int64
lastSyncedSnapshotHeight int64
processingSnapshot *snapshot
closeCh chan struct{}
closeCh <-chan struct{}
}
// newSyncer creates a new syncer.
@@ -82,6 +82,7 @@ func newSyncer(
stateProvider StateProvider,
snapshotCh chan<- p2p.Envelope,
chunkCh chan<- p2p.Envelope,
closeCh <-chan struct{},
tempDir string,
metrics *Metrics,
) *syncer {
@@ -97,7 +98,7 @@ func newSyncer(
fetchers: cfg.Fetchers,
retryTimeout: cfg.ChunkRequestTimeout,
metrics: metrics,
closeCh: make(chan struct{}),
closeCh: closeCh,
}
}
@@ -150,8 +151,7 @@ func (s *syncer) AddPeer(peerID types.NodeID) {
select {
case <-s.closeCh:
default:
s.snapshotCh <- msg
case s.snapshotCh <- msg:
}
}
@@ -368,11 +368,6 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
return state, commit, nil
}
// Close signals to the syncer that it should stop processing any updates.
func (s *syncer) Close() {
close(s.closeCh)
}
// offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
// response, or nil if the snapshot was accepted.
func (s *syncer) offerSnapshot(ctx context.Context, snapshot *snapshot) error {