test_cdc_generation_data: test snapshot transfer

The test only looked at the initial cdc_generation
generation. It made the changes bigger to go
past the raft max_command_size limit.
It then made sure this large mutation set is saved
in several raft commands.

In this commit we enhance the test to check that the
mutations are properly handled during snapshot transfer.
The problem is that the entire system.cdc_generations_v3
table is read into the topology_snapshot and it's total
size can exceed the commitlog max_record_size limit.

We need a separate injection since the compaction
could nullify the effects of the previous injection.

The test fails without the fix from the previous commit.
This commit is contained in:
Petr Gusev
2024-03-05 13:22:35 +04:00
parent 276d58114d
commit 740b240e9d
2 changed files with 22 additions and 1 deletions

View File

@@ -6263,6 +6263,25 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled)
// Alternatively, a node would wait for some time before switching to normal state.
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
cdc_generation_mutations = co_await ss.get_system_mutations(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
utils::get_local_injector().inject("cdc_generation_mutations_topology_snapshot_replication",
[target_size=ss._db.local().commitlog()->max_record_size() * 2, &muts = cdc_generation_mutations] {
// Copy mutations n times, where n is picked so that the memory size of all mutations
// together exceeds `commitlog()->max_record_size()`.
// We multiply by two to account for all possible deltas (like segment::entry_overhead_size).
size_t current_size = 0;
for (const auto& m: muts) {
current_size += m.representation().size();
}
const auto number_of_copies = (target_size / current_size + 1) * 2;
muts.reserve(muts.size() * number_of_copies);
const auto it_begin = muts.begin();
const auto it_end = muts.end();
for (unsigned i = 0; i < number_of_copies; ++i) {
std::copy(it_begin, it_end, std::back_inserter(muts));
}
});
}
std::vector<canonical_mutation> topology_requests_mutations;

View File

@@ -17,7 +17,9 @@ async def test_send_data_in_parts(manager: ManagerClient):
first_server = await manager.server_add(config=config)
async with inject_error(manager.api, first_server.ip_addr, 'cdc_generation_mutations_replication'):
await manager.server_add(config=config)
async with inject_error(manager.api, first_server.ip_addr,
'cdc_generation_mutations_topology_snapshot_replication'):
await manager.server_add(config=config)
await check_token_ring_and_group0_consistency(manager)