ensure syncer is not closed before trying to send a snapshot discovery message

This commit is contained in:
William Banfield
2021-09-29 16:47:39 -04:00
parent 41384c6899
commit becc6c3072
3 changed files with 17 additions and 6 deletions

View File

@@ -247,6 +247,10 @@ func (r *Reactor) OnStop() {
// wait for any remaining requests to complete
<-r.dispatcher.Done()
r.syncer.Close()
<-r.dispatcher.Done()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
@@ -292,7 +296,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) {
r.stateProvider,
r.snapshotCh.Out,
r.chunkCh.Out,
r.snapshotCh.Done(),
r.tempDir,
r.metrics,
)

View File

@@ -171,7 +171,6 @@ func setup(
stateProvider,
rts.snapshotOutCh,
rts.chunkOutCh,
rts.snapshotChannel.Done(),
"",
rts.reactor.metrics,
)

View File

@@ -70,7 +70,7 @@ type syncer struct {
avgChunkTime int64
lastSyncedSnapshotHeight int64
processingSnapshot *snapshot
closeCh <-chan struct{}
closeCh chan struct{}
}
// newSyncer creates a new syncer.
@@ -82,7 +82,6 @@ func newSyncer(
stateProvider StateProvider,
snapshotCh chan<- p2p.Envelope,
chunkCh chan<- p2p.Envelope,
closeCh <-chan struct{},
tempDir string,
metrics *Metrics,
) *syncer {
@@ -98,7 +97,7 @@ func newSyncer(
fetchers: cfg.Fetchers,
retryTimeout: cfg.ChunkRequestTimeout,
metrics: metrics,
closeCh: closeCh,
closeCh: make(chan struct{}),
}
}
@@ -151,7 +150,8 @@ func (s *syncer) AddPeer(peerID types.NodeID) {
select {
case <-s.closeCh:
case s.snapshotCh <- msg:
default:
s.snapshotCh <- msg
}
}
@@ -368,6 +368,15 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu
return state, commit, nil
}
// Close signals to the syncer that it should stop processing any updates.
func (s *syncer) Close() {
close(s.closeCh)
}
func (s *syncer) Done() <-chan struct{} {
return s.closeCh
}
// offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's
// response, or nil if the snapshot was accepted.
func (s *syncer) offerSnapshot(ctx context.Context, snapshot *snapshot) error {