test: add UT to test retrying ALTER tablets KEYSPACE

The newly added testcase is based on the already existing
`test_alter_dropped_tablets_keyspace`.
A new error injection is created, which stops the ALTER execution just
before the changes are submitted to RAFT. In the meantime, a new schema
change is performed using the 2nd node in the cluster, thus causing the
1st node to retry the ALTER statement.

(cherry picked from commit 522bede8ec)
This commit is contained in:
Piotr Smaron
2024-10-15 15:40:43 +02:00
committed by Mergify
parent 47042a19f6
commit ab4b0069a1
2 changed files with 68 additions and 0 deletions

View File

@@ -844,6 +844,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.trace("do update {} reason {}", updates, reason);
mixed_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", [] (auto& handler) -> future<> {
rtlogger.info("wait-before-committing-rf-change-event injection hit");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30});
rtlogger.info("wait-before-committing-rf-change-event injection done");
});
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
>>>>>>> 3f4c8a30e3 (cql/tablets: fix indentation in `rf_change` event handler)
}

View File

@@ -12,6 +12,7 @@ from cassandra.protocol import InvalidRequest
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
from test.topology.conftest import skip_mode
from test.topology.util import disable_schema_agreement_wait
logger = logging.getLogger(__name__)
@@ -66,3 +67,65 @@ async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None:
with pytest.raises(InvalidRequest, match="Can't ALTER keyspace ks, keyspace doesn't exist") as e:
await task
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_alter_tablets_keyspace_concurrent_modification(manager: ManagerClient) -> None:
config = {
'enable_tablets': 'true'
}
logger.info("starting a node (the leader)")
servers = [await manager.server_add(config=config)]
logger.info("starting a second node (the follower)")
servers += [await manager.server_add(config=config)]
await manager.get_cql().run_async("create keyspace ks with "
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} and "
"tablets = {'initial': 2}")
await manager.get_cql().run_async("create table ks.t (pk int primary key)")
logger.info(f"injecting wait-before-committing-rf-change-event into the leader node {servers[0]}")
injection_handler = await inject_error_one_shot(manager.api, servers[0].ip_addr,
'wait-before-committing-rf-change-event')
# ALTER tablets KS only accepts a specific DC, it rejects the generic 'replication_factor' tag
res = await manager.get_cql().run_async("select data_center from system.local")
this_dc = res[0].data_center
async def alter_tablets_ks_without_waiting_to_complete():
logger.info("scheduling ALTER KS to change the RF from 1 to 2")
await manager.get_cql().run_async("alter keyspace ks "
f"with replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}")
# by creating a task this way we ensure it's immediately executed,
# but we don't want to wait until the task is completed here,
# because we want to do something else in the meantime
task = asyncio.create_task(alter_tablets_ks_without_waiting_to_complete())
logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request")
leader_log_file = await manager.server_open_log(servers[0].server_id)
await leader_log_file.wait_for("wait-before-committing-rf-change-event injection hit", timeout=10)
logger.info(f"creating another keyspace from the follower node {servers[1]} so that the leader, which hangs on injected sleep, "
f"wakes up with a changed schema")
host = manager.get_cql().cluster.metadata.get_host(servers[1].ip_addr)
with disable_schema_agreement_wait(manager.get_cql()):
await manager.get_cql().run_async("create keyspace ks2 with "
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} "
"and tablets = {'enabled': true}", host=host)
logger.info("waking up the leader to continue processing ALTER on a changed schema, which should cause a retry")
await injection_handler.message()
logger.info("waiting for ALTER to complete")
await task
# ensure that the concurrent modification error really did take place
matches = await leader_log_file.grep("topology change coordinator fiber got group0_concurrent_modification")
assert matches
# ensure that the ALTER has eventually succeeded and we changed RF from 1 to 2
res = manager.get_cql().execute(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = 'ks'")
assert res[0].replication[this_dc] == '2'