Files
scylladb/test/cluster/dtest/schema_management_test.py
Piotr Smaron 32225797cd dtest: fix flaky test_writes_schema_recreated_while_node_down
`read_barrier(session2)` was supposed to ensure `node2` has caught up on schema
before a CL=ALL write. But `patient_cql_connection(node2)` creates a
cluster-aware driver session `(TokenAwarePolicy(DCAwareRoundRobinPolicy()))`
that can route the barrier CQL statement to any node — not necessarily `node2`.
If the barrier runs on `node1` or `node3` (which already have the new schema),
it's a no-op, and `node2` remains stale, thus the observed `WriteFailure`.
The fix is to switch to `patient_exclusive_cql_connection(node2)`,
which uses `WhiteListRoundRobinPolicy([node2_ip])` to pin all CQL to `node2`.
This is already the established pattern used by other tests in the same file.

Fixes: SCYLLADB-1139

No need to backport yet, appeared only on master.

Closes scylladb/scylladb#29151
2026-03-23 10:25:54 +02:00

691 lines
30 KiB
Python

#
# Copyright (C) 2015-present The Apache Software Foundation
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import functools
import logging
import string
import threading
import time
from concurrent import futures
from typing import NamedTuple
import pytest
from cassandra import AlreadyExists, ConsistencyLevel, InvalidRequest
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import SimpleStatement, dict_factory
from concurrent.futures import ThreadPoolExecutor
from dtest_class import Tester, create_cf, create_ks, read_barrier
from tools.assertions import assert_all, assert_invalid
from tools.cluster_topology import generate_cluster_topology
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2, rows_to_list
logger = logging.getLogger(__name__)
class TestSchemaManagement(Tester):
def prepare(self, racks_num: int, has_config: bool = True):
cluster = self.cluster
cluster_topology = generate_cluster_topology(rack_num=racks_num)
if has_config:
config = {
"ring_delay_ms": 5000,
}
cluster.set_configuration_options(values=config)
cluster.populate(cluster_topology)
cluster.start(wait_other_notice=True)
return cluster
def test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema...")
create_ks(session, "ks", 3)
session.execute(
"""
CREATE TABLE users (
id int,
firstname text,
lastname text,
PRIMARY KEY (id)
);
"""
)
insert_statement = session.prepare("INSERT INTO users (id, firstname, lastname) VALUES (?, 'A', 'B')")
insert_statement.consistency_level = ConsistencyLevel.ALL
session.execute(insert_statement, [0])
logger.debug("Altering schema")
session.execute("ALTER TABLE users WITH comment = 'updated'")
logger.debug("Restarting node2")
node2.stop(gently=True)
node2.start(wait_for_binary_proto=True)
logger.debug("Restarting node3")
node3.stop(gently=True)
node3.start(wait_for_binary_proto=True, wait_other_notice=True)
n_partitions = 20
for i in range(n_partitions):
session.execute(insert_statement, [i])
rows = session.execute("SELECT * FROM users")
res = sorted(rows)
assert len(res) == n_partitions
for i in range(n_partitions):
expected = [i, "A", "B"]
assert list(res[i]) == expected, f"Expected {expected}, got {res[i]}"
def test_dropping_keyspace_with_many_columns(self):
"""
Exploits https://github.com/scylladb/scylla/issues/1484
"""
cluster = self.prepare(racks_num=1, has_config=False)
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
session.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
for i in range(8):
session.execute(f"CREATE TABLE testxyz.test_{i} (k int, c int, PRIMARY KEY (k),)")
session.execute("drop keyspace testxyz")
for node in cluster.nodelist():
s = self.patient_cql_connection(node)
s.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
s.execute("drop keyspace testxyz")
def test_multiple_create_table_in_parallel(self):
"""
Run multiple create table statements via different nodes
1. Create a cluster of 3 nodes
2. Run create table with different table names in parallel - check all complete
3. Run create table with the same table name in parallel - check if they complete
"""
logger.debug("1. Create a cluster of 3 nodes")
nodes_count = 3
cluster = self.prepare(racks_num=nodes_count)
sessions = [self.patient_exclusive_cql_connection(node) for node in cluster.nodelist()]
ks = "ks"
create_ks(sessions[0], ks, nodes_count)
def create_table(session, table_name):
create_statement = f"CREATE TABLE {ks}.{table_name} (p int PRIMARY KEY, c0 text, c1 text, c2 text, c3 text, c4 text, c5 text, c6 text, c7 text, c8 text, c9 text);"
logger.debug(f"create_statement {create_statement}")
session.execute(create_statement)
logger.debug("2. Run create table with different table names in parallel - check all complete")
step2_tables = [f"t{i}" for i in range(nodes_count)]
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
list(executor.map(create_table, sessions, step2_tables))
for table in step2_tables:
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{table}", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
logger.debug("3. Run create table with the same table name in parallel - check if they complete")
step3_table = "test"
step3_tables = [step3_table for i in range(nodes_count)]
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
res_futures = [executor.submit(create_table, *args) for args in zip(sessions, step3_tables)]
for res_future in res_futures:
try:
res_future.result()
except AlreadyExists as e:
logger.info(f"expected cassandra.AlreadyExists error {e}")
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{step3_table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
sessions[0].execute(f"SELECT * FROM {ks}.{step3_table}")
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{step3_table}", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
@pytest.mark.parametrize("case", ("write", "read", "mixed"))
def test_alter_table_in_parallel_to_read_and_write(self, case):
"""
Create a table and write into while altering the table
1. Create a cluster of 3 nodes and populate a table
2. Run write/read/read_and_write" statement in a loop
3. Alter table while inserts are running
"""
logger.debug("1. Create a cluster of 3 nodes and populate a table")
cluster = self.prepare(racks_num=3)
col_number = 20
[node1, node2, node3] = cluster.nodelist()
session = self.patient_exclusive_cql_connection(node1)
def run_stress(stress_type, col=col_number - 2):
node2.stress_object([stress_type, "n=10000", "cl=QUORUM", "-schema", "replication(factor=3)", "-col", f"n=FIXED({col})", "-rate", "threads=1"])
logger.debug("Populate")
run_stress("write", col_number)
with ThreadPoolExecutor(max_workers=1) as executor:
logger.debug(f"2. Run {case} statement in a loop")
statement_future = executor.submit(functools.partial(run_stress, case))
logger.debug(f"let's {case} statement work some time")
time.sleep(2)
logger.debug("3. Alter table while inserts are running")
alter_statement = f'ALTER TABLE keyspace1.standard1 DROP ("C{col_number - 1}", "C{col_number - 2}")'
logger.debug(f"alter_statement {alter_statement}")
alter_result = session.execute(alter_statement)
logger.debug(alter_result.all())
logger.debug(f"wait till {case} statement finished")
statement_future.result()
rows = session.execute(SimpleStatement("SELECT * FROM keyspace1.standard1 LIMIT 1;", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)[0]) == col_number - 1, f"Expected {col_number - 1} columns but got rows:{rows} instead"
logger.debug("read and check data")
run_stress("read")
@pytest.mark.skip("unimplemented")
def commitlog_replays_after_schema_change(self):
"""
Commitlog can be replayed even though schema has been changed
1. Create a table and insert data
2. Alter table
3. Kill node
4. Boot node and verify that commitlog have been replayed and that all data is restored
"""
raise NotImplementedError
@pytest.mark.parametrize("case", ("create_table", "alter_table", "drop_table"))
def test_update_schema_while_node_is_killed(self, case):
"""
Check that a node that is killed durring a table creation/alter/drop is able to rejoin and to synch on schema
"""
logger.debug("1. Create a cluster and insert data")
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
def create_table_case():
try:
logger.debug("Creating table")
create_c1c2_table(session)
logger.debug("Populating")
insert_c1c2(session, n=10)
except AlreadyExists:
# the CQL command can be called multiple time case of retries
pass
def alter_table_case():
try:
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
except InvalidRequest as exc:
# the CQL command can be called multiple time case of retries
assert "Invalid column name c3" in str(exc)
def drop_table_case():
try:
session.execute("DROP TABLE cf;", timeout=180)
except InvalidRequest as exc:
# the CQL command can be called multiple time case of retries
assert "Cannot drop non existing table" in str(exc)
logger.debug("Creating keyspace")
create_ks(session, "ks", 3)
if case != "create_table":
create_table_case()
case_map = {
"create_table": create_table_case,
"alter_table": alter_table_case,
"drop_table": drop_table_case,
}
with ThreadPoolExecutor(max_workers=1) as executor:
logger.debug(f"2. kill node during {case}")
kill_node_future = executor.submit(node2.stop, gently=False, wait_other_notice=True)
case_map[case]()
kill_node_future.result()
logger.debug("3. Start the stopped node2")
node2.start(wait_for_binary_proto=True)
session = self.patient_exclusive_cql_connection(node2)
read_barrier(session)
def create_or_alter_table_expected_result(col_mun):
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf LIMIT 1;", consistency_level=ConsistencyLevel.QUORUM))
assert len(rows_to_list(rows)[0]) == col_mun, f"Expected {col_mun} columns but got rows:{rows} instead"
for key in range(10):
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.QUORUM)
expected_case_result_map = {
"create_table": functools.partial(create_or_alter_table_expected_result, 3),
"alter_table": functools.partial(create_or_alter_table_expected_result, 4),
"drop_table": functools.partial(assert_invalid, session, "SELECT * FROM test1"),
}
logger.debug("verify that commitlog has been replayed and that all data is restored")
expected_case_result_map[case]()
@pytest.mark.parametrize("is_gently_stop", [True, False])
def test_nodes_rejoining_a_cluster_synch_on_schema(self, is_gently_stop):
"""
Nodes rejoining the cluster synch on schema changes
1. Create a cluster and insert data
2. Stop a node
3. Alter table
4. Insert additional data
5. Start the stopped node
6. Verify the stopped node synchs on the updated schema
"""
logger.debug("1. Create a cluster and insert data")
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
create_c1c2_table(session)
create_cf(session, "cf", key_name="p", key_type="int", columns={"v": "text"})
logger.debug("Populating")
insert_c1c2(session, n=10, consistency=ConsistencyLevel.ALL)
logger.debug("2 Stop a node1")
node1.stop(gently=is_gently_stop, wait_other_notice=True)
logger.debug("3 Alter table")
session = self.patient_cql_connection(node2)
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
logger.debug("4 Insert additional data")
session.execute(SimpleStatement("INSERT INTO ks.cf (key, c1, c2, c3) VALUES ('test', 'test', 'test', 'test')", consistency_level=ConsistencyLevel.QUORUM))
logger.debug("5. Start the stopped node1")
node1.start(wait_for_binary_proto=True)
logger.debug("6. Verify the stopped node synchs on the updated schema")
session = self.patient_exclusive_cql_connection(node1)
read_barrier(session)
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf WHERE key='test'", consistency_level=ConsistencyLevel.ALL))
expected = [["test", "test", "test", "test"]]
assert rows_to_list(rows) == expected, f"Expected {expected} but got {rows} instead"
for key in range(10):
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.ALL)
def test_reads_schema_recreated_while_node_down(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Populating")
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
logger.debug("Stopping node2")
node2.stop(gently=True)
logger.debug("Re-creating schema")
session.execute("DROP TABLE cf;")
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v1 bigint, v2 text);")
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_exclusive_cql_connection(node2)
read_barrier(session2)
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
assert rows_to_list(rows) == [], f"Expected an empty result set, got {rows}"
def test_writes_schema_recreated_while_node_down(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Populating")
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
logger.debug("Stopping node2")
node2.stop(gently=True, wait_other_notice=True)
logger.debug("Re-creating schema")
session.execute("DROP TABLE cf;")
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_exclusive_cql_connection(node2)
read_barrier(session2)
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
expected = [[2, "2"]]
assert rows_to_list(rows) == expected, f"Expected {expected}, got {rows_to_list(rows)}"
class TestLargePartitionAlterSchema(Tester):
# Issue scylladb/scylla: #5135:
#
# Issue: Cache reads may miss some writes if schema alter followed by a read happened concurrently with preempted
# partition entry update
# Affects only tables with multi-row partitions, which are the only ones that can experience the update of partition
# entry being preempted.
#
# The scenario in which the problem could have happened has to involve:
# - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
# - appending writes to the partition (not overwrites)
# - scans of the partition
# - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
# column lands in the middle (in alphabetical order) of the old column set.
#
# Memtable flush has to happen after a schema alter concurrently with a read.
#
# The bug could result in cache corruption which manifests as some past writes being missing (not visible to reads).
PARTITIONS = 50
STRING_VALUE = string.ascii_lowercase
def prepare(self, cluster_topology: dict[str, dict[str, int]], rf: int):
if not self.cluster.nodelist():
self.cluster.populate(cluster_topology)
self.cluster.start(wait_other_notice=True)
node1 = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node=node1)
self.create_schema(session=session, rf=rf)
return session
def create_schema(self, session, rf):
logger.debug("Creating schema")
create_ks(session=session, name="ks", rf=rf)
session.execute(
"""
CREATE TABLE lp_table (
pk int,
ck1 int,
val1 text,
val2 text,
PRIMARY KEY (pk, ck1)
);
"""
)
def populate(self, session, data, ck_start, ck_end=None, stop_populating: threading.Event = None):
ck = ck_start
def _populate_loop():
nonlocal ck
while True:
if stop_populating is not None and stop_populating.is_set():
return
if ck_end is not None and ck >= ck_end:
return
for pk in range(self.PARTITIONS):
row = [pk, ck, self.STRING_VALUE, self.STRING_VALUE]
data.append(row)
yield tuple(row)
ck += 1
records_written = ck - ck_start
logger.debug(f"Start populate DB: {self.PARTITIONS} partitions with {ck_end - ck_start if ck_end else 'infinite'} records in each partition")
parameters = _populate_loop()
stmt = session.prepare("INSERT INTO lp_table (pk, ck1, val1, val2) VALUES (?, ?, ?, ?)")
execute_concurrent_with_args(session=session, statement=stmt, parameters=parameters, concurrency=100)
logger.debug(f"Finish populate DB: {self.PARTITIONS} partitions with {records_written} records in each partition")
return data
def read(self, session, ck_max, stop_reading: threading.Event = None):
def _read_loop():
while True:
for ck in range(ck_max):
for pk in range(self.PARTITIONS):
if stop_reading is not None and stop_reading.is_set():
return
session.execute(f"select * from lp_table where pk = {pk} and ck1 = {ck}")
if stop_reading is None:
return
logger.debug(f"Start reading..")
_read_loop()
logger.debug(f"Finish reading..")
def add_column(self, session, column_name, column_type):
logger.debug(f"Add {column_name} column")
session.execute(f"ALTER TABLE lp_table ADD {column_name} {column_type}")
def drop_column(self, session, column_name):
logger.debug(f"Drop {column_name} column")
session.execute(f"ALTER TABLE lp_table DROP {column_name}")
def test_large_partition_with_add_column(self):
cluster_topology = generate_cluster_topology()
session = self.prepare(cluster_topology, rf=1)
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
threads = []
timeout = 300
ck_end = 5000
if self.cluster.scylla_mode == "debug":
timeout = 900
ck_end = 500
with ThreadPoolExecutor(max_workers=2) as executor:
stop_populating = threading.Event()
stop_reading = threading.Event()
# Insert new rows in background
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
# Wait for running load
time.sleep(10)
self.add_column(session, "new_clmn", "int")
# Memtable flush has to happen after a schema alter concurrently with a read
logger.debug("Flush data")
self.cluster.nodelist()[0].flush()
# Stop populating and reading soon after flush
time.sleep(1)
logger.debug("Stop populating and reading")
stop_populating.set()
stop_reading.set()
for future in futures.as_completed(threads, timeout=timeout):
try:
future.result()
except Exception as exc: # noqa: BLE001
pytest.fail(f"Generated an exception: {exc}")
# Add 'null' values for the new column `new_clmn` in the expected data
for i, _ in enumerate(data):
data[i].append(None)
assert_all(session, f"select pk, ck1, val1, val2, new_clmn from lp_table", data, ignore_order=True, print_result_on_failure=False)
def test_large_partition_with_drop_column(self):
cluster_topology = generate_cluster_topology()
session = self.prepare(cluster_topology, rf=1)
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
threads = []
timeout = 300
ck_end = 5000
if self.cluster.scylla_mode == "debug":
timeout = 900
ck_end = 500
with ThreadPoolExecutor(max_workers=2) as executor:
stop_populating = threading.Event()
stop_reading = threading.Event()
# Insert new rows in background
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
# Wait for running load
time.sleep(10)
self.drop_column(session=session, column_name="val1")
# Memtable flush has to happen after a schema alter concurrently with a read
logger.debug("Flush data")
self.cluster.nodelist()[0].flush()
# Stop populating and reading soon after flush
time.sleep(1)
logger.debug("Stop populating and reading")
stop_populating.set()
stop_reading.set()
result = []
for future in futures.as_completed(threads, timeout=timeout):
try:
result.append(future.result())
except Exception as exc: # noqa: BLE001
# "Unknown identifier val1" is expected error
if not len(exc.args) or "Unknown identifier val1" not in exc.args[0]:
pytest.fail(f"Generated an exception: {exc}")
class HistoryVerifier:
def __init__(self, table_name="table1", keyspace_name="lwt_load_ks"):
"""
Initialize parameters for further verification of schema history.
:param table_name: table thats we change it's schema and verify schema history accordingly.
"""
self.table_name = table_name
self.keyspace_name = keyspace_name
self.versions = []
self.versions_dict = {}
self.query = ""
def verify(self, session, expected_current_diff, expected_prev_diff, query):
"""
Verify current schema history entry by comparing to previous schema entry.
:param session: python cql session
:param expected_current_diff: difference of current schema from previous schema
:param expected_prev_diff: difference of previous schema from current schema
:param query: The query that created new schema
"""
def get_table_id(session, keyspace_name, table_name):
assert keyspace_name, f"Input kesyspcase should have value, keyspace_name={keyspace_name}"
assert table_name, f"Input table_name should have value, table_name={table_name}"
query = "select keyspace_name,table_name,id from system_schema.tables"
query += f" WHERE keyspace_name='{keyspace_name}' AND table_name='{table_name}'"
current_rows = session.execute(query).current_rows
assert len(current_rows) == 1, f"Not found table description, ks={keyspace_name} table_name={table_name}"
res = current_rows[0]
return res["id"]
def read_schema_history_table(session, cf_id):
"""
read system.scylla_table_schema_history and verify current version diff from previous vesion
:param session: python cql session
:param cf_id: uuid of the table we changed it's schema
"""
query = f"select * from system.scylla_table_schema_history WHERE cf_id={cf_id}"
res = session.execute(query).current_rows
new_versions = list({
entry["schema_version"]
for entry in res
if str(entry["schema_version"]) not in self.versions
})
msg = f"Expect 1, got len(new_versions)={len(new_versions)}"
assert len(new_versions) == 1, msg
current_version = str(new_versions[0])
logger.debug(f"New schema_version {current_version} after executing '{self.query}'")
columns_list = (
{"column_name": entry["column_name"], "type": entry["type"]}
for entry in res
if entry["kind"] == "regular" and current_version == str(entry["schema_version"])
)
self.versions_dict[current_version] = {}
for item in columns_list:
self.versions_dict[current_version][item["column_name"]] = item["type"]
self.versions.append(current_version)
if len(self.versions) > 1:
current_id = self.versions[-1]
previous_id = self.versions[-2]
set_current = set(self.versions_dict[current_id].items())
set_previous = set(self.versions_dict[previous_id].items())
current_diff = set_current - set_previous
previous_diff = set_previous - set_current
msg1 = f"Expect diff(new schema,old schema) to be {expected_current_diff} got {current_diff}"
msg2 = f" query is '{self.query}' versions={current_id},{previous_id}"
if current_diff != expected_current_diff:
logger.debug(msg1 + msg2)
assert current_diff == expected_current_diff, msg1 + msg2
msg1 = f"Expect diff(old schema,new schema) to be {expected_prev_diff} got {previous_diff}"
assert previous_diff == expected_prev_diff, msg1 + msg2
self.query = query
cf_id = get_table_id(session, keyspace_name=self.keyspace_name, table_name=self.table_name)
read_schema_history_table(session, cf_id)
class DDL(NamedTuple):
ddl_command: str
expected_current_diff: set | None
expected_prev_diff: set | None
class TestSchemaHistory(Tester):
def prepare(self):
cluster = self.cluster
# in case support tablets and rf-rack-valid-keyspaces
# create cluster with 3 racks with 1 node in each rack
cluster_topology = generate_cluster_topology(rack_num=3)
rf = 3
cluster.populate(cluster_topology).start(wait_other_notice=True)
self.session = self.patient_cql_connection(self.cluster.nodelist()[0], row_factory=dict_factory)
create_ks(self.session, "lwt_load_ks", rf)
def test_schema_history_alter_table(self):
"""test schema history changes following alter table cql commands"""
self.prepare()
verifier = HistoryVerifier(table_name="table2")
queries_and_expected_diffs = [
DDL(ddl_command="CREATE TABLE IF NOT EXISTS lwt_load_ks.table2 (pk int PRIMARY KEY, v int, int_col int)", expected_current_diff=None, expected_prev_diff=None),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER v TYPE varint", expected_current_diff={("v", "varint")}, expected_prev_diff={("v", "int")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD (v2 int, v3 int)", expected_current_diff={("v2", "int"), ("v3", "int")}, expected_prev_diff=set()),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER int_col TYPE varint", expected_current_diff={("int_col", "varint")}, expected_prev_diff={("int_col", "int")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP int_col", expected_current_diff=set(), expected_prev_diff={("int_col", "varint")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD int_col bigint", expected_current_diff={("int_col", "bigint")}, expected_prev_diff=set()),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP (int_col,v)", expected_current_diff=set(), expected_prev_diff={("int_col", "bigint"), ("v", "varint")}),
]
for ddl in queries_and_expected_diffs:
self.session.execute(ddl.ddl_command)
verifier.verify(self.session, ddl.expected_current_diff, ddl.expected_prev_diff, query=ddl.ddl_command)