diff --git a/internal/blocksync/reactor_test.go b/internal/blocksync/reactor_test.go index eec36bbae..2febd9814 100644 --- a/internal/blocksync/reactor_test.go +++ b/internal/blocksync/reactor_test.go @@ -75,10 +75,14 @@ func setup( chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel, MessageType: new(bcproto.Message)} rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc) - i := 0 - for nodeID := range rts.network.Nodes { - rts.addNode(ctx, t, nodeID, genDoc, privValArray, maxBlockHeights[i]) - i++ + if maxBlockHeights[1] != 0 { + rts.addMultipleNodes(ctx, t, rts.network.NodeIDs(), genDoc, privValArray, maxBlockHeights, 0) + } else { + i := 0 + for nodeID := range rts.network.Nodes { + rts.addNode(ctx, t, nodeID, genDoc, privValArray, maxBlockHeights[i]) + i++ + } } t.Cleanup(func() { @@ -97,6 +101,143 @@ func setup( return rts } +// We add multiple nodes with varying initial heights +// Allows us to test whether block sync works when a node +// has previous state +// maxBlockHeightPerNode - the heights for which the node already has state +// maxBlockHeightIdx - the index of the node with maximum height +func (rts *reactorTestSuite) addMultipleNodes( + ctx context.Context, + t *testing.T, + nodeIDs []types.NodeID, + genDoc *types.GenesisDoc, + privValArray []types.PrivValidator, + maxBlockHeightPerNode []int64, + maxBlockHeightIdx int64, + +) { + t.Helper() + + logger := log.NewNopLogger() + blockDB := make([]*dbm.MemDB, len(nodeIDs)) + stateDB := make([]*dbm.MemDB, len(nodeIDs)) + blockExecutors := make([]*sm.BlockExecutor, len(nodeIDs)) + blockStores := make([]*store.BlockStore, len(nodeIDs)) + stateStores := make([]sm.Store, len(nodeIDs)) + + state, err := sm.MakeGenesisState(genDoc) + require.NoError(t, err) + + for idx, nodeID := range nodeIDs { + rts.nodes = append(rts.nodes, nodeID) + rts.app[nodeID] = proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics()) + require.NoError(t, rts.app[nodeID].Start(ctx)) + stateDB[idx] = dbm.NewMemDB() + stateStores[idx] = sm.NewStore(stateDB[idx]) + + blockDB[idx] = dbm.NewMemDB() + blockStores[idx] = store.NewBlockStore(blockDB[idx]) + + require.NoError(t, stateStores[idx].Save(state)) + mp := &mpmocks.Mempool{} + mp.On("Lock").Return() + mp.On("Unlock").Return() + mp.On("FlushAppConn", mock.Anything).Return(nil) + mp.On("Update", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything).Return(nil) + + eventbus := eventbus.NewDefault(logger) + require.NoError(t, eventbus.Start(ctx)) + + blockExecutors[idx] = sm.NewBlockExecutor(stateStores[idx], + log.NewNopLogger(), + rts.app[nodeID], + mp, + sm.EmptyEvidencePool{}, + blockStores[idx], + eventbus, + sm.NopMetrics(), + ) + } + + for blockHeight := int64(1); blockHeight <= maxBlockHeightPerNode[maxBlockHeightIdx]; blockHeight++ { + lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil) + + if blockHeight > 1 { + lastBlockMeta := blockStores[maxBlockHeightIdx].LoadBlockMeta(blockHeight - 1) + lastBlock := blockStores[maxBlockHeightIdx].LoadBlock(blockHeight - 1) + + commitSigs := make([]types.CommitSig, len(privValArray)) + votes := make([]types.Vote, len(privValArray)) + for i, val := range privValArray { + vote, err := factory.MakeVote( + ctx, + val, + lastBlock.Header.ChainID, 0, + lastBlock.Header.Height, 0, 2, + lastBlockMeta.BlockID, + time.Now(), + ) + require.NoError(t, err) + votes[i] = *vote + commitSigs[i] = vote.CommitSig() + } + lastCommit = types.NewCommit( + votes[0].Height, + votes[0].Round, + lastBlockMeta.BlockID, + commitSigs, + ) + + } + thisBlock := sf.MakeBlock(state, blockHeight, lastCommit) + thisParts, err := thisBlock.MakePartSet(types.BlockPartSizeBytes) + require.NoError(t, err) + blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()} + + for idx, _ := range nodeIDs { + + if blockHeight <= maxBlockHeightPerNode[idx] { + lastState, err := stateStores[idx].Load() + require.NoError(t, err) + state, err = blockExecutors[idx].ApplyBlock(ctx, lastState, blockID, thisBlock) + require.NoError(t, err) + blockStores[idx].SaveBlock(thisBlock, thisParts, lastCommit) + } + } + } + + for idx, nodeID := range nodeIDs { + rts.peerChans[nodeID] = make(chan p2p.PeerUpdate) + rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1) + rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID]) + + chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) { + return rts.blockSyncChannels[nodeID], nil + } + rts.reactors[nodeID] = NewReactor( + rts.logger.With("nodeID", nodeID), + stateStores[idx], + blockExecutors[idx], + blockStores[idx], + nil, + chCreator, + func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] }, + rts.blockSync, + consensus.NopMetrics(), + nil, // eventbus, can be nil + ) + + require.NoError(t, rts.reactors[nodeID].Start(ctx)) + require.True(t, rts.reactors[nodeID].IsRunning()) + } +} + func (rts *reactorTestSuite) addNode( ctx context.Context, t *testing.T, @@ -264,6 +405,46 @@ func TestReactor_AbruptDisconnect(t *testing.T) { //@jmalicevic ToDO Add tests that support syncing from nodes that are at different heights // rts := setup(ctx, t, genDoc, privVals, []int64{maxBlockHeight, 0, 0, 0}) +func TestReactor_NonGenesisSync(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cfg, err := config.ResetTestRoot(t.TempDir(), "block_sync_reactor_test") + require.NoError(t, err) + defer os.RemoveAll(cfg.RootDir) + + valSet, privVals := factory.ValidatorSet(ctx, t, 4, 30) + genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams()) + maxBlockHeight := int64(101) + + rts := setup(ctx, t, genDoc, privVals, []int64{maxBlockHeight, 50, 4, 0}) + require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height()) + rts.start(ctx, t) + + require.Eventually( + t, + func() bool { + matching := true + for idx, _ := range rts.nodes { + if idx == 0 { + continue + } + matching = matching && rts.reactors[rts.nodes[idx]].GetRemainingSyncTime() > time.Nanosecond && + rts.reactors[rts.nodes[idx]].pool.getLastSyncRate() > 0.001 + + if !matching { + height, _, _ := rts.reactors[rts.nodes[idx]].pool.GetStatus() + t.Logf("%d %d %s %f", height, idx, rts.reactors[rts.nodes[idx]].GetRemainingSyncTime(), rts.reactors[rts.nodes[idx]].pool.getLastSyncRate()) + } + } + return matching + }, + 20*time.Second, + 10*time.Millisecond, + "expected node to be partially synced", + ) +} + func TestReactor_SyncTime(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()