Files
scylladb/test/cluster/test_view_build_status.py
2026-03-10 10:46:48 +02:00

391 lines
17 KiB
Python

#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import pytest
import time
import asyncio
import logging
from test.pylib.util import wait_for_cql_and_get_hosts, wait_for_view, wait_for_view_v1
from test.pylib.manager_client import ManagerClient
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.internal_types import ServerInfo
from test.cluster.util import trigger_snapshot, wait_for, create_new_test_keyspace
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from cassandra.protocol import InvalidRequest
from test.cluster.util import new_test_keyspace
from test.cluster.test_view_building_coordinator import mark_all_servers, pause_view_building_tasks, \
unpause_view_building_tasks, wait_for_some_view_build_tasks_to_get_stuck
logger = logging.getLogger(__name__)
async def create_keyspace(cql, disable_tablets=False):
ks_options = "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}"
if disable_tablets:
ks_options = ks_options + " AND tablets={'enabled': false}"
return await create_new_test_keyspace(cql, ks_options)
async def create_table(cql, ks):
await cql.run_async(f"CREATE TABLE {ks}.t (p int, c int, PRIMARY KEY (p, c))")
async def create_mv(cql, ks, view_name):
await cql.run_async(f"CREATE MATERIALIZED VIEW {ks}.{view_name} AS SELECT * FROM {ks}.t WHERE c IS NOT NULL and p IS NOT NULL PRIMARY KEY (c, p)")
async def get_view_builder_version(cql, **kwargs):
result = await cql.run_async("SELECT value FROM system.scylla_local WHERE key='view_builder_version'", **kwargs)
if len(result) == 0:
return 1
else:
return int(result[0].value) // 10
async def view_builder_is_v2(cql, **kwargs):
v = await get_view_builder_version(cql, **kwargs)
return v == 2 or None
async def view_is_built_v2(cql, ks_name, view_name, node_count, **kwargs):
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE keyspace_name='{ks_name}' \
AND view_name = '{view_name}' AND status = 'SUCCESS' ALLOW FILTERING", **kwargs)
return done[0][0] == node_count or None
async def wait_for_view_v2(cql, ks_name, view_name, node_count, **kwargs):
await wait_for(lambda: view_is_built_v2(cql, ks_name, view_name, node_count, **kwargs), time.time() + 60)
async def wait_for_row_count(cql, table, n, host):
async def row_count_is_n():
cnt = (await cql.run_async(f"SELECT count(*) FROM {table}", host=host))[0].count
return cnt == n or None
await wait_for(row_count_is_n, time.time() + 60)
async def wait_for_view_build_status(cql, ks_name, view_name, status, node_count):
async def view_build_status_is():
result = await cql.run_async(f"SELECT * FROM system.view_build_status_v2 WHERE keyspace_name='{ks_name}' AND view_name='{view_name}' AND status='{status}' ALLOW FILTERING")
return len(result) == node_count or None
await wait_for(view_build_status_is, time.time() + 60)
# Verify a new cluster uses the view_build_status_v2 table.
# Create a materialized view and check that the view's build status
# is stored in view_build_status_v2 and all nodes see all the other
# node's statuses.
@pytest.mark.asyncio
async def test_view_build_status_v2_table(manager: ManagerClient):
node_count = 3
servers = await manager.servers_add(node_count)
cql, hosts = await manager.get_ready_cql(servers)
# The cluster should init with view_builder v2
for h in hosts:
v = await get_view_builder_version(cql, host=h)
assert v == 2
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt")
await asyncio.gather(*(wait_for_view_v2(cql, ks, 'vt', node_count, host=h) for h in hosts))
# The table system_distributed.view_build_status is set to be a virtual table reading
# from system.view_build_status_v2, so verify that reading from each of them provides
# the same output.
@pytest.mark.asyncio
async def test_view_build_status_virtual_table(manager: ManagerClient):
node_count = 3
servers = await manager.servers_add(node_count)
cql, hosts = await manager.get_ready_cql(servers)
async def select_v1():
r = await cql.run_async("SELECT * FROM system_distributed.view_build_status")
return r
async def select_v2():
r = await cql.run_async("SELECT * FROM system.view_build_status_v2")
return r
async def assert_v1_eq_v2():
r1, r2 = await select_v1(), await select_v2()
assert r1 == r2
async def wait_for_view_on_host(cql, name, node_count, host, timeout: int = 120):
async def view_is_built():
done = await cql.run_async(f"SELECT COUNT(*) FROM system.view_build_status_v2 WHERE status = 'SUCCESS' AND view_name = '{name}' ALLOW FILTERING", host=host)
return done[0][0] == node_count or None
deadline = time.time() + timeout
await wait_for(view_is_built, deadline)
ks_name = await create_keyspace(cql)
await create_table(cql, ks_name)
await assert_v1_eq_v2()
await create_mv(cql, ks_name, 'vt1')
await asyncio.gather(*(wait_for_view_on_host(cql, 'vt1', node_count, h) for h in hosts))
await assert_v1_eq_v2()
assert len(await select_v2()) == node_count
await create_mv(cql, ks_name, 'vt2')
await asyncio.gather(*(wait_for_view_on_host(cql, 'vt2', node_count, h) for h in hosts))
await assert_v1_eq_v2()
assert len(await select_v2()) == node_count * 2
# verify SELECT ... WHERE works
r1 = await cql.run_async("SELECT * FROM system_distributed.view_build_status WHERE keyspace_name='{ks_name}' AND view_name='vt1'")
r2 = await cql.run_async("SELECT * FROM system.view_build_status_v2 WHERE keyspace_name='{ks_name}' AND view_name='vt1'")
assert r1 == r2
# verify SELECT COUNT(*) works
r1 = await cql.run_async("SELECT COUNT(*) FROM system_distributed.view_build_status")
r2 = await cql.run_async("SELECT COUNT(*) FROM system.view_build_status_v2")
assert r1 == r2
# verify paging
r1 = await cql.run_async(SimpleStatement(f"SELECT * FROM system_distributed.view_build_status", fetch_size=1))
r2 = await cql.run_async(SimpleStatement(f"SELECT * FROM system.view_build_status_v2", fetch_size=1))
assert r1 == r2
# select with range
s0_host_id = await manager.get_host_id(servers[0].server_id)
r1 = await cql.run_async(f"SELECT * FROM system_distributed.view_build_status WHERE keyspace_name='{ks_name}' AND view_name='vt1' AND host_id >= {s0_host_id}")
r2 = await cql.run_async(f"SELECT * FROM system.view_build_status_v2 WHERE keyspace_name='{ks_name}' AND view_name='vt1' AND host_id >= {s0_host_id}")
assert r1 == r2
# select with allow filtering
r1 = await cql.run_async(f"SELECT * FROM system_distributed.view_build_status WHERE status='SUCCESS' ALLOW FILTERING")
r2 = await cql.run_async(f"SELECT * FROM system.view_build_status_v2 WHERE status='SUCCESS' ALLOW FILTERING")
assert r1 == r2
await cql.run_async(f"DROP MATERIALIZED VIEW {ks_name}.vt1")
async def view_rows_removed_and_v1_eq_v2():
r1, r2 = await select_v1(), await select_v2()
if len(r2) == node_count and r1 == r2:
return True
await wait_for(view_rows_removed_and_v1_eq_v2, time.time() + 60)
# Cluster with 3 nodes.
# Create materialized views. Start new server and it should get a snapshot on bootstrap.
# Stop 3 `old` servers and query the new server to validate if it has the same view build status.
@pytest.mark.asyncio
async def test_view_build_status_snapshot(manager: ManagerClient):
servers = await manager.servers_add(3)
cql, _ = await manager.get_ready_cql(servers)
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
await create_mv(cql, ks, "vt2")
for s in servers:
await manager.driver_connect(server=s)
cql = manager.get_cql()
await wait_for_view(cql, "vt1", 3)
await wait_for_view(cql, "vt2", 3)
# we don't know who the leader is, so trigger the snapshot on all nodes
await asyncio.gather(*(trigger_snapshot(manager, s) for s in servers))
# Add a new server which will recover from the snapshot
new_server = await manager.server_add()
all_servers = servers + [new_server]
await wait_for_cql_and_get_hosts(cql, all_servers, time.time() + 60)
await manager.servers_see_each_other(all_servers)
await asyncio.gather(*(manager.server_stop_gracefully(s.server_id) for s in servers))
# Read the table on the new server, verify it contains all the previous table content
await manager.driver_connect(server=new_server)
cql = manager.get_cql()
await wait_for_view(cql, "vt1", 4)
await wait_for_view(cql, "vt2", 4)
# Test that when removing a node from the cluster, we clean its rows from
# the view build status table.
@pytest.mark.asyncio
async def test_view_build_status_cleanup_on_remove_node(manager: ManagerClient):
node_count = 4
servers = await manager.servers_add(node_count)
cql, hosts = await manager.get_ready_cql(servers)
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
await create_mv(cql, ks, "vt2")
await wait_for_row_count(cql, "system.view_build_status_v2", node_count*2, hosts[0])
await manager.server_stop_gracefully(servers[-1].server_id)
await manager.remove_node(servers[0].server_id, servers[-1].server_id)
servers.pop()
cql, hosts = await manager.get_ready_cql(servers)
# The 2 rows belonging to the node that was removed, one for each view, should
# be deleted from the table.
await wait_for_row_count(cql, "system.view_build_status_v2", (node_count-1)*2, hosts[0])
# Replace a node and verify that the view_build_status has rows for the new node and
# no rows for the old node
@pytest.mark.asyncio
async def test_view_build_status_with_replace_node(manager: ManagerClient):
node_count = 4
servers = await manager.servers_add(node_count)
cql, hosts = await manager.get_ready_cql(servers)
ks = await create_keyspace(cql)
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
await create_mv(cql, ks, "vt2")
await wait_for_row_count(cql, "system.view_build_status_v2", node_count*2, hosts[1])
# replace a node
removed_host_id = await manager.get_host_id(servers[0].server_id)
await manager.server_stop_gracefully(servers[0].server_id);
replace_cfg = ReplaceConfig(replaced_id = servers[0].server_id, reuse_ip_addr = False, use_host_id = True)
servers.append(await manager.server_add(replace_cfg))
servers = servers[1:]
added_host_id = await manager.get_host_id(servers[-1].server_id)
await manager.driver_connect(server=servers[1])
cql = manager.get_cql()
# wait for the old node rows to be removed and new node rows to be added
async def node_rows_replaced():
result = await cql.run_async(f"SELECT * FROM system.view_build_status_v2 WHERE host_id={removed_host_id} ALLOW FILTERING")
if len(result) != 0:
return None
result = await cql.run_async(f"SELECT * FROM system.view_build_status_v2 WHERE host_id={added_host_id} ALLOW FILTERING")
if len(result) != 2:
return None
result = await cql.run_async("SELECT * FROM system.view_build_status_v2")
if len(result) != node_count * 2:
return None
return True
await wait_for(node_rows_replaced, time.time() + 60)
# Reproduces scylladb/scylladb#20754
# View build status migration is doing read with CL=ALL, so it requires all nodes to be up.
# Before the fix, the migration was triggered too early, causing unavailable exception in topology coordinator.
# The error was triggered when the cluster started in raft topology without view build status v2.
# It wasn't happening in gossip topology -> raft topology upgrade.
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injection is not supported in release mode')
async def test_migration_on_existing_raft_topology(request, manager: ManagerClient):
cfg = {
"error_injections_at_startup": [
{
"name": "suppress_features",
"value": "VIEW_BUILD_STATUS_ON_GROUP0"
},
"skip_vb_v2_version_mut"
]
}
servers = await manager.servers_add(3, config=cfg)
logging.info("Waiting until driver connects to every server")
cql, hosts = await manager.get_ready_cql(servers)
ks = await create_new_test_keyspace(cql, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}")
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
# Verify we're using v1 now
v = await get_view_builder_version(cql)
assert v == 1
await wait_for_row_count(cql, "system_distributed.view_build_status", 3, hosts[0])
result = await cql.run_async("SELECT * FROM system.view_build_status_v2")
assert len(result) == 0
# Enable suppressed `VIEW_BUILD_STATUS_ON_GROUP0` cluster feature
for srv in servers:
await manager.server_stop_gracefully(srv.server_id)
await manager.server_update_config(srv.server_id, "error_injections_at_startup", [])
await manager.server_start(srv.server_id)
await wait_for_cql_and_get_hosts(manager.get_cql(), servers, time.time() + 60)
logging.info("Waiting until view builder status is migrated")
await asyncio.gather(*(wait_for(lambda: view_builder_is_v2(cql, host=h), time.time() + 60) for h in hosts))
# Check that new writes are written to the v2 table
await create_mv(cql, ks, "vt2")
await asyncio.gather(*(wait_for_view_v2(cql, ks, "vt2", 3, host=h) for h in hosts))
await wait_for_row_count(cql, "system.view_build_status_v2", 6, hosts[0])
# Check if there is no error logs from raft topology
for srv in servers:
log = await manager.server_open_log(srv.server_id)
res = await log.grep(r'ERROR.*\[shard [0-9]: [a-z]+\] raft_topology - topology change coordinator fiber got error exceptions::unavailable_exception \(Cannot achieve consistency level for')
assert len(res) == 0
# Test that when removing the view, its build status is cleaned from the status table
@pytest.mark.asyncio
async def test_view_build_status_cleanup_on_drop_view(manager: ManagerClient):
node_count = 4
servers = await manager.servers_add(node_count)
cql, hosts = await manager.get_ready_cql(servers)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
await wait_for_view_build_status(cql, ks, "vt1", "SUCCESS", node_count)
await cql.run_async(f"DROP MATERIALIZED VIEW {ks}.vt1")
await wait_for_view_build_status(cql, ks, "vt1", "SUCCESS", 0)
# Test that when removing the view, its build status is cleaned from the status table
@pytest.mark.asyncio
async def test_view_build_status_extended_on_added_node(manager: ManagerClient):
node_count = 4
servers = await manager.servers_add(node_count)
cql, hosts = await manager.get_ready_cql(servers)
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await create_table(cql, ks)
await create_mv(cql, ks, "vt1")
await wait_for_view_build_status(cql, ks, "vt1", "SUCCESS", node_count)
await manager.server_add()
await wait_for_view_build_status(cql, ks, "vt1", "SUCCESS", node_count+1)
# Test that when removing the view, its build status is cleaned from the status table
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_view_build_status_marked_started_on_node_added_during_building(manager: ManagerClient):
node_count = 4
servers = await manager.servers_add(node_count, cmdline=[
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',
'--logger-log-level', 'view_building_coordinator=debug',
'--logger-log-level', 'view_building_worker=debug',
])
cql, hosts = await manager.get_ready_cql(servers)
await manager.disable_tablet_balancing()
async with new_test_keyspace(manager, f"WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 1}}") as ks:
await create_table(cql, ks)
await pause_view_building_tasks(manager, pause_all=True)
marks = await mark_all_servers(manager)
await create_mv(cql, ks, "vt1")
await wait_for_some_view_build_tasks_to_get_stuck(manager, marks)
await wait_for_view_build_status(cql, ks, "vt1", "STARTED", node_count)
await manager.server_add(cmdline=[
'--logger-log-level', 'storage_service=debug',
'--logger-log-level', 'raft_topology=debug',
'--logger-log-level', 'view_building_coordinator=debug',
'--logger-log-level', 'view_building_worker=debug',
])
await wait_for_view_build_status(cql, ks, "vt1", "STARTED", node_count+1)
await unpause_view_building_tasks(manager)
await wait_for_view_build_status(cql, ks, "vt1", "SUCCESS", node_count+1)