From bd42353ee5fb62123c1e99ce6ae2008c1451e7c0 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 6 Jan 2021 13:33:13 -0500 Subject: [PATCH] blockchain v0: refactor TestNoBlockResponse --- blockchain/v0/reactor.go | 2 + blockchain/v0/reactor_test.go | 486 ++++++++++++++++++++-------------- 2 files changed, 288 insertions(+), 200 deletions(-) diff --git a/blockchain/v0/reactor.go b/blockchain/v0/reactor.go index bf6109fcc..6dc053d54 100644 --- a/blockchain/v0/reactor.go +++ b/blockchain/v0/reactor.go @@ -268,6 +268,8 @@ func (r *Reactor) handleMessage(chID p2p.ChannelID, envelope p2p.Envelope) (err } }() + r.Logger.Debug("received message", "message", envelope.Message, "peer", envelope.From) + switch chID { case BlockchainChannel: err = r.handleBlockchainMessage(envelope) diff --git a/blockchain/v0/reactor_test.go b/blockchain/v0/reactor_test.go index a88b499f4..b5fee74ce 100644 --- a/blockchain/v0/reactor_test.go +++ b/blockchain/v0/reactor_test.go @@ -2,12 +2,12 @@ package v0 import ( "fmt" + "math/rand" "os" "sort" "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" dbm "github.com/tendermint/tm-db" @@ -17,6 +17,7 @@ import ( "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/mempool/mock" "github.com/tendermint/tendermint/p2p" + bcproto "github.com/tendermint/tendermint/proto/tendermint/blockchain" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/store" @@ -24,49 +25,38 @@ import ( tmtime "github.com/tendermint/tendermint/types/time" ) -var config *cfg.Config +var rng = rand.New(rand.NewSource(time.Now().UnixNano())) -func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) { - validators := make([]types.GenesisValidator, numValidators) - privValidators := make([]types.PrivValidator, numValidators) - for i := 0; i < numValidators; i++ { - val, privVal := types.RandValidator(randPower, minPower) - validators[i] = types.GenesisValidator{ - PubKey: val.PubKey, - Power: val.VotingPower, - } - privValidators[i] = privVal - } - sort.Sort(types.PrivValidatorsByAddress(privValidators)) - - return &types.GenesisDoc{ - GenesisTime: tmtime.Now(), - ChainID: config.ChainID(), - Validators: validators, - }, privValidators -} - -type BlockchainReactorPair struct { - reactor *BlockchainReactor +type reactorTestSuite struct { + reactor *Reactor app proxy.AppConns + + peerID p2p.NodeID + + blockchainChannel *p2p.Channel + blockchainInCh chan p2p.Envelope + blockchainOutCh chan p2p.Envelope + blockchainPeerErrCh chan p2p.PeerError + + peerUpdates *p2p.PeerUpdatesCh } -func newBlockchainReactor( - logger log.Logger, +func setup( + t *testing.T, genDoc *types.GenesisDoc, privVals []types.PrivValidator, - maxBlockHeight int64) BlockchainReactorPair { - if len(privVals) != 1 { - panic("only support one validator") - } + maxBlockHeight int64, + chBuf uint, +) *reactorTestSuite { + t.Helper() + + require.Len(t, privVals, 1, "only one validator can be supported") app := &testApp{} cc := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(cc) - err := proxyApp.Start() - if err != nil { - panic(fmt.Errorf("error start app: %w", err)) - } + require.NoError(t, proxyApp.Start()) blockDB := dbm.NewMemDB() stateDB := dbm.NewMemDB() @@ -74,25 +64,24 @@ func newBlockchainReactor( blockStore := store.NewBlockStore(blockDB) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) - if err != nil { - panic(fmt.Errorf("error constructing state from genesis file: %w", err)) - } + require.NoError(t, err) - // Make the BlockchainReactor itself. - // NOTE we have to create and commit the blocks first because - // pool.height is determined from the store. fastSync := true db := dbm.NewMemDB() stateStore = sm.NewStore(db) - blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), - mock.Mempool{}, sm.EmptyEvidencePool{}) - if err = stateStore.Save(state); err != nil { - panic(err) - } - // let's add some blocks in + blockExec := sm.NewBlockExecutor( + stateStore, + log.TestingLogger(), + proxyApp.Consensus(), + mock.Mempool{}, + sm.EmptyEvidencePool{}, + ) + require.NoError(t, stateStore.Save(state)) + for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil) + if blockHeight > 1 { lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) lastBlock := blockStore.LoadBlock(blockHeight - 1) @@ -105,182 +94,98 @@ func newBlockchainReactor( lastBlock.Header.ChainID, time.Now(), ) - if err != nil { - panic(err) - } - lastCommit = types.NewCommit(vote.Height, vote.Round, - lastBlockMeta.BlockID, []types.CommitSig{vote.CommitSig()}) + require.NoError(t, err) + + lastCommit = types.NewCommit( + vote.Height, + vote.Round, + lastBlockMeta.BlockID, + []types.CommitSig{vote.CommitSig()}, + ) } thisBlock := makeBlock(blockHeight, state, lastCommit) - thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} state, _, err = blockExec.ApplyBlock(state, blockID, thisBlock) - if err != nil { - panic(fmt.Errorf("error apply block: %w", err)) - } + require.NoError(t, err) blockStore.SaveBlock(thisBlock, thisParts, lastCommit) } - bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - bcReactor.SetLogger(logger.With("module", "blockchain")) + pID := make([]byte, 16) + _, err = rng.Read(pID) + require.NoError(t, err) - return BlockchainReactorPair{bcReactor, proxyApp} + rts := &reactorTestSuite{ + app: proxyApp, + blockchainInCh: make(chan p2p.Envelope, chBuf), + blockchainOutCh: make(chan p2p.Envelope, chBuf), + blockchainPeerErrCh: make(chan p2p.PeerError, chBuf), + peerUpdates: p2p.NewPeerUpdates(make(chan p2p.PeerUpdate)), + peerID: p2p.NodeID(fmt.Sprintf("%x", pID)), + } + + rts.blockchainChannel = p2p.NewChannel( + BlockchainChannel, + new(bcproto.Message), + rts.blockchainInCh, + rts.blockchainOutCh, + rts.blockchainPeerErrCh, + ) + + rts.reactor = NewReactor( + log.TestingLogger().With("module", "blockchain"), + state.Copy(), + blockExec, + blockStore, + nil, + rts.blockchainChannel, + rts.peerUpdates, + fastSync, + ) + + require.NoError(t, rts.reactor.Start()) + require.True(t, rts.reactor.IsRunning()) + + t.Cleanup(func() { + require.NoError(t, rts.reactor.Stop()) + require.NoError(t, rts.app.Stop()) + require.False(t, rts.reactor.IsRunning()) + }) + + return rts } -func TestNoBlockResponse(t *testing.T) { - config = cfg.ResetTestRoot("blockchain_reactor_test") - defer os.RemoveAll(config.RootDir) - genDoc, privVals := randGenesisDoc(1, false, 30) +func randGenesisDoc( + config *cfg.Config, + numValidators int, + randPower bool, + minPower int64, +) (*types.GenesisDoc, []types.PrivValidator) { + validators := make([]types.GenesisValidator, numValidators) + privValidators := make([]types.PrivValidator, numValidators) - maxBlockHeight := int64(65) - - reactorPairs := make([]BlockchainReactorPair, 2) - - reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - - p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) - return s - - }, p2p.Connect2Switches) - - defer func() { - for _, r := range reactorPairs { - err := r.reactor.Stop() - require.NoError(t, err) - err = r.app.Stop() - require.NoError(t, err) - } - }() - - tests := []struct { - height int64 - existent bool - }{ - {maxBlockHeight + 2, false}, - {10, true}, - {1, true}, - {100, false}, - } - - for { - if reactorPairs[1].reactor.pool.IsCaughtUp() { - break + for i := 0; i < numValidators; i++ { + val, privVal := types.RandValidator(randPower, minPower) + validators[i] = types.GenesisValidator{ + PubKey: val.PubKey, + Power: val.VotingPower, } - time.Sleep(10 * time.Millisecond) + privValidators[i] = privVal } - assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) + sort.Sort(types.PrivValidatorsByAddress(privValidators)) - for _, tt := range tests { - block := reactorPairs[1].reactor.store.LoadBlock(tt.height) - if tt.existent { - assert.True(t, block != nil) - } else { - assert.True(t, block == nil) - } - } + return &types.GenesisDoc{ + GenesisTime: tmtime.Now(), + ChainID: config.ChainID(), + Validators: validators, + }, privValidators } -// NOTE: This is too hard to test without -// an easy way to add test peer to switch -// or without significant refactoring of the module. -// Alternatively we could actually dial a TCP conn but -// that seems extreme. -func TestBadBlockStopsPeer(t *testing.T) { - config = cfg.ResetTestRoot("blockchain_reactor_test") - defer os.RemoveAll(config.RootDir) - genDoc, privVals := randGenesisDoc(1, false, 30) - - maxBlockHeight := int64(148) - - // Other chain needs a different validator set - otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30) - otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight) - - defer func() { - err := otherChain.reactor.Stop() - require.Error(t, err) - err = otherChain.app.Stop() - require.NoError(t, err) - }() - - reactorPairs := make([]BlockchainReactorPair, 4) - - reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - - switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) - return s - - }, p2p.Connect2Switches) - - defer func() { - for _, r := range reactorPairs { - err := r.reactor.Stop() - require.NoError(t, err) - - err = r.app.Stop() - require.NoError(t, err) - } - }() - - for { - time.Sleep(1 * time.Second) - caughtUp := true - for _, r := range reactorPairs { - if !r.reactor.pool.IsCaughtUp() { - caughtUp = false - } - } - if caughtUp { - break - } - } - - // at this time, reactors[0-3] is the newest - assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size()) - - // Mark reactorPairs[3] as an invalid peer. Fiddling with .store without a mutex is a data - // race, but can't be easily avoided. - reactorPairs[3].reactor.store = otherChain.reactor.store - - lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs = append(reactorPairs, lastReactorPair) - - switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) - return s - - }, p2p.Connect2Switches)...) - - for i := 0; i < len(reactorPairs)-1; i++ { - p2p.Connect2Switches(switches, i, len(reactorPairs)-1) - } - - for { - if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { - break - } - - time.Sleep(1 * time.Second) - } - - assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) -} - -//---------------------------------------------- -// utility funcs - func makeTxs(height int64) (txs []types.Tx) { for i := 0; i < 10; i++ { txs = append(txs, types.Tx([]byte{byte(height), byte(i)})) @@ -326,3 +231,184 @@ func (app *testApp) Commit() abci.ResponseCommit { func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) { return } + +func simulateRouter(primary *reactorTestSuite, suites []*reactorTestSuite) { + // create a mapping for efficient suite lookup by peer ID + suitesByPeerID := make(map[p2p.NodeID]*reactorTestSuite) + for _, suite := range suites { + suitesByPeerID[suite.peerID] = suite + } + + // Simulate a router by listening for all outbound envelopes and proxying the + // envelope to the respective peer (suite). + go func() { + for envelope := range primary.blockchainOutCh { + if envelope.Broadcast { + for _, s := range suites { + // broadcast to everyone except source + if s.peerID != envelope.From { + s.blockchainInCh <- p2p.Envelope{ + From: primary.peerID, + To: s.peerID, + Message: envelope.Message, + } + } + } + } else { + other := suitesByPeerID[envelope.To] + + other.blockchainInCh <- p2p.Envelope{ + From: primary.peerID, + To: envelope.To, + Message: envelope.Message, + } + } + } + }() + + go func() { + for range primary.blockchainPeerErrCh { + // drop peer errors + } + }() +} + +func TestNoBlockResponse(t *testing.T) { + config := cfg.ResetTestRoot("blockchain_reactor_test") + defer os.RemoveAll(config.RootDir) + + genDoc, privVals := randGenesisDoc(config, 1, false, 30) + maxBlockHeight := int64(65) + testSuites := []*reactorTestSuite{ + setup(t, genDoc, privVals, maxBlockHeight, 0), + setup(t, genDoc, privVals, 0, 0), + } + + require.Equal(t, maxBlockHeight, testSuites[0].reactor.store.Height()) + + simulateRouter(testSuites[0], testSuites) + simulateRouter(testSuites[1], testSuites) + + testCases := []struct { + height int64 + existent bool + }{ + {maxBlockHeight + 2, false}, + {10, true}, + {1, true}, + {100, false}, + } + + secondaryPool := testSuites[1].reactor.pool + + for { + if secondaryPool.MaxPeerHeight() > 0 && secondaryPool.IsCaughtUp() { + break + } + + time.Sleep(10 * time.Millisecond) + } + + for _, tc := range testCases { + block := testSuites[1].reactor.store.LoadBlock(tc.height) + if tc.existent { + require.True(t, block != nil) + } else { + require.True(t, block == nil, block) + } + } +} + +// ---------------------------------------------------------------------------- +// ---------------------------------------------------------------------------- +// ---------------------------------------------------------------------------- + +// // NOTE: This is too hard to test without +// // an easy way to add test peer to switch +// // or without significant refactoring of the module. +// // Alternatively we could actually dial a TCP conn but +// // that seems extreme. +// func TestBadBlockStopsPeer(t *testing.T) { +// config = cfg.ResetTestRoot("blockchain_reactor_test") +// defer os.RemoveAll(config.RootDir) +// genDoc, privVals := randGenesisDoc(1, false, 30) + +// maxBlockHeight := int64(148) + +// // Other chain needs a different validator set +// otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30) +// otherChain := newBlockchainReactor(log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight) + +// defer func() { +// err := otherChain.reactor.Stop() +// require.Error(t, err) +// err = otherChain.app.Stop() +// require.NoError(t, err) +// }() + +// reactorPairs := make([]BlockchainReactorPair, 4) + +// reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) +// reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) +// reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) +// reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) + +// switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { +// s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) +// return s + +// }, p2p.Connect2Switches) + +// defer func() { +// for _, r := range reactorPairs { +// err := r.reactor.Stop() +// require.NoError(t, err) + +// err = r.app.Stop() +// require.NoError(t, err) +// } +// }() + +// for { +// time.Sleep(1 * time.Second) +// caughtUp := true +// for _, r := range reactorPairs { +// if !r.reactor.pool.IsCaughtUp() { +// caughtUp = false +// } +// } +// if caughtUp { +// break +// } +// } + +// // at this time, reactors[0-3] is the newest +// assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size()) + +// // Mark reactorPairs[3] as an invalid peer. Fiddling with .store without a mutex is a data +// // race, but can't be easily avoided. +// reactorPairs[3].reactor.store = otherChain.reactor.store + +// lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) +// reactorPairs = append(reactorPairs, lastReactorPair) + +// switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { +// s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) +// return s + +// }, p2p.Connect2Switches)...) + +// for i := 0; i < len(reactorPairs)-1; i++ { +// p2p.Connect2Switches(switches, i, len(reactorPairs)-1) +// } + +// for { +// if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { +// break +// } + +// time.Sleep(1 * time.Second) +// } + +// assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) +// }