test: adjust existing tests
- Disable tablets in `test_migration_on_existing_raft_topology`. Because views on tablets are experimental now, we can safely assume that view building coordinator will start with view build status on raft. - Add error injection to pause view building on worker. Used to pause view building process, there is analogous error injection in view_builder. - Do a read barrier in `test_view_in_system_tables` Increases test stability by making sure that the node sees up-to-date group0 state and `system.built_views` is synced. - Wait for view is build in some tests Increases tests stability by making sure that the view is built. - Remove xfail marker from `test_tablet_streaming_with_unbuilt_view` This series fix https://github.com/scylladb/scylladb/issues/21564 and this test should work now.
This commit is contained in:
@@ -780,6 +780,7 @@ future<> view_building_worker::batch::do_build_range(view_building_worker& local
|
||||
as.check();
|
||||
std::exception_ptr eptr;
|
||||
try {
|
||||
utils::get_local_injector().inject("view_building_worker_pause_before_consume", 5min, as).get();
|
||||
vbw_logger.info("Starting range {} building for base table: {}.{}", range, base_cf->schema()->ks_name(), base_cf->schema()->cf_name());
|
||||
auto end_token = reader.consume_in_thread(std::move(consumer));
|
||||
vbw_logger.info("Built range {} for base table: {}.{}", dht::token_range(range.start(), end_token), base_cf->schema()->ks_name(), base_cf->schema()->cf_name());
|
||||
|
||||
@@ -91,6 +91,7 @@ async def test_base_partition_deletion_with_metrics(manager: ManagerClient, perm
|
||||
insert = cql.prepare(f'INSERT INTO {table} (p1,p2,c) VALUES (?,?,?)')
|
||||
# The view partition key is a permutation of the base partition key.
|
||||
async with new_materialized_view(manager, table, '*', '(p2,p1),c' if permuted else '(p1,p2),c', 'p1 is not null and p2 is not null and c is not null') as mv:
|
||||
await wait_for_view(cql, mv.split('.')[1], 1)
|
||||
# the metric total_view_updates_pushed_local is incremented by 1 for each 100 row view
|
||||
# updates, because it is collected in batches according to max_rows_for_view_updates.
|
||||
# To verify the behavior, we want the metric to increase by at least 2 without the optimization,
|
||||
@@ -138,6 +139,7 @@ async def test_base_partition_deletion_in_batch_with_delete_row_with_metrics(man
|
||||
insert = cql.prepare(f'INSERT INTO {table} (p,c,v) VALUES (?,?,?)')
|
||||
# The view partition key is the same as the base partition key.
|
||||
async with new_materialized_view(manager, table, '*', '(p,c),v', 'p is not null and c is not null and v is not null') as mv:
|
||||
await wait_for_view(cql, mv.split('.')[1], 1)
|
||||
N = 101 # See comment above
|
||||
for i in range(N):
|
||||
await cql.run_async(insert, [1, 10, i])
|
||||
|
||||
@@ -12,7 +12,7 @@ from test.pylib.repair import create_table_insert_data_for_repair
|
||||
from test.pylib.rest_client import HTTPError, read_barrier
|
||||
from test.pylib.scylla_cluster import ReplaceConfig
|
||||
from test.pylib.tablets import get_tablet_replica, get_all_tablet_replicas
|
||||
from test.pylib.util import unique_name
|
||||
from test.pylib.util import unique_name, wait_for
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import wait_for_cql_and_get_hosts, create_new_test_keyspace, new_test_keyspace, reconnect_driver, get_topology_coordinator
|
||||
from contextlib import nullcontext as does_not_raise
|
||||
@@ -612,7 +612,6 @@ async def test_tablets_disabled_with_gossip_topology_changes(manager: ManagerCli
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/21564")
|
||||
async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
|
||||
"""
|
||||
Reproducer for https://github.com/scylladb/scylladb/issues/21564
|
||||
@@ -626,6 +625,8 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
|
||||
cmdline = [
|
||||
'--logger-log-level', 'storage_service=debug',
|
||||
'--logger-log-level', 'raft_topology=debug',
|
||||
'--logger-log-level', 'view_building_coordinator=debug',
|
||||
'--logger-log-level', 'view_building_worker=debug',
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
@@ -642,8 +643,8 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
|
||||
servers.append(await manager.server_add(cmdline=cmdline, config=cfg))
|
||||
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
||||
|
||||
logger.info("Inject error to make view generator pause before processing the sstable")
|
||||
injection_name = "view_builder_pause_add_new_view"
|
||||
logger.info("Inject error to make view building worker pause before processing the sstable")
|
||||
injection_name = "view_building_worker_pause_before_consume"
|
||||
await manager.api.enable_injection(servers[0].ip_addr, injection_name, one_shot=True)
|
||||
|
||||
logger.info("Create view")
|
||||
@@ -657,6 +658,14 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
|
||||
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
||||
logger.info("Migration done")
|
||||
|
||||
async def check_table():
|
||||
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
|
||||
if len(result) == 1:
|
||||
return True
|
||||
else:
|
||||
return None
|
||||
await wait_for(check_table, time.time() + 30)
|
||||
|
||||
# Verify the table has expected number of rows
|
||||
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
||||
assert len(list(rows)) == num_of_rows
|
||||
@@ -666,7 +675,6 @@ async def test_tablet_streaming_with_unbuilt_view(manager: ManagerClient):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@skip_mode('release', 'error injections are not supported in release mode')
|
||||
@pytest.mark.xfail(reason="https://github.com/scylladb/scylladb/issues/19149")
|
||||
async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
|
||||
"""
|
||||
Reproducer for https://github.com/scylladb/scylladb/issues/19149
|
||||
@@ -682,6 +690,8 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
|
||||
cmdline = [
|
||||
'--logger-log-level', 'storage_service=debug',
|
||||
'--logger-log-level', 'raft_topology=debug',
|
||||
'--logger-log-level', 'view_building_coordinator=debug',
|
||||
'--logger-log-level', 'view_building_worker=debug',
|
||||
]
|
||||
servers = [await manager.server_add(cmdline=cmdline, config=cfg)]
|
||||
await manager.api.disable_tablet_balancing(servers[0].ip_addr)
|
||||
@@ -694,13 +704,21 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
|
||||
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "test")
|
||||
|
||||
logger.info("Create view")
|
||||
# Pause view building
|
||||
await manager.api.enable_injection(servers[0].ip_addr, "view_building_worker_pause_before_consume", one_shot=True)
|
||||
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.mv1 AS \
|
||||
SELECT * FROM {ks}.test WHERE pk IS NOT NULL AND c IS NOT NULL \
|
||||
PRIMARY KEY (c, pk);")
|
||||
|
||||
# Check if view building was started
|
||||
async def check_mv_status():
|
||||
mv_status = await cql.run_async(f"SELECT status FROM system.view_build_status_v2 WHERE keyspace_name='{ks}' AND view_name='mv'")
|
||||
return len(mv_status) == 1 and mv_status[0].status == "STARTED"
|
||||
await wait_for(check_mv_status, time.time() + 30)
|
||||
|
||||
logger.info("Generate an sstable and move it to upload directory of test table")
|
||||
# create an sstable using a dummy table
|
||||
await cql.run_async("CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);")
|
||||
await cql.run_async(f"CREATE TABLE {ks}.dummy (pk int PRIMARY KEY, c int);")
|
||||
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.dummy (pk, c) VALUES ({k}, {k%3});") for k in range(64, 128)])
|
||||
await manager.api.keyspace_flush(servers[0].ip_addr, ks, "dummy")
|
||||
node_workdir = await manager.server_get_workdir(servers[0].server_id)
|
||||
@@ -730,6 +748,14 @@ async def test_tablet_streaming_with_staged_sstables(manager: ManagerClient):
|
||||
await manager.api.move_tablet(servers[0].ip_addr, ks, "test", replica[0], replica[1], s1_host_id, 0, tablet_token)
|
||||
logger.info("Migration done")
|
||||
|
||||
async def check_view_is_built():
|
||||
result = await cql.run_async(f"SELECT * FROM system.built_views WHERE keyspace_name='{ks}' AND view_name='mv1'")
|
||||
if len(result) == 1:
|
||||
return True
|
||||
else:
|
||||
return None
|
||||
await wait_for(check_view_is_built, time.time() + 60)
|
||||
|
||||
expected_num_of_rows = 128
|
||||
# Verify the table has expected number of rows
|
||||
rows = await cql.run_async(f"SELECT pk from {ks}.test")
|
||||
|
||||
@@ -7,7 +7,7 @@ from contextlib import asynccontextmanager
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.pylib.rest_client import inject_error_one_shot
|
||||
from test.pylib.tablets import get_tablet_replica, get_base_table, get_tablet_count, get_tablet_info
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
|
||||
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts, wait_for_view
|
||||
from test.cluster.conftest import skip_mode
|
||||
from test.cluster.util import new_test_keyspace
|
||||
import time
|
||||
@@ -371,6 +371,7 @@ async def test_create_colocated_table_while_base_is_migrating(manager: ManagerCl
|
||||
new_replica = await get_tablet_replica(manager, servers[0], ks, 'test', tablet_token)
|
||||
assert new_replica[0] == dst_host_id
|
||||
|
||||
await wait_for_view(cql, 'tv', 2)
|
||||
rows = await cql.run_async(f"SELECT * FROM {ks}.tv")
|
||||
assert len(rows) == total_keys+1
|
||||
|
||||
|
||||
@@ -21,8 +21,11 @@ from cassandra.protocol import InvalidRequest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def create_keyspace(cql):
|
||||
return await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}")
|
||||
async def create_keyspace(cql, disable_tablets=False):
|
||||
ks_options = "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
|
||||
if disable_tablets:
|
||||
ks_options = ks_options + " AND tablets={'enabled': false}"
|
||||
return await create_new_test_keyspace(cql, ks_options)
|
||||
|
||||
async def create_table(cql, ks):
|
||||
await cql.run_async(f"CREATE TABLE {ks}.t (p int, c int, PRIMARY KEY (p, c))")
|
||||
@@ -507,7 +510,7 @@ async def test_migration_on_existing_raft_topology(request, manager: ManagerClie
|
||||
logging.info("Waiting until driver connects to every server")
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
ks = await create_keyspace(cql)
|
||||
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}")
|
||||
await create_table(cql, ks)
|
||||
await create_mv(cql, ks, "vt1")
|
||||
|
||||
@@ -553,7 +556,10 @@ async def test_view_build_status_with_synchronize_wait(manager: ManagerClient):
|
||||
servers.append(await manager.server_add())
|
||||
cql, hosts = await manager.get_ready_cql(servers)
|
||||
|
||||
ks = await create_keyspace(cql)
|
||||
# With tablets and view building coordinator, view build status is marked
|
||||
# in the same group0 batch that sets new node status to normal,
|
||||
# so `start_operation()` is not called and this test doesn't work.
|
||||
ks = await create_keyspace(cql, disable_tablets=True)
|
||||
await create_table(cql, ks)
|
||||
# 'raft_group0_client::start_operation' gets called underneath this.
|
||||
await create_mv(cql, ks, "vt1")
|
||||
|
||||
@@ -6,6 +6,7 @@ import pytest
|
||||
import requests
|
||||
from .util import new_materialized_view, new_test_table, unique_name, sleep_till_whole_second
|
||||
from . import nodetool
|
||||
from .test_materialized_view import wait_for_view_built
|
||||
|
||||
def test_tombstone_gc_with_conflict_in_memtable(scylla_only, cql, test_keyspace):
|
||||
"""
|
||||
@@ -103,6 +104,7 @@ def test_tombstone_gc_with_materialized_view_update_in_memtable(scylla_only, cql
|
||||
with new_test_table(cql, test_keyspace, schema) as table:
|
||||
# Create a materialized view with same partition key as the base, and using a regular column in the base as a clustering key in the view
|
||||
with new_materialized_view(cql, table, '*', 'k, v', 'k is not null and v is not null', extra="with gc_grace_seconds = 0") as mv:
|
||||
wait_for_view_built(cql, mv)
|
||||
with nodetool.no_autocompaction_context(cql, mv):
|
||||
# Insert initial data into the base table
|
||||
cql.execute(f"insert into {table} (k, v, w) values (1, 1, 1)")
|
||||
|
||||
@@ -143,10 +143,11 @@ def test_mv_quoted_column_names_build(cql, test_keyspace):
|
||||
# that simply hasn't completed (besides looking at the logs,
|
||||
# which we don't). This means, unfortunately, that a failure
|
||||
# of this test is slow - it needs to wait for a timeout.
|
||||
start_time = time.time()
|
||||
while time.time() < start_time + 30:
|
||||
if list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]:
|
||||
break
|
||||
wait_for_view_built(cql, mv)
|
||||
# start_time = time.time()
|
||||
# while time.time() < start_time + 30:
|
||||
# if list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]:
|
||||
# break
|
||||
assert list(cql.execute(f'SELECT * from {mv}')) == [(2, 1)]
|
||||
|
||||
# The previous test (test_mv_empty_string_partition_key) verifies that a
|
||||
@@ -1501,6 +1502,13 @@ def test_view_in_system_tables(cql, test_keyspace):
|
||||
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, v int") as base:
|
||||
with new_materialized_view(cql, base, '*', 'v,p', 'v is not null and p is not null') as view:
|
||||
wait_for_view_built(cql, view)
|
||||
|
||||
# In view_building_coordinator path, `built_views` table is updated by view_building_worker,
|
||||
# so there is a short window when a view is build (information is in view_build_status_v2)
|
||||
# but it isn't marked in `built_views` locally.
|
||||
# Doing read barrier is enough to ensure that the worker updated the table.
|
||||
cql.execute("DROP TABLE IF EXISTS nosuchkeyspace.nosuchtable")
|
||||
|
||||
res = [ f'{r.keyspace_name}.{r.view_name}' for r in cql.execute('select * from system.built_views')]
|
||||
assert view in res
|
||||
res = [ f'{r.table_name}.{r.index_name}' for r in cql.execute('select * from system."IndexInfo"')]
|
||||
|
||||
Reference in New Issue
Block a user