Merge branch 'master' into wb/epoch-fixes-forward-port-master

This commit is contained in:
William Banfield
2022-03-18 12:19:13 -04:00
committed by GitHub
11 changed files with 84 additions and 136 deletions

View File

@@ -80,15 +80,7 @@ type Reactor struct {
blockSync *atomicBool
blockSyncCh *p2p.Channel
// blockSyncOutBridgeCh defines a channel that acts as a bridge between sending Envelope
// messages that the reactor will consume in processBlockSyncCh and receiving messages
// from the peer updates channel and other goroutines. We do this instead of directly
// sending on blockSyncCh.Out to avoid race conditions in the case where other goroutines
// send Envelopes directly to the to blockSyncCh.Out channel, since processBlockSyncCh
// may close the blockSyncCh.Out channel at the same time that other goroutines send to
// blockSyncCh.Out.
blockSyncOutBridgeCh chan p2p.Envelope
peerUpdates *p2p.PeerUpdates
peerUpdates *p2p.PeerUpdates
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
@@ -119,17 +111,16 @@ func NewReactor(
}
r := &Reactor{
logger: logger,
stateStore: stateStore,
blockExec: blockExec,
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
blockSyncCh: blockSyncCh,
blockSyncOutBridgeCh: make(chan p2p.Envelope),
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
logger: logger,
stateStore: stateStore,
blockExec: blockExec,
store: store,
consReactor: consReactor,
blockSync: newAtomicBool(blockSync),
blockSyncCh: blockSyncCh,
peerUpdates: peerUpdates,
metrics: metrics,
eventBus: eventBus,
}
r.BaseService = *service.NewBaseService(logger, "BlockSync", r)
@@ -175,7 +166,6 @@ func (r *Reactor) OnStart(ctx context.Context) error {
}
go r.processBlockSyncCh(ctx)
go r.processBlockSyncBridge(ctx)
go r.processPeerUpdates(ctx)
return nil
@@ -306,21 +296,8 @@ func (r *Reactor) processBlockSyncCh(ctx context.Context) {
}
}
func (r *Reactor) processBlockSyncBridge(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case envelope := <-r.blockSyncOutBridgeCh:
if err := r.blockSyncCh.Send(ctx, envelope); err != nil {
return
}
}
}
}
// processPeerUpdate processes a PeerUpdate.
func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpdate) {
r.logger.Debug("received peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
// XXX: Pool#RedoRequest can sometimes give us an empty peer.
@@ -331,12 +308,20 @@ func (r *Reactor) processPeerUpdate(peerUpdate p2p.PeerUpdate) {
switch peerUpdate.Status {
case p2p.PeerStatusUp:
// send a status update the newly added peer
r.blockSyncOutBridgeCh <- p2p.Envelope{
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
To: peerUpdate.NodeID,
Message: &bcproto.StatusResponse{
Base: r.store.Base(),
Height: r.store.Height(),
},
}); err != nil {
r.pool.RemovePeer(peerUpdate.NodeID)
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: peerUpdate.NodeID,
Err: err,
}); err != nil {
return
}
}
case p2p.PeerStatusDown:
@@ -353,7 +338,7 @@ func (r *Reactor) processPeerUpdates(ctx context.Context) {
case <-ctx.Done():
return
case peerUpdate := <-r.peerUpdates.Updates():
r.processPeerUpdate(peerUpdate)
r.processPeerUpdate(ctx, peerUpdate)
}
}
}
@@ -372,7 +357,6 @@ func (r *Reactor) SwitchToBlockSync(ctx context.Context, state sm.State) error {
r.syncStartTime = time.Now()
go r.requestRoutine(ctx)
go r.poolRoutine(ctx, true)
return nil
@@ -387,15 +371,17 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
case <-ctx.Done():
return
case request := <-r.requestsCh:
select {
case <-ctx.Done():
return
case r.blockSyncOutBridgeCh <- p2p.Envelope{
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
To: request.PeerID,
Message: &bcproto.BlockRequest{Height: request.Height},
}:
}); err != nil {
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: request.PeerID,
Err: err,
}); err != nil {
return
}
}
case pErr := <-r.errorsCh:
if err := r.blockSyncCh.SendError(ctx, p2p.PeerError{
NodeID: pErr.peerID,
@@ -404,16 +390,12 @@ func (r *Reactor) requestRoutine(ctx context.Context) {
return
}
case <-statusUpdateTicker.C:
go func() {
select {
case <-ctx.Done():
return
case r.blockSyncOutBridgeCh <- p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}:
}
}()
if err := r.blockSyncCh.Send(ctx, p2p.Envelope{
Broadcast: true,
Message: &bcproto.StatusRequest{},
}); err != nil {
return
}
}
}
}

