From cf138da853afbf93dd72121de6e45c30b2e30bbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Jadwiszczak?= Date: Thu, 24 Apr 2025 13:42:22 +0200 Subject: [PATCH] test: adjust existing tests - Disable tablets in `test_migration_on_existing_raft_topology`. Because views on tablets are experimental now, we can safely assume that view building coordinator will start with view build status on raft. - Add error injection to pause view building on worker. Used to pause view building process, there is analogous error injection in view_builder. - Do a read barrier in `test_view_in_system_tables` Increases test stability by making sure that the node sees up-to-date group0 state and `system.built_views` is synced. - Wait for view is build in some tests Increases tests stability by making sure that the view is built. - Remove xfail marker from `test_tablet_streaming_with_unbuilt_view` This series fix https://github.com/scylladb/scylladb/issues/21564 and this test should work now. --- db/view/view_building_worker.cc | 1 + test/cluster/mv/test_mv_delete_partitions.py | 2 ++ test/cluster/test_tablets.py | 38 ++++++++++++++++---- test/cluster/test_tablets_colocation.py | 3 +- test/cluster/test_view_build_status.py | 14 +++++--- test/cqlpy/test_compaction.py | 2 ++ test/cqlpy/test_materialized_view.py | 16 ++++++--- 7 files changed, 61 insertions(+), 15 deletions(-) diff --git a/db/view/view_building_worker.cc b/db/view/view_building_worker.cc index 1ed4d467db..a158d0393b 100644 --- a/db/view/view_building_worker.cc +++ b/db/view/view_building_worker.cc @@ -780,6 +780,7 @@ future<> view_building_worker::batch::do_build_range(view_building_worker& local as.check(); std::exception_ptr eptr; try { + utils::get_local_injector().inject("view_building_worker_pause_before_consume", 5min, as).get(); vbw_logger.info("Starting range {} building for base table: {}.{}", range, base_cf->schema()->ks_name(), base_cf->schema()->cf_name()); auto end_token = reader.consume_in_thread(std::move(consumer)); vbw_logger.info("Built range {} for base table: {}.{}", dht::token_range(range.start(), end_token), base_cf->schema()->ks_name(), base_cf->schema()->cf_name()); diff --git a/test/cluster/mv/test_mv_delete_partitions.py b/test/cluster/mv/test_mv_delete_partitions.py index c52201cd3c..952e8c5ed5 100644 --- a/test/cluster/mv/test_mv_delete_partitions.py +++ b/test/cluster/mv/test_mv_delete_partitions.py @@ -91,6 +91,7 @@ async def test_base_partition_deletion_with_metrics(manager: ManagerClient, perm insert = cql.prepare(f'INSERT INTO {table} (p1,p2,c) VALUES (?,?,?)') # The view partition key is a permutation of the base partition key. async with new_materialized_view(manager, table, '*', '(p2,p1),c' if permuted else '(p1,p2),c', 'p1 is not null and p2 is not null and c is not null') as mv: + await wait_for_view(cql, mv.split('.')[1], 1) # the metric total_view_updates_pushed_local is incremented by 1 for each 100 row view # updates, because it is collected in batches according to max_rows_for_view_updates. # To verify the behavior, we want the metric to increase by at least 2 without the optimization, @@ -138,6 +139,7 @@ async def test_base_partition_deletion_in_batch_with_delete_row_with_metrics(man insert = cql.prepare(f'INSERT INTO {table} (p,c,v) VALUES (?,?,?)') # The view partition key is the same as the base partition key. async with new_materialized_view(manager, table, '*', '(p,c),v', 'p is not null and c is not null and v is not null') as mv: + await wait_for_view(cql, mv.split('.')[1], 1) N = 101 # See comment above for i in range(N): await cql.run_async(insert, [1, 10, i]) diff --git a/test/cluster/test_tablets.py b/test/cluster/test_tablets.py index d67dc84198..0fa5591d4b 100644 --- a/test/cluster/test_tablets.py +++ b/test/cluster/test_tablets.py @@ -12,7 +12,7 @@ from test.pylib.repair import create_table_insert_data_for_repair from test.pylib.rest_client import HTTPError, read_barrier from test.pylib.scylla_cluster import ReplaceConfig from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas -from test.pylib.util import unique_name +from test.pylib.util import unique_name, wait_for from test.cluster.conftest import skip_mode from test.cluster.util import wait_for_cql_and_get_hosts, create_new_test_keyspace, new_test_keyspace, reconnect_driver, get_topology_coordinator from contextlib import nullcontext as does_not_raise @@ -612,7 +612,6 @@ async def test_tablets_disabled_with_gossip_topology_changes(manager: ManagerCli @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') -@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/21564") async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient): """ Reproducer for https://github.com/scylladb/scylladb/issues/21564 @@ -626,6 +625,8 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient): cmdline = [ '--logger-log-level', 'storage_service=debug', '--logger-log-level', 'raft_topology=debug', + '--logger-log-level', 'view_building_coordinator=debug', + '--logger-log-level', 'view_building_worker=debug', ] servers = [await manager.server_add(cmdline=cmdline, config=cfg)] await manager.api.disable_tablet_balancing(servers[0].ip_addr) @@ -642,8 +643,8 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient): servers.append(await manager.server_add(cmdline=cmdline, config=cfg)) s1_host_id = await manager.get_host_id(servers[1].server_id) - logger.info("Inject error to make view generator pause before processing the sstable") - injection_name = "view_builder_pause_add_new_view" + logger.info("Inject error to make view building worker pause before processing the sstable") + injection_name = "view_building_worker_pause_before_consume" await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True) logger.info("Create view") @@ -657,6 +658,14 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient): await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token) logger.info("Migration done") + async def check_table(): + result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'") + if len(result) == 1: + return True + else: + return None + await wait_for(check_table, time.time() + 30) + # Verify the table has expected number of rows rows = await cql.run_async(f"SELECT pk from {ks}.test") assert len(list(rows)) == num_of_rows @@ -666,7 +675,6 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient): @pytest.mark.asyncio @skip_mode('release', 'error injections are not supported in release mode') -@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/19149") async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient): """ Reproducer for https://github.com/scylladb/scylladb/issues/19149 @@ -682,6 +690,8 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient): cmdline = [ '--logger-log-level', 'storage_service=debug', '--logger-log-level', 'raft_topology=debug', + '--logger-log-level', 'view_building_coordinator=debug', + '--logger-log-level', 'view_building_worker=debug', ] servers = [await manager.server_add(cmdline=cmdline, config=cfg)] await manager.api.disable_tablet_balancing(servers[0].ip_addr) @@ -694,13 +704,21 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient): await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test") logger.info("Create view") + # Pause view building + await manager.api.enable_injection(servers[0].ip_addr, "view_building_worker_pause_before_consume", one_shot=True) await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \ SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \ PRIMARY KEY (c, pk);") + + # Check if view building was started + async def check_mv_status(): + mv_status = await cql.run_async(f"SELECT status FROM system.view_build_status_v2 WHERE keyspace_name='{ks}' AND view_name='mv'") + return len(mv_status) == 1 and mv_status[0].status == "STARTED" + await wait_for(check_mv_status, time.time() + 30) logger.info("Generate an sstable and move it to upload directory of test table") # create an sstable using a dummy table - await cql.run_async("CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);") + await cql.run_async(f"CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);") await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.dummy (pk, c) VALUES ({k}, {k%3});") for k in range(64, 128)]) await manager.api.keyspace_flush(servers[0].ip_addr, ks, "dummy") node_workdir = await manager.server_get_workdir(servers[0].server_id) @@ -730,6 +748,14 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient): await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token) logger.info("Migration done") + async def check_view_is_built(): + result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'") + if len(result) == 1: + return True + else: + return None + await wait_for(check_view_is_built, time.time() + 60) + expected_num_of_rows = 128 # Verify the table has expected number of rows rows = await cql.run_async(f"SELECT pk from {ks}.test") diff --git a/test/cluster/test_tablets_colocation.py b/test/cluster/test_tablets_colocation.py index d73aab51a9..9b487ecc85 100644 --- a/test/cluster/test_tablets_colocation.py +++ b/test/cluster/test_tablets_colocation.py @@ -7,7 +7,7 @@ from contextlib import asynccontextmanager from test.pylib.manager_client import ManagerClient from test.pylib.rest_client import inject_error_one_shot from test.pylib.tablets import get_tablet_replica, get_base_table, get_tablet_count, get_tablet_info -from test.pylib.util import wait_for, wait_for_cql_and_get_hosts +from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, wait_for_view from test.cluster.conftest import skip_mode from test.cluster.util import new_test_keyspace import time @@ -371,6 +371,7 @@ async def test_create_colocated_table_while_base_is_migrating(manager: ManagerCl new_replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token) assert new_replica[0] == dst_host_id + await wait_for_view(cql, 'tv', 2) rows = await cql.run_async(f"SELECT * FROM {ks}.tv") assert len(rows) == total_keys+1 diff --git a/test/cluster/test_view_build_status.py b/test/cluster/test_view_build_status.py index f923a0f9c6..00ffad9bf6 100644 --- a/test/cluster/test_view_build_status.py +++ b/test/cluster/test_view_build_status.py @@ -21,8 +21,11 @@ from cassandra.protocol import InvalidRequest logger = logging.getLogger(__name__) -async def create_keyspace(cql): - return await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") +async def create_keyspace(cql, disable_tablets=False): + ks_options = "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}" + if disable_tablets: + ks_options = ks_options + " AND tablets={'enabled': false}" + return await create_new_test_keyspace(cql, ks_options) async def create_table(cql, ks): await cql.run_async(f"CREATE TABLE {ks}.t (p int, c int, PRIMARY KEY (p, c))") @@ -507,7 +510,7 @@ async def test_migration_on_existing_raft_topology(request, manager: ManagerClie logging.info("Waiting until driver connects to every server") cql, hosts = await manager.get_ready_cql(servers) - ks = await create_keyspace(cql) + ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") await create_table(cql, ks) await create_mv(cql, ks, "vt1") @@ -553,7 +556,10 @@ async def test_view_build_status_with_synchronize_wait(manager: ManagerClient): servers.append(await manager.server_add()) cql, hosts = await manager.get_ready_cql(servers) - ks = await create_keyspace(cql) + # With tablets and view building coordinator, view build status is marked + # in the same group0 batch that sets new node status to normal, + # so `start_operation()` is not called and this test doesn't work. + ks = await create_keyspace(cql, disable_tablets=True) await create_table(cql, ks) # 'raft_group0_client::start_operation' gets called underneath this. await create_mv(cql, ks, "vt1") diff --git a/test/cqlpy/test_compaction.py b/test/cqlpy/test_compaction.py index 429d335f33..cfa407c658 100644 --- a/test/cqlpy/test_compaction.py +++ b/test/cqlpy/test_compaction.py @@ -6,6 +6,7 @@ import pytest import requests from .util import new_materialized_view, new_test_table, unique_name, sleep_till_whole_second from . import nodetool +from .test_materialized_view import wait_for_view_built def test_tombstone_gc_with_conflict_in_memtable(scylla_only, cql, test_keyspace): """ @@ -103,6 +104,7 @@ def test_tombstone_gc_with_materialized_view_update_in_memtable(scylla_only, cql with new_test_table(cql, test_keyspace, schema) as table: # Create a materialized view with same partition key as the base, and using a regular column in the base as a clustering key in the view with new_materialized_view(cql, table, '*', 'k, v', 'k is not null and v is not null', extra="with gc_grace_seconds = 0") as mv: + wait_for_view_built(cql, mv) with nodetool.no_autocompaction_context(cql, mv): # Insert initial data into the base table cql.execute(f"insert into {table} (k, v, w) values (1, 1, 1)") diff --git a/test/cqlpy/test_materialized_view.py b/test/cqlpy/test_materialized_view.py index f08eb6ab1d..b93dd911a3 100644 --- a/test/cqlpy/test_materialized_view.py +++ b/test/cqlpy/test_materialized_view.py @@ -143,10 +143,11 @@ def test_mv_quoted_column_names_build(cql, test_keyspace): # that simply hasn't completed (besides looking at the logs, # which we don't). This means, unfortunately, that a failure # of this test is slow - it needs to wait for a timeout. - start_time = time.time() - while time.time() < start_time + 30: - if list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]: - break + wait_for_view_built(cql, mv) + # start_time = time.time() + # while time.time() < start_time + 30: + # if list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]: + # break assert list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)] # The previous test (test_mv_empty_string_partition_key) verifies that a @@ -1501,6 +1502,13 @@ def test_view_in_system_tables(cql, test_keyspace): with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, v int") as base: with new_materialized_view(cql, base, '*', 'v,p', 'v is not null and p is not null') as view: wait_for_view_built(cql, view) + + # In view_building_coordinator path, `built_views` table is updated by view_building_worker, + # so there is a short window when a view is build (information is in view_build_status_v2) + # but it isn't marked in `built_views` locally. + # Doing read barrier is enough to ensure that the worker updated the table. + cql.execute("DROP TABLE IF EXISTS nosuchkeyspace.nosuchtable") + res = [ f'{r.keyspace_name}.{r.view_name}' for r in cql.execute('select * from system.built_views')] assert view in res res = [ f'{r.table_name}.{r.index_name}' for r in cql.execute('select * from system."IndexInfo"')]