test: raft topology: test prepare_and_broadcast_cdc_generation_data

This test limits `commitlog_segment_size_in_mb` to 2, thus `max_command_size`
is limited to less than 1 MB. It adds an injection which copies mutations
generated by `get_cdc_generation_mutations` n times, where n is picked that
the memory size of all mutations exceeds `max_command_size`.

This test passes if cdc generation data is committed by raft in multiple commands.
If all the data is committed in a single command, the leader node will loop trying
to send raft command and getting the error:
```
storage_service - raft topology: topology change coordinator fiber got error raft::command_is_too_big_error (Command size {} is greater than the configured limit {})
```
This commit is contained in:
Mikołaj Grzebieluch
2023-05-11 16:56:08 +02:00
parent 8d6c95f9e3
commit 4e3c97d8d4
2 changed files with 48 additions and 0 deletions

View File

@@ -25,6 +25,7 @@
#include "gms/inet_address.hh"
#include "gms/gossiper.hh"
#include "gms/feature_service.hh"
#include "utils/error_injection.hh"
#include "utils/UUID_gen.hh"
#include "cdc/generation.hh"
@@ -272,8 +273,10 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
res.emplace_back(s, partition_key::from_singular(*s, id));
res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts);
size_t size_estimate = 0;
size_t total_size_estimate = 0;
for (auto& e : desc.entries()) {
if (size_estimate >= mutation_size_threshold) {
total_size_estimate += size_estimate;
res.emplace_back(s, partition_key::from_singular(*s, id));
size_estimate = 0;
}
@@ -292,6 +295,20 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
co_await coroutine::maybe_yield();
}
total_size_estimate += size_estimate;
// Copy mutations n times, where n is picked so that the memory size of all mutations together exceeds `max_command_size`.
utils::get_local_injector().inject("cdc_generation_mutations_replication", [&res, total_size_estimate, mutation_size_threshold] {
utils::chunked_vector<mutation> new_res;
size_t number_of_copies = (mutation_size_threshold / total_size_estimate + 1) * 2;
for (size_t i = 0; i < number_of_copies; ++i) {
std::copy(res.begin(), res.end(), std::back_inserter(new_res));
}
res = std::move(new_res);
});
co_return res;
}

View File

@@ -0,0 +1,31 @@
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.topology.util import check_token_ring_and_group0_consistency
import pytest
"""
The injection forces the topology coordinator to send CDC generation data in multiple parts,
if it didn't the command size would go over commitlog segment size limit making it impossible to commit and apply the command.
"""
@pytest.mark.asyncio
async def test_send_data_in_parts(manager: ManagerClient):
config = {
'commitlog_segment_size_in_mb': 2
}
first_server = await manager.server_add(config=config)
async with inject_error(manager.api, first_server.ip_addr, 'cdc_generation_mutations_replication'):
await manager.server_add(config=config)
await check_token_ring_and_group0_consistency(manager)
cql = manager.get_cql()
rows = await cql.run_async("SELECT description FROM system.group0_history")
for row in rows:
if row.description.startswith('insert CDC generation data (UUID: ') and row.description.endswith('), part'):
break
else:
pytest.fail("No CDC generation data sent in parts was found")