blocksync: added test for syncing blocks from different heights

This commit is contained in:
Jasmina Malicevic
2022-04-28 16:41:55 +02:00
parent 993c90ce07
commit 600f84dae6

View File

@@ -75,10 +75,14 @@ func setup(
chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel, MessageType: new(bcproto.Message)}
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(ctx, t, chDesc)
i := 0
for nodeID := range rts.network.Nodes {
rts.addNode(ctx, t, nodeID, genDoc, privValArray, maxBlockHeights[i])
i++
if maxBlockHeights[1] != 0 {
rts.addMultipleNodes(ctx, t, rts.network.NodeIDs(), genDoc, privValArray, maxBlockHeights, 0)
} else {
i := 0
for nodeID := range rts.network.Nodes {
rts.addNode(ctx, t, nodeID, genDoc, privValArray, maxBlockHeights[i])
i++
}
}
t.Cleanup(func() {
@@ -97,6 +101,143 @@ func setup(
return rts
}
// We add multiple nodes with varying initial heights
// Allows us to test whether block sync works when a node
// has previous state
// maxBlockHeightPerNode - the heights for which the node already has state
// maxBlockHeightIdx - the index of the node with maximum height
func (rts *reactorTestSuite) addMultipleNodes(
ctx context.Context,
t *testing.T,
nodeIDs []types.NodeID,
genDoc *types.GenesisDoc,
privValArray []types.PrivValidator,
maxBlockHeightPerNode []int64,
maxBlockHeightIdx int64,
) {
t.Helper()
logger := log.NewNopLogger()
blockDB := make([]*dbm.MemDB, len(nodeIDs))
stateDB := make([]*dbm.MemDB, len(nodeIDs))
blockExecutors := make([]*sm.BlockExecutor, len(nodeIDs))
blockStores := make([]*store.BlockStore, len(nodeIDs))
stateStores := make([]sm.Store, len(nodeIDs))
state, err := sm.MakeGenesisState(genDoc)
require.NoError(t, err)
for idx, nodeID := range nodeIDs {
rts.nodes = append(rts.nodes, nodeID)
rts.app[nodeID] = proxy.New(abciclient.NewLocalClient(logger, &abci.BaseApplication{}), logger, proxy.NopMetrics())
require.NoError(t, rts.app[nodeID].Start(ctx))
stateDB[idx] = dbm.NewMemDB()
stateStores[idx] = sm.NewStore(stateDB[idx])
blockDB[idx] = dbm.NewMemDB()
blockStores[idx] = store.NewBlockStore(blockDB[idx])
require.NoError(t, stateStores[idx].Save(state))
mp := &mpmocks.Mempool{}
mp.On("Lock").Return()
mp.On("Unlock").Return()
mp.On("FlushAppConn", mock.Anything).Return(nil)
mp.On("Update",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything).Return(nil)
eventbus := eventbus.NewDefault(logger)
require.NoError(t, eventbus.Start(ctx))
blockExecutors[idx] = sm.NewBlockExecutor(stateStores[idx],
log.NewNopLogger(),
rts.app[nodeID],
mp,
sm.EmptyEvidencePool{},
blockStores[idx],
eventbus,
sm.NopMetrics(),
)
}
for blockHeight := int64(1); blockHeight <= maxBlockHeightPerNode[maxBlockHeightIdx]; blockHeight++ {
lastCommit := types.NewCommit(blockHeight-1, 0, types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStores[maxBlockHeightIdx].LoadBlockMeta(blockHeight - 1)
lastBlock := blockStores[maxBlockHeightIdx].LoadBlock(blockHeight - 1)
commitSigs := make([]types.CommitSig, len(privValArray))
votes := make([]types.Vote, len(privValArray))
for i, val := range privValArray {
vote, err := factory.MakeVote(
ctx,
val,
lastBlock.Header.ChainID, 0,
lastBlock.Header.Height, 0, 2,
lastBlockMeta.BlockID,
time.Now(),
)
require.NoError(t, err)
votes[i] = *vote
commitSigs[i] = vote.CommitSig()
}
lastCommit = types.NewCommit(
votes[0].Height,
votes[0].Round,
lastBlockMeta.BlockID,
commitSigs,
)
}
thisBlock := sf.MakeBlock(state, blockHeight, lastCommit)
thisParts, err := thisBlock.MakePartSet(types.BlockPartSizeBytes)
require.NoError(t, err)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartSetHeader: thisParts.Header()}
for idx, _ := range nodeIDs {
if blockHeight <= maxBlockHeightPerNode[idx] {
lastState, err := stateStores[idx].Load()
require.NoError(t, err)
state, err = blockExecutors[idx].ApplyBlock(ctx, lastState, blockID, thisBlock)
require.NoError(t, err)
blockStores[idx].SaveBlock(thisBlock, thisParts, lastCommit)
}
}
}
for idx, nodeID := range nodeIDs {
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
chCreator := func(ctx context.Context, chdesc *p2p.ChannelDescriptor) (*p2p.Channel, error) {
return rts.blockSyncChannels[nodeID], nil
}
rts.reactors[nodeID] = NewReactor(
rts.logger.With("nodeID", nodeID),
stateStores[idx],
blockExecutors[idx],
blockStores[idx],
nil,
chCreator,
func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] },
rts.blockSync,
consensus.NopMetrics(),
nil, // eventbus, can be nil
)
require.NoError(t, rts.reactors[nodeID].Start(ctx))
require.True(t, rts.reactors[nodeID].IsRunning())
}
}
func (rts *reactorTestSuite) addNode(
ctx context.Context,
t *testing.T,
@@ -264,6 +405,46 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
//@jmalicevic ToDO Add tests that support syncing from nodes that are at different heights
// rts := setup(ctx, t, genDoc, privVals, []int64{maxBlockHeight, 0, 0, 0})
func TestReactor_NonGenesisSync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cfg, err := config.ResetTestRoot(t.TempDir(), "block_sync_reactor_test")
require.NoError(t, err)
defer os.RemoveAll(cfg.RootDir)
valSet, privVals := factory.ValidatorSet(ctx, t, 4, 30)
genDoc := factory.GenesisDoc(cfg, time.Now(), valSet.Validators, factory.ConsensusParams())
maxBlockHeight := int64(101)
rts := setup(ctx, t, genDoc, privVals, []int64{maxBlockHeight, 50, 4, 0})
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
rts.start(ctx, t)
require.Eventually(
t,
func() bool {
matching := true
for idx, _ := range rts.nodes {
if idx == 0 {
continue
}
matching = matching && rts.reactors[rts.nodes[idx]].GetRemainingSyncTime() > time.Nanosecond &&
rts.reactors[rts.nodes[idx]].pool.getLastSyncRate() > 0.001
if !matching {
height, _, _ := rts.reactors[rts.nodes[idx]].pool.GetStatus()
t.Logf("%d %d %s %f", height, idx, rts.reactors[rts.nodes[idx]].GetRemainingSyncTime(), rts.reactors[rts.nodes[idx]].pool.getLastSyncRate())
}
}
return matching
},
20*time.Second,
10*time.Millisecond,
"expected node to be partially synced",
)
}
func TestReactor_SyncTime(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()