blockchain v0: update TestBadBlockStopsPeer

This commit is contained in:
Aleksandr Bezobchuk
2021-01-08 08:38:12 -05:00
parent 9039b6cd05
commit 88fa8fe348
2 changed files with 95 additions and 92 deletions

View File

@@ -433,7 +433,12 @@ FOR_LOOP:
lastAdvance = r.pool.LastAdvance()
)
r.Logger.Debug("consensus ticker", "num_pending", numPending, "total", lenRequesters)
r.Logger.Debug(
"consensus ticker",
"num_pending", numPending,
"total", lenRequesters,
"height", height,
)
switch {
case r.pool.IsCaughtUp():
@@ -462,7 +467,7 @@ FOR_LOOP:
break FOR_LOOP
case <-trySyncTicker.C: // chan time
case <-trySyncTicker.C:
select {
case didProcessCh <- struct{}{}:
default:
@@ -476,7 +481,7 @@ FOR_LOOP:
// better to split these routines rather than coupling them as it is
// written here.
//
// TODO: uncouple from request routine.
// TODO: Uncouple from request routine.
// see if there are any blocks to sync
first, second := r.pool.PeekTwoBlocks()
@@ -534,10 +539,10 @@ FOR_LOOP:
// TODO: batch saves so we do not persist to disk every block
r.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.
var err error
// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.
state, _, err = r.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO: This is bad, are we zombie?

View File

@@ -38,7 +38,8 @@ type reactorTestSuite struct {
blockchainOutCh chan p2p.Envelope
blockchainPeerErrCh chan p2p.PeerError
peerUpdates *p2p.PeerUpdatesCh
peerUpdatesCh chan p2p.PeerUpdate
peerUpdates *p2p.PeerUpdatesCh
}
func setup(
@@ -118,12 +119,15 @@ func setup(
_, err = rng.Read(pID)
require.NoError(t, err)
peerUpdatesCh := make(chan p2p.PeerUpdate)
rts := &reactorTestSuite{
app: proxyApp,
blockchainInCh: make(chan p2p.Envelope, chBuf),
blockchainOutCh: make(chan p2p.Envelope, chBuf),
blockchainPeerErrCh: make(chan p2p.PeerError, chBuf),
peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate)),
peerUpdatesCh: peerUpdatesCh,
peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh),
peerID: p2p.NodeID(fmt.Sprintf("%x", pID)),
}
@@ -136,7 +140,7 @@ func setup(
)
rts.reactor = NewReactor(
log.TestingLogger().With("module", "blockchain"),
log.TestingLogger().With("module", "blockchain", "node", rts.peerID),
state.Copy(),
blockExec,
blockStore,
@@ -232,7 +236,7 @@ func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQue
return
}
func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite) {
func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite, dropChErr bool) {
// create a mapping for efficient suite lookup by peer ID
suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite)
for _, suite := range suites {
@@ -255,9 +259,7 @@ func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite) {
}
}
} else {
other := suitesByPeerID[envelope.To]
other.blockchainInCh <- p2p.Envelope{
suitesByPeerID[envelope.To].blockchainInCh <- p2p.Envelope{
From: primary.peerID,
To: envelope.To,
Message: envelope.Message,
@@ -268,7 +270,14 @@ func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite) {
go func() {
for pErr := range primary.blockchainPeerErrCh {
primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err)
if dropChErr {
primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err)
} else {
primary.peerUpdatesCh <- p2p.PeerUpdate{
PeerID: pErr.PeerID,
Status: p2p.PeerStatusRemoved,
}
}
}
}()
}
@@ -286,8 +295,8 @@ func TestNoBlockResponse(t *testing.T) {
require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height())
simulateRouter(testSuites[0], testSuites)
simulateRouter(testSuites[1], testSuites)
simulateRouter(testSuites[0], testSuites, true)
simulateRouter(testSuites[1], testSuites, true)
testCases := []struct {
height int64
@@ -319,96 +328,85 @@ func TestNoBlockResponse(t *testing.T) {
}
}
// ----------------------------------------------------------------------------
// ----------------------------------------------------------------------------
// ----------------------------------------------------------------------------
func TestBadBlockStopsPeer(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
// // NOTE: This is too hard to test without
// // an easy way to add test peer to switch
// // or without significant refactoring of the module.
// // Alternatively we could actually dial a TCP conn but
// // that seems extreme.
// func TestBadBlockStopsPeer(t *testing.T) {
// config = cfg.ResetTestRoot("blockchain_reactor_test")
// defer os.RemoveAll(config.RootDir)
// genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(50) // int64(148)
genDoc, privVals := randGenesisDoc(config, 1, false, 30)
// maxBlockHeight := int64(148)
testSuites := []*reactorTestSuite{
setup(t, genDoc, privVals, maxBlockHeight, 0), // fully synced node
setup(t, genDoc, privVals, 0, 0),
setup(t, genDoc, privVals, 0, 0),
setup(t, genDoc, privVals, 0, 0),
setup(t, genDoc, privVals, 0, 0), // new node
}
// // Other chain needs a different validator set
// otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30)
// otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight)
require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height())
// defer func() {
// err := otherChain.reactor.Stop()
// require.Error(t, err)
// err = otherChain.app.Stop()
// require.NoError(t, err)
// }()
for _, s := range testSuites[:len(testSuites)-1] {
simulateRouter(s, testSuites, true)
// reactorPairs := make([]BlockchainReactorPair, 4)
// connect reactor to every other reactor except the new node
for _, ss := range testSuites[:len(testSuites)-1] {
if s.peerID != ss.peerID {
s.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: ss.peerID,
}
}
}
}
// reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
// reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
// reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
// reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
for {
caughtUp := true
for _, s := range testSuites[1 : len(testSuites)-1] {
if s.reactor.pool.MaxPeerHeight() == 0 || !s.reactor.pool.IsCaughtUp() {
caughtUp = false
}
}
// switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
// s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
// return s
if caughtUp {
break
}
// }, p2p.Connect2Switches)
time.Sleep(10 * time.Millisecond)
}
// defer func() {
// for _, r := range reactorPairs {
// err := r.reactor.Stop()
// require.NoError(t, err)
for _, s := range testSuites[:len(testSuites)-1] {
require.Len(t, s.reactor.pool.peers, 3)
}
// err = r.app.Stop()
// require.NoError(t, err)
// }
// }()
// Mark testSuites[3] as an invalid peer which will cause newSuite to disconnect
// from this peer.
otherGenDoc, otherPrivVals := randGenesisDoc(config, 1, false, 30)
otherSuite := setup(t, otherGenDoc, otherPrivVals, maxBlockHeight, 0)
testSuites[3].reactor.store = otherSuite.reactor.store
// for {
// time.Sleep(1 * time.Second)
// caughtUp := true
// for _, r := range reactorPairs {
// if !r.reactor.pool.IsCaughtUp() {
// caughtUp = false
// }
// }
// if caughtUp {
// break
// }
// }
// start the new peer's faux router
newSuite := testSuites[len(testSuites)-1]
simulateRouter(newSuite, testSuites, false)
// // at this time, reactors[0-3] is the newest
// assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
// connect all nodes to the new peer
for _, s := range testSuites[:len(testSuites)-1] {
newSuite.peerUpdatesCh <- p2p.PeerUpdate{
Status: p2p.PeerStatusUp,
PeerID: s.peerID,
}
}
// // Mark reactorPairs[3] as an invalid peer. Fiddling with .store without a mutex is a data
// // race, but can't be easily avoided.
// reactorPairs[3].reactor.store = otherChain.reactor.store
// wait for the new peer to catch up and become fully synced
for {
if newSuite.reactor.pool.MaxPeerHeight() > 0 && newSuite.reactor.pool.IsCaughtUp() {
break
}
// lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
// reactorPairs = append(reactorPairs, lastReactorPair)
time.Sleep(10 * time.Millisecond)
}
// switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
// s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
// return s
time.Sleep(5 * time.Second)
// }, p2p.Connect2Switches)...)
// for i := 0; i < len(reactorPairs)-1; i++ {
// p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
// }
// for {
// if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
// break
// }
// time.Sleep(1 * time.Second)
// }
// assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
// }
// ensure two peers were removed that provided conflicting commits
require.Len(t, newSuite.reactor.pool.peers, len(testSuites)-3)
}