blockchain v0: add poolWG and refactor pool routines

This commit is contained in:
Aleksandr Bezobchuk
2021-01-08 14:20:34 -05:00
parent 3ac6a6921e
commit 95b08bb775
2 changed files with 71 additions and 40 deletions

View File

@@ -2,6 +2,7 @@ package v0
import (
"fmt"
"sync"
"time"
"github.com/gogo/protobuf/proto"
@@ -89,6 +90,11 @@ type Reactor struct {
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
// poolWG is used to synchronize the graceful shutdown of the poolRoutine and
// requestRoutine spawned goroutines when stopping the reactor and before
// stopping the p2p Channel(s).
poolWG sync.WaitGroup
}
// NewReactor returns new reactor instance.
@@ -163,6 +169,9 @@ func (r *Reactor) OnStop() {
}
}
// wait for the poolRoutine and requestRoutine goroutines to gracefully exit
r.poolWG.Wait()
// Close closeCh to signal to all spawned goroutines to gracefully exit. All
// p2p Channels should execute Close().
close(r.closeCh)
@@ -362,17 +371,67 @@ func (r *Reactor) SwitchToFastSync(state sm.State) error {
return nil
}
func (r *Reactor) broadcastStatusRequest() {
envelope := p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}
r.poolWG.Add(1)
defer r.poolWG.Done()
select {
case <-r.Quit():
return
case <-r.pool.Quit():
return
case r.blockchainCh.Out() <- envelope:
}
}
func (r *Reactor) requestRoutine() {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()
r.poolWG.Add(1)
defer r.poolWG.Done()
for {
select {
case <-r.Quit():
return
case <-r.pool.Quit():
return
case request := <-r.requestsCh:
r.blockchainCh.Out() <- p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}
case pErr := <-r.errorsCh:
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: pErr.peerID,
Err: pErr.err,
Severity: p2p.PeerErrorSeverityLow,
}
case <-statusUpdateTicker.C:
go r.broadcastStatusRequest()
}
}
}
// poolRoutine handles messages from the poolReactor telling the reactor what to
// do.
//
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
//
// TODO: Ensure this works nicely with stopping reactor and that no race
// conditions or deadlocks exist.
func (r *Reactor) poolRoutine(stateSynced bool) {
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
statusUpdateTicker = time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
blocksSynced = uint64(0)
@@ -387,42 +446,12 @@ func (r *Reactor) poolRoutine(stateSynced bool) {
)
defer trySyncTicker.Stop()
defer statusUpdateTicker.Stop()
defer switchToConsensusTicker.Stop()
go func() {
for {
select {
go r.requestRoutine()
case <-r.Quit():
return
case <-r.pool.Quit():
return
case request := <-r.requestsCh:
r.blockchainCh.Out() <- p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}
case pErr := <-r.errorsCh:
r.blockchainCh.Error() <- p2p.PeerError{
PeerID: pErr.peerID,
Err: pErr.err,
Severity: p2p.PeerErrorSeverityLow,
}
case <-statusUpdateTicker.C:
go func() {
r.blockchainCh.Out() <- p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}
}()
}
}
}()
r.poolWG.Add(1)
defer r.poolWG.Done()
FOR_LOOP:
for {

View File

@@ -386,11 +386,13 @@ func TestReactor_BadBlockStopsPeer(t *testing.T) {
"expected new node to be fully synced",
)
require.Eventually(
require.Eventuallyf(
t,
func() bool { return len(newSuite.reactor.pool.peers) == len(testSuites)-3 },
time.Second,
10*time.Second,
10*time.Millisecond,
"invalid number of peers",
"invalid number of peers; expected: %d, got: %d",
len(testSuites)-3,
len(newSuite.reactor.pool.peers),
)
}