To allow auto-expansion of numeric RF to rack list. Otherwise, keyspace creation will be rejected if rf-rack-valid keyspaces are enforced.
355 lines
16 KiB
Python
355 lines
16 KiB
Python
#
|
|
# Copyright (C) 2023-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
import asyncio
|
|
import time
|
|
import pytest
|
|
import logging
|
|
import re
|
|
from uuid import UUID
|
|
|
|
from cassandra.cluster import Session, ConsistencyLevel # type: ignore
|
|
from cassandra.query import SimpleStatement # type: ignore
|
|
from cassandra.pool import Host # type: ignore
|
|
|
|
from test.pylib.manager_client import ManagerClient, ServerInfo
|
|
from test.pylib.util import wait_for_cql_and_get_hosts
|
|
from test.pylib.log_browsing import ScyllaLogFile
|
|
from test.cluster.util import reconnect_driver, wait_until_upgrade_finishes, \
|
|
enter_recovery_state, delete_raft_data_and_upgrade_state, new_test_keyspace
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def get_local_schema_version(cql: Session, h: Host) -> UUID:
|
|
rs = await cql.run_async("select schema_version from system.local where key = 'local'", host=h)
|
|
assert(rs)
|
|
return rs[0].schema_version
|
|
|
|
|
|
async def get_group0_schema_version(cql: Session, h: Host) -> UUID | None:
|
|
rs = await cql.run_async("select value from system.scylla_local where key = 'group0_schema_version'", host=h)
|
|
if rs:
|
|
return UUID(rs[0].value)
|
|
return None
|
|
|
|
|
|
async def get_scylla_tables_versions(cql: Session, h: Host) -> list[tuple[str, str, UUID | None]]:
|
|
rs = await cql.run_async("select keyspace_name, table_name, version from system_schema.scylla_tables", host=h)
|
|
return [(r.keyspace_name, r.table_name, r.version) for r in rs]
|
|
|
|
|
|
async def get_scylla_tables_version(cql: Session, h: Host, keyspace_name: str, table_name: str) -> UUID | None:
|
|
rs = await cql.run_async(
|
|
f"select version from system_schema.scylla_tables"
|
|
f" where keyspace_name = '{keyspace_name}' and table_name = '{table_name}'",
|
|
host=h)
|
|
if not rs:
|
|
pytest.fail(f"No scylla_tables row found for {keyspace_name}.{table_name}")
|
|
return rs[0].version
|
|
|
|
|
|
async def verify_local_schema_versions_synced(cql: Session, hs: list[Host]) -> None:
|
|
versions = {h: await get_local_schema_version(cql, h) for h in hs}
|
|
logger.info(f"system.local schema_versions: {versions}")
|
|
h1, v1 = next(iter(versions.items()))
|
|
for h, v in versions.items():
|
|
if v != v1:
|
|
pytest.fail(f"{h1}'s system.local schema_version {v1} is different than {h}'s version {v}")
|
|
|
|
|
|
async def verify_group0_schema_versions_synced(cql: Session, hs: list[Host]) -> None:
|
|
versions = {h: await get_group0_schema_version(cql, h) for h in hs}
|
|
logger.info(f"system.scylla_local group0_schema_versions: {versions}")
|
|
h1, v1 = next(iter(versions.items()))
|
|
for h, v in versions.items():
|
|
if v != v1:
|
|
pytest.fail(f"{h1}'s system.scylla_local group0_schema_version {v1} is different than {h}'s version {v}")
|
|
|
|
|
|
async def verify_scylla_tables_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool) -> None:
|
|
versions = {h: set(await get_scylla_tables_versions(cql, h)) for h in hs}
|
|
logger.info(f"system_schema.scylla_tables: {versions}")
|
|
h1, v1 = next(iter(versions.items()))
|
|
for h, v in versions.items():
|
|
diff = v.symmetric_difference(v1)
|
|
if ignore_system_tables:
|
|
diff = {(k, t, v) for k, t, v in diff if k != "system"}
|
|
if diff:
|
|
pytest.fail(f"{h1}'s system_schema.scylla_tables contents is different than {h}'s, symmetric diff: {diff}")
|
|
|
|
|
|
async def verify_table_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool = False) -> None:
|
|
logger.info("Verifying that versions stored in tables are in sync")
|
|
await verify_group0_schema_versions_synced(cql, hs)
|
|
await verify_local_schema_versions_synced(cql, hs)
|
|
await verify_scylla_tables_versions_synced(cql, hs, ignore_system_tables)
|
|
|
|
|
|
async def verify_in_memory_table_versions(srvs: list[ServerInfo], logs: list[ScyllaLogFile], marks: list[int], table):
|
|
"""
|
|
Assumes that `logs` are log files of servers `srvs`, correspondingly in order.
|
|
Assumes that `marks` are log markers (obtained by `ScyllaLogFile.mark()`) corresponding to `logs` in order.
|
|
Assumes that an 'alter table {table} ...' statement was performed after obtaining `marks`.
|
|
Checks that every server printed the same version in `Altering {table}...' log message.
|
|
"""
|
|
logger.info("Verifying that in-memory table schema versions are in sync")
|
|
matches = [await log.grep(f"Altering {table}.*version=(.*)", from_mark=mark) for log, mark in zip(logs, marks)]
|
|
|
|
def get_version(srv: ServerInfo, matches: list[tuple[str, re.Match[str]]]):
|
|
if not matches:
|
|
pytest.fail(f"Server {srv} didn't log 'Altering' message")
|
|
_, match = matches[0]
|
|
return UUID(match.group(1))
|
|
|
|
versions = {srv: get_version(srv, m) for srv, m in zip(srvs, matches)}
|
|
logger.info(f"In-memory table versions: {versions}")
|
|
|
|
s1, v1 = next(iter(versions.items()))
|
|
for s, v in versions.items():
|
|
if v != v1:
|
|
pytest.fail(f"{s1}'s in-memory table version {v1} is different than {s}'s version {v}")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_schema_versioning_with_recovery(manager: ManagerClient):
|
|
"""
|
|
Perform schema changes while mixing nodes in RECOVERY mode with nodes in group 0 mode.
|
|
Schema changes originating from RECOVERY node use digest-based schema versioning.
|
|
Schema changes originating from group 0 nodes use persisted versions committed through group 0.
|
|
|
|
Verify that schema versions are in sync after each schema change.
|
|
"""
|
|
cfg = {'enable_user_defined_functions': False,
|
|
'force_gossip_topology_changes': True,
|
|
'tablets_mode_for_new_keyspaces': 'disabled'}
|
|
logger.info("Booting cluster")
|
|
# Must bootstrap sequentially because of gossip topology changes
|
|
servers = [await manager.server_add(config=cfg, property_file={"dc":"dc1", "rack":f"rack{i+1}"}) for i in range(3)]
|
|
cql = manager.get_cql()
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info("Creating keyspace and table")
|
|
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks_name:
|
|
await verify_table_versions_synced(cql, hosts)
|
|
table_name = "t"
|
|
table = f"{ks_name}.{table_name}"
|
|
await cql.run_async(f"create table {table} (pk int primary key)")
|
|
|
|
logger.info("Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert ks_t_version
|
|
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
marks = [await log.mark() for log in logs]
|
|
|
|
logger.info("Altering table")
|
|
await cql.run_async(f"alter table {table} with comment = ''")
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert new_ks_t_version
|
|
assert new_ks_t_version != ks_t_version
|
|
ks_t_version = new_ks_t_version
|
|
|
|
# We still have a group 0 majority, don't do this at home.
|
|
srv1 = servers[0]
|
|
logger.info(f"Rebooting {srv1} in RECOVERY mode")
|
|
h1 = next(h for h in hosts if h.address == srv1.ip_addr)
|
|
await cql.run_async("update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h1)
|
|
await manager.server_restart(srv1.server_id)
|
|
|
|
cql = await reconnect_driver(manager)
|
|
logger.info(f"Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
|
|
# We're doing a schema change on RECOVERY node while we have two nodes running in group 0 mode.
|
|
# Don't do this at home.
|
|
#
|
|
# Now, the two nodes are not doing any schema changes right now, so this doesn't actually break anything:
|
|
# the RECOVERY node is operating using the old schema change procedure, which means
|
|
# that it pushes the schema mutations to other nodes directly with RPC, modifying
|
|
# the group 0 state machine on other two nodes.
|
|
#
|
|
# There is one problem with this however. If the RECOVERY node considers some other node
|
|
# as DOWN, it will silently *not* push the schema change, completing the operation
|
|
# "successfully" nevertheless (it will return to the driver without error).
|
|
# Usually in this case we rely on eventual convergence of schema through gossip,
|
|
# which will not happen here, because the group 0 nodes are not doing schema pulls!
|
|
# So we need to make sure that the RECOVERY node sees the other nodes as UP before
|
|
# we perform the schema change, so it pushes the mutations to them.
|
|
logger.info(f"Waiting until RECOVERY node ({srv1}) sees other servers as UP")
|
|
await manager.server_sees_others(srv1.server_id, 2)
|
|
|
|
marks = [await log.mark() for log in logs]
|
|
logger.info(f"Altering table on RECOVERY node ({srv1})")
|
|
await cql.run_async(f"alter table {table} with comment = ''", host=h1)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert not new_ks_t_version
|
|
ks_t_version = new_ks_t_version
|
|
|
|
logger.info(f"Stopping {srv1} gracefully")
|
|
await manager.server_stop_gracefully(srv1.server_id)
|
|
|
|
srv2 = servers[1]
|
|
logger.info(f"Waiting until {srv2} sees {srv1} as dead")
|
|
await manager.server_not_sees_other_server(srv2.ip_addr, srv1.ip_addr)
|
|
|
|
# Now we modify schema through group 0 while the RECOVERY node is dead.
|
|
# Don't do this at home.
|
|
marks = [await log.mark() for log in logs]
|
|
h2 = next(h for h in hosts if h.address == srv2.ip_addr)
|
|
logger.info(f"Altering table on group 0 node {srv2}")
|
|
await cql.run_async(f"alter table {table} with comment = ''", host=h2)
|
|
|
|
await manager.server_start(srv1.server_id)
|
|
cql = await reconnect_driver(manager)
|
|
logger.info(f"Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info(f"Waiting until {srv2} sees {srv1} as UP")
|
|
await manager.server_sees_other_server(srv2.ip_addr, srv1.ip_addr)
|
|
|
|
# The RECOVERY node will pull schema when it gets a write.
|
|
# The other group 0 node will do a barrier so it will also sync schema before the write returns.
|
|
logger.info("Forcing schema sync through CL=ALL INSERT")
|
|
await cql.run_async(SimpleStatement(f"insert into {table} (pk) values (0)", consistency_level=ConsistencyLevel.ALL),
|
|
host=h2)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert new_ks_t_version
|
|
ks_t_version = new_ks_t_version
|
|
|
|
srv3 = servers[2]
|
|
h3 = next(h for h in hosts if h.address == srv3.ip_addr)
|
|
logger.info("Finishing recovery")
|
|
for h in [h2, h3]:
|
|
await cql.run_async(
|
|
"update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h)
|
|
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in [srv2, srv3]))
|
|
|
|
cql = await reconnect_driver(manager)
|
|
logger.info("Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
for h in [h1, h2, h3]:
|
|
await delete_raft_data_and_upgrade_state(cql, h)
|
|
|
|
logger.info("Restarting servers")
|
|
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
|
|
|
cql = await reconnect_driver(manager)
|
|
logger.info("Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logging.info(f"Waiting until upgrade finishes")
|
|
for h in [h1, h2, h3]:
|
|
await wait_until_upgrade_finishes(cql, h, time.time() + 60)
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
|
|
for change in [
|
|
f"alter table {table} with comment = ''",
|
|
f"alter table {table} add v int",
|
|
f"alter table {table} alter v type blob"]:
|
|
|
|
marks = [await log.mark() for log in logs]
|
|
logger.info(f"Altering table with \"{change}\"")
|
|
await cql.run_async(change)
|
|
|
|
new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], ks_name, table_name)
|
|
assert new_ks_t_version
|
|
assert new_ks_t_version != ks_t_version
|
|
ks_t_version = new_ks_t_version
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_upgrade(manager: ManagerClient):
|
|
"""
|
|
This test uses the gossip-based recovery procedure.
|
|
|
|
While Raft is disabled, we use digest-based schema versioning.
|
|
Once Raft upgrade is complete, we use persisted versions committed through group 0.
|
|
"""
|
|
# Raft upgrade tests had to be replaced with recovery tests (scylladb/scylladb#16192)
|
|
# as prerequisite for getting rid of `consistent_cluster_management` flag.
|
|
# So we do the same here: start a cluster in Raft mode, then enter recovery
|
|
# to simulate a non-Raft cluster.
|
|
cfg = {'enable_user_defined_functions': False,
|
|
'force_gossip_topology_changes': True,
|
|
'tablets_mode_for_new_keyspaces': 'disabled'}
|
|
logger.info("Booting cluster")
|
|
servers = [await manager.server_add(config=cfg, property_file={"dc":"dc1", "rack":f"rack{i+1}"}) for i in range(3)]
|
|
cql = manager.get_cql()
|
|
|
|
logging.info("Waiting until driver connects to every server")
|
|
hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logging.info(f"Setting recovery state on {hosts} and restarting")
|
|
await asyncio.gather(*(enter_recovery_state(cql, h) for h in hosts))
|
|
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
|
cql = await reconnect_driver(manager)
|
|
|
|
logging.info("Waiting until driver connects to every server")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logger.info("Creating keyspace and table")
|
|
async with new_test_keyspace(manager, "with replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2}") as ks_name:
|
|
table = f"{ks_name}.t"
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await cql.run_async(f"create table {table} (pk int primary key)")
|
|
|
|
logging.info(f"Deleting Raft data and upgrade state on {hosts}")
|
|
await asyncio.gather(*(delete_raft_data_and_upgrade_state(cql, h) for h in hosts))
|
|
|
|
logging.info(f"Restarting {servers}")
|
|
await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers))
|
|
cql = await reconnect_driver(manager)
|
|
|
|
logger.info("Waiting for driver")
|
|
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
|
|
|
|
logging.info(f"Waiting until Raft upgrade procedure finishes")
|
|
await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts))
|
|
|
|
logs = [await manager.server_open_log(srv.server_id) for srv in servers]
|
|
|
|
marks = [await log.mark() for log in logs]
|
|
logger.info("Altering table")
|
|
await cql.run_async(f"alter table {table} with comment = ''")
|
|
|
|
await verify_table_versions_synced(cql, hosts)
|
|
await verify_in_memory_table_versions(servers, logs, marks, table)
|
|
|
|
# `group0_schema_version` should be present
|
|
# and the version column for `{table}` should be non-null.
|
|
for h in hosts:
|
|
logger.info(f"Checking that `group0_schema_version` is set on {h}")
|
|
assert (await get_group0_schema_version(cql, h)) is not None
|
|
|
|
for h in hosts:
|
|
logger.info(f"Checking that `version` column for `{table}` is set on {h}")
|
|
versions = await get_scylla_tables_versions(cql, h)
|
|
for ks, _, v in versions:
|
|
if ks == "ks":
|
|
assert v is not None
|