Files
scylladb/test/cluster/dtest/schema_management_test.py
Dario Mirovic f1d63d014c test: dtest: schema_management_test.py: speed up TestLargePartitionAlterSchema tests
The tests in `TestLargePartitionAlterSchema` are `test_large_partition_with_add_column`
and `test_large_partition_with_drop_column`.

These tests need to replicate the following conditions that led to a bug before a fix from around 5 years ago.

The scenario in which the problem could have happened has to involve:
- a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
- appending writes to the partition (not overwrites)
- scans of the partition
- schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
  column lands in the middle (in alphabetical order) of the old column set.

The way the test is set up is:
- fixed number of writes per populate call
- fixed number of reads

This has the following implications:
- if the machine executing the test is fast, all the writes are done before the 10 seconds sleep
- there are too many reads - most of them get executed after the test logic is done

This patch solves these issues in the following way:
- populate lazily generates write data, and stops when instructed by `stop_populating` event
- read, which is done sequentially, stops when instructed by `stop_reading` event
- number of max operations is increased significantly, but the operations are stopped 1 second
  after node flush; this makes sure there are enough operations during the test, but also that
  the test does not take unnecessary time

Test execution time has been reduced severalfold. On dev machine the time the tests take is
reduced from 110 seconds to 34 seconds.

The patch also introduces a few small improvements:
- `cs_run` renamed to `run_stress` for clarity
- Stopped checking if cluster is `ScyllaCluster`, since it is the only one we use
- `case_map` removed from `test_alter_table_in_parallel_to_read_and_write`, used `mixed` param directly
- Added explanation comment on why we do `data[i].append(None)`
- Replaced `alter_table` inner function with its body, for simplicity
- Removed unnecessary `ck_rows` variable in `populate`
- Removed unnecessary `isinstance(self.cluster. ScyllaCluster)`
- Adjusted `ThreadPoolExecutor` size in several places where 5 workers are not needed
- Replaced functional programming style expressions for `new_versions` and `columns_list` with
  comprehension/generator statement python style code, improving readability

Refs #26932

fix
2025-12-18 17:07:27 +01:00

691 lines
30 KiB
Python

