From ff9c8428df3039bd28ec3ce2202381ef196df46c Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 19 Jan 2025 08:52:50 +0200 Subject: [PATCH] topology_custom/test_mv_topology_change: use new_test_keyspace Signed-off-by: Benny Halevy --- .../mv/test_mv_topology_change.py | 193 +++++++++--------- 1 file changed, 97 insertions(+), 96 deletions(-) diff --git a/test/topology_custom/mv/test_mv_topology_change.py b/test/topology_custom/mv/test_mv_topology_change.py index 20bac33889..8e11b3f9c0 100644 --- a/test/topology_custom/mv/test_mv_topology_change.py +++ b/test/topology_custom/mv/test_mv_topology_change.py @@ -16,6 +16,7 @@ from test.pylib.manager_client import ManagerClient from test.pylib.tablets import get_tablet_replica from test.topology.conftest import skip_mode from test.pylib.util import wait_for +from test.topology.util import new_test_keyspace logger = logging.getLogger(__name__) @@ -36,51 +37,51 @@ async def test_mv_topology_change(manager: ManagerClient): servers = [await manager.server_add(config=cfg, timeout=60) for _ in range(3)] cql = manager.get_cql() - await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};") - await cql.run_async("CREATE TABLE ks.t (pk int primary key, v int)") - await cql.run_async("CREATE materialized view ks.t_view AS select pk, v from ks.t where v is not null primary key (v, pk)") + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.t (pk int primary key, v int)") + await cql.run_async(f"CREATE materialized view {ks}.t_view AS select pk, v from {ks}.t where v is not null primary key (v, pk)") - stop_event = asyncio.Event() - concurrency = 10 - async def do_writes(start_it, repeat) -> int: - iteration = start_it - while not stop_event.is_set(): - start_time = time.time() - try: - await cql.run_async(f"insert into ks.t (pk, v) values ({iteration}, {iteration})") - except NoHostAvailable as e: - for _, err in e.errors.items(): - # ConnectionException can be raised when the node is shutting down. - if not isinstance(err, ConnectionException): - logger.error(f"Write started {time.time() - start_time}s ago failed: {e}") - raise - except Exception as e: - logger.error(f"Write started {time.time() - start_time}s ago failed: {e}") - raise - iteration += concurrency - if not repeat: - break - await asyncio.sleep(0.01) - return iteration + stop_event = asyncio.Event() + concurrency = 10 + async def do_writes(start_it, repeat) -> int: + iteration = start_it + while not stop_event.is_set(): + start_time = time.time() + try: + await cql.run_async(f"insert into {ks}.t (pk, v) values ({iteration}, {iteration})") + except NoHostAvailable as e: + for _, err in e.errors.items(): + # ConnectionException can be raised when the node is shutting down. + if not isinstance(err, ConnectionException): + logger.error(f"Write started {time.time() - start_time}s ago failed: {e}") + raise + except Exception as e: + logger.error(f"Write started {time.time() - start_time}s ago failed: {e}") + raise + iteration += concurrency + if not repeat: + break + await asyncio.sleep(0.01) + return iteration - # to hit the issue #18709 it's enough to start one batch of writes, the effective - # replication maps for base and view will change after the writes start but before they finish - tasks = [asyncio.create_task(do_writes(i, repeat=False)) for i in range(concurrency)] + # to hit the issue #18709 it's enough to start one batch of writes, the effective + # replication maps for base and view will change after the writes start but before they finish + tasks = [asyncio.create_task(do_writes(i, repeat=False)) for i in range(concurrency)] - server = await manager.server_add() + server = await manager.server_add() - await asyncio.gather(*tasks) + await asyncio.gather(*tasks) - [await manager.api.disable_injection(s.ip_addr, "delay_before_get_view_natural_endpoint") for s in servers] + [await manager.api.disable_injection(s.ip_addr, "delay_before_get_view_natural_endpoint") for s in servers] - # to hit the issue #17786 we need to run multiple batches of writes, so that some write is processed while the - # effective replication maps for base and view are different - tasks = [asyncio.create_task(do_writes(i, repeat=True)) for i in range(concurrency)] - await manager.decommission_node(server.server_id) + # to hit the issue #17786 we need to run multiple batches of writes, so that some write is processed while the + # effective replication maps for base and view are different + tasks = [asyncio.create_task(do_writes(i, repeat=True)) for i in range(concurrency)] + await manager.decommission_node(server.server_id) - stop_event.set() - await asyncio.gather(*tasks) + stop_event.set() + await asyncio.gather(*tasks) # Reproduces #19152 # Verify a pending replica is not doing unnecessary work of building and sending view updates. @@ -103,68 +104,68 @@ async def test_mv_update_on_pending_replica(manager: ManagerClient, intranode): await manager.api.disable_tablet_balancing(servers[0].ip_addr) cql = manager.get_cql() - await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1};") - await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int);") - await cql.run_async("CREATE MATERIALIZED VIEW test.mv1 AS SELECT * FROM test.test WHERE pk IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, pk);") + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, c int);") + await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL PRIMARY KEY (c, pk);") - table_id = await manager.get_table_id('test', 'test') + table_id = await manager.get_table_id(ks, 'test') - servers.append(await manager.server_add(config=cfg, cmdline=cmd)) + servers.append(await manager.server_add(config=cfg, cmdline=cmd)) - key = 7 # Whatever - tablet_token = 0 # Doesn't matter since there is one tablet - await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, 0)") + key = 7 # Whatever + tablet_token = 0 # Doesn't matter since there is one tablet + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, 0)") - replica = await get_tablet_replica(manager, servers[0], 'test', 'test', tablet_token) - s0_host_id = await manager.get_host_id(servers[0].server_id) - s1_host_id = await manager.get_host_id(servers[1].server_id) - src_shard = replica[1] - dst_shard = 1-replica[1] - assert replica[0] == s0_host_id + replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token) + s0_host_id = await manager.get_host_id(servers[0].server_id) + s1_host_id = await manager.get_host_id(servers[1].server_id) + src_shard = replica[1] + dst_shard = 1-replica[1] + assert replica[0] == s0_host_id - if intranode: - dst_host = s0_host_id - dst_ip = servers[0].ip_addr - streaming_wait_injection = "intranode_migration_streaming_wait" - else: - dst_host = s1_host_id - dst_ip = servers[1].ip_addr - streaming_wait_injection = "stream_mutation_fragments" + if intranode: + dst_host = s0_host_id + dst_ip = servers[0].ip_addr + streaming_wait_injection = "intranode_migration_streaming_wait" + else: + dst_host = s1_host_id + dst_ip = servers[1].ip_addr + streaming_wait_injection = "stream_mutation_fragments" - await manager.api.enable_injection(dst_ip, streaming_wait_injection, one_shot=True) + await manager.api.enable_injection(dst_ip, streaming_wait_injection, one_shot=True) - migration_task = asyncio.create_task( - manager.api.move_tablet(servers[0].ip_addr, "test", "test", s0_host_id, src_shard, dst_host, dst_shard, tablet_token)) + migration_task = asyncio.create_task( + manager.api.move_tablet(servers[0].ip_addr, ks, "test", s0_host_id, src_shard, dst_host, dst_shard, tablet_token)) - async def tablet_is_streaming(): - res = await cql.run_async(f"SELECT stage FROM system.tablets WHERE table_id={table_id}") - stage = res[0].stage - return stage == 'streaming' or None + async def tablet_is_streaming(): + res = await cql.run_async(f"SELECT stage FROM system.tablets WHERE table_id={table_id}") + stage = res[0].stage + return stage == 'streaming' or None - await wait_for(tablet_is_streaming, time.time() + 60) + await wait_for(tablet_is_streaming, time.time() + 60) - await cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({key}, {1})") + await cql.run_async(f"INSERT INTO {ks}.test (pk, c) VALUES ({key}, {1})") - # Release abandoned streaming - await manager.api.message_injection(dst_ip, streaming_wait_injection) + # Release abandoned streaming + await manager.api.message_injection(dst_ip, streaming_wait_injection) - logger.info("Waiting for migration to finish") - await migration_task - logger.info("Migration done") + logger.info("Waiting for migration to finish") + await migration_task + logger.info("Migration done") - def get_view_updates_on_wrong_node_count(server): - metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text - pattern = re.compile("^scylla_database_total_view_updates_on_wrong_node") - for metric in metrics.split('\n'): - if pattern.match(metric) is not None: - return int(float(metric.split()[1])) + def get_view_updates_on_wrong_node_count(server): + metrics = requests.get(f"http://{server.ip_addr}:9180/metrics").text + pattern = re.compile("^scylla_database_total_view_updates_on_wrong_node") + for metric in metrics.split('\n'): + if pattern.match(metric) is not None: + return int(float(metric.split()[1])) - assert all(map(lambda x: x is None or x == 0, [get_view_updates_on_wrong_node_count(server) for server in servers])) + assert all(map(lambda x: x is None or x == 0, [get_view_updates_on_wrong_node_count(server) for server in servers])) - res = await cql.run_async(f"SELECT c FROM test.test WHERE pk={key}") - assert [1] == [x.c for x in res] - res = await cql.run_async(f"SELECT c FROM test.mv1 WHERE pk={key} ALLOW FILTERING") - assert [1] == [x.c for x in res] + res = await cql.run_async(f"SELECT c FROM {ks}.test WHERE pk={key}") + assert [1] == [x.c for x in res] + res = await cql.run_async(f"SELECT c FROM {ks}.mv1 WHERE pk={key} ALLOW FILTERING") + assert [1] == [x.c for x in res] # Reproduces issue #19529 # Write to a table with MV while one node is stopped, and verify @@ -179,18 +180,18 @@ async def test_mv_write_to_dead_node(manager: ManagerClient): servers = await manager.servers_add(4) cql = manager.get_cql() - await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") - await cql.run_async("CREATE TABLE ks.t (pk int primary key, v int)") - await cql.run_async("CREATE materialized view ks.t_view AS select pk, v from ks.t where v is not null primary key (v, pk)") + async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks: + await cql.run_async(f"CREATE TABLE {ks}.t (pk int primary key, v int)") + await cql.run_async(f"CREATE materialized view {ks}.t_view AS select pk, v from {ks}.t where v is not null primary key (v, pk)") - await manager.server_stop_gracefully(servers[-1].server_id) + await manager.server_stop_gracefully(servers[-1].server_id) - # Do inserts. some should generate MV writes to the stopped node - for i in range(100): - await cql.run_async(f"insert into ks.t (pk, v) values ({i}, {i+1})") + # Do inserts. some should generate MV writes to the stopped node + for i in range(100): + await cql.run_async(f"insert into {ks}.t (pk, v) values ({i}, {i+1})") - # Remove the node to trigger a topology change. - # If the MV write is not completed, as in issue #19529, the topology change - # will be held for long time until the write timeouts. - # Otherwise, it is expected to complete in short time. - await manager.remove_node(servers[0].server_id, servers[-1].server_id, timeout=30) + # Remove the node to trigger a topology change. + # If the MV write is not completed, as in issue #19529, the topology change + # will be held for long time until the write timeouts. + # Otherwise, it is expected to complete in short time. + await manager.remove_node(servers[0].server_id, servers[-1].server_id, timeout=30)