From 2794e400718f36482504ea9bf9b2e99f82ac8a02 Mon Sep 17 00:00:00 2001 From: William Banfield Date: Mon, 4 Oct 2021 17:16:42 -0400 Subject: [PATCH] consensus: wait until peerUpdates channel is closed to close remaining peers --- internal/consensus/reactor.go | 45 +++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/internal/consensus/reactor.go b/internal/consensus/reactor.go index ca3f5d353..e4112af14 100644 --- a/internal/consensus/reactor.go +++ b/internal/consensus/reactor.go @@ -229,8 +229,34 @@ func (r *Reactor) OnStop() { r.state.Wait() } + // Close closeCh to signal to all spawned goroutines to gracefully exit. All + // p2p Channels should execute Close(). + close(r.closeCh) + + // Close the StateChannel goroutine separately since it uses its own channel + // to signal closure. + close(r.stateCloseCh) + <-r.stateCh.Done() + + // Wait for all p2p Channels to be closed before returning. This ensures we + // can easily reason about synchronization of all p2p Channels and ensure no + // panics will occur. + <-r.voteSetBitsCh.Done() + <-r.dataCh.Done() + <-r.voteCh.Done() + <-r.peerUpdates.Done() + + peers := make(map[types.NodeID]*PeerState) r.mtx.Lock() - peers := r.peers + + // Here, we make a copy of the map of peer states. + // The code below proceeds to access entries from the map outside of a lock. + // Goroutines launched by other functions may be deleting from the map but should + // to it since we waited for the peerUpdates channel to complete. + // Each of the methods below is safe to perform concurrently on the peers. + for id, state := range r.peers { + peers[id] = state + } r.mtx.Unlock() // wait for all spawned peer goroutines to gracefully exit @@ -240,23 +266,6 @@ func (r *Reactor) OnStop() { for _, ps := range peers { ps.broadcastWG.Wait() } - - // Close the StateChannel goroutine separately since it uses its own channel - // to signal closure. - close(r.stateCloseCh) - <-r.stateCh.Done() - - // Close closeCh to signal to all spawned goroutines to gracefully exit. All - // p2p Channels should execute Close(). - close(r.closeCh) - - // Wait for all p2p Channels to be closed before returning. This ensures we - // can easily reason about synchronization of all p2p Channels and ensure no - // panics will occur. - <-r.voteSetBitsCh.Done() - <-r.dataCh.Done() - <-r.voteCh.Done() - <-r.peerUpdates.Done() } // SetEventBus sets the reactor's event bus.