From 740b240e9df24ea85fcb925fdfe9f065545abb9b Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Tue, 5 Mar 2024 13:22:35 +0400 Subject: [PATCH] test_cdc_generation_data: test snapshot transfer The test only looked at the initial cdc_generation generation. It made the changes bigger to go past the raft max_command_size limit. It then made sure this large mutation set is saved in several raft commands. In this commit we enhance the test to check that the mutations are properly handled during snapshot transfer. The problem is that the entire system.cdc_generations_v3 table is read into the topology_snapshot and it's total size can exceed the commitlog max_record_size limit. We need a separate injection since the compaction could nullify the effects of the previous injection. The test fails without the fix from the previous commit. --- service/storage_service.cc | 19 +++++++++++++++++++ .../test_cdc_generation_data.py | 4 +++- 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/service/storage_service.cc b/service/storage_service.cc index 3d2a56339d..19123e4bc8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6263,6 +6263,25 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled) // Alternatively, a node would wait for some time before switching to normal state. auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(); cdc_generation_mutations = co_await ss.get_system_mutations(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3); + + utils::get_local_injector().inject("cdc_generation_mutations_topology_snapshot_replication", + [target_size=ss._db.local().commitlog()->max_record_size() * 2, &muts = cdc_generation_mutations] { + // Copy mutations n times, where n is picked so that the memory size of all mutations + // together exceeds `commitlog()->max_record_size()`. + // We multiply by two to account for all possible deltas (like segment::entry_overhead_size). + + size_t current_size = 0; + for (const auto& m: muts) { + current_size += m.representation().size(); + } + const auto number_of_copies = (target_size / current_size + 1) * 2; + muts.reserve(muts.size() * number_of_copies); + const auto it_begin = muts.begin(); + const auto it_end = muts.end(); + for (unsigned i = 0; i < number_of_copies; ++i) { + std::copy(it_begin, it_end, std::back_inserter(muts)); + } + }); } std::vector topology_requests_mutations; diff --git a/test/topology_experimental_raft/test_cdc_generation_data.py b/test/topology_experimental_raft/test_cdc_generation_data.py index 0151a3ceeb..3b65724880 100644 --- a/test/topology_experimental_raft/test_cdc_generation_data.py +++ b/test/topology_experimental_raft/test_cdc_generation_data.py @@ -17,7 +17,9 @@ async def test_send_data_in_parts(manager: ManagerClient): first_server = await manager.server_add(config=config) async with inject_error(manager.api, first_server.ip_addr, 'cdc_generation_mutations_replication'): - await manager.server_add(config=config) + async with inject_error(manager.api, first_server.ip_addr, + 'cdc_generation_mutations_topology_snapshot_replication'): + await manager.server_add(config=config) await check_token_ring_and_group0_consistency(manager)