From 88fa8fe348eda92eb8c5443037539e2e7f65b5a2 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 8 Jan 2021 08:38:12 -0500 Subject: [PATCH] blockchain v0: update TestBadBlockStopsPeer --- blockchain/v0/reactor.go | 15 ++- blockchain/v0/reactor_test.go | 172 +++++++++++++++++----------------- 2 files changed, 95 insertions(+), 92 deletions(-) diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index 6dc053d54..1445b24ae 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -433,7 +433,12 @@ FOR_LOOP: lastAdvance = r.pool.LastAdvance() ) - r.Logger.Debug("consensus ticker", "num_pending", numPending, "total", lenRequesters) + r.Logger.Debug( + "consensus ticker", + "num_pending", numPending, + "total", lenRequesters, + "height", height, + ) switch { case r.pool.IsCaughtUp(): @@ -462,7 +467,7 @@ FOR_LOOP: break FOR_LOOP - case <-trySyncTicker.C: // chan time + case <-trySyncTicker.C: select { case didProcessCh <- struct{}{}: default: @@ -476,7 +481,7 @@ FOR_LOOP: // better to split these routines rather than coupling them as it is // written here. // - // TODO: uncouple from request routine. + // TODO: Uncouple from request routine. // see if there are any blocks to sync first, second := r.pool.PeekTwoBlocks() @@ -534,10 +539,10 @@ FOR_LOOP: // TODO: batch saves so we do not persist to disk every block r.store.SaveBlock(first, firstParts, second.LastCommit) - // TODO: Same thing for app - but we would need a way to get the hash - // without persisting the state. var err error + // TODO: Same thing for app - but we would need a way to get the hash + // without persisting the state. state, _, err = r.blockExec.ApplyBlock(state, firstID, first) if err != nil { // TODO: This is bad, are we zombie? diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index 30c45bd28..21601efc2 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -38,7 +38,8 @@ type reactorTestSuite struct { blockchainOutCh chan p2p.Envelope blockchainPeerErrCh chan p2p.PeerError - peerUpdates *p2p.PeerUpdatesCh + peerUpdatesCh chan p2p.PeerUpdate + peerUpdates *p2p.PeerUpdatesCh } func setup( @@ -118,12 +119,15 @@ func setup( _, err = rng.Read(pID) require.NoError(t, err) + peerUpdatesCh := make(chan p2p.PeerUpdate) + rts := &reactorTestSuite{ app: proxyApp, blockchainInCh: make(chan p2p.Envelope, chBuf), blockchainOutCh: make(chan p2p.Envelope, chBuf), blockchainPeerErrCh: make(chan p2p.PeerError, chBuf), - peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate)), + peerUpdatesCh: peerUpdatesCh, + peerUpdates: p2p.NewPeerUpdates(peerUpdatesCh), peerID: p2p.NodeID(fmt.Sprintf("%x", pID)), } @@ -136,7 +140,7 @@ func setup( ) rts.reactor = NewReactor( - log.TestingLogger().With("module", "blockchain"), + log.TestingLogger().With("module", "blockchain", "node", rts.peerID), state.Copy(), blockExec, blockStore, @@ -232,7 +236,7 @@ func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQue return } -func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite) { +func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite, dropChErr bool) { // create a mapping for efficient suite lookup by peer ID suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite) for _, suite := range suites { @@ -255,9 +259,7 @@ func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite) { } } } else { - other := suitesByPeerID[envelope.To] - - other.blockchainInCh <- p2p.Envelope{ + suitesByPeerID[envelope.To].blockchainInCh <- p2p.Envelope{ From: primary.peerID, To: envelope.To, Message: envelope.Message, @@ -268,7 +270,14 @@ func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite) { go func() { for pErr := range primary.blockchainPeerErrCh { - primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err) + if dropChErr { + primary.reactor.Logger.Debug("dropped peer error", "err", pErr.Err) + } else { + primary.peerUpdatesCh <- p2p.PeerUpdate{ + PeerID: pErr.PeerID, + Status: p2p.PeerStatusRemoved, + } + } } }() } @@ -286,8 +295,8 @@ func TestNoBlockResponse(t *testing.T) { require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height()) - simulateRouter(testSuites[0], testSuites) - simulateRouter(testSuites[1], testSuites) + simulateRouter(testSuites[0], testSuites, true) + simulateRouter(testSuites[1], testSuites, true) testCases := []struct { height int64 @@ -319,96 +328,85 @@ func TestNoBlockResponse(t *testing.T) { } } -// ---------------------------------------------------------------------------- -// ---------------------------------------------------------------------------- -// ---------------------------------------------------------------------------- +func TestBadBlockStopsPeer(t *testing.T) { + config := cfg.ResetTestRoot("blockchain_reactor_test") + defer os.RemoveAll(config.RootDir) -// // NOTE: This is too hard to test without -// // an easy way to add test peer to switch -// // or without significant refactoring of the module. -// // Alternatively we could actually dial a TCP conn but -// // that seems extreme. -// func TestBadBlockStopsPeer(t *testing.T) { -// config = cfg.ResetTestRoot("blockchain_reactor_test") -// defer os.RemoveAll(config.RootDir) -// genDoc, privVals := randGenesisDoc(1, false, 30) + maxBlockHeight := int64(50) // int64(148) + genDoc, privVals := randGenesisDoc(config, 1, false, 30) -// maxBlockHeight := int64(148) + testSuites := []*reactorTestSuite{ + setup(t, genDoc, privVals, maxBlockHeight, 0), // fully synced node + setup(t, genDoc, privVals, 0, 0), + setup(t, genDoc, privVals, 0, 0), + setup(t, genDoc, privVals, 0, 0), + setup(t, genDoc, privVals, 0, 0), // new node + } -// // Other chain needs a different validator set -// otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30) -// otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight) + require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height()) -// defer func() { -// err := otherChain.reactor.Stop() -// require.Error(t, err) -// err = otherChain.app.Stop() -// require.NoError(t, err) -// }() + for _, s := range testSuites[:len(testSuites)-1] { + simulateRouter(s, testSuites, true) -// reactorPairs := make([]BlockchainReactorPair, 4) + // connect reactor to every other reactor except the new node + for _, ss := range testSuites[:len(testSuites)-1] { + if s.peerID != ss.peerID { + s.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: ss.peerID, + } + } + } + } -// reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) -// reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) -// reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) -// reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + for { + caughtUp := true + for _, s := range testSuites[1 : len(testSuites)-1] { + if s.reactor.pool.MaxPeerHeight() == 0 || !s.reactor.pool.IsCaughtUp() { + caughtUp = false + } + } -// switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { -// s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) -// return s + if caughtUp { + break + } -// }, p2p.Connect2Switches) + time.Sleep(10 * time.Millisecond) + } -// defer func() { -// for _, r := range reactorPairs { -// err := r.reactor.Stop() -// require.NoError(t, err) + for _, s := range testSuites[:len(testSuites)-1] { + require.Len(t, s.reactor.pool.peers, 3) + } -// err = r.app.Stop() -// require.NoError(t, err) -// } -// }() + // Mark testSuites[3] as an invalid peer which will cause newSuite to disconnect + // from this peer. + otherGenDoc, otherPrivVals := randGenesisDoc(config, 1, false, 30) + otherSuite := setup(t, otherGenDoc, otherPrivVals, maxBlockHeight, 0) + testSuites[3].reactor.store = otherSuite.reactor.store -// for { -// time.Sleep(1 * time.Second) -// caughtUp := true -// for _, r := range reactorPairs { -// if !r.reactor.pool.IsCaughtUp() { -// caughtUp = false -// } -// } -// if caughtUp { -// break -// } -// } + // start the new peer's faux router + newSuite := testSuites[len(testSuites)-1] + simulateRouter(newSuite, testSuites, false) -// // at this time, reactors[0-3] is the newest -// assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size()) + // connect all nodes to the new peer + for _, s := range testSuites[:len(testSuites)-1] { + newSuite.peerUpdatesCh <- p2p.PeerUpdate{ + Status: p2p.PeerStatusUp, + PeerID: s.peerID, + } + } -// // Mark reactorPairs[3] as an invalid peer. Fiddling with .store without a mutex is a data -// // race, but can't be easily avoided. -// reactorPairs[3].reactor.store = otherChain.reactor.store + // wait for the new peer to catch up and become fully synced + for { + if newSuite.reactor.pool.MaxPeerHeight() > 0 && newSuite.reactor.pool.IsCaughtUp() { + break + } -// lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) -// reactorPairs = append(reactorPairs, lastReactorPair) + time.Sleep(10 * time.Millisecond) + } -// switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { -// s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) -// return s + time.Sleep(5 * time.Second) -// }, p2p.Connect2Switches)...) - -// for i := 0; i < len(reactorPairs)-1; i++ { -// p2p.Connect2Switches(switches, i, len(reactorPairs)-1) -// } - -// for { -// if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { -// break -// } - -// time.Sleep(1 * time.Second) -// } - -// assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) -// } + // ensure two peers were removed that provided conflicting commits + require.Len(t, newSuite.reactor.pool.peers, len(testSuites)-3) +}