diff --git a/service/raft/raft_group0.cc b/service/raft/raft_group0.cc index 025b584927..39b34fc287 100644 --- a/service/raft/raft_group0.cc +++ b/service/raft/raft_group0.cc @@ -418,6 +418,7 @@ future<> raft_group0::abort() { co_return; } _aborted = true; + group0_log.debug("Raft group0 service is aborting..."); co_await smp::invoke_on_all([this]() { return uninit_rpc_verbs(_ms.local()); @@ -430,6 +431,8 @@ future<> raft_group0::abort() { co_await std::move(_leadership_monitor); co_await stop_group0(); + + group0_log.debug("Raft group0 service is aborted"); } future<> raft_group0::start_server_for_group0(raft::group_id group0_id, service::storage_service& ss, cql3::query_processor& qp, service::migration_manager& mm, bool topology_change_enabled) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 97a96ea40c..a16646a47e 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -817,6 +817,10 @@ future<> storage_service::topology_state_load(state_change_hint hint) { for (const auto& gen_id : _topology_state_machine._topology.committed_cdc_generations) { rtlogger.trace("topology_state_load: process committed cdc generation {}", gen_id); + co_await utils::get_local_injector().inject("topology_state_load_before_update_cdc", [](auto& handler) -> future<> { + rtlogger.info("topology_state_load_before_update_cdc hit, wait for message"); + co_await handler.wait_for_message(db::timeout_clock::now() + std::chrono::minutes(5)); + }); co_await _cdc_gens.local().handle_cdc_generation(gen_id); if (gen_id == _topology_state_machine._topology.committed_cdc_generations.back()) { co_await _sys_ks.local().update_cdc_generation_id(gen_id); diff --git a/test/cluster/test_cdc_generation_data.py b/test/cluster/test_cdc_generation_data.py index f50a0f33db..b765051d96 100644 --- a/test/cluster/test_cdc_generation_data.py +++ b/test/cluster/test_cdc_generation_data.py @@ -1,8 +1,13 @@ from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import inject_error from test.cluster.util import check_token_ring_and_group0_consistency - +from test.cluster.conftest import skip_mode +import logging import pytest +import asyncio + + +logger = logging.getLogger(__name__) """ The injection forces the topology coordinator to send CDC generation data in multiple parts, @@ -31,3 +36,40 @@ async def test_send_data_in_parts(manager: ManagerClient): break else: pytest.fail("No CDC generation data sent in parts was found") + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_group0_apply_while_node_is_being_shutdown(manager: ManagerClient): + # This a regression test for #24401. + + logger.info("Starting s0") + s0 = await manager.server_add(cmdline=['--logger-log-level', 'raft_group0=debug']) + + logger.info("Injecting topology_state_load_before_update_cdc into s0") + await manager.api.enable_injection(s0.ip_addr, "topology_state_load_before_update_cdc", False) + + logger.info("Starting s1") + s1_start_task = asyncio.create_task(manager.server_add()) + + logger.info("Waiting for topology_state_load_before_update_cdc on s0") + log = await manager.server_open_log(s0.server_id) + await log.wait_for('topology_state_load_before_update_cdc hit, wait for message') + + logger.info("Triggering s0 shutdown") + stop_s0_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id)) + + logger.info("Waiting for group0 to start aborting on s0") + await log.wait_for('Raft group0 service is aborting...') + + logger.info("Releasing topology_state_load_before_update_cdc on s0") + await manager.api.message_injection(s0.ip_addr, 'topology_state_load_before_update_cdc') + + await stop_s0_task + try: + await s1_start_task + except Exception: + pass # ingore errors, since we don't care + + errors = await log.grep_for_errors() + assert errors == []