#
# Copyright (C) 2015-present The Apache Software Foundation
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import functools
import logging
import string
import threading
import time
from concurrent import futures
from typing import NamedTuple
import pytest
from cassandra import AlreadyExists, ConsistencyLevel, InvalidRequest
from cassandra.concurrent import execute_concurrent_with_args
from cassandra.query import SimpleStatement, dict_factory
from concurrent.futures import ThreadPoolExecutor
from dtest_class import Tester, create_cf, create_ks, read_barrier
from tools.assertions import assert_all, assert_invalid
from tools.cluster_topology import generate_cluster_topology
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2, rows_to_list
logger = logging.getLogger(__name__)
class TestSchemaManagement(Tester):
def prepare(self, racks_num: int, has_config: bool = True):
cluster = self.cluster
cluster_topology = generate_cluster_topology(rack_num=racks_num)
if has_config:
config = {
"ring_delay_ms": 5000,
}
cluster.set_configuration_options(values=config)
cluster.populate(cluster_topology)
cluster.start(wait_other_notice=True)
return cluster
def test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema...")
create_ks(session, "ks", 3)
session.execute(
"""
CREATE TABLE users (
id int,
firstname text,
lastname text,
PRIMARY KEY (id)
);
"""
)
insert_statement = session.prepare("INSERT INTO users (id, firstname, lastname) VALUES (?, 'A', 'B')")
insert_statement.consistency_level = ConsistencyLevel.ALL
session.execute(insert_statement, [0])
logger.debug("Altering schema")
session.execute("ALTER TABLE users WITH comment = 'updated'")
logger.debug("Restarting node2")
node2.stop(gently=True)
node2.start(wait_for_binary_proto=True)
logger.debug("Restarting node3")
node3.stop(gently=True)
node3.start(wait_for_binary_proto=True, wait_other_notice=True)
n_partitions = 20
for i in range(n_partitions):
session.execute(insert_statement, [i])
rows = session.execute("SELECT * FROM users")
res = sorted(rows)
assert len(res) == n_partitions
for i in range(n_partitions):
expected = [i, "A", "B"]
assert list(res[i]) == expected, f"Expected {expected}, got {res[i]}"
def test_dropping_keyspace_with_many_columns(self):
"""
Exploits https://github.com/scylladb/scylla/issues/1484
"""
cluster = self.prepare(racks_num=1, has_config=False)
node1 = cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
session.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
for i in range(8):
session.execute(f"CREATE TABLE testxyz.test_{i} (k int, c int, PRIMARY KEY (k),)")
session.execute("drop keyspace testxyz")
for node in cluster.nodelist():
s = self.patient_cql_connection(node)
s.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
s.execute("drop keyspace testxyz")
def test_multiple_create_table_in_parallel(self):
"""
Run multiple create table statements via different nodes
1. Create a cluster of 3 nodes
2. Run create table with different table names in parallel - check all complete
3. Run create table with the same table name in parallel - check if they complete
"""
logger.debug("1. Create a cluster of 3 nodes")
nodes_count = 3
cluster = self.prepare(racks_num=nodes_count)
sessions = [self.patient_exclusive_cql_connection(node) for node in cluster.nodelist()]
ks = "ks"
create_ks(sessions[0], ks, nodes_count)
def create_table(session, table_name):
create_statement = f"CREATE TABLE {ks}.{table_name} (p int PRIMARY KEY, c0 text, c1 text, c2 text, c3 text, c4 text, c5 text, c6 text, c7 text, c8 text, c9 text);"
logger.debug(f"create_statement {create_statement}")
session.execute(create_statement)
logger.debug("2. Run create table with different table names in parallel - check all complete")
step2_tables = [f"t{i}" for i in range(nodes_count)]
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
list(executor.map(create_table, sessions, step2_tables))
for table in step2_tables:
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{table}", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
logger.debug("3. Run create table with the same table name in parallel - check if they complete")
step3_table = "test"
step3_tables = [step3_table for i in range(nodes_count)]
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
res_futures = [executor.submit(create_table, *args) for args in zip(sessions, step3_tables)]
for res_future in res_futures:
try:
res_future.result()
except AlreadyExists as e:
logger.info(f"expected cassandra.AlreadyExists error {e}")
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{step3_table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
sessions[0].execute(f"SELECT * FROM {ks}.{step3_table}")
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{step3_table}", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
@pytest.mark.parametrize("case", ("write", "read", "mixed"))
def test_alter_table_in_parallel_to_read_and_write(self, case):
"""
Create a table and write into while altering the table
1. Create a cluster of 3 nodes and populate a table
2. Run write/read/read_and_write" statement in a loop
3. Alter table while inserts are running
"""
logger.debug("1. Create a cluster of 3 nodes and populate a table")
cluster = self.prepare(racks_num=3)
col_number = 20
[node1, node2, node3] = cluster.nodelist()
session = self.patient_exclusive_cql_connection(node1)
def run_stress(stress_type, col=col_number - 2):
node2.stress_object([stress_type, "n=10000", "cl=QUORUM", "-schema", "replication(factor=3)", "-col", f"n=FIXED({col})", "-rate", "threads=1"])
logger.debug("Populate")
run_stress("write", col_number)
with ThreadPoolExecutor(max_workers=1) as executor:
logger.debug(f"2. Run {case} statement in a loop")
statement_future = executor.submit(functools.partial(run_stress, case))
logger.debug(f"let's {case} statement work some time")
time.sleep(2)
logger.debug("3. Alter table while inserts are running")
alter_statement = f'ALTER TABLE keyspace1.standard1 DROP ("C{col_number - 1}", "C{col_number - 2}")'
logger.debug(f"alter_statement {alter_statement}")
alter_result = session.execute(alter_statement)
logger.debug(alter_result.all())
logger.debug(f"wait till {case} statement finished")
statement_future.result()
rows = session.execute(SimpleStatement("SELECT * FROM keyspace1.standard1 LIMIT 1;", consistency_level=ConsistencyLevel.ALL))
assert len(rows_to_list(rows)[0]) == col_number - 1, f"Expected {col_number - 1} columns but got rows:{rows} instead"
logger.debug("read and check data")
run_stress("read")
@pytest.mark.skip("unimplemented")
def commitlog_replays_after_schema_change(self):
"""
Commitlog can be replayed even though schema has been changed
1. Create a table and insert data
2. Alter table
3. Kill node
4. Boot node and verify that commitlog have been replayed and that all data is restored
"""
raise NotImplementedError
@pytest.mark.parametrize("case", ("create_table", "alter_table", "drop_table"))
def test_update_schema_while_node_is_killed(self, case):
"""
Check that a node that is killed durring a table creation/alter/drop is able to rejoin and to synch on schema
"""
logger.debug("1. Create a cluster and insert data")
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
def create_table_case():
try:
logger.debug("Creating table")
create_c1c2_table(session)
logger.debug("Populating")
insert_c1c2(session, n=10)
except AlreadyExists:
# the CQL command can be called multiple time case of retries
pass
def alter_table_case():
try:
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
except InvalidRequest as exc:
# the CQL command can be called multiple time case of retries
assert "Invalid column name c3" in str(exc)
def drop_table_case():
try:
session.execute("DROP TABLE cf;", timeout=180)
except InvalidRequest as exc:
# the CQL command can be called multiple time case of retries
assert "Cannot drop non existing table" in str(exc)
logger.debug("Creating keyspace")
create_ks(session, "ks", 3)
if case != "create_table":
create_table_case()
case_map = {
"create_table": create_table_case,
"alter_table": alter_table_case,
"drop_table": drop_table_case,
}
with ThreadPoolExecutor(max_workers=1) as executor:
logger.debug(f"2. kill node during {case}")
kill_node_future = executor.submit(node2.stop, gently=False, wait_other_notice=True)
case_map[case]()
kill_node_future.result()
logger.debug("3. Start the stopped node2")
node2.start(wait_for_binary_proto=True)
session = self.patient_exclusive_cql_connection(node2)
read_barrier(session)
def create_or_alter_table_expected_result(col_mun):
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf LIMIT 1;", consistency_level=ConsistencyLevel.QUORUM))
assert len(rows_to_list(rows)[0]) == col_mun, f"Expected {col_mun} columns but got rows:{rows} instead"
for key in range(10):
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.QUORUM)
expected_case_result_map = {
"create_table": functools.partial(create_or_alter_table_expected_result, 3),
"alter_table": functools.partial(create_or_alter_table_expected_result, 4),
"drop_table": functools.partial(assert_invalid, session, "SELECT * FROM test1"),
}
logger.debug("verify that commitlog has been replayed and that all data is restored")
expected_case_result_map[case]()
@pytest.mark.parametrize("is_gently_stop", [True, False])
def test_nodes_rejoining_a_cluster_synch_on_schema(self, is_gently_stop):
"""
Nodes rejoining the cluster synch on schema changes
1. Create a cluster and insert data
2. Stop a node
3. Alter table
4. Insert additional data
5. Start the stopped node
6. Verify the stopped node synchs on the updated schema
"""
logger.debug("1. Create a cluster and insert data")
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
create_c1c2_table(session)
create_cf(session, "cf", key_name="p", key_type="int", columns={"v": "text"})
logger.debug("Populating")
insert_c1c2(session, n=10, consistency=ConsistencyLevel.ALL)
logger.debug("2 Stop a node1")
node1.stop(gently=is_gently_stop, wait_other_notice=True)
logger.debug("3 Alter table")
session = self.patient_cql_connection(node2)
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
logger.debug("4 Insert additional data")
session.execute(SimpleStatement("INSERT INTO ks.cf (key, c1, c2, c3) VALUES ('test', 'test', 'test', 'test')", consistency_level=ConsistencyLevel.QUORUM))
logger.debug("5. Start the stopped node1")
node1.start(wait_for_binary_proto=True)
logger.debug("6. Verify the stopped node synchs on the updated schema")
session = self.patient_exclusive_cql_connection(node1)
read_barrier(session)
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf WHERE key='test'", consistency_level=ConsistencyLevel.ALL))
expected = [["test", "test", "test", "test"]]
assert rows_to_list(rows) == expected, f"Expected {expected} but got {rows} instead"
for key in range(10):
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.ALL)
def test_reads_schema_recreated_while_node_down(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Populating")
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
logger.debug("Stopping node2")
node2.stop(gently=True)
logger.debug("Re-creating schema")
session.execute("DROP TABLE cf;")
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v1 bigint, v2 text);")
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_cql_connection(node2)
read_barrier(session2)
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
assert rows_to_list(rows) == [], f"Expected an empty result set, got {rows}"
def test_writes_schema_recreated_while_node_down(self):
cluster = self.prepare(racks_num=3)
[node1, node2, node3] = cluster.nodelist()
session = self.patient_cql_connection(node1)
logger.debug("Creating schema")
create_ks(session, "ks", 3)
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Populating")
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
logger.debug("Stopping node2")
node2.stop(gently=True, wait_other_notice=True)
logger.debug("Re-creating schema")
session.execute("DROP TABLE cf;")
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
logger.debug("Restarting node2")
node2.start(wait_for_binary_proto=True)
session2 = self.patient_cql_connection(node2)
read_barrier(session2)
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
expected = [[2, "2"]]
assert rows_to_list(rows) == expected, f"Expected {expected}, got {rows_to_list(rows)}"
class TestLargePartitionAlterSchema(Tester):
# Issue scylladb/scylla: #5135:
#
# Issue: Cache reads may miss some writes if schema alter followed by a read happened concurrently with preempted
# partition entry update
# Affects only tables with multi-row partitions, which are the only ones that can experience the update of partition
# entry being preempted.
#
# The scenario in which the problem could have happened has to involve:
# - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
# - appending writes to the partition (not overwrites)
# - scans of the partition
# - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
# column lands in the middle (in alphabetical order) of the old column set.
#
# Memtable flush has to happen after a schema alter concurrently with a read.
#
# The bug could result in cache corruption which manifests as some past writes being missing (not visible to reads).
PARTITIONS = 50
STRING_VALUE = string.ascii_lowercase
def prepare(self, cluster_topology: dict[str, dict[str, int]], rf: int):
if not self.cluster.nodelist():
self.cluster.populate(cluster_topology)
self.cluster.start(wait_other_notice=True)
node1 = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node=node1)
self.create_schema(session=session, rf=rf)
return session
def create_schema(self, session, rf):
logger.debug("Creating schema")
create_ks(session=session, name="ks", rf=rf)
session.execute(
"""
CREATE TABLE lp_table (
pk int,
ck1 int,
val1 text,
val2 text,
PRIMARY KEY (pk, ck1)
);
"""
)
def populate(self, session, data, ck_start, ck_end=None, stop_populating: threading.Event = None):
ck = ck_start
def _populate_loop():
nonlocal ck
while True:
if stop_populating is not None and stop_populating.is_set():
return
if ck_end is not None and ck >= ck_end:
return
for pk in range(self.PARTITIONS):
row = [pk, ck, self.STRING_VALUE, self.STRING_VALUE]
data.append(row)
yield tuple(row)
ck += 1
records_written = ck - ck_start
logger.debug(f"Start populate DB: {self.PARTITIONS} partitions with {ck_end - ck_start if ck_end else 'infinite'} records in each partition")
parameters = _populate_loop()
stmt = session.prepare("INSERT INTO lp_table (pk, ck1, val1, val2) VALUES (?, ?, ?, ?)")
execute_concurrent_with_args(session=session, statement=stmt, parameters=parameters, concurrency=100)
logger.debug(f"Finish populate DB: {self.PARTITIONS} partitions with {records_written} records in each partition")
return data
def read(self, session, ck_max, stop_reading: threading.Event = None):
def _read_loop():
while True:
for ck in range(ck_max):
for pk in range(self.PARTITIONS):
if stop_reading is not None and stop_reading.is_set():
return
session.execute(f"select * from lp_table where pk = {pk} and ck1 = {ck}")
if stop_reading is None:
return
logger.debug(f"Start reading..")
_read_loop()
logger.debug(f"Finish reading..")
def add_column(self, session, column_name, column_type):
logger.debug(f"Add {column_name} column")
session.execute(f"ALTER TABLE lp_table ADD {column_name} {column_type}")
def drop_column(self, session, column_name):
logger.debug(f"Drop {column_name} column")
session.execute(f"ALTER TABLE lp_table DROP {column_name}")
def test_large_partition_with_add_column(self):
cluster_topology = generate_cluster_topology()
session = self.prepare(cluster_topology, rf=1)
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
threads = []
timeout = 300
ck_end = 5000
if self.cluster.scylla_mode == "debug":
timeout = 900
ck_end = 500
with ThreadPoolExecutor(max_workers=2) as executor:
stop_populating = threading.Event()
stop_reading = threading.Event()
# Insert new rows in background
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
# Wait for running load
time.sleep(10)
self.add_column(session, "new_clmn", "int")
# Memtable flush has to happen after a schema alter concurrently with a read
logger.debug("Flush data")
self.cluster.nodelist()[0].flush()
# Stop populating and reading soon after flush
time.sleep(1)
logger.debug("Stop populating and reading")
stop_populating.set()
stop_reading.set()
for future in futures.as_completed(threads, timeout=timeout):
try:
future.result()
except Exception as exc: # noqa: BLE001
pytest.fail(f"Generated an exception: {exc}")
# Add 'null' values for the new column `new_clmn` in the expected data
for i, _ in enumerate(data):
data[i].append(None)
assert_all(session, f"select pk, ck1, val1, val2, new_clmn from lp_table", data, ignore_order=True, print_result_on_failure=False)
def test_large_partition_with_drop_column(self):
cluster_topology = generate_cluster_topology()
session = self.prepare(cluster_topology, rf=1)
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
threads = []
timeout = 300
ck_end = 5000
if self.cluster.scylla_mode == "debug":
timeout = 900
ck_end = 500
with ThreadPoolExecutor(max_workers=2) as executor:
stop_populating = threading.Event()
stop_reading = threading.Event()
# Insert new rows in background
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
# Wait for running load
time.sleep(10)
self.drop_column(session=session, column_name="val1")
# Memtable flush has to happen after a schema alter concurrently with a read
logger.debug("Flush data")
self.cluster.nodelist()[0].flush()
# Stop populating and reading soon after flush
time.sleep(1)
logger.debug("Stop populating and reading")
stop_populating.set()
stop_reading.set()
result = []
for future in futures.as_completed(threads, timeout=timeout):
try:
result.append(future.result())
except Exception as exc: # noqa: BLE001
# "Unknown identifier val1" is expected error
if not len(exc.args) or "Unknown identifier val1" not in exc.args[0]:
pytest.fail(f"Generated an exception: {exc}")
class HistoryVerifier:
def __init__(self, table_name="table1", keyspace_name="lwt_load_ks"):
"""
Initialize parameters for further verification of schema history.
:param table_name: table thats we change it's schema and verify schema history accordingly.
"""
self.table_name = table_name
self.keyspace_name = keyspace_name
self.versions = []
self.versions_dict = {}
self.query = ""
def verify(self, session, expected_current_diff, expected_prev_diff, query):
"""
Verify current schema history entry by comparing to previous schema entry.
:param session: python cql session
:param expected_current_diff: difference of current schema from previous schema
:param expected_prev_diff: difference of previous schema from current schema
:param query: The query that created new schema
"""
def get_table_id(session, keyspace_name, table_name):
assert keyspace_name, f"Input kesyspcase should have value, keyspace_name={keyspace_name}"
assert table_name, f"Input table_name should have value, table_name={table_name}"
query = "select keyspace_name,table_name,id from system_schema.tables"
query += f" WHERE keyspace_name='{keyspace_name}' AND table_name='{table_name}'"
current_rows = session.execute(query).current_rows
assert len(current_rows) == 1, f"Not found table description, ks={keyspace_name} table_name={table_name}"
res = current_rows[0]
return res["id"]
def read_schema_history_table(session, cf_id):
"""
read system.scylla_table_schema_history and verify current version diff from previous vesion
:param session: python cql session
:param cf_id: uuid of the table we changed it's schema
"""
query = f"select * from system.scylla_table_schema_history WHERE cf_id={cf_id}"
res = session.execute(query).current_rows
new_versions = list({
entry["schema_version"]
for entry in res
if str(entry["schema_version"]) not in self.versions
})
msg = f"Expect 1, got len(new_versions)={len(new_versions)}"
assert len(new_versions) == 1, msg
current_version = str(new_versions[0])
logger.debug(f"New schema_version {current_version} after executing '{self.query}'")
columns_list = (
{"column_name": entry["column_name"], "type": entry["type"]}
for entry in res
if entry["kind"] == "regular" and current_version == str(entry["schema_version"])
)
self.versions_dict[current_version] = {}
for item in columns_list:
self.versions_dict[current_version][item["column_name"]] = item["type"]
self.versions.append(current_version)
if len(self.versions) > 1:
current_id = self.versions[-1]
previous_id = self.versions[-2]
set_current = set(self.versions_dict[current_id].items())
set_previous = set(self.versions_dict[previous_id].items())
current_diff = set_current - set_previous
previous_diff = set_previous - set_current
msg1 = f"Expect diff(new schema,old schema) to be {expected_current_diff} got {current_diff}"
msg2 = f" query is '{self.query}' versions={current_id},{previous_id}"
if current_diff != expected_current_diff:
logger.debug(msg1 + msg2)
assert current_diff == expected_current_diff, msg1 + msg2
msg1 = f"Expect diff(old schema,new schema) to be {expected_prev_diff} got {previous_diff}"
assert previous_diff == expected_prev_diff, msg1 + msg2
self.query = query
cf_id = get_table_id(session, keyspace_name=self.keyspace_name, table_name=self.table_name)
read_schema_history_table(session, cf_id)
class DDL(NamedTuple):
ddl_command: str
expected_current_diff: set | None
expected_prev_diff: set | None
class TestSchemaHistory(Tester):
def prepare(self):
cluster = self.cluster
# in case support tablets and rf-rack-valid-keyspaces
# create cluster with 3 racks with 1 node in each rack
cluster_topology = generate_cluster_topology(rack_num=3)
rf = 3
cluster.populate(cluster_topology).start(wait_other_notice=True)
self.session = self.patient_cql_connection(self.cluster.nodelist()[0], row_factory=dict_factory)
create_ks(self.session, "lwt_load_ks", rf)
def test_schema_history_alter_table(self):
"""test schema history changes following alter table cql commands"""
self.prepare()
verifier = HistoryVerifier(table_name="table2")
queries_and_expected_diffs = [
DDL(ddl_command="CREATE TABLE IF NOT EXISTS lwt_load_ks.table2 (pk int PRIMARY KEY, v int, int_col int)", expected_current_diff=None, expected_prev_diff=None),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER v TYPE varint", expected_current_diff={("v", "varint")}, expected_prev_diff={("v", "int")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD (v2 int, v3 int)", expected_current_diff={("v2", "int"), ("v3", "int")}, expected_prev_diff=set()),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER int_col TYPE varint", expected_current_diff={("int_col", "varint")}, expected_prev_diff={("int_col", "int")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP int_col", expected_current_diff=set(), expected_prev_diff={("int_col", "varint")}),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD int_col bigint", expected_current_diff={("int_col", "bigint")}, expected_prev_diff=set()),
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP (int_col,v)", expected_current_diff=set(), expected_prev_diff={("int_col", "bigint"), ("v", "varint")}),
]
for ddl in queries_and_expected_diffs:
self.session.execute(ddl.ddl_command)
verifier.verify(self.session, ddl.expected_current_diff, ddl.expected_prev_diff, query=ddl.ddl_command)