From d8a9fdddbd3ed3e3375a035e07c6d98acdc73000 Mon Sep 17 00:00:00 2001 From: Wojciech Mitros Date: Mon, 11 May 2026 15:12:33 +0200 Subject: [PATCH] test: run test_mv_admission_control_exception on one shard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In the test we perform 2 consecutive writes where the first write is supposed to increase the view update backlog above the mv admission control threshold and the second one is expected to be rejected because of that. On each node/shard we have 2 types of view update backlogs: 1. for deciding whether we should admit writes 2. for propagating the backlog information to other nodes/shards. For the second write to be rejected, it must be performed on a node and shard which updated its backlog of type 1. The view update backlog of type 2. is immediately increased on the base table replica. For this backlog to be registered as a backlog of type 1., it needs to be either carried by gossip (happening once every second) or by attaching it to a replica write response. We don't want to increase the runtime of tests unnecessarily, so we don't wait and we rely on the second mechanism. The response to the first base table write (the one causing increase in the backlog) carries the increased backlog to the coordinator of this write. So for the second write to observe the increased backlog, it needs to be coordinated on the same node+shard as the first write. We make sure that both writes are coordinated on the same node+shard by using prepared statements combined with setting the host in `run_async`. Both writes target the same partition and with prepared statements we route them directly to the correct shard. That was the idea, at least. In practice, for the driver to learn the correct shard, it first needs to learn the token->shard mapping from the server. For vnodes it can expect a shard by calculating the token of the affected partition, but for tablets, it had no opportunity to learn the tablet->shard mapping so the first write may route to any shard. Additionally, we aren't guaranteed that the driver established connections to all shards on all nodes at the point of any write. So if a connection finishes establishing between the two writes, this may also cause us to coordinate these 2 writes on different shards, leading to a missed view backlog growth and not-rejected second write. We fix this in this patch by running the test using one shard on each node. This way, as long as we perform both writes on the same node, they'll also be coordinated on the same shard. This also makes the prepared statement and BoundStatement unnecessary — we can use SimpleStatement with FallthroughRetryPolicy directly. Fixes: SCYLLADB-1957 Closes scylladb/scylladb#29862 (cherry picked from commit f3cf20803b557418668f035b4387c1ba84105306) Closes scylladb/scylladb#29873 Closes scylladb/scylladb#29879 --- test/cluster/mv/test_mv_admission_control.py | 22 +++++++++----------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/test/cluster/mv/test_mv_admission_control.py b/test/cluster/mv/test_mv_admission_control.py index 6d9b609189..e80924a984 100644 --- a/test/cluster/mv/test_mv_admission_control.py +++ b/test/cluster/mv/test_mv_admission_control.py @@ -16,9 +16,8 @@ from test.cluster.mv.tablets.test_mv_tablets import pin_the_only_tablet, get_tab from test.cluster.util import new_test_keyspace from cassandra.cluster import ConsistencyLevel, EXEC_PROFILE_DEFAULT # type: ignore -from cassandra.cqltypes import Int32Type # type: ignore from cassandra.policies import FallthroughRetryPolicy # type: ignore -from cassandra.query import SimpleStatement, BoundStatement # type: ignore +from cassandra.query import SimpleStatement # type: ignore logger = logging.getLogger(__name__) @@ -30,7 +29,8 @@ logger = logging.getLogger(__name__) async def test_mv_admission_control_exception(manager: ManagerClient) -> None: node_count = 2 config = {'error_injections_at_startup': ['view_update_limit', 'update_backlog_immediately'], 'tablets_mode_for_new_keyspaces': 'enabled'} - servers = await manager.servers_add(node_count, config=config) + # Use 1 shard to make sure that the same shard handles both writes, so that the second write sees the backlog increase caused by the first one. + servers = await manager.servers_add(node_count, config=config, cmdline=['--smp=1']) cql, hosts = await manager.get_ready_cql(servers) async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))") @@ -42,17 +42,15 @@ async def test_mv_admission_control_exception(manager: ManagerClient) -> None: await pin_the_only_tablet(manager, ks, "tab", servers[0]) await pin_the_only_tablet(manager, ks, "mv_cf_view", servers[1]) - # Prepare the statement so that the write goes to the same shard both - # times (the first write will cause only the shard on which it was - # performed to have the updated view update backlog). - stmt = cql.prepare(f"INSERT INTO {ks}.tab (key, c, v) VALUES (?, ?, ?)") - # To inspect the error message, we need to disable retries, which can't - # be done in `prepare()` or `run_async()`. Instead, we use `BoundStatement`. - bnd_stmt = BoundStatement(stmt, retry_policy=FallthroughRetryPolicy()) + # To inspect the error message, we need to disable retries. + stmt = SimpleStatement(f"INSERT INTO {ks}.tab (key, c, v) VALUES (0, 0, '{240000*'a'}')", + retry_policy=FallthroughRetryPolicy()) await asyncio.gather(*(manager.api.enable_injection(s.ip_addr, "never_finish_remote_view_updates", one_shot=False) for s in servers)) - await cql.run_async(bnd_stmt.bind([0, 0, 240000*'a']), host=hosts[0]) + await cql.run_async(stmt, host=hosts[0]) with pytest.raises(Exception, match="View update backlog is too high"): - await cql.run_async(bnd_stmt.bind([0, 0, 'a']), host=hosts[0]) + stmt2 = SimpleStatement(f"INSERT INTO {ks}.tab (key, c, v) VALUES (0, 0, 'a')", + retry_policy=FallthroughRetryPolicy()) + await cql.run_async(stmt2, host=hosts[0]) await asyncio.gather(*(manager.api.disable_injection(s.ip_addr, "never_finish_remote_view_updates") for s in servers)) # In this test we have a table with a materialized view and a replication factor of 3