diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index bc9627923d..1a5d17df53 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -842,6 +842,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber { rtlogger.trace("do update {} reason {}", updates, reason); mixed_change change{std::move(updates)}; group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason); + co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", [] (auto& handler) -> future<> { + rtlogger.info("wait-before-committing-rf-change-event injection hit"); + co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30}); + rtlogger.info("wait-before-committing-rf-change-event injection done"); + }); co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as); } break; diff --git a/test/topology_custom/test_tablets_cql.py b/test/topology_custom/test_tablets_cql.py index 298cf03b5b..044b6a9991 100644 --- a/test/topology_custom/test_tablets_cql.py +++ b/test/topology_custom/test_tablets_cql.py @@ -12,6 +12,7 @@ from cassandra.protocol import InvalidRequest from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import inject_error_one_shot from test.topology.conftest import skip_mode +from test.topology.util import disable_schema_agreement_wait logger = logging.getLogger(__name__) @@ -66,3 +67,65 @@ async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None: with pytest.raises(InvalidRequest, match="Can't ALTER keyspace ks, keyspace doesn't exist") as e: await task + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_alter_tablets_keyspace_concurrent_modification(manager: ManagerClient) -> None: + config = { + 'enable_tablets': 'true' + } + + logger.info("starting a node (the leader)") + servers = [await manager.server_add(config=config)] + + logger.info("starting a second node (the follower)") + servers += [await manager.server_add(config=config)] + + await manager.get_cql().run_async("create keyspace ks with " + "replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} and " + "tablets = {'initial': 2}") + await manager.get_cql().run_async("create table ks.t (pk int primary key)") + + logger.info(f"injecting wait-before-committing-rf-change-event into the leader node {servers[0]}") + injection_handler = await inject_error_one_shot(manager.api, servers[0].ip_addr, + 'wait-before-committing-rf-change-event') + + # ALTER tablets KS only accepts a specific DC, it rejects the generic 'replication_factor' tag + res = await manager.get_cql().run_async("select data_center from system.local") + this_dc = res[0].data_center + + async def alter_tablets_ks_without_waiting_to_complete(): + logger.info("scheduling ALTER KS to change the RF from 1 to 2") + await manager.get_cql().run_async("alter keyspace ks " + f"with replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}") + + # by creating a task this way we ensure it's immediately executed, + # but we don't want to wait until the task is completed here, + # because we want to do something else in the meantime + task = asyncio.create_task(alter_tablets_ks_without_waiting_to_complete()) + + logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request") + leader_log_file = await manager.server_open_log(servers[0].server_id) + await leader_log_file.wait_for("wait-before-committing-rf-change-event injection hit", timeout=10) + + logger.info(f"creating another keyspace from the follower node {servers[1]} so that the leader, which hangs on injected sleep, " + f"wakes up with a changed schema") + host = manager.get_cql().cluster.metadata.get_host(servers[1].ip_addr) + with disable_schema_agreement_wait(manager.get_cql()): + await manager.get_cql().run_async("create keyspace ks2 with " + "replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} " + "and tablets = {'enabled': true}", host=host) + + logger.info("waking up the leader to continue processing ALTER on a changed schema, which should cause a retry") + await injection_handler.message() + + logger.info("waiting for ALTER to complete") + await task + + # ensure that the concurrent modification error really did take place + matches = await leader_log_file.grep("topology change coordinator fiber got group0_concurrent_modification") + assert matches + + # ensure that the ALTER has eventually succeeded and we changed RF from 1 to 2 + res = manager.get_cql().execute(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = 'ks'") + assert res[0].replication[this_dc] == '2'