View File

@@ -885,8 +885,11 @@ func randConsensusNetWithPeers(
app := appFunc(logger, filepath.Join(cfg.DBDir(), fmt.Sprintf("%s_%d", testName, i)))
vals := types.TM2PB.ValidatorUpdates(state.Validators)
if _, ok := app.(*kvstore.PersistentKVStoreApplication); ok {
// simulate handshake, receive app version. If don't do this, replay test will fail
switch app.(type) {
// simulate handshake, receive app version. If don't do this, replay test will fail
case *kvstore.PersistentKVStoreApplication:
state.Version.Consensus.App = kvstore.ProtocolVersion
case *kvstore.Application:
state.Version.Consensus.App = kvstore.ProtocolVersion
}
app.InitChain(abci.RequestInitChain{Validators: vals})
@@ -973,10 +976,6 @@ func newEpehemeralKVStore(_ log.Logger, _ string) abci.Application {
return kvstore.NewApplication()
}
func newPersistentKVStore(logger log.Logger, dbDir string) abci.Application {
return kvstore.NewPersistentKVStoreApplication(logger, dbDir)
}
func signDataIsEqual(v1 *types.Vote, v2 *tmproto.Vote) bool {
if v1 == nil || v2 == nil {
return false

View File

@@ -351,6 +351,8 @@ func (r *Reactor) broadcastHasVoteMessage(ctx context.Context, vote *types.Vote)
// internal pubsub defined in the consensus state to broadcast them to peers
// upon receiving.
func (r *Reactor) subscribeToBroadcastEvents() {
onStopCh := r.state.getOnStopCh()
err := r.state.evsw.AddListenerForEvent(
listenerIDConsensus,
types.EventNewRoundStepValue,
@@ -359,7 +361,7 @@ func (r *Reactor) subscribeToBroadcastEvents() {
return err
}
select {
case r.state.onStopCh <- data.(*cstypes.RoundState):
case onStopCh <- data.(*cstypes.RoundState):
return nil
case <-ctx.Done():
return ctx.Err()

View File

@@ -8,7 +8,6 @@ import (
"io"
"math/rand"
"os"
"path/filepath"
"runtime"
"testing"
"time"
@@ -338,13 +337,13 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite {
nPeers,
"replay_test",
newMockTickerFunc(true),
newPersistentKVStore)
newEpehemeralKVStore)
sim.Config = cfg
defer func() { t.Cleanup(cleanup) }()
var err error
sim.GenesisState, err = sm.MakeGenesisState(genDoc)
require.NoError(t, err)
sim.CleanupFunc = cleanup
partSize := types.BlockPartSizeBytes
@@ -584,9 +583,6 @@ func setupSimulator(ctx context.Context, t *testing.T) *simulatorTestSuite {
sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i)))
sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i)))
}
if sim.CleanupFunc != nil {
t.Cleanup(sim.CleanupFunc)
}
return sim
}
@@ -743,19 +739,14 @@ func testHandshakeReplay(
)
latestAppHash := state.AppHash
// make a new client creator
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_a_r%d", nBlocks, mode, rand.Int())))
t.Cleanup(func() { require.NoError(t, kvstoreApp.Close()) })
eventBus := eventbus.NewDefault(logger)
require.NoError(t, eventBus.Start(ctx))
clientCreator2 := abciclient.NewLocalClient(logger, kvstoreApp)
client := abciclient.NewLocalClient(logger, kvstore.NewApplication())
if nBlocks > 0 {
// run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
stateDB1 := dbm.NewMemDB()
stateStore := sm.NewStore(stateDB1)
err := stateStore.Save(genesisState)
@@ -776,7 +767,7 @@ func testHandshakeReplay(
genDoc, err := sm.MakeGenesisDocFromFile(cfg.GenesisFile())
require.NoError(t, err)
handshaker := NewHandshaker(logger, stateStore, state, store, eventBus, genDoc)
proxyApp := proxy.New(clientCreator2, logger, proxy.NopMetrics())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx), "Error starting proxy app connections")
require.True(t, proxyApp.IsRunning())
require.NotNil(t, proxyApp)
@@ -905,10 +896,7 @@ func buildTMStateFromChain(
t.Helper()
// run the whole chain against this client to build up the tendermint state
kvstoreApp := kvstore.NewPersistentKVStoreApplication(logger,
filepath.Join(cfg.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))
defer kvstoreApp.Close()
client := abciclient.NewLocalClient(logger, kvstoreApp)
client := abciclient.NewLocalClient(logger, kvstore.NewApplication())
proxyApp := proxy.New(client, logger, proxy.NopMetrics())
require.NoError(t, proxyApp.Start(ctx))

View File

@@ -452,10 +452,6 @@ func (cs *State) OnStart(ctx context.Context) error {
}
}
if err := cs.evsw.Start(ctx); err != nil {
return err
}
// Double Signing Risk Reduction
if err := cs.checkDoubleSigningRisk(cs.Height); err != nil {
return err
@@ -497,12 +493,19 @@ func (cs *State) loadWalFile(ctx context.Context) error {
return nil
}
func (cs *State) getOnStopCh() chan *cstypes.RoundState {
cs.mtx.RLock()
defer cs.mtx.RUnlock()
return cs.onStopCh
}
// OnStop implements service.Service.
func (cs *State) OnStop() {
// If the node is committing a new block, wait until it is finished!
if cs.GetRoundState().Step == cstypes.RoundStepCommit {
select {
case <-cs.onStopCh:
case <-cs.getOnStopCh():
case <-time.After(cs.config.TimeoutCommit):
cs.logger.Error("OnStop: timeout waiting for commit to finish", "time", cs.config.TimeoutCommit)
}
@@ -510,10 +513,6 @@ func (cs *State) OnStop() {
close(cs.onStopCh)
if cs.evsw.IsRunning() {
cs.evsw.Stop()
}
if cs.timeoutTicker.IsRunning() {
cs.timeoutTicker.Stop()
}

View File

@@ -7,7 +7,6 @@ import (
"fmt"
"io"
mrand "math/rand"
"path/filepath"
"testing"
"time"
@@ -26,18 +25,17 @@ import (
"github.com/tendermint/tendermint/types"
)
// WALGenerateNBlocks generates a consensus WAL. It does this by spinning up a
// stripped down version of node (proxy app, event bus, consensus state) with a
// persistent kvstore application and special consensus wal instance
// (byteBufferWAL) and waits until numBlocks are created.
// WALGenerateNBlocks generates a consensus WAL. It does this by
// spinning up a stripped down version of node (proxy app, event bus,
// consensus state) with a kvstore application and special consensus
// wal instance (byteBufferWAL) and waits until numBlocks are created.
// If the node fails to produce given numBlocks, it fails the test.
func WALGenerateNBlocks(ctx context.Context, t *testing.T, logger log.Logger, wr io.Writer, numBlocks int) {
t.Helper()
cfg := getConfig(t)
app := kvstore.NewPersistentKVStoreApplication(logger, filepath.Join(cfg.DBDir(), "wal_generator"))
t.Cleanup(func() { require.NoError(t, app.Close()) })
app := kvstore.NewApplication()
logger.Info("generating WAL (last height msg excluded)", "numBlocks", numBlocks)

View File

@@ -69,11 +69,6 @@ type Group struct {
minIndex int // Includes head
maxIndex int // Includes head, where Head will move to
// close this when the processTicks routine is done.
// this ensures we can cleanup the dir after calling Stop
// and the routine won't be trying to access it anymore
doneProcessTicks chan struct{}
// TODO: When we start deleting files, we need to start tracking GroupReaders
// and their dependencies.
}
@@ -101,7 +96,6 @@ func OpenGroup(ctx context.Context, logger log.Logger, headPath string, groupOpt
groupCheckDuration: defaultGroupCheckDuration,
minIndex: 0,
maxIndex: 0,
doneProcessTicks: make(chan struct{}),
}
for _, option := range groupOptions {
@@ -154,13 +148,6 @@ func (g *Group) OnStop() {
}
}
// Wait blocks until all internal goroutines are finished. Supposed to be
// called after Stop.
func (g *Group) Wait() {
// wait for processTicks routine to finish
<-g.doneProcessTicks
}
// Close closes the head file. The group must be stopped by this moment.
func (g *Group) Close() {
if err := g.FlushAndSync(); err != nil {
@@ -241,8 +228,6 @@ func (g *Group) FlushAndSync() error {
}
func (g *Group) processTicks(ctx context.Context) {
defer close(g.doneProcessTicks)
for {
select {
case <-ctx.Done():

View File

@@ -42,7 +42,8 @@ const (
type PeerScore uint8
const (
PeerScorePersistent PeerScore = math.MaxUint8 // persistent peers
PeerScorePersistent PeerScore = math.MaxUint8 // persistent peers
MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1
)
// PeerUpdate is a peer update event sent via PeerUpdates.
@@ -1283,6 +1284,9 @@ func (p *peerInfo) Score() PeerScore {
}
score := p.MutableScore
if score > int64(MaxPeerScoreNotPersistent) {
score = int64(MaxPeerScoreNotPersistent)
}
for _, addr := range p.AddressInfo {
// DialFailures is reset when dials succeed, so this
@@ -1294,10 +1298,6 @@ func (p *peerInfo) Score() PeerScore {
return 0
}
if score >= math.MaxUint8 {
return PeerScore(math.MaxUint8)
}
return PeerScore(score)
}

View File

@@ -80,4 +80,20 @@ func TestPeerScoring(t *testing.T) {
time.Millisecond,
"startAt=%d score=%d", start, peerManager.Scores()[id])
})
t.Run("TestNonPersistantPeerUpperBound", func(t *testing.T) {
start := int64(peerManager.Scores()[id] + 1)
for i := start; i <= int64(PeerScorePersistent); i++ {
peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusGood,
})
if i == int64(PeerScorePersistent) {
require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id])
} else {
require.EqualValues(t, i, peerManager.Scores()[id])
}
}
})
}

View File

@@ -7,7 +7,6 @@ import (
"sync"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
)
// ErrListenerWasRemoved is returned by AddEvent if the listener was removed.
@@ -45,16 +44,11 @@ type Fireable interface {
// They can be removed by calling either RemoveListenerForEvent or
// RemoveListener (for all events).
type EventSwitch interface {
service.Service
Fireable
Stop()
AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error
}
type eventSwitch struct {
service.BaseService
mtx sync.RWMutex
eventCells map[string]*eventCell
listeners map[string]*eventListener
@@ -65,13 +59,9 @@ func NewEventSwitch(logger log.Logger) EventSwitch {
eventCells: make(map[string]*eventCell),
listeners: make(map[string]*eventListener),
}
evsw.BaseService = *service.NewBaseService(logger, "EventSwitch", evsw)
return evsw
}
func (evsw *eventSwitch) OnStart(ctx context.Context) error { return nil }
func (evsw *eventSwitch) OnStop() {}
func (evsw *eventSwitch) AddListenerForEvent(listenerID, eventValue string, cb EventCallback) error {
// Get/Create eventCell and listener.
evsw.mtx.Lock()

View File

@@ -21,8 +21,6 @@ func TestAddListenerForEventFireOnce(t *testing.T) {
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
messages := make(chan EventData)
require.NoError(t, evsw.AddListenerForEvent("listener", "event",
@@ -50,8 +48,6 @@ func TestAddListenerForEventFireMany(t *testing.T) {
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum := make(chan uint64)
doneSending := make(chan uint64)
@@ -88,8 +84,6 @@ func TestAddListenerForDifferentEvents(t *testing.T) {
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum := make(chan uint64)
doneSending1 := make(chan uint64)
@@ -151,9 +145,6 @@ func TestAddDifferentListenerForDifferentEvents(t *testing.T) {
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64)
@@ -247,8 +238,6 @@ func TestManageListenersAsync(t *testing.T) {
logger := log.NewTestingLogger(t)
evsw := NewEventSwitch(logger)
require.NoError(t, evsw.Start(ctx))
t.Cleanup(evsw.Wait)
doneSum1 := make(chan uint64)
doneSum2 := make(chan uint64)