mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-12 19:02:12 +00:00
topology_custom/test_mv_admission_control: use new_test_keyspace
Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit ef85c4b27e)
This commit is contained in:
@@ -13,6 +13,7 @@ import logging
|
||||
from test.topology.conftest import skip_mode
|
||||
from test.pylib.util import wait_for_view
|
||||
from test.topology_custom.test_mv_tablets import pin_the_only_tablet, get_tablet_replicas
|
||||
from test.topology.util import new_test_keyspace
|
||||
|
||||
from cassandra.cluster import ConsistencyLevel, EXEC_PROFILE_DEFAULT # type: ignore
|
||||
from cassandra.cqltypes import Int32Type # type: ignore
|
||||
@@ -31,31 +32,28 @@ async def test_mv_admission_control_exception(manager: ManagerClient) -> None:
|
||||
config = {'error_injections_at_startup': ['view_update_limit', 'update_backlog_immediately'], 'tablets_mode_for_new_keyspaces': 'enabled'}
|
||||
servers = await manager.servers_add(node_count, config=config)
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
await cql.run_async(f"CREATE KEYSPACE ks WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}"
|
||||
"AND tablets = {'initial': 1}")
|
||||
await cql.run_async(f"CREATE TABLE ks.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv_cf_view AS SELECT * FROM ks.tab "
|
||||
"WHERE c IS NOT NULL and key IS NOT NULL PRIMARY KEY (c, key) ")
|
||||
await wait_for_view(cql, 'mv_cf_view', node_count)
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv_cf_view AS SELECT * FROM {ks}.tab "
|
||||
"WHERE c IS NOT NULL and key IS NOT NULL PRIMARY KEY (c, key) ")
|
||||
await wait_for_view(cql, 'mv_cf_view', node_count)
|
||||
|
||||
# Only remote updates hold on to memory, so make the update remote by pinning base and view tablets to different nodes.
|
||||
await pin_the_only_tablet(manager, "ks", "tab", servers[0])
|
||||
await pin_the_only_tablet(manager, "ks", "mv_cf_view", servers[1])
|
||||
# Only remote updates hold on to memory, so make the update remote by pinning base and view tablets to different nodes.
|
||||
await pin_the_only_tablet(manager, ks, "tab", servers[0])
|
||||
await pin_the_only_tablet(manager, ks, "mv_cf_view", servers[1])
|
||||
|
||||
# Prepare the statement so that the write goes to the same shard both
|
||||
# times (the first write will cause only the shard on which it was
|
||||
# performed to have the updated view update backlog).
|
||||
stmt = cql.prepare(f"INSERT INTO ks.tab (key, c, v) VALUES (?, ?, ?)")
|
||||
# To inspect the error message, we need to disable retries, which can't
|
||||
# be done in `prepare()` or `run_async()`. Instead, we use `BoundStatement`.
|
||||
bnd_stmt = BoundStatement(stmt, retry_policy=FallthroughRetryPolicy())
|
||||
await asyncio.gather(*(manager.api.enable_injection(s.ip_addr, "never_finish_remote_view_updates", one_shot=False) for s in servers))
|
||||
await cql.run_async(bnd_stmt.bind([0, 0, 240000*'a']), host=hosts[0])
|
||||
with pytest.raises(Exception, match="View update backlog is too high"):
|
||||
await cql.run_async(bnd_stmt.bind([0, 0, 'a']), host=hosts[0])
|
||||
await asyncio.gather(*(manager.api.disable_injection(s.ip_addr, "never_finish_remote_view_updates") for s in servers))
|
||||
|
||||
await cql.run_async(f"DROP KEYSPACE ks")
|
||||
# Prepare the statement so that the write goes to the same shard both
|
||||
# times (the first write will cause only the shard on which it was
|
||||
# performed to have the updated view update backlog).
|
||||
stmt = cql.prepare(f"INSERT INTO {ks}.tab (key, c, v) VALUES (?, ?, ?)")
|
||||
# To inspect the error message, we need to disable retries, which can't
|
||||
# be done in `prepare()` or `run_async()`. Instead, we use `BoundStatement`.
|
||||
bnd_stmt = BoundStatement(stmt, retry_policy=FallthroughRetryPolicy())
|
||||
await asyncio.gather(*(manager.api.enable_injection(s.ip_addr, "never_finish_remote_view_updates", one_shot=False) for s in servers))
|
||||
await cql.run_async(bnd_stmt.bind([0, 0, 240000*'a']), host=hosts[0])
|
||||
with pytest.raises(Exception, match="View update backlog is too high"):
|
||||
await cql.run_async(bnd_stmt.bind([0, 0, 'a']), host=hosts[0])
|
||||
await asyncio.gather(*(manager.api.disable_injection(s.ip_addr, "never_finish_remote_view_updates") for s in servers))
|
||||
|
||||
# In this test we have a table with a materialized view and a replication factor of 3
|
||||
# and 4 nodes so that not all views get paired with replicas on the same nodes.
|
||||
@@ -73,62 +71,64 @@ async def test_mv_retried_writes_reach_all_replicas(manager: ManagerClient) -> N
|
||||
server = await manager.server_add(config={'error_injections_at_startup': ['view_update_limit', 'delay_before_remote_view_update', 'update_backlog_immediately'], 'tablets_mode_for_new_keyspaces': 'enabled'})
|
||||
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
await cql.run_async(f"CREATE KEYSPACE ks WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}"
|
||||
"AND tablets = {'initial': 1}")
|
||||
await cql.run_async(f"CREATE TABLE ks.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW ks.mv_cf_view AS SELECT * FROM ks.tab "
|
||||
"WHERE c IS NOT NULL and key IS NOT NULL PRIMARY KEY (c, key) ")
|
||||
await wait_for_view(cql, 'mv_cf_view', node_count)
|
||||
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3} AND tablets = {'initial': 1}") as ks:
|
||||
await cql.run_async(f"CREATE TABLE {ks}.tab (key int, c int, v text, PRIMARY KEY (key, c))")
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv_cf_view AS SELECT * FROM {ks}.tab "
|
||||
"WHERE c IS NOT NULL and key IS NOT NULL PRIMARY KEY (c, key) ")
|
||||
await wait_for_view(cql, 'mv_cf_view', node_count)
|
||||
|
||||
# Disable tablet balancing so that the slow node doesn't get tablets moved away from it.
|
||||
for s in servers:
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
# Make sure that the slow node has a base table tablet and no view tablets, so that the
|
||||
# view updates from it are remote. (using shard 0 and token 0 when moving tablets as they don't make a difference here)
|
||||
base_tablet_replicas = await get_tablet_replicas(manager, servers[0], "ks", "tab", 0)
|
||||
base_tablet_hosts = [str(replica[0]) for replica in base_tablet_replicas]
|
||||
slow_host_id = await manager.get_host_id(server.server_id)
|
||||
if str(slow_host_id) not in base_tablet_hosts:
|
||||
base_tablet_host_id, base_tablet_shard = base_tablet_replicas[0]
|
||||
await manager.api.move_tablet(servers[0].ip_addr, "ks", "tab", base_tablet_host_id, base_tablet_shard, slow_host_id, 0, 0)
|
||||
view_tablet_replicas = await get_tablet_replicas(manager, servers[0], "ks", "mv_cf_view", 0)
|
||||
view_tablet_hosts = [str(replica[0]) for replica in view_tablet_replicas]
|
||||
for replica_host, replica_shard in view_tablet_replicas:
|
||||
if str(replica_host) != str(slow_host_id):
|
||||
continue
|
||||
slow_host_shard = replica_shard
|
||||
# Move the view tablet to the node that doesn't have one
|
||||
# Disable tablet balancing so that the slow node doesn't get tablets moved away from it.
|
||||
for s in servers:
|
||||
fast_host_id = await manager.get_host_id(s.server_id)
|
||||
if str(fast_host_id) not in view_tablet_hosts:
|
||||
await manager.api.move_tablet(servers[0].ip_addr, "ks", "mv_cf_view", slow_host_id, slow_host_shard, fast_host_id, 0, 0)
|
||||
break
|
||||
await manager.api.disable_tablet_balancing(s.ip_addr)
|
||||
await manager.api.disable_tablet_balancing(server.ip_addr)
|
||||
|
||||
# Prepare the statement so that the write goes to the same shard
|
||||
# for all requests (the backlog increase caused by a write is only
|
||||
# immediately noted on the shard that the write was performed on).
|
||||
stmt = cql.prepare(f"INSERT INTO ks.tab (key, c, v) VALUES (?, ?, ?)")
|
||||
for i in range(10):
|
||||
# Perform a write that will increase the view update backlog on the slow node
|
||||
# to a level causing admission control to reject the following writes.
|
||||
await cql.run_async(stmt, [0, i, 240000*'a'], host=hosts[0])
|
||||
# Based on whether the response from the slow node is received before the next write,
|
||||
# the following small write can serve two purposes:
|
||||
# 1. If the response is received before the next write, the write will be rejected by
|
||||
# admission control and retried until it reaches all replicas.
|
||||
# 2. If the response is not received before the next write, the write will be sent to
|
||||
# the slow node without causing the view update backlog limit to be exceeded. Then,
|
||||
# due to cl=ALL, the coordinator will wait for the response from the slow node, which
|
||||
# will carry an up-to-date view update backlog for the next large write.
|
||||
cl_all_execution_profile = cql.execution_profile_clone_update(EXEC_PROFILE_DEFAULT, consistency_level = ConsistencyLevel.ALL)
|
||||
await cql.run_async(stmt, [0, 10 + i, 'a'], host=hosts[0], execution_profile=cl_all_execution_profile)
|
||||
# Make sure that the slow node has a base table tablet and no view tablets, so that the
|
||||
# view updates from it are remote. (using shard 0 and token 0 when moving tablets as they don't make a difference here)
|
||||
base_tablet_replicas = await get_tablet_replicas(manager, servers[0], ks, "tab", 0)
|
||||
base_tablet_hosts = [str(replica[0]) for replica in base_tablet_replicas]
|
||||
slow_host_id = await manager.get_host_id(server.server_id)
|
||||
if str(slow_host_id) not in base_tablet_hosts:
|
||||
base_tablet_host_id, base_tablet_shard = base_tablet_replicas[0]
|
||||
await manager.api.move_tablet(servers[0].ip_addr, ks, "tab", base_tablet_host_id, base_tablet_shard, slow_host_id, 0, 0)
|
||||
view_tablet_replicas = await get_tablet_replicas(manager, servers[0], ks, "mv_cf_view", 0)
|
||||
view_tablet_hosts = [str(replica[0]) for replica in view_tablet_replicas]
|
||||
for replica_host, replica_shard in view_tablet_replicas:
|
||||
if str(replica_host) != str(slow_host_id):
|
||||
continue
|
||||
slow_host_shard = replica_shard
|
||||
# Move the view tablet to the node that doesn't have one
|
||||
for s in servers:
|
||||
fast_host_id = await manager.get_host_id(s.server_id)
|
||||
if str(fast_host_id) not in view_tablet_hosts:
|
||||
await manager.api.move_tablet(servers[0].ip_addr, ks, "mv_cf_view", slow_host_id, slow_host_shard, fast_host_id, 0, 0)
|
||||
break
|
||||
|
||||
# Verify that all writes reached the slow node
|
||||
await asyncio.gather(*(manager.server_stop_gracefully(s.server_id) for s in servers))
|
||||
print(f"Connecting to {server.ip_addr}")
|
||||
await manager.driver_connect(server=server)
|
||||
cql = manager.get_cql()
|
||||
# Prepare the statement so that the write goes to the same shard
|
||||
# for all requests (the backlog increase caused by a write is only
|
||||
# immediately noted on the shard that the write was performed on).
|
||||
stmt = cql.prepare(f"INSERT INTO {ks}.tab (key, c, v) VALUES (?, ?, ?)")
|
||||
for i in range(10):
|
||||
# Perform a write that will increase the view update backlog on the slow node
|
||||
# to a level causing admission control to reject the following writes.
|
||||
await cql.run_async(stmt, [0, i, 240000*'a'], host=hosts[0])
|
||||
# Based on whether the response from the slow node is received before the next write,
|
||||
# the following small write can serve two purposes:
|
||||
# 1. If the response is received before the next write, the write will be rejected by
|
||||
# admission control and retried until it reaches all replicas.
|
||||
# 2. If the response is not received before the next write, the write will be sent to
|
||||
# the slow node without causing the view update backlog limit to be exceeded. Then,
|
||||
# due to cl=ALL, the coordinator will wait for the response from the slow node, which
|
||||
# will carry an up-to-date view update backlog for the next large write.
|
||||
cl_all_execution_profile = cql.execution_profile_clone_update(EXEC_PROFILE_DEFAULT, consistency_level = ConsistencyLevel.ALL)
|
||||
await cql.run_async(stmt, [0, 10 + i, 'a'], host=hosts[0], execution_profile=cl_all_execution_profile)
|
||||
|
||||
assert len(await cql.run_async(SimpleStatement(f"SELECT * FROM ks.tab", consistency_level=ConsistencyLevel.ONE))) == 20
|
||||
# Verify that all writes reached the slow node
|
||||
await asyncio.gather(*(manager.server_stop_gracefully(s.server_id) for s in servers))
|
||||
print(f"Connecting to {server.ip_addr}")
|
||||
await manager.driver_connect(server=server)
|
||||
cql = manager.get_cql()
|
||||
|
||||
assert len(await cql.run_async(SimpleStatement(f"SELECT * FROM {ks}.tab", consistency_level=ConsistencyLevel.ONE))) == 20
|
||||
|
||||
# For dropping the keyspace
|
||||
await asyncio.gather(*(manager.server_start(s.server_id) for s in servers))
|
||||
|
||||
Reference in New Issue
Block a user