mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-27 20:05:10 +00:00
The tests in `TestLargePartitionAlterSchema` are `test_large_partition_with_add_column` and `test_large_partition_with_drop_column`. These tests need to replicate the following conditions that led to a bug before a fix from around 5 years ago. The scenario in which the problem could have happened has to involve: - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition. - appending writes to the partition (not overwrites) - scans of the partition - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped column lands in the middle (in alphabetical order) of the old column set. The way the test is set up is: - fixed number of writes per populate call - fixed number of reads This has the following implications: - if the machine executing the test is fast, all the writes are done before the 10 seconds sleep - there are too many reads - most of them get executed after the test logic is done This patch solves these issues in the following way: - populate lazily generates write data, and stops when instructed by `stop_populating` event - read, which is done sequentially, stops when instructed by `stop_reading` event - number of max operations is increased significantly, but the operations are stopped 1 second after node flush; this makes sure there are enough operations during the test, but also that the test does not take unnecessary time Test execution time has been reduced severalfold. On dev machine the time the tests take is reduced from 110 seconds to 34 seconds. The patch also introduces a few small improvements: - `cs_run` renamed to `run_stress` for clarity - Stopped checking if cluster is `ScyllaCluster`, since it is the only one we use - `case_map` removed from `test_alter_table_in_parallel_to_read_and_write`, used `mixed` param directly - Added explanation comment on why we do `data[i].append(None)` - Replaced `alter_table` inner function with its body, for simplicity - Removed unnecessary `ck_rows` variable in `populate` - Removed unnecessary `isinstance(self.cluster. ScyllaCluster)` - Adjusted `ThreadPoolExecutor` size in several places where 5 workers are not needed - Replaced functional programming style expressions for `new_versions` and `columns_list` with comprehension/generator statement python style code, improving readability Refs #26932 fix
691 lines
30 KiB
Python
691 lines
30 KiB
Python
#
|
|
# Copyright (C) 2015-present The Apache Software Foundation
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import functools
|
|
import logging
|
|
import string
|
|
import threading
|
|
import time
|
|
from concurrent import futures
|
|
from typing import NamedTuple
|
|
|
|
import pytest
|
|
from cassandra import AlreadyExists, ConsistencyLevel, InvalidRequest
|
|
from cassandra.concurrent import execute_concurrent_with_args
|
|
from cassandra.query import SimpleStatement, dict_factory
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from dtest_class import Tester, create_cf, create_ks, read_barrier
|
|
from tools.assertions import assert_all, assert_invalid
|
|
from tools.cluster_topology import generate_cluster_topology
|
|
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2, rows_to_list
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class TestSchemaManagement(Tester):
|
|
def prepare(self, racks_num: int, has_config: bool = True):
|
|
cluster = self.cluster
|
|
cluster_topology = generate_cluster_topology(rack_num=racks_num)
|
|
|
|
if has_config:
|
|
config = {
|
|
"ring_delay_ms": 5000,
|
|
}
|
|
cluster.set_configuration_options(values=config)
|
|
|
|
cluster.populate(cluster_topology)
|
|
cluster.start(wait_other_notice=True)
|
|
|
|
return cluster
|
|
|
|
|
|
def test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns(self):
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
logger.debug("Creating schema...")
|
|
create_ks(session, "ks", 3)
|
|
session.execute(
|
|
"""
|
|
CREATE TABLE users (
|
|
id int,
|
|
firstname text,
|
|
lastname text,
|
|
PRIMARY KEY (id)
|
|
);
|
|
"""
|
|
)
|
|
|
|
insert_statement = session.prepare("INSERT INTO users (id, firstname, lastname) VALUES (?, 'A', 'B')")
|
|
insert_statement.consistency_level = ConsistencyLevel.ALL
|
|
session.execute(insert_statement, [0])
|
|
|
|
logger.debug("Altering schema")
|
|
session.execute("ALTER TABLE users WITH comment = 'updated'")
|
|
|
|
logger.debug("Restarting node2")
|
|
node2.stop(gently=True)
|
|
node2.start(wait_for_binary_proto=True)
|
|
|
|
logger.debug("Restarting node3")
|
|
node3.stop(gently=True)
|
|
node3.start(wait_for_binary_proto=True, wait_other_notice=True)
|
|
|
|
n_partitions = 20
|
|
for i in range(n_partitions):
|
|
session.execute(insert_statement, [i])
|
|
|
|
rows = session.execute("SELECT * FROM users")
|
|
res = sorted(rows)
|
|
assert len(res) == n_partitions
|
|
for i in range(n_partitions):
|
|
expected = [i, "A", "B"]
|
|
assert list(res[i]) == expected, f"Expected {expected}, got {res[i]}"
|
|
|
|
def test_dropping_keyspace_with_many_columns(self):
|
|
"""
|
|
Exploits https://github.com/scylladb/scylla/issues/1484
|
|
"""
|
|
cluster = self.prepare(racks_num=1, has_config=False)
|
|
|
|
node1 = cluster.nodelist()[0]
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
session.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
|
for i in range(8):
|
|
session.execute(f"CREATE TABLE testxyz.test_{i} (k int, c int, PRIMARY KEY (k),)")
|
|
session.execute("drop keyspace testxyz")
|
|
|
|
for node in cluster.nodelist():
|
|
s = self.patient_cql_connection(node)
|
|
s.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
|
s.execute("drop keyspace testxyz")
|
|
|
|
def test_multiple_create_table_in_parallel(self):
|
|
"""
|
|
Run multiple create table statements via different nodes
|
|
1. Create a cluster of 3 nodes
|
|
2. Run create table with different table names in parallel - check all complete
|
|
3. Run create table with the same table name in parallel - check if they complete
|
|
"""
|
|
logger.debug("1. Create a cluster of 3 nodes")
|
|
nodes_count = 3
|
|
cluster = self.prepare(racks_num=nodes_count)
|
|
sessions = [self.patient_exclusive_cql_connection(node) for node in cluster.nodelist()]
|
|
ks = "ks"
|
|
create_ks(sessions[0], ks, nodes_count)
|
|
|
|
def create_table(session, table_name):
|
|
create_statement = f"CREATE TABLE {ks}.{table_name} (p int PRIMARY KEY, c0 text, c1 text, c2 text, c3 text, c4 text, c5 text, c6 text, c7 text, c8 text, c9 text);"
|
|
logger.debug(f"create_statement {create_statement}")
|
|
session.execute(create_statement)
|
|
|
|
logger.debug("2. Run create table with different table names in parallel - check all complete")
|
|
step2_tables = [f"t{i}" for i in range(nodes_count)]
|
|
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
|
list(executor.map(create_table, sessions, step2_tables))
|
|
|
|
for table in step2_tables:
|
|
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
|
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{table}", consistency_level=ConsistencyLevel.ALL))
|
|
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
|
|
|
logger.debug("3. Run create table with the same table name in parallel - check if they complete")
|
|
step3_table = "test"
|
|
step3_tables = [step3_table for i in range(nodes_count)]
|
|
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
|
res_futures = [executor.submit(create_table, *args) for args in zip(sessions, step3_tables)]
|
|
for res_future in res_futures:
|
|
try:
|
|
res_future.result()
|
|
except AlreadyExists as e:
|
|
logger.info(f"expected cassandra.AlreadyExists error {e}")
|
|
|
|
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{step3_table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
|
sessions[0].execute(f"SELECT * FROM {ks}.{step3_table}")
|
|
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{step3_table}", consistency_level=ConsistencyLevel.ALL))
|
|
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
|
|
|
@pytest.mark.parametrize("case", ("write", "read", "mixed"))
|
|
def test_alter_table_in_parallel_to_read_and_write(self, case):
|
|
"""
|
|
Create a table and write into while altering the table
|
|
1. Create a cluster of 3 nodes and populate a table
|
|
2. Run write/read/read_and_write" statement in a loop
|
|
3. Alter table while inserts are running
|
|
"""
|
|
logger.debug("1. Create a cluster of 3 nodes and populate a table")
|
|
cluster = self.prepare(racks_num=3)
|
|
col_number = 20
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
session = self.patient_exclusive_cql_connection(node1)
|
|
|
|
def run_stress(stress_type, col=col_number - 2):
|
|
node2.stress_object([stress_type, "n=10000", "cl=QUORUM", "-schema", "replication(factor=3)", "-col", f"n=FIXED({col})", "-rate", "threads=1"])
|
|
|
|
logger.debug("Populate")
|
|
run_stress("write", col_number)
|
|
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
logger.debug(f"2. Run {case} statement in a loop")
|
|
statement_future = executor.submit(functools.partial(run_stress, case))
|
|
|
|
logger.debug(f"let's {case} statement work some time")
|
|
time.sleep(2)
|
|
|
|
logger.debug("3. Alter table while inserts are running")
|
|
alter_statement = f'ALTER TABLE keyspace1.standard1 DROP ("C{col_number - 1}", "C{col_number - 2}")'
|
|
logger.debug(f"alter_statement {alter_statement}")
|
|
alter_result = session.execute(alter_statement)
|
|
logger.debug(alter_result.all())
|
|
|
|
logger.debug(f"wait till {case} statement finished")
|
|
statement_future.result()
|
|
|
|
rows = session.execute(SimpleStatement("SELECT * FROM keyspace1.standard1 LIMIT 1;", consistency_level=ConsistencyLevel.ALL))
|
|
assert len(rows_to_list(rows)[0]) == col_number - 1, f"Expected {col_number - 1} columns but got rows:{rows} instead"
|
|
|
|
logger.debug("read and check data")
|
|
run_stress("read")
|
|
|
|
@pytest.mark.skip("unimplemented")
|
|
def commitlog_replays_after_schema_change(self):
|
|
"""
|
|
Commitlog can be replayed even though schema has been changed
|
|
1. Create a table and insert data
|
|
2. Alter table
|
|
3. Kill node
|
|
4. Boot node and verify that commitlog have been replayed and that all data is restored
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
@pytest.mark.parametrize("case", ("create_table", "alter_table", "drop_table"))
|
|
def test_update_schema_while_node_is_killed(self, case):
|
|
"""
|
|
Check that a node that is killed durring a table creation/alter/drop is able to rejoin and to synch on schema
|
|
"""
|
|
|
|
logger.debug("1. Create a cluster and insert data")
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
def create_table_case():
|
|
try:
|
|
logger.debug("Creating table")
|
|
create_c1c2_table(session)
|
|
logger.debug("Populating")
|
|
insert_c1c2(session, n=10)
|
|
except AlreadyExists:
|
|
# the CQL command can be called multiple time case of retries
|
|
pass
|
|
|
|
def alter_table_case():
|
|
try:
|
|
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
|
except InvalidRequest as exc:
|
|
# the CQL command can be called multiple time case of retries
|
|
assert "Invalid column name c3" in str(exc)
|
|
|
|
def drop_table_case():
|
|
try:
|
|
session.execute("DROP TABLE cf;", timeout=180)
|
|
except InvalidRequest as exc:
|
|
# the CQL command can be called multiple time case of retries
|
|
assert "Cannot drop non existing table" in str(exc)
|
|
|
|
logger.debug("Creating keyspace")
|
|
create_ks(session, "ks", 3)
|
|
if case != "create_table":
|
|
create_table_case()
|
|
|
|
case_map = {
|
|
"create_table": create_table_case,
|
|
"alter_table": alter_table_case,
|
|
"drop_table": drop_table_case,
|
|
}
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
logger.debug(f"2. kill node during {case}")
|
|
kill_node_future = executor.submit(node2.stop, gently=False, wait_other_notice=True)
|
|
case_map[case]()
|
|
kill_node_future.result()
|
|
|
|
logger.debug("3. Start the stopped node2")
|
|
node2.start(wait_for_binary_proto=True)
|
|
|
|
session = self.patient_exclusive_cql_connection(node2)
|
|
read_barrier(session)
|
|
|
|
def create_or_alter_table_expected_result(col_mun):
|
|
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf LIMIT 1;", consistency_level=ConsistencyLevel.QUORUM))
|
|
assert len(rows_to_list(rows)[0]) == col_mun, f"Expected {col_mun} columns but got rows:{rows} instead"
|
|
for key in range(10):
|
|
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.QUORUM)
|
|
|
|
expected_case_result_map = {
|
|
"create_table": functools.partial(create_or_alter_table_expected_result, 3),
|
|
"alter_table": functools.partial(create_or_alter_table_expected_result, 4),
|
|
"drop_table": functools.partial(assert_invalid, session, "SELECT * FROM test1"),
|
|
}
|
|
logger.debug("verify that commitlog has been replayed and that all data is restored")
|
|
expected_case_result_map[case]()
|
|
|
|
@pytest.mark.parametrize("is_gently_stop", [True, False])
|
|
def test_nodes_rejoining_a_cluster_synch_on_schema(self, is_gently_stop):
|
|
"""
|
|
Nodes rejoining the cluster synch on schema changes
|
|
1. Create a cluster and insert data
|
|
2. Stop a node
|
|
3. Alter table
|
|
4. Insert additional data
|
|
5. Start the stopped node
|
|
6. Verify the stopped node synchs on the updated schema
|
|
"""
|
|
|
|
logger.debug("1. Create a cluster and insert data")
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
logger.debug("Creating schema")
|
|
create_ks(session, "ks", 3)
|
|
create_c1c2_table(session)
|
|
create_cf(session, "cf", key_name="p", key_type="int", columns={"v": "text"})
|
|
|
|
logger.debug("Populating")
|
|
insert_c1c2(session, n=10, consistency=ConsistencyLevel.ALL)
|
|
|
|
logger.debug("2 Stop a node1")
|
|
node1.stop(gently=is_gently_stop, wait_other_notice=True)
|
|
|
|
logger.debug("3 Alter table")
|
|
session = self.patient_cql_connection(node2)
|
|
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
|
|
|
logger.debug("4 Insert additional data")
|
|
session.execute(SimpleStatement("INSERT INTO ks.cf (key, c1, c2, c3) VALUES ('test', 'test', 'test', 'test')", consistency_level=ConsistencyLevel.QUORUM))
|
|
|
|
logger.debug("5. Start the stopped node1")
|
|
node1.start(wait_for_binary_proto=True)
|
|
|
|
logger.debug("6. Verify the stopped node synchs on the updated schema")
|
|
session = self.patient_exclusive_cql_connection(node1)
|
|
read_barrier(session)
|
|
|
|
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf WHERE key='test'", consistency_level=ConsistencyLevel.ALL))
|
|
expected = [["test", "test", "test", "test"]]
|
|
assert rows_to_list(rows) == expected, f"Expected {expected} but got {rows} instead"
|
|
for key in range(10):
|
|
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.ALL)
|
|
|
|
def test_reads_schema_recreated_while_node_down(self):
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
logger.debug("Creating schema")
|
|
create_ks(session, "ks", 3)
|
|
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
|
|
|
logger.debug("Populating")
|
|
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
|
|
|
logger.debug("Stopping node2")
|
|
node2.stop(gently=True)
|
|
|
|
logger.debug("Re-creating schema")
|
|
session.execute("DROP TABLE cf;")
|
|
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v1 bigint, v2 text);")
|
|
|
|
logger.debug("Restarting node2")
|
|
node2.start(wait_for_binary_proto=True)
|
|
session2 = self.patient_cql_connection(node2)
|
|
read_barrier(session2)
|
|
|
|
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
|
assert rows_to_list(rows) == [], f"Expected an empty result set, got {rows}"
|
|
|
|
def test_writes_schema_recreated_while_node_down(self):
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
logger.debug("Creating schema")
|
|
create_ks(session, "ks", 3)
|
|
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
|
|
|
logger.debug("Populating")
|
|
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
|
|
|
logger.debug("Stopping node2")
|
|
node2.stop(gently=True, wait_other_notice=True)
|
|
|
|
logger.debug("Re-creating schema")
|
|
session.execute("DROP TABLE cf;")
|
|
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
|
|
|
logger.debug("Restarting node2")
|
|
node2.start(wait_for_binary_proto=True)
|
|
session2 = self.patient_cql_connection(node2)
|
|
read_barrier(session2)
|
|
|
|
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))
|
|
|
|
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
|
expected = [[2, "2"]]
|
|
assert rows_to_list(rows) == expected, f"Expected {expected}, got {rows_to_list(rows)}"
|
|
|
|
|
|
class TestLargePartitionAlterSchema(Tester):
|
|
# Issue scylladb/scylla: #5135:
|
|
#
|
|
# Issue: Cache reads may miss some writes if schema alter followed by a read happened concurrently with preempted
|
|
# partition entry update
|
|
# Affects only tables with multi-row partitions, which are the only ones that can experience the update of partition
|
|
# entry being preempted.
|
|
#
|
|
# The scenario in which the problem could have happened has to involve:
|
|
# - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
|
|
# - appending writes to the partition (not overwrites)
|
|
# - scans of the partition
|
|
# - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
|
|
# column lands in the middle (in alphabetical order) of the old column set.
|
|
#
|
|
# Memtable flush has to happen after a schema alter concurrently with a read.
|
|
#
|
|
# The bug could result in cache corruption which manifests as some past writes being missing (not visible to reads).
|
|
|
|
PARTITIONS = 50
|
|
STRING_VALUE = string.ascii_lowercase
|
|
|
|
def prepare(self, cluster_topology: dict[str, dict[str, int]], rf: int):
|
|
if not self.cluster.nodelist():
|
|
self.cluster.populate(cluster_topology)
|
|
self.cluster.start(wait_other_notice=True)
|
|
|
|
node1 = self.cluster.nodelist()[0]
|
|
session = self.patient_cql_connection(node=node1)
|
|
self.create_schema(session=session, rf=rf)
|
|
|
|
return session
|
|
|
|
def create_schema(self, session, rf):
|
|
logger.debug("Creating schema")
|
|
create_ks(session=session, name="ks", rf=rf)
|
|
|
|
session.execute(
|
|
"""
|
|
CREATE TABLE lp_table (
|
|
pk int,
|
|
ck1 int,
|
|
val1 text,
|
|
val2 text,
|
|
PRIMARY KEY (pk, ck1)
|
|
);
|
|
"""
|
|
)
|
|
|
|
def populate(self, session, data, ck_start, ck_end=None, stop_populating: threading.Event = None):
|
|
ck = ck_start
|
|
def _populate_loop():
|
|
nonlocal ck
|
|
while True:
|
|
if stop_populating is not None and stop_populating.is_set():
|
|
return
|
|
if ck_end is not None and ck >= ck_end:
|
|
return
|
|
for pk in range(self.PARTITIONS):
|
|
row = [pk, ck, self.STRING_VALUE, self.STRING_VALUE]
|
|
data.append(row)
|
|
yield tuple(row)
|
|
ck += 1
|
|
|
|
records_written = ck - ck_start
|
|
|
|
logger.debug(f"Start populate DB: {self.PARTITIONS} partitions with {ck_end - ck_start if ck_end else 'infinite'} records in each partition")
|
|
|
|
parameters = _populate_loop()
|
|
|
|
stmt = session.prepare("INSERT INTO lp_table (pk, ck1, val1, val2) VALUES (?, ?, ?, ?)")
|
|
|
|
execute_concurrent_with_args(session=session, statement=stmt, parameters=parameters, concurrency=100)
|
|
logger.debug(f"Finish populate DB: {self.PARTITIONS} partitions with {records_written} records in each partition")
|
|
return data
|
|
|
|
def read(self, session, ck_max, stop_reading: threading.Event = None):
|
|
def _read_loop():
|
|
while True:
|
|
for ck in range(ck_max):
|
|
for pk in range(self.PARTITIONS):
|
|
if stop_reading is not None and stop_reading.is_set():
|
|
return
|
|
session.execute(f"select * from lp_table where pk = {pk} and ck1 = {ck}")
|
|
if stop_reading is None:
|
|
return
|
|
|
|
logger.debug(f"Start reading..")
|
|
_read_loop()
|
|
logger.debug(f"Finish reading..")
|
|
|
|
def add_column(self, session, column_name, column_type):
|
|
logger.debug(f"Add {column_name} column")
|
|
session.execute(f"ALTER TABLE lp_table ADD {column_name} {column_type}")
|
|
|
|
def drop_column(self, session, column_name):
|
|
logger.debug(f"Drop {column_name} column")
|
|
session.execute(f"ALTER TABLE lp_table DROP {column_name}")
|
|
|
|
def test_large_partition_with_add_column(self):
|
|
cluster_topology = generate_cluster_topology()
|
|
session = self.prepare(cluster_topology, rf=1)
|
|
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
|
|
|
threads = []
|
|
timeout = 300
|
|
ck_end = 5000
|
|
if self.cluster.scylla_mode == "debug":
|
|
timeout = 900
|
|
ck_end = 500
|
|
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
stop_populating = threading.Event()
|
|
stop_reading = threading.Event()
|
|
# Insert new rows in background
|
|
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
|
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
|
# Wait for running load
|
|
time.sleep(10)
|
|
self.add_column(session, "new_clmn", "int")
|
|
|
|
# Memtable flush has to happen after a schema alter concurrently with a read
|
|
logger.debug("Flush data")
|
|
self.cluster.nodelist()[0].flush()
|
|
|
|
# Stop populating and reading soon after flush
|
|
time.sleep(1)
|
|
logger.debug("Stop populating and reading")
|
|
stop_populating.set()
|
|
stop_reading.set()
|
|
|
|
for future in futures.as_completed(threads, timeout=timeout):
|
|
try:
|
|
future.result()
|
|
except Exception as exc: # noqa: BLE001
|
|
pytest.fail(f"Generated an exception: {exc}")
|
|
|
|
# Add 'null' values for the new column `new_clmn` in the expected data
|
|
for i, _ in enumerate(data):
|
|
data[i].append(None)
|
|
|
|
assert_all(session, f"select pk, ck1, val1, val2, new_clmn from lp_table", data, ignore_order=True, print_result_on_failure=False)
|
|
|
|
def test_large_partition_with_drop_column(self):
|
|
cluster_topology = generate_cluster_topology()
|
|
session = self.prepare(cluster_topology, rf=1)
|
|
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
|
|
|
threads = []
|
|
timeout = 300
|
|
ck_end = 5000
|
|
if self.cluster.scylla_mode == "debug":
|
|
timeout = 900
|
|
ck_end = 500
|
|
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
stop_populating = threading.Event()
|
|
stop_reading = threading.Event()
|
|
# Insert new rows in background
|
|
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
|
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
|
# Wait for running load
|
|
time.sleep(10)
|
|
self.drop_column(session=session, column_name="val1")
|
|
|
|
# Memtable flush has to happen after a schema alter concurrently with a read
|
|
logger.debug("Flush data")
|
|
self.cluster.nodelist()[0].flush()
|
|
|
|
# Stop populating and reading soon after flush
|
|
time.sleep(1)
|
|
logger.debug("Stop populating and reading")
|
|
stop_populating.set()
|
|
stop_reading.set()
|
|
|
|
result = []
|
|
for future in futures.as_completed(threads, timeout=timeout):
|
|
try:
|
|
result.append(future.result())
|
|
except Exception as exc: # noqa: BLE001
|
|
# "Unknown identifier val1" is expected error
|
|
if not len(exc.args) or "Unknown identifier val1" not in exc.args[0]:
|
|
pytest.fail(f"Generated an exception: {exc}")
|
|
|
|
|
|
class HistoryVerifier:
|
|
def __init__(self, table_name="table1", keyspace_name="lwt_load_ks"):
|
|
"""
|
|
Initialize parameters for further verification of schema history.
|
|
:param table_name: table thats we change it's schema and verify schema history accordingly.
|
|
"""
|
|
|
|
self.table_name = table_name
|
|
self.keyspace_name = keyspace_name
|
|
self.versions = []
|
|
self.versions_dict = {}
|
|
self.query = ""
|
|
|
|
def verify(self, session, expected_current_diff, expected_prev_diff, query):
|
|
"""
|
|
Verify current schema history entry by comparing to previous schema entry.
|
|
:param session: python cql session
|
|
:param expected_current_diff: difference of current schema from previous schema
|
|
:param expected_prev_diff: difference of previous schema from current schema
|
|
:param query: The query that created new schema
|
|
"""
|
|
|
|
def get_table_id(session, keyspace_name, table_name):
|
|
assert keyspace_name, f"Input kesyspcase should have value, keyspace_name={keyspace_name}"
|
|
assert table_name, f"Input table_name should have value, table_name={table_name}"
|
|
query = "select keyspace_name,table_name,id from system_schema.tables"
|
|
query += f" WHERE keyspace_name='{keyspace_name}' AND table_name='{table_name}'"
|
|
current_rows = session.execute(query).current_rows
|
|
assert len(current_rows) == 1, f"Not found table description, ks={keyspace_name} table_name={table_name}"
|
|
res = current_rows[0]
|
|
return res["id"]
|
|
|
|
def read_schema_history_table(session, cf_id):
|
|
"""
|
|
read system.scylla_table_schema_history and verify current version diff from previous vesion
|
|
:param session: python cql session
|
|
:param cf_id: uuid of the table we changed it's schema
|
|
"""
|
|
|
|
query = f"select * from system.scylla_table_schema_history WHERE cf_id={cf_id}"
|
|
res = session.execute(query).current_rows
|
|
new_versions = list({
|
|
entry["schema_version"]
|
|
for entry in res
|
|
if str(entry["schema_version"]) not in self.versions
|
|
})
|
|
msg = f"Expect 1, got len(new_versions)={len(new_versions)}"
|
|
assert len(new_versions) == 1, msg
|
|
current_version = str(new_versions[0])
|
|
logger.debug(f"New schema_version {current_version} after executing '{self.query}'")
|
|
columns_list = (
|
|
{"column_name": entry["column_name"], "type": entry["type"]}
|
|
for entry in res
|
|
if entry["kind"] == "regular" and current_version == str(entry["schema_version"])
|
|
)
|
|
self.versions_dict[current_version] = {}
|
|
for item in columns_list:
|
|
self.versions_dict[current_version][item["column_name"]] = item["type"]
|
|
|
|
self.versions.append(current_version)
|
|
if len(self.versions) > 1:
|
|
current_id = self.versions[-1]
|
|
previous_id = self.versions[-2]
|
|
set_current = set(self.versions_dict[current_id].items())
|
|
set_previous = set(self.versions_dict[previous_id].items())
|
|
current_diff = set_current - set_previous
|
|
previous_diff = set_previous - set_current
|
|
msg1 = f"Expect diff(new schema,old schema) to be {expected_current_diff} got {current_diff}"
|
|
msg2 = f" query is '{self.query}' versions={current_id},{previous_id}"
|
|
if current_diff != expected_current_diff:
|
|
logger.debug(msg1 + msg2)
|
|
assert current_diff == expected_current_diff, msg1 + msg2
|
|
msg1 = f"Expect diff(old schema,new schema) to be {expected_prev_diff} got {previous_diff}"
|
|
assert previous_diff == expected_prev_diff, msg1 + msg2
|
|
|
|
self.query = query
|
|
cf_id = get_table_id(session, keyspace_name=self.keyspace_name, table_name=self.table_name)
|
|
read_schema_history_table(session, cf_id)
|
|
|
|
|
|
class DDL(NamedTuple):
|
|
ddl_command: str
|
|
expected_current_diff: set | None
|
|
expected_prev_diff: set | None
|
|
|
|
|
|
class TestSchemaHistory(Tester):
|
|
def prepare(self):
|
|
cluster = self.cluster
|
|
# in case support tablets and rf-rack-valid-keyspaces
|
|
# create cluster with 3 racks with 1 node in each rack
|
|
cluster_topology = generate_cluster_topology(rack_num=3)
|
|
rf = 3
|
|
cluster.populate(cluster_topology).start(wait_other_notice=True)
|
|
self.session = self.patient_cql_connection(self.cluster.nodelist()[0], row_factory=dict_factory)
|
|
create_ks(self.session, "lwt_load_ks", rf)
|
|
|
|
def test_schema_history_alter_table(self):
|
|
"""test schema history changes following alter table cql commands"""
|
|
self.prepare()
|
|
verifier = HistoryVerifier(table_name="table2")
|
|
queries_and_expected_diffs = [
|
|
DDL(ddl_command="CREATE TABLE IF NOT EXISTS lwt_load_ks.table2 (pk int PRIMARY KEY, v int, int_col int)", expected_current_diff=None, expected_prev_diff=None),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER v TYPE varint", expected_current_diff={("v", "varint")}, expected_prev_diff={("v", "int")}),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD (v2 int, v3 int)", expected_current_diff={("v2", "int"), ("v3", "int")}, expected_prev_diff=set()),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER int_col TYPE varint", expected_current_diff={("int_col", "varint")}, expected_prev_diff={("int_col", "int")}),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP int_col", expected_current_diff=set(), expected_prev_diff={("int_col", "varint")}),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD int_col bigint", expected_current_diff={("int_col", "bigint")}, expected_prev_diff=set()),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP (int_col,v)", expected_current_diff=set(), expected_prev_diff={("int_col", "bigint"), ("v", "varint")}),
|
|
]
|
|
for ddl in queries_and_expected_diffs:
|
|
self.session.execute(ddl.ddl_command)
|
|
verifier.verify(self.session, ddl.expected_current_diff, ddl.expected_prev_diff, query=ddl.ddl_command)
|