storage_service: test_group0_apply_while_node_is_being_shutdown

This commit is contained in:
Petr Gusev
2025-06-06 10:00:45 +02:00
parent 6b85ab79d6
commit b1050944a3
3 changed files with 50 additions and 1 deletions

View File

@@ -418,6 +418,7 @@ future<> raft_group0::abort() {
co_return;
}
_aborted = true;
group0_log.debug("Raft group0 service is aborting...");
co_await smp::invoke_on_all([this]() {
return uninit_rpc_verbs(_ms.local());
@@ -430,6 +431,8 @@ future<> raft_group0::abort() {
co_await std::move(_leadership_monitor);
co_await stop_group0();
group0_log.debug("Raft group0 service is aborted");
}
future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) {

View File

@@ -817,6 +817,10 @@ future<> storage_service::topology_state_load(state_change_hint hint) {
for (const auto& gen_id : _topology_state_machine._topology.committed_cdc_generations) {
rtlogger.trace("topology_state_load: process committed cdc generation {}", gen_id);
co_await utils::get_local_injector().inject("topology_state_load_before_update_cdc", [](auto& handler) -> future<> {
rtlogger.info("topology_state_load_before_update_cdc hit, wait for message");
co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5));
});
co_await _cdc_gens.local().handle_cdc_generation(gen_id);
if (gen_id == _topology_state_machine._topology.committed_cdc_generations.back()) {
co_await _sys_ks.local().update_cdc_generation_id(gen_id);

View File

@@ -1,8 +1,13 @@
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.cluster.util import check_token_ring_and_group0_consistency
from test.cluster.conftest import skip_mode
import logging
import pytest
import asyncio
logger = logging.getLogger(__name__)
"""
The injection forces the topology coordinator to send CDC generation data in multiple parts,
@@ -31,3 +36,40 @@ async def test_send_data_in_parts(manager: ManagerClient):
break
else:
pytest.fail("No CDC generation data sent in parts was found")
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_group0_apply_while_node_is_being_shutdown(manager: ManagerClient):
# This a regression test for #24401.
logger.info("Starting s0")
s0 = await manager.server_add(cmdline=['--logger-log-level', 'raft_group0=debug'])
logger.info("Injecting topology_state_load_before_update_cdc into s0")
await manager.api.enable_injection(s0.ip_addr, "topology_state_load_before_update_cdc", False)
logger.info("Starting s1")
s1_start_task = asyncio.create_task(manager.server_add())
logger.info("Waiting for topology_state_load_before_update_cdc on s0")
log = await manager.server_open_log(s0.server_id)
await log.wait_for('topology_state_load_before_update_cdc hit, wait for message')
logger.info("Triggering s0 shutdown")
stop_s0_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id))
logger.info("Waiting for group0 to start aborting on s0")
await log.wait_for('Raft group0 service is aborting...')
logger.info("Releasing topology_state_load_before_update_cdc on s0")
await manager.api.message_injection(s0.ip_addr, 'topology_state_load_before_update_cdc')
await stop_s0_task
try:
await s1_start_task
except Exception:
pass # ingore errors, since we don't care
errors = await log.grep_for_errors()
assert errors == []