From becc6c3072d99551e2520afb68bb362d38e0c8c0 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Wed, 29 Sep 2021 16:47:39 -0400 Subject: [PATCH] ensure syncer is not closed before trying to send a snapshot discovery message --- internal/statesync/reactor.go | 5 ++++- internal/statesync/reactor_test.go | 1 - internal/statesync/syncer.go | 17 +++++++++++++---- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/internal/statesync/reactor.go b/internal/statesync/reactor.go index 98f026e9a..cd20b5895 100644 --- a/internal/statesync/reactor.go +++ b/internal/statesync/reactor.go @@ -247,6 +247,10 @@ func (r *Reactor) OnStop() { // wait for any remaining requests to complete <-r.dispatcher.Done() + r.syncer.Close() + + <-r.dispatcher.Done() + // Close closeCh to signal to all spawned goroutines to gracefully exit. All // p2p Channels should execute Close(). close(r.closeCh) @@ -292,7 +296,6 @@ func (r *Reactor) Sync(ctx context.Context) (sm.State, error) { r.stateProvider, r.snapshotCh.Out, r.chunkCh.Out, - r.snapshotCh.Done(), r.tempDir, r.metrics, ) diff --git a/internal/statesync/reactor_test.go b/internal/statesync/reactor_test.go index b90e5fd78..55a5bbed0 100644 --- a/internal/statesync/reactor_test.go +++ b/internal/statesync/reactor_test.go @@ -171,7 +171,6 @@ func setup( stateProvider, rts.snapshotOutCh, rts.chunkOutCh, - rts.snapshotChannel.Done(), "", rts.reactor.metrics, ) diff --git a/internal/statesync/syncer.go b/internal/statesync/syncer.go index 68bec6880..5369024d0 100644 --- a/internal/statesync/syncer.go +++ b/internal/statesync/syncer.go @@ -70,7 +70,7 @@ type syncer struct { avgChunkTime int64 lastSyncedSnapshotHeight int64 processingSnapshot *snapshot - closeCh <-chan struct{} + closeCh chan struct{} } // newSyncer creates a new syncer. @@ -82,7 +82,6 @@ func newSyncer( stateProvider StateProvider, snapshotCh chan<- p2p.Envelope, chunkCh chan<- p2p.Envelope, - closeCh <-chan struct{}, tempDir string, metrics *Metrics, ) *syncer { @@ -98,7 +97,7 @@ func newSyncer( fetchers: cfg.Fetchers, retryTimeout: cfg.ChunkRequestTimeout, metrics: metrics, - closeCh: closeCh, + closeCh: make(chan struct{}), } } @@ -151,7 +150,8 @@ func (s *syncer) AddPeer(peerID types.NodeID) { select { case <-s.closeCh: - case s.snapshotCh <- msg: + default: + s.snapshotCh <- msg } } @@ -368,6 +368,15 @@ func (s *syncer) Sync(ctx context.Context, snapshot *snapshot, chunks *chunkQueu return state, commit, nil } +// Close signals to the syncer that it should stop processing any updates. +func (s *syncer) Close() { + close(s.closeCh) +} + +func (s *syncer) Done() <-chan struct{} { + return s.closeCh +} + // offerSnapshot offers a snapshot to the app. It returns various errors depending on the app's // response, or nil if the snapshot was accepted. func (s *syncer) offerSnapshot(ctx context.Context, snapshot *snapshot) error {