test: adjust existing tests

- Disable tablets in `test_migration_on_existing_raft_topology`.
Because views on tablets are experimental now, we can safely
assume that view building coordinator will start with view build status
on raft.
- Add error injection to pause view building on worker.
Used to pause view building process, there is analogous error injection
in view_builder.
- Do a read barrier in `test_view_in_system_tables`
Increases test stability by making sure that the node sees up-to-date
group0 state and `system.built_views` is synced.
- Wait for view is build in some tests
Increases tests stability by making sure that the view is built.
- Remove xfail marker from `test_tablet_streaming_with_unbuilt_view`
This series fix https://github.com/scylladb/scylladb/issues/21564
and this test should work now.
This commit is contained in:
Michał Jadwiszczak
2025-04-24 13:42:22 +02:00
parent 6056b55309
commit cf138da853
7 changed files with 61 additions and 15 deletions

View File

@@ -780,6 +780,7 @@ future<> view_building_worker::batch::do_build_range(view_building_worker& local
as.check();
std::exception_ptr eptr;
try {
utils::get_local_injector().inject("view_building_worker_pause_before_consume", 5min, as).get();
vbw_logger.info("Starting range {} building for base table: {}.{}", range, base_cf->schema()->ks_name(), base_cf->schema()->cf_name());
auto end_token = reader.consume_in_thread(std::move(consumer));
vbw_logger.info("Built range {} for base table: {}.{}", dht::token_range(range.start(), end_token), base_cf->schema()->ks_name(), base_cf->schema()->cf_name());

View File

@@ -91,6 +91,7 @@ async def test_base_partition_deletion_with_metrics(manager: ManagerClient, perm
insert = cql.prepare(f'INSERT INTO {table} (p1,p2,c) VALUES (?,?,?)')
# The view partition key is a permutation of the base partition key.
async with new_materialized_view(manager, table, '*', '(p2,p1),c' if permuted else '(p1,p2),c', 'p1 is not null and p2 is not null and c is not null') as mv:
await wait_for_view(cql, mv.split('.')[1], 1)
# the metric total_view_updates_pushed_local is incremented by 1 for each 100 row view
# updates, because it is collected in batches according to max_rows_for_view_updates.
# To verify the behavior, we want the metric to increase by at least 2 without the optimization,
@@ -138,6 +139,7 @@ async def test_base_partition_deletion_in_batch_with_delete_row_with_metrics(man
insert = cql.prepare(f'INSERT INTO {table} (p,c,v) VALUES (?,?,?)')
# The view partition key is the same as the base partition key.
async with new_materialized_view(manager, table, '*', '(p,c),v', 'p is not null and c is not null and v is not null') as mv:
await wait_for_view(cql, mv.split('.')[1], 1)
N = 101 # See comment above
for i in range(N):
await cql.run_async(insert, [1, 10, i])

View File

@@ -12,7 +12,7 @@ from test.pylib.repair import create_table_insert_data_for_repair
from test.pylib.rest_client import HTTPError, read_barrier
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
from test.pylib.util import unique_name
from test.pylib.util import unique_name, wait_for
from test.cluster.conftest import skip_mode
from test.cluster.util import wait_for_cql_and_get_hosts, create_new_test_keyspace, new_test_keyspace, reconnect_driver, get_topology_coordinator
from contextlib import nullcontext as does_not_raise
@@ -612,7 +612,6 @@ async def test_tablets_disabled_with_gossip_topology_changes(manager: ManagerCli
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/21564")
async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
"""
Reproducer for https://github.com/scylladb/scylladb/issues/21564
@@ -626,6 +625,8 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',
'--logger-log-level', 'view_building_coordinator=debug',
'--logger-log-level', 'view_building_worker=debug',
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -642,8 +643,8 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
s1_host_id = await manager.get_host_id(servers[1].server_id)
logger.info("Inject error to make view generator pause before processing the sstable")
injection_name = "view_builder_pause_add_new_view"
logger.info("Inject error to make view building worker pause before processing the sstable")
injection_name = "view_building_worker_pause_before_consume"
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
logger.info("Create view")
@@ -657,6 +658,14 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
async def check_table():
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
if len(result) == 1:
return True
else:
return None
await wait_for(check_table, time.time() + 30)
# Verify the table has expected number of rows
rows = await cql.run_async(f"SELECT pk from {ks}.test")
assert len(list(rows)) == num_of_rows
@@ -666,7 +675,6 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/19149")
async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
"""
Reproducer for https://github.com/scylladb/scylladb/issues/19149
@@ -682,6 +690,8 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
cmdline = [
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',
'--logger-log-level', 'view_building_coordinator=debug',
'--logger-log-level', 'view_building_worker=debug',
]
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
@@ -694,13 +704,21 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
logger.info("Create view")
# Pause view building
await manager.api.enable_injection(servers[0].ip_addr, "view_building_worker_pause_before_consume", one_shot=True)
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
PRIMARY KEY (c, pk);")
# Check if view building was started
async def check_mv_status():
mv_status = await cql.run_async(f"SELECT status FROM system.view_build_status_v2 WHERE keyspace_name='{ks}' AND view_name='mv'")
return len(mv_status) == 1 and mv_status[0].status == "STARTED"
await wait_for(check_mv_status, time.time() + 30)
logger.info("Generate an sstable and move it to upload directory of test table")
# create an sstable using a dummy table
await cql.run_async("CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);")
await cql.run_async(f"CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);")
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.dummy (pk, c) VALUES ({k}, {k%3});") for k in range(64, 128)])
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "dummy")
node_workdir = await manager.server_get_workdir(servers[0].server_id)
@@ -730,6 +748,14 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
logger.info("Migration done")
async def check_view_is_built():
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
if len(result) == 1:
return True
else:
return None
await wait_for(check_view_is_built, time.time() + 60)
expected_num_of_rows = 128
# Verify the table has expected number of rows
rows = await cql.run_async(f"SELECT pk from {ks}.test")

View File

@@ -7,7 +7,7 @@ from contextlib import asynccontextmanager
from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
from test.pylib.tablets import get_tablet_replica, get_base_table, get_tablet_count, get_tablet_info
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, wait_for_view
from test.cluster.conftest import skip_mode
from test.cluster.util import new_test_keyspace
import time
@@ -371,6 +371,7 @@ async def test_create_colocated_table_while_base_is_migrating(manager: ManagerCl
new_replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
assert new_replica[0] == dst_host_id
await wait_for_view(cql, 'tv', 2)
rows = await cql.run_async(f"SELECT * FROM {ks}.tv")
assert len(rows) == total_keys+1

View File

@@ -21,8 +21,11 @@ from cassandra.protocol import InvalidRequest
logger = logging.getLogger(__name__)
async def create_keyspace(cql):
return await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}")
async def create_keyspace(cql, disable_tablets=False):
ks_options = "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
if disable_tablets:
ks_options = ks_options + " AND tablets={'enabled': false}"
return await create_new_test_keyspace(cql, ks_options)
async def create_table(cql, ks):
await cql.run_async(f"CREATE TABLE {ks}.t (p int, c int, PRIMARY KEY (p, c))")
@@ -507,7 +510,7 @@ async def test_migration_on_existing_raft_topology(request, manager: ManagerClie
logging.info("Waiting until driver connects to every server")
cql, hosts = await manager.get_ready_cql(servers)
ks = await create_keyspace(cql)
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}")
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
@@ -553,7 +556,10 @@ async def test_view_build_status_with_synchronize_wait(manager: ManagerClient):
servers.append(await manager.server_add())
cql, hosts = await manager.get_ready_cql(servers)
ks = await create_keyspace(cql)
# With tablets and view building coordinator, view build status is marked
# in the same group0 batch that sets new node status to normal,
# so `start_operation()` is not called and this test doesn't work.
ks = await create_keyspace(cql, disable_tablets=True)
await create_table(cql, ks)
# 'raft_group0_client::start_operation' gets called underneath this.
await create_mv(cql, ks, "vt1")

View File

@@ -6,6 +6,7 @@ import pytest
import requests
from .util import new_materialized_view, new_test_table, unique_name, sleep_till_whole_second
from . import nodetool
from .test_materialized_view import wait_for_view_built
def test_tombstone_gc_with_conflict_in_memtable(scylla_only, cql, test_keyspace):
"""
@@ -103,6 +104,7 @@ def test_tombstone_gc_with_materialized_view_update_in_memtable(scylla_only, cql
with new_test_table(cql, test_keyspace, schema) as table:
# Create a materialized view with same partition key as the base, and using a regular column in the base as a clustering key in the view
with new_materialized_view(cql, table, '*', 'k, v', 'k is not null and v is not null', extra="with gc_grace_seconds = 0") as mv:
wait_for_view_built(cql, mv)
with nodetool.no_autocompaction_context(cql, mv):
# Insert initial data into the base table
cql.execute(f"insert into {table} (k, v, w) values (1, 1, 1)")

View File

@@ -143,10 +143,11 @@ def test_mv_quoted_column_names_build(cql, test_keyspace):
# that simply hasn't completed (besides looking at the logs,
# which we don't). This means, unfortunately, that a failure
# of this test is slow - it needs to wait for a timeout.
start_time = time.time()
while time.time() < start_time + 30:
if list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]:
break
wait_for_view_built(cql, mv)
# start_time = time.time()
# while time.time() < start_time + 30:
# if list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]:
# break
assert list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]
# The previous test (test_mv_empty_string_partition_key) verifies that a
@@ -1501,6 +1502,13 @@ def test_view_in_system_tables(cql, test_keyspace):
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, v int") as base:
with new_materialized_view(cql, base, '*', 'v,p', 'v is not null and p is not null') as view:
wait_for_view_built(cql, view)
# In view_building_coordinator path, `built_views` table is updated by view_building_worker,
# so there is a short window when a view is build (information is in view_build_status_v2)
# but it isn't marked in `built_views` locally.
# Doing read barrier is enough to ensure that the worker updated the table.
cql.execute("DROP TABLE IF EXISTS nosuchkeyspace.nosuchtable")
res = [ f'{r.keyspace_name}.{r.view_name}' for r in cql.execute('select * from system.built_views')]
assert view in res
res = [ f'{r.table_name}.{r.index_name}' for r in cql.execute('select * from system."IndexInfo"')]