diff --git a/service/storage_service.cc b/service/storage_service.cc index 3d2a56339d..19123e4bc8 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -6263,6 +6263,25 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled) // Alternatively, a node would wait for some time before switching to normal state. auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex(); cdc_generation_mutations = co_await ss.get_system_mutations(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3); + + utils::get_local_injector().inject("cdc_generation_mutations_topology_snapshot_replication", + [target_size=ss._db.local().commitlog()->max_record_size() * 2, &muts = cdc_generation_mutations] { + // Copy mutations n times, where n is picked so that the memory size of all mutations + // together exceeds `commitlog()->max_record_size()`. + // We multiply by two to account for all possible deltas (like segment::entry_overhead_size). + + size_t current_size = 0; + for (const auto& m: muts) { + current_size += m.representation().size(); + } + const auto number_of_copies = (target_size / current_size + 1) * 2; + muts.reserve(muts.size() * number_of_copies); + const auto it_begin = muts.begin(); + const auto it_end = muts.end(); + for (unsigned i = 0; i < number_of_copies; ++i) { + std::copy(it_begin, it_end, std::back_inserter(muts)); + } + }); } std::vector topology_requests_mutations; diff --git a/test/topology_experimental_raft/test_cdc_generation_data.py b/test/topology_experimental_raft/test_cdc_generation_data.py index 0151a3ceeb..3b65724880 100644 --- a/test/topology_experimental_raft/test_cdc_generation_data.py +++ b/test/topology_experimental_raft/test_cdc_generation_data.py @@ -17,7 +17,9 @@ async def test_send_data_in_parts(manager: ManagerClient): first_server = await manager.server_add(config=config) async with inject_error(manager.api, first_server.ip_addr, 'cdc_generation_mutations_replication'): - await manager.server_add(config=config) + async with inject_error(manager.api, first_server.ip_addr, + 'cdc_generation_mutations_topology_snapshot_replication'): + await manager.server_add(config=config) await check_token_ring_and_group0_consistency(manager)