test: add UT to test retrying ALTER tablets KEYSPACE

The newly added testcase is based on the already existing
`test_alter_dropped_tablets_keyspace`.
A new error injection is created, which stops the ALTER execution just
before the changes are submitted to RAFT. In the meantime, a new schema
change is performed using the 2nd node in the cluster, thus causing the
1st node to retry the ALTER statement.
This commit is contained in:
Piotr Smaron
2024-10-15 15:40:43 +02:00
parent 3f4c8a30e3
commit 522bede8ec
2 changed files with 68 additions and 0 deletions

View File

@@ -842,6 +842,11 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.trace("do update {} reason {}", updates, reason);
mixed_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
co_await utils::get_local_injector().inject("wait-before-committing-rf-change-event", [] (auto& handler) -> future<> {
rtlogger.info("wait-before-committing-rf-change-event injection hit");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{30});
rtlogger.info("wait-before-committing-rf-change-event injection done");
});
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), _as);
}
break;

View File

@@ -12,6 +12,7 @@ from cassandra.protocol import InvalidRequest
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
from test.topology.conftest import skip_mode
from test.topology.util import disable_schema_agreement_wait
logger = logging.getLogger(__name__)
@@ -66,3 +67,65 @@ async def test_alter_dropped_tablets_keyspace(manager: ManagerClient) -> None:
with pytest.raises(InvalidRequest, match="Can't ALTER keyspace ks, keyspace doesn't exist") as e:
await task
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_alter_tablets_keyspace_concurrent_modification(manager: ManagerClient) -> None:
config = {
'enable_tablets': 'true'
}
logger.info("starting a node (the leader)")
servers = [await manager.server_add(config=config)]
logger.info("starting a second node (the follower)")
servers += [await manager.server_add(config=config)]
await manager.get_cql().run_async("create keyspace ks with "
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} and "
"tablets = {'initial': 2}")
await manager.get_cql().run_async("create table ks.t (pk int primary key)")
logger.info(f"injecting wait-before-committing-rf-change-event into the leader node {servers[0]}")
injection_handler = await inject_error_one_shot(manager.api, servers[0].ip_addr,
'wait-before-committing-rf-change-event')
# ALTER tablets KS only accepts a specific DC, it rejects the generic 'replication_factor' tag
res = await manager.get_cql().run_async("select data_center from system.local")
this_dc = res[0].data_center
async def alter_tablets_ks_without_waiting_to_complete():
logger.info("scheduling ALTER KS to change the RF from 1 to 2")
await manager.get_cql().run_async("alter keyspace ks "
f"with replication = {{'class': 'NetworkTopologyStrategy', '{this_dc}': 2}}")
# by creating a task this way we ensure it's immediately executed,
# but we don't want to wait until the task is completed here,
# because we want to do something else in the meantime
task = asyncio.create_task(alter_tablets_ks_without_waiting_to_complete())
logger.info(f"waiting for the leader node {servers[0]} to start handling the keyspace-rf-change request")
leader_log_file = await manager.server_open_log(servers[0].server_id)
await leader_log_file.wait_for("wait-before-committing-rf-change-event injection hit", timeout=10)
logger.info(f"creating another keyspace from the follower node {servers[1]} so that the leader, which hangs on injected sleep, "
f"wakes up with a changed schema")
host = manager.get_cql().cluster.metadata.get_host(servers[1].ip_addr)
with disable_schema_agreement_wait(manager.get_cql()):
await manager.get_cql().run_async("create keyspace ks2 with "
"replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} "
"and tablets = {'enabled': true}", host=host)
logger.info("waking up the leader to continue processing ALTER on a changed schema, which should cause a retry")
await injection_handler.message()
logger.info("waiting for ALTER to complete")
await task
# ensure that the concurrent modification error really did take place
matches = await leader_log_file.grep("topology change coordinator fiber got group0_concurrent_modification")
assert matches
# ensure that the ALTER has eventually succeeded and we changed RF from 1 to 2
res = manager.get_cql().execute(f"SELECT * FROM system_schema.keyspaces WHERE keyspace_name = 'ks'")
assert res[0].replication[this_dc] == '2'