`test_schema_versioning_with_recovery` is currently flaky. It performs a write with CL=ALL and then checks if the schema version is the same on all nodes by calling `verify_table_versions_synced`. All nodes are expected to sync their schema before handling the replica write. The node in RECOVERY mode should do it through a schema pull, and other nodes should do it through a group 0 read barrier. The problem is in `verify_local_schema_versions_synced` that compares the schema versions in `system.local`. The node in RECOVERY mode updates the schema version in `system.local` after it acknowledges the replica write as completed. Hence, the check can fail. We fix the problem by making the function wait until the schema versions match. Note that RECOVERY mode is about to be retired together with the whole gossip-based topology in 2026.2. So, this test is about to be deleted. However, we still want to fix it, so that it doesn't bother us in older branches. Fixes #23803 Closes scylladb/scylladb#28114
359 lines
16 KiB
Python
359 lines
16 KiB
Python
#
|
|
# Copyright (C) 2023-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
import asyncio
|
|
import time
|
|
import pytest
|
|
import logging
|
|
import re
|
|
from uuid import UUID
|
|
|
|
from cassandra.cluster import Session, ConsistencyLevel # type: ignore
|
|
from cassandra.query import SimpleStatement # type: ignore
|
|
from cassandra.pool import Host # type: ignore
|
|
|
|
from test.pylib.manager_client import ManagerClient, ServerInfo
|
|
from test.pylib.util import wait_for, wait_for_cql_and_get_hosts
|
|
from test.pylib.log_browsing import ScyllaLogFile
|
|
from test.cluster.util import reconnect_driver, wait_until_upgrade_finishes, \
|
|
enter_recovery_state, delete_raft_data_and_upgrade_state, new_test_keyspace
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_local_schema_version(cql: Session, h: Host) -> UUID:
|
|
rs = await cql.run_async("select schema_version from system.local where key = 'local'", host=h)
|
|
assert(rs)
|
|
return rs[0].schema_version
|
|
|
|
|
|
async def get_group0_schema_version(cql: Session, h: Host) -> UUID | None:
|
|
rs = await cql.run_async("select value from system.scylla_local where key = 'group0_schema_version'", host=h)
|
|
if rs:
|
|
return UUID(rs[0].value)
|
|
return None
|
|
|
|
|
|
async def get_scylla_tables_versions(cql: Session, h: Host) -> list[tuple[str, str, UUID | None]]:
|
|
rs = await cql.run_async("select keyspace_name, table_name, version from system_schema.scylla_tables", host=h)
|
|
return [(r.keyspace_name, r.table_name, r.version) for r in rs]
|
|
|
|
|
|
async def get_scylla_tables_version(cql: Session, h: Host, keyspace_name: str, table_name: str) -> UUID | None:
|
|
rs = await cql.run_async(
|
|
f"select version from system_schema.scylla_tables"
|
|
f" where keyspace_name = '{keyspace_name}' and table_name = '{table_name}'",
|
|
host=h)
|
|
if not rs:
|
|
pytest.fail(f"No scylla_tables row found for {keyspace_name}.{table_name}")
|
|
return rs[0].version
|
|
|
|
|
|
async def verify_local_schema_versions_synced(cql: Session, hs: list[Host]) -> None:
|
|
async def check():
|
|
versions = {h: await get_local_schema_version(cql, h) for h in hs}
|
|
logger.info(f"system.local schema_versions: {versions}")
|
|
h1, v1 = next(iter(versions.items()))
|
|
for h, v in versions.items():
|
|
if v != v1:
|
|
logger.info(f"{h1}'s system.local schema_version {v1} is different than {h}'s version {v}; retrying")
|
|
return None
|
|
return True
|
|
await wait_for(check, deadline=time.time() + 5.0, period=1.0)
|
|
|
|
|
|
async def verify_group0_schema_versions_synced(cql: Session, hs: list[Host]) -> None:
|
|
versions = {h: await get_group0_schema_version(cql, h) for h in hs}
|
|
logger.info(f"system.scylla_local group0_schema_versions: {versions}")
|
|
h1, v1 = next(iter(versions.items()))
|
|
for h, v in versions.items():
|
|
if v != v1:
|
|
pytest.fail(f"{h1}'s system.scylla_local group0_schema_version {v1} is different than {h}'s version {v}")
|
|
|
|
|
|
async def verify_scylla_tables_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool) -> None:
|
|
versions = {h: set(await get_scylla_tables_versions(cql, h)) for h in hs}
|
|
logger.info(f"system_schema.scylla_tables: {versions}")
|
|
h1, v1 = next(iter(versions.items()))
|
|
for h, v in versions.items():
|
|
diff = v.symmetric_difference(v1)
|
|
if ignore_system_tables:
|
|
diff = {(k, t, v) for k, t, v in diff if k != "system"}
|
|
if diff:
|
|
pytest.fail(f"{h1}'s system_schema.scylla_tables contents is different than {h}'s, symmetric diff: {diff}")
|
|
|
|
|
|
async def verify_table_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool = False) -> None:
|
|
logger.info("Verifying that versions stored in tables are in sync")
|
|
await verify_group0_schema_versions_synced(cql, hs)
|
|
await verify_local_schema_versions_synced(cql, hs)
|
|
await verify_scylla_tables_versions_synced(cql, hs, ignore_system_tables)
|
|
|
|
|
|
async def verify_in_memory_table_versions(srvs: list[ServerInfo], logs: list[ScyllaLogFile], marks: list[int], table):
|
|
"""
|
|
Assumes that `logs` are log files of servers `srvs`, correspondingly in order.
|
|
Assumes that `marks` are log markers (obtained by `ScyllaLogFile.mark()`) corresponding to `logs` in order.
|
|
Assumes that an 'alter table {table} ...' statement was performed after obtaining `marks`.
|
|
Checks that every server printed the same version in `Altering {table}...' log message.
|
|
"""
|
|
logger.info("Verifying that in-memory table schema versions are in sync")
|
|
matches = [await log.grep(f"Altering {table}.*version=(.*)", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
|
|
def get_version(srv: ServerInfo, matches: list[tuple[str, re.Match[str]]]):
|
|
if not matches:
|
|
pytest.fail(f"Server {srv} didn't log 'Altering' message")
|
|
_, match = matches[0]
|
|
return UUID(match.group(1))
|
|
|
|
versions = {srv: get_version(srv, m) for srv, m in zip(srvs, matches)}
|
|
logger.info(f"In-memory table versions: {versions}")
|
|
|
|
s1, v1 = next(iter(versions.items()))
|
|
for s, v in versions.items():
|
|
if v != v1:
|
|
pytest.fail(f"{s1}'s in-memory table version {v1} is different than {s}'s version {v}")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_schema_versioning_with_recovery(manager: ManagerClient):
|
|
"""
|
|
Perform schema changes while mixing nodes in RECOVERY mode with nodes in group 0 mode.
|
|
Schema changes originating from RECOVERY node use digest-based schema versioning.
|
|
Schema changes originating from group 0 nodes use persisted versions committed through group 0.
|
|
|
|
Verify that schema versions are in sync after each schema change.
|
|
"""
|
|
cfg = {'enable_user_defined_functions': False,
|
|
'force_gossip_topology_changes': True,
|
|
'tablets_mode_for_new_keyspaces': 'disabled'}
|
|
logger.info("Booting cluster")
|
|
# Must bootstrap sequentially because of gossip topology changes
|
|
servers = [await manager.server_add(config=cfg, property_file={"dc":"dc1", "rack":f"rack{i+1}"}) for i in range(3)]
|
|
cql = manager.get_cql()
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info("Creating keyspace and table")
|
|
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks_name:
|
|
await verify_table_versions_synced(cql, hosts)
|
|
table_name = "t"
|
|
table = f"{ks_name}.{table_name}"
|
|
await cql.run_async(f"create table {table} (pk int primary key)")
|
|
|
|
logger.info("Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert ks_t_version
|
|
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
|
|
logger.info("Altering table")
|
|
await cql.run_async(f"alter table {table} with comment = ''")
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert new_ks_t_version
|
|
assert new_ks_t_version != ks_t_version
|
|
ks_t_version = new_ks_t_version
|
|
|
|
# We still have a group 0 majority, don't do this at home.
|
|
srv1 = servers[0]
|
|
logger.info(f"Rebooting {srv1} in RECOVERY mode")
|
|
h1 = next(h for h in hosts if h.address == srv1.ip_addr)
|
|
await cql.run_async("update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h1)
|
|
await manager.server_restart(srv1.server_id)
|
|
|
|
cql = await reconnect_driver(manager)
|
|
logger.info(f"Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
|
|
# We're doing a schema change on RECOVERY node while we have two nodes running in group 0 mode.
|
|
# Don't do this at home.
|
|
#
|
|
# Now, the two nodes are not doing any schema changes right now, so this doesn't actually break anything:
|
|
# the RECOVERY node is operating using the old schema change procedure, which means
|
|
# that it pushes the schema mutations to other nodes directly with RPC, modifying
|
|
# the group 0 state machine on other two nodes.
|
|
#
|
|
# There is one problem with this however. If the RECOVERY node considers some other node
|
|
# as DOWN, it will silently *not* push the schema change, completing the operation
|
|
# "successfully" nevertheless (it will return to the driver without error).
|
|
# Usually in this case we rely on eventual convergence of schema through gossip,
|
|
# which will not happen here, because the group 0 nodes are not doing schema pulls!
|
|
# So we need to make sure that the RECOVERY node sees the other nodes as UP before
|
|
# we perform the schema change, so it pushes the mutations to them.
|
|
logger.info(f"Waiting until RECOVERY node ({srv1}) sees other servers as UP")
|
|
await manager.server_sees_others(srv1.server_id, 2)
|
|
|
|
marks = [await log.mark() for log in logs]
|
|
logger.info(f"Altering table on RECOVERY node ({srv1})")
|
|
await cql.run_async(f"alter table {table} with comment = ''", host=h1)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert not new_ks_t_version
|
|
ks_t_version = new_ks_t_version
|
|
|
|
logger.info(f"Stopping {srv1} gracefully")
|
|
await manager.server_stop_gracefully(srv1.server_id)
|
|
|
|
srv2 = servers[1]
|
|
logger.info(f"Waiting until {srv2} sees {srv1} as dead")
|
|
await manager.server_not_sees_other_server(srv2.ip_addr, srv1.ip_addr)
|
|
|
|
# Now we modify schema through group 0 while the RECOVERY node is dead.
|
|
# Don't do this at home.
|
|
marks = [await log.mark() for log in logs]
|
|
h2 = next(h for h in hosts if h.address == srv2.ip_addr)
|
|
logger.info(f"Altering table on group 0 node {srv2}")
|
|
await cql.run_async(f"alter table {table} with comment = ''", host=h2)
|
|
|
|
await manager.server_start(srv1.server_id)
|
|
cql = await reconnect_driver(manager)
|
|
logger.info(f"Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info(f"Waiting until {srv2} sees {srv1} as UP")
|
|
await manager.server_sees_other_server(srv2.ip_addr, srv1.ip_addr)
|
|
|
|
# The RECOVERY node will pull schema when it gets a write.
|
|
# The other group 0 node will do a barrier so it will also sync schema before the write returns.
|
|
logger.info("Forcing schema sync through CL=ALL INSERT")
|
|
await cql.run_async(SimpleStatement(f"insert into {table} (pk) values (0)", consistency_level=ConsistencyLevel.ALL),
|
|
host=h2)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert new_ks_t_version
|
|
ks_t_version = new_ks_t_version
|
|
|
|
srv3 = servers[2]
|
|
h3 = next(h for h in hosts if h.address == srv3.ip_addr)
|
|
logger.info("Finishing recovery")
|
|
for h in [h2, h3]:
|
|
await cql.run_async(
|
|
"update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h)
|
|
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in [srv2, srv3]))
|
|
|
|
cql = await reconnect_driver(manager)
|
|
logger.info("Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
for h in [h1, h2, h3]:
|
|
await delete_raft_data_and_upgrade_state(cql, h)
|
|
|
|
logger.info("Restarting servers")
|
|
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
|
|
|
cql = await reconnect_driver(manager)
|
|
logger.info("Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logging.info(f"Waiting until upgrade finishes")
|
|
for h in [h1, h2, h3]:
|
|
await wait_until_upgrade_finishes(cql, h, time.time() + 60)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
|
|
for change in [
|
|
f"alter table {table} with comment = ''",
|
|
f"alter table {table} add v int",
|
|
f"alter table {table} alter v type blob"]:
|
|
|
|
marks = [await log.mark() for log in logs]
|
|
logger.info(f"Altering table with \"{change}\"")
|
|
await cql.run_async(change)
|
|
|
|
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert new_ks_t_version
|
|
assert new_ks_t_version != ks_t_version
|
|
ks_t_version = new_ks_t_version
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upgrade(manager: ManagerClient):
|
|
"""
|
|
This test uses the gossip-based recovery procedure.
|
|
|
|
While Raft is disabled, we use digest-based schema versioning.
|
|
Once Raft upgrade is complete, we use persisted versions committed through group 0.
|
|
"""
|
|
# Raft upgrade tests had to be replaced with recovery tests (scylladb/scylladb#16192)
|
|
# as prerequisite for getting rid of `consistent_cluster_management` flag.
|
|
# So we do the same here: start a cluster in Raft mode, then enter recovery
|
|
# to simulate a non-Raft cluster.
|
|
cfg = {'enable_user_defined_functions': False,
|
|
'force_gossip_topology_changes': True,
|
|
'tablets_mode_for_new_keyspaces': 'disabled'}
|
|
logger.info("Booting cluster")
|
|
servers = [await manager.server_add(config=cfg, property_file={"dc":"dc1", "rack":f"rack{i+1}"}) for i in range(3)]
|
|
cql = manager.get_cql()
|
|
|
|
logging.info("Waiting until driver connects to every server")
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logging.info(f"Setting recovery state on {hosts} and restarting")
|
|
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
|
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
|
cql = await reconnect_driver(manager)
|
|
|
|
logging.info("Waiting until driver connects to every server")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info("Creating keyspace and table")
|
|
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks_name:
|
|
table = f"{ks_name}.t"
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await cql.run_async(f"create table {table} (pk int primary key)")
|
|
|
|
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
|
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
|
|
|
logging.info(f"Restarting {servers}")
|
|
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
|
cql = await reconnect_driver(manager)
|
|
|
|
logger.info("Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logging.info(f"Waiting until Raft upgrade procedure finishes")
|
|
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
|
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
|
|
marks = [await log.mark() for log in logs]
|
|
logger.info("Altering table")
|
|
await cql.run_async(f"alter table {table} with comment = ''")
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
# `group0_schema_version` should be present
|
|
# and the version column for `{table}` should be non-null.
|
|
for h in hosts:
|
|
logger.info(f"Checking that `group0_schema_version` is set on {h}")
|
|
assert (await get_group0_schema_version(cql, h)) is not None
|
|
|
|
for h in hosts:
|
|
logger.info(f"Checking that `version` column for `{table}` is set on {h}")
|
|
versions = await get_scylla_tables_versions(cql, h)
|
|
for ks, _, v in versions:
|
|
if ks == "ks":
|
|
assert v is not None
|