mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-22 17:40:34 +00:00
`read_barrier(session2)` was supposed to ensure `node2` has caught up on schema before a CL=ALL write. But `patient_cql_connection(node2)` creates a cluster-aware driver session `(TokenAwarePolicy(DCAwareRoundRobinPolicy()))` that can route the barrier CQL statement to any node — not necessarily `node2`. If the barrier runs on `node1` or `node3` (which already have the new schema), it's a no-op, and `node2` remains stale, thus the observed `WriteFailure`. The fix is to switch to `patient_exclusive_cql_connection(node2)`, which uses `WhiteListRoundRobinPolicy([node2_ip])` to pin all CQL to `node2`. This is already the established pattern used by other tests in the same file. Fixes: SCYLLADB-1139 No need to backport yet, appeared only on master. Closes scylladb/scylladb#29151
691 lines
30 KiB
Python
691 lines
30 KiB
Python
#
|
|
# Copyright (C) 2015-present The Apache Software Foundation
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
import functools
|
|
import logging
|
|
import string
|
|
import threading
|
|
import time
|
|
from concurrent import futures
|
|
from typing import NamedTuple
|
|
|
|
import pytest
|
|
from cassandra import AlreadyExists, ConsistencyLevel, InvalidRequest
|
|
from cassandra.concurrent import execute_concurrent_with_args
|
|
from cassandra.query import SimpleStatement, dict_factory
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from dtest_class import Tester, create_cf, create_ks, read_barrier
|
|
from tools.assertions import assert_all, assert_invalid
|
|
from tools.cluster_topology import generate_cluster_topology
|
|
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2, rows_to_list
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class TestSchemaManagement(Tester):
|
|
def prepare(self, racks_num: int, has_config: bool = True):
|
|
cluster = self.cluster
|
|
cluster_topology = generate_cluster_topology(rack_num=racks_num)
|
|
|
|
if has_config:
|
|
config = {
|
|
"ring_delay_ms": 5000,
|
|
}
|
|
cluster.set_configuration_options(values=config)
|
|
|
|
cluster.populate(cluster_topology)
|
|
cluster.start(wait_other_notice=True)
|
|
|
|
return cluster
|
|
|
|
|
|
def test_prepared_statements_work_after_node_restart_after_altering_schema_without_changing_columns(self):
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
logger.debug("Creating schema...")
|
|
create_ks(session, "ks", 3)
|
|
session.execute(
|
|
"""
|
|
CREATE TABLE users (
|
|
id int,
|
|
firstname text,
|
|
lastname text,
|
|
PRIMARY KEY (id)
|
|
);
|
|
"""
|
|
)
|
|
|
|
insert_statement = session.prepare("INSERT INTO users (id, firstname, lastname) VALUES (?, 'A', 'B')")
|
|
insert_statement.consistency_level = ConsistencyLevel.ALL
|
|
session.execute(insert_statement, [0])
|
|
|
|
logger.debug("Altering schema")
|
|
session.execute("ALTER TABLE users WITH comment = 'updated'")
|
|
|
|
logger.debug("Restarting node2")
|
|
node2.stop(gently=True)
|
|
node2.start(wait_for_binary_proto=True)
|
|
|
|
logger.debug("Restarting node3")
|
|
node3.stop(gently=True)
|
|
node3.start(wait_for_binary_proto=True, wait_other_notice=True)
|
|
|
|
n_partitions = 20
|
|
for i in range(n_partitions):
|
|
session.execute(insert_statement, [i])
|
|
|
|
rows = session.execute("SELECT * FROM users")
|
|
res = sorted(rows)
|
|
assert len(res) == n_partitions
|
|
for i in range(n_partitions):
|
|
expected = [i, "A", "B"]
|
|
assert list(res[i]) == expected, f"Expected {expected}, got {res[i]}"
|
|
|
|
def test_dropping_keyspace_with_many_columns(self):
|
|
"""
|
|
Exploits https://github.com/scylladb/scylla/issues/1484
|
|
"""
|
|
cluster = self.prepare(racks_num=1, has_config=False)
|
|
|
|
node1 = cluster.nodelist()[0]
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
session.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
|
for i in range(8):
|
|
session.execute(f"CREATE TABLE testxyz.test_{i} (k int, c int, PRIMARY KEY (k),)")
|
|
session.execute("drop keyspace testxyz")
|
|
|
|
for node in cluster.nodelist():
|
|
s = self.patient_cql_connection(node)
|
|
s.execute("CREATE KEYSPACE testxyz WITH replication = { 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
|
|
s.execute("drop keyspace testxyz")
|
|
|
|
def test_multiple_create_table_in_parallel(self):
|
|
"""
|
|
Run multiple create table statements via different nodes
|
|
1. Create a cluster of 3 nodes
|
|
2. Run create table with different table names in parallel - check all complete
|
|
3. Run create table with the same table name in parallel - check if they complete
|
|
"""
|
|
logger.debug("1. Create a cluster of 3 nodes")
|
|
nodes_count = 3
|
|
cluster = self.prepare(racks_num=nodes_count)
|
|
sessions = [self.patient_exclusive_cql_connection(node) for node in cluster.nodelist()]
|
|
ks = "ks"
|
|
create_ks(sessions[0], ks, nodes_count)
|
|
|
|
def create_table(session, table_name):
|
|
create_statement = f"CREATE TABLE {ks}.{table_name} (p int PRIMARY KEY, c0 text, c1 text, c2 text, c3 text, c4 text, c5 text, c6 text, c7 text, c8 text, c9 text);"
|
|
logger.debug(f"create_statement {create_statement}")
|
|
session.execute(create_statement)
|
|
|
|
logger.debug("2. Run create table with different table names in parallel - check all complete")
|
|
step2_tables = [f"t{i}" for i in range(nodes_count)]
|
|
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
|
list(executor.map(create_table, sessions, step2_tables))
|
|
|
|
for table in step2_tables:
|
|
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
|
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{table}", consistency_level=ConsistencyLevel.ALL))
|
|
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
|
|
|
logger.debug("3. Run create table with the same table name in parallel - check if they complete")
|
|
step3_table = "test"
|
|
step3_tables = [step3_table for i in range(nodes_count)]
|
|
with ThreadPoolExecutor(max_workers=nodes_count) as executor:
|
|
res_futures = [executor.submit(create_table, *args) for args in zip(sessions, step3_tables)]
|
|
for res_future in res_futures:
|
|
try:
|
|
res_future.result()
|
|
except AlreadyExists as e:
|
|
logger.info(f"expected cassandra.AlreadyExists error {e}")
|
|
|
|
sessions[0].execute(SimpleStatement(f"INSERT INTO {ks}.{step3_table} (p) VALUES (1)", consistency_level=ConsistencyLevel.ALL))
|
|
sessions[0].execute(f"SELECT * FROM {ks}.{step3_table}")
|
|
rows = sessions[0].execute(SimpleStatement(f"SELECT * FROM {ks}.{step3_table}", consistency_level=ConsistencyLevel.ALL))
|
|
assert len(rows_to_list(rows)) == 1, f"Expected 1 row but got rows:{rows} instead"
|
|
|
|
@pytest.mark.parametrize("case", ("write", "read", "mixed"))
|
|
def test_alter_table_in_parallel_to_read_and_write(self, case):
|
|
"""
|
|
Create a table and write into while altering the table
|
|
1. Create a cluster of 3 nodes and populate a table
|
|
2. Run write/read/read_and_write" statement in a loop
|
|
3. Alter table while inserts are running
|
|
"""
|
|
logger.debug("1. Create a cluster of 3 nodes and populate a table")
|
|
cluster = self.prepare(racks_num=3)
|
|
col_number = 20
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
session = self.patient_exclusive_cql_connection(node1)
|
|
|
|
def run_stress(stress_type, col=col_number - 2):
|
|
node2.stress_object([stress_type, "n=10000", "cl=QUORUM", "-schema", "replication(factor=3)", "-col", f"n=FIXED({col})", "-rate", "threads=1"])
|
|
|
|
logger.debug("Populate")
|
|
run_stress("write", col_number)
|
|
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
logger.debug(f"2. Run {case} statement in a loop")
|
|
statement_future = executor.submit(functools.partial(run_stress, case))
|
|
|
|
logger.debug(f"let's {case} statement work some time")
|
|
time.sleep(2)
|
|
|
|
logger.debug("3. Alter table while inserts are running")
|
|
alter_statement = f'ALTER TABLE keyspace1.standard1 DROP ("C{col_number - 1}", "C{col_number - 2}")'
|
|
logger.debug(f"alter_statement {alter_statement}")
|
|
alter_result = session.execute(alter_statement)
|
|
logger.debug(alter_result.all())
|
|
|
|
logger.debug(f"wait till {case} statement finished")
|
|
statement_future.result()
|
|
|
|
rows = session.execute(SimpleStatement("SELECT * FROM keyspace1.standard1 LIMIT 1;", consistency_level=ConsistencyLevel.ALL))
|
|
assert len(rows_to_list(rows)[0]) == col_number - 1, f"Expected {col_number - 1} columns but got rows:{rows} instead"
|
|
|
|
logger.debug("read and check data")
|
|
run_stress("read")
|
|
|
|
@pytest.mark.skip("unimplemented")
|
|
def commitlog_replays_after_schema_change(self):
|
|
"""
|
|
Commitlog can be replayed even though schema has been changed
|
|
1. Create a table and insert data
|
|
2. Alter table
|
|
3. Kill node
|
|
4. Boot node and verify that commitlog have been replayed and that all data is restored
|
|
"""
|
|
raise NotImplementedError
|
|
|
|
@pytest.mark.parametrize("case", ("create_table", "alter_table", "drop_table"))
|
|
def test_update_schema_while_node_is_killed(self, case):
|
|
"""
|
|
Check that a node that is killed durring a table creation/alter/drop is able to rejoin and to synch on schema
|
|
"""
|
|
|
|
logger.debug("1. Create a cluster and insert data")
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
def create_table_case():
|
|
try:
|
|
logger.debug("Creating table")
|
|
create_c1c2_table(session)
|
|
logger.debug("Populating")
|
|
insert_c1c2(session, n=10)
|
|
except AlreadyExists:
|
|
# the CQL command can be called multiple time case of retries
|
|
pass
|
|
|
|
def alter_table_case():
|
|
try:
|
|
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
|
except InvalidRequest as exc:
|
|
# the CQL command can be called multiple time case of retries
|
|
assert "Invalid column name c3" in str(exc)
|
|
|
|
def drop_table_case():
|
|
try:
|
|
session.execute("DROP TABLE cf;", timeout=180)
|
|
except InvalidRequest as exc:
|
|
# the CQL command can be called multiple time case of retries
|
|
assert "Cannot drop non existing table" in str(exc)
|
|
|
|
logger.debug("Creating keyspace")
|
|
create_ks(session, "ks", 3)
|
|
if case != "create_table":
|
|
create_table_case()
|
|
|
|
case_map = {
|
|
"create_table": create_table_case,
|
|
"alter_table": alter_table_case,
|
|
"drop_table": drop_table_case,
|
|
}
|
|
with ThreadPoolExecutor(max_workers=1) as executor:
|
|
logger.debug(f"2. kill node during {case}")
|
|
kill_node_future = executor.submit(node2.stop, gently=False, wait_other_notice=True)
|
|
case_map[case]()
|
|
kill_node_future.result()
|
|
|
|
logger.debug("3. Start the stopped node2")
|
|
node2.start(wait_for_binary_proto=True)
|
|
|
|
session = self.patient_exclusive_cql_connection(node2)
|
|
read_barrier(session)
|
|
|
|
def create_or_alter_table_expected_result(col_mun):
|
|
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf LIMIT 1;", consistency_level=ConsistencyLevel.QUORUM))
|
|
assert len(rows_to_list(rows)[0]) == col_mun, f"Expected {col_mun} columns but got rows:{rows} instead"
|
|
for key in range(10):
|
|
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.QUORUM)
|
|
|
|
expected_case_result_map = {
|
|
"create_table": functools.partial(create_or_alter_table_expected_result, 3),
|
|
"alter_table": functools.partial(create_or_alter_table_expected_result, 4),
|
|
"drop_table": functools.partial(assert_invalid, session, "SELECT * FROM test1"),
|
|
}
|
|
logger.debug("verify that commitlog has been replayed and that all data is restored")
|
|
expected_case_result_map[case]()
|
|
|
|
@pytest.mark.parametrize("is_gently_stop", [True, False])
|
|
def test_nodes_rejoining_a_cluster_synch_on_schema(self, is_gently_stop):
|
|
"""
|
|
Nodes rejoining the cluster synch on schema changes
|
|
1. Create a cluster and insert data
|
|
2. Stop a node
|
|
3. Alter table
|
|
4. Insert additional data
|
|
5. Start the stopped node
|
|
6. Verify the stopped node synchs on the updated schema
|
|
"""
|
|
|
|
logger.debug("1. Create a cluster and insert data")
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
logger.debug("Creating schema")
|
|
create_ks(session, "ks", 3)
|
|
create_c1c2_table(session)
|
|
create_cf(session, "cf", key_name="p", key_type="int", columns={"v": "text"})
|
|
|
|
logger.debug("Populating")
|
|
insert_c1c2(session, n=10, consistency=ConsistencyLevel.ALL)
|
|
|
|
logger.debug("2 Stop a node1")
|
|
node1.stop(gently=is_gently_stop, wait_other_notice=True)
|
|
|
|
logger.debug("3 Alter table")
|
|
session = self.patient_cql_connection(node2)
|
|
session.execute("ALTER TABLE ks.cf ADD (c3 text);", timeout=180)
|
|
|
|
logger.debug("4 Insert additional data")
|
|
session.execute(SimpleStatement("INSERT INTO ks.cf (key, c1, c2, c3) VALUES ('test', 'test', 'test', 'test')", consistency_level=ConsistencyLevel.QUORUM))
|
|
|
|
logger.debug("5. Start the stopped node1")
|
|
node1.start(wait_for_binary_proto=True)
|
|
|
|
logger.debug("6. Verify the stopped node synchs on the updated schema")
|
|
session = self.patient_exclusive_cql_connection(node1)
|
|
read_barrier(session)
|
|
|
|
rows = session.execute(SimpleStatement("SELECT * FROM ks.cf WHERE key='test'", consistency_level=ConsistencyLevel.ALL))
|
|
expected = [["test", "test", "test", "test"]]
|
|
assert rows_to_list(rows) == expected, f"Expected {expected} but got {rows} instead"
|
|
for key in range(10):
|
|
query_c1c2(session=session, key=key, consistency=ConsistencyLevel.ALL)
|
|
|
|
def test_reads_schema_recreated_while_node_down(self):
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
logger.debug("Creating schema")
|
|
create_ks(session, "ks", 3)
|
|
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
|
|
|
logger.debug("Populating")
|
|
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
|
|
|
logger.debug("Stopping node2")
|
|
node2.stop(gently=True)
|
|
|
|
logger.debug("Re-creating schema")
|
|
session.execute("DROP TABLE cf;")
|
|
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v1 bigint, v2 text);")
|
|
|
|
logger.debug("Restarting node2")
|
|
node2.start(wait_for_binary_proto=True)
|
|
session2 = self.patient_exclusive_cql_connection(node2)
|
|
read_barrier(session2)
|
|
|
|
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
|
assert rows_to_list(rows) == [], f"Expected an empty result set, got {rows}"
|
|
|
|
def test_writes_schema_recreated_while_node_down(self):
|
|
cluster = self.prepare(racks_num=3)
|
|
|
|
[node1, node2, node3] = cluster.nodelist()
|
|
|
|
session = self.patient_cql_connection(node1)
|
|
|
|
logger.debug("Creating schema")
|
|
create_ks(session, "ks", 3)
|
|
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
|
|
|
logger.debug("Populating")
|
|
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (1, '1')", consistency_level=ConsistencyLevel.ALL))
|
|
|
|
logger.debug("Stopping node2")
|
|
node2.stop(gently=True, wait_other_notice=True)
|
|
|
|
logger.debug("Re-creating schema")
|
|
session.execute("DROP TABLE cf;")
|
|
session.execute("CREATE TABLE cf (p int PRIMARY KEY, v text);")
|
|
|
|
logger.debug("Restarting node2")
|
|
node2.start(wait_for_binary_proto=True)
|
|
session2 = self.patient_exclusive_cql_connection(node2)
|
|
read_barrier(session2)
|
|
|
|
session.execute(SimpleStatement("INSERT INTO cf (p, v) VALUES (2, '2')", consistency_level=ConsistencyLevel.ALL))
|
|
|
|
rows = session.execute(SimpleStatement("SELECT * FROM cf", consistency_level=ConsistencyLevel.ALL))
|
|
expected = [[2, "2"]]
|
|
assert rows_to_list(rows) == expected, f"Expected {expected}, got {rows_to_list(rows)}"
|
|
|
|
|
|
class TestLargePartitionAlterSchema(Tester):
|
|
# Issue scylladb/scylla: #5135:
|
|
#
|
|
# Issue: Cache reads may miss some writes if schema alter followed by a read happened concurrently with preempted
|
|
# partition entry update
|
|
# Affects only tables with multi-row partitions, which are the only ones that can experience the update of partition
|
|
# entry being preempted.
|
|
#
|
|
# The scenario in which the problem could have happened has to involve:
|
|
# - a large partition with many rows, large enough for preemption (every 0.5ms) to happen during the scan of the partition.
|
|
# - appending writes to the partition (not overwrites)
|
|
# - scans of the partition
|
|
# - schema alter of that table. The issue is exposed only by adding or dropping a column, such that the added/dropped
|
|
# column lands in the middle (in alphabetical order) of the old column set.
|
|
#
|
|
# Memtable flush has to happen after a schema alter concurrently with a read.
|
|
#
|
|
# The bug could result in cache corruption which manifests as some past writes being missing (not visible to reads).
|
|
|
|
PARTITIONS = 50
|
|
STRING_VALUE = string.ascii_lowercase
|
|
|
|
def prepare(self, cluster_topology: dict[str, dict[str, int]], rf: int):
|
|
if not self.cluster.nodelist():
|
|
self.cluster.populate(cluster_topology)
|
|
self.cluster.start(wait_other_notice=True)
|
|
|
|
node1 = self.cluster.nodelist()[0]
|
|
session = self.patient_cql_connection(node=node1)
|
|
self.create_schema(session=session, rf=rf)
|
|
|
|
return session
|
|
|
|
def create_schema(self, session, rf):
|
|
logger.debug("Creating schema")
|
|
create_ks(session=session, name="ks", rf=rf)
|
|
|
|
session.execute(
|
|
"""
|
|
CREATE TABLE lp_table (
|
|
pk int,
|
|
ck1 int,
|
|
val1 text,
|
|
val2 text,
|
|
PRIMARY KEY (pk, ck1)
|
|
);
|
|
"""
|
|
)
|
|
|
|
def populate(self, session, data, ck_start, ck_end=None, stop_populating: threading.Event = None):
|
|
ck = ck_start
|
|
def _populate_loop():
|
|
nonlocal ck
|
|
while True:
|
|
if stop_populating is not None and stop_populating.is_set():
|
|
return
|
|
if ck_end is not None and ck >= ck_end:
|
|
return
|
|
for pk in range(self.PARTITIONS):
|
|
row = [pk, ck, self.STRING_VALUE, self.STRING_VALUE]
|
|
data.append(row)
|
|
yield tuple(row)
|
|
ck += 1
|
|
|
|
records_written = ck - ck_start
|
|
|
|
logger.debug(f"Start populate DB: {self.PARTITIONS} partitions with {ck_end - ck_start if ck_end else 'infinite'} records in each partition")
|
|
|
|
parameters = _populate_loop()
|
|
|
|
stmt = session.prepare("INSERT INTO lp_table (pk, ck1, val1, val2) VALUES (?, ?, ?, ?)")
|
|
|
|
execute_concurrent_with_args(session=session, statement=stmt, parameters=parameters, concurrency=100)
|
|
logger.debug(f"Finish populate DB: {self.PARTITIONS} partitions with {records_written} records in each partition")
|
|
return data
|
|
|
|
def read(self, session, ck_max, stop_reading: threading.Event = None):
|
|
def _read_loop():
|
|
while True:
|
|
for ck in range(ck_max):
|
|
for pk in range(self.PARTITIONS):
|
|
if stop_reading is not None and stop_reading.is_set():
|
|
return
|
|
session.execute(f"select * from lp_table where pk = {pk} and ck1 = {ck}")
|
|
if stop_reading is None:
|
|
return
|
|
|
|
logger.debug(f"Start reading..")
|
|
_read_loop()
|
|
logger.debug(f"Finish reading..")
|
|
|
|
def add_column(self, session, column_name, column_type):
|
|
logger.debug(f"Add {column_name} column")
|
|
session.execute(f"ALTER TABLE lp_table ADD {column_name} {column_type}")
|
|
|
|
def drop_column(self, session, column_name):
|
|
logger.debug(f"Drop {column_name} column")
|
|
session.execute(f"ALTER TABLE lp_table DROP {column_name}")
|
|
|
|
def test_large_partition_with_add_column(self):
|
|
cluster_topology = generate_cluster_topology()
|
|
session = self.prepare(cluster_topology, rf=1)
|
|
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
|
|
|
threads = []
|
|
timeout = 300
|
|
ck_end = 5000
|
|
if self.cluster.scylla_mode == "debug":
|
|
timeout = 900
|
|
ck_end = 500
|
|
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
stop_populating = threading.Event()
|
|
stop_reading = threading.Event()
|
|
# Insert new rows in background
|
|
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
|
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
|
# Wait for running load
|
|
time.sleep(10)
|
|
self.add_column(session, "new_clmn", "int")
|
|
|
|
# Memtable flush has to happen after a schema alter concurrently with a read
|
|
logger.debug("Flush data")
|
|
self.cluster.nodelist()[0].flush()
|
|
|
|
# Stop populating and reading soon after flush
|
|
time.sleep(1)
|
|
logger.debug("Stop populating and reading")
|
|
stop_populating.set()
|
|
stop_reading.set()
|
|
|
|
for future in futures.as_completed(threads, timeout=timeout):
|
|
try:
|
|
future.result()
|
|
except Exception as exc: # noqa: BLE001
|
|
pytest.fail(f"Generated an exception: {exc}")
|
|
|
|
# Add 'null' values for the new column `new_clmn` in the expected data
|
|
for i, _ in enumerate(data):
|
|
data[i].append(None)
|
|
|
|
assert_all(session, f"select pk, ck1, val1, val2, new_clmn from lp_table", data, ignore_order=True, print_result_on_failure=False)
|
|
|
|
def test_large_partition_with_drop_column(self):
|
|
cluster_topology = generate_cluster_topology()
|
|
session = self.prepare(cluster_topology, rf=1)
|
|
data = self.populate(session=session, data=[], ck_start=0, ck_end=10)
|
|
|
|
threads = []
|
|
timeout = 300
|
|
ck_end = 5000
|
|
if self.cluster.scylla_mode == "debug":
|
|
timeout = 900
|
|
ck_end = 500
|
|
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
stop_populating = threading.Event()
|
|
stop_reading = threading.Event()
|
|
# Insert new rows in background
|
|
threads.append(executor.submit(self.populate, session=session, data=data, ck_start=10, ck_end=None, stop_populating=stop_populating))
|
|
threads.append(executor.submit(self.read, session=session, ck_max=ck_end, stop_reading=stop_reading))
|
|
# Wait for running load
|
|
time.sleep(10)
|
|
self.drop_column(session=session, column_name="val1")
|
|
|
|
# Memtable flush has to happen after a schema alter concurrently with a read
|
|
logger.debug("Flush data")
|
|
self.cluster.nodelist()[0].flush()
|
|
|
|
# Stop populating and reading soon after flush
|
|
time.sleep(1)
|
|
logger.debug("Stop populating and reading")
|
|
stop_populating.set()
|
|
stop_reading.set()
|
|
|
|
result = []
|
|
for future in futures.as_completed(threads, timeout=timeout):
|
|
try:
|
|
result.append(future.result())
|
|
except Exception as exc: # noqa: BLE001
|
|
# "Unknown identifier val1" is expected error
|
|
if not len(exc.args) or "Unknown identifier val1" not in exc.args[0]:
|
|
pytest.fail(f"Generated an exception: {exc}")
|
|
|
|
|
|
class HistoryVerifier:
|
|
def __init__(self, table_name="table1", keyspace_name="lwt_load_ks"):
|
|
"""
|
|
Initialize parameters for further verification of schema history.
|
|
:param table_name: table thats we change it's schema and verify schema history accordingly.
|
|
"""
|
|
|
|
self.table_name = table_name
|
|
self.keyspace_name = keyspace_name
|
|
self.versions = []
|
|
self.versions_dict = {}
|
|
self.query = ""
|
|
|
|
def verify(self, session, expected_current_diff, expected_prev_diff, query):
|
|
"""
|
|
Verify current schema history entry by comparing to previous schema entry.
|
|
:param session: python cql session
|
|
:param expected_current_diff: difference of current schema from previous schema
|
|
:param expected_prev_diff: difference of previous schema from current schema
|
|
:param query: The query that created new schema
|
|
"""
|
|
|
|
def get_table_id(session, keyspace_name, table_name):
|
|
assert keyspace_name, f"Input kesyspcase should have value, keyspace_name={keyspace_name}"
|
|
assert table_name, f"Input table_name should have value, table_name={table_name}"
|
|
query = "select keyspace_name,table_name,id from system_schema.tables"
|
|
query += f" WHERE keyspace_name='{keyspace_name}' AND table_name='{table_name}'"
|
|
current_rows = session.execute(query).current_rows
|
|
assert len(current_rows) == 1, f"Not found table description, ks={keyspace_name} table_name={table_name}"
|
|
res = current_rows[0]
|
|
return res["id"]
|
|
|
|
def read_schema_history_table(session, cf_id):
|
|
"""
|
|
read system.scylla_table_schema_history and verify current version diff from previous vesion
|
|
:param session: python cql session
|
|
:param cf_id: uuid of the table we changed it's schema
|
|
"""
|
|
|
|
query = f"select * from system.scylla_table_schema_history WHERE cf_id={cf_id}"
|
|
res = session.execute(query).current_rows
|
|
new_versions = list({
|
|
entry["schema_version"]
|
|
for entry in res
|
|
if str(entry["schema_version"]) not in self.versions
|
|
})
|
|
msg = f"Expect 1, got len(new_versions)={len(new_versions)}"
|
|
assert len(new_versions) == 1, msg
|
|
current_version = str(new_versions[0])
|
|
logger.debug(f"New schema_version {current_version} after executing '{self.query}'")
|
|
columns_list = (
|
|
{"column_name": entry["column_name"], "type": entry["type"]}
|
|
for entry in res
|
|
if entry["kind"] == "regular" and current_version == str(entry["schema_version"])
|
|
)
|
|
self.versions_dict[current_version] = {}
|
|
for item in columns_list:
|
|
self.versions_dict[current_version][item["column_name"]] = item["type"]
|
|
|
|
self.versions.append(current_version)
|
|
if len(self.versions) > 1:
|
|
current_id = self.versions[-1]
|
|
previous_id = self.versions[-2]
|
|
set_current = set(self.versions_dict[current_id].items())
|
|
set_previous = set(self.versions_dict[previous_id].items())
|
|
current_diff = set_current - set_previous
|
|
previous_diff = set_previous - set_current
|
|
msg1 = f"Expect diff(new schema,old schema) to be {expected_current_diff} got {current_diff}"
|
|
msg2 = f" query is '{self.query}' versions={current_id},{previous_id}"
|
|
if current_diff != expected_current_diff:
|
|
logger.debug(msg1 + msg2)
|
|
assert current_diff == expected_current_diff, msg1 + msg2
|
|
msg1 = f"Expect diff(old schema,new schema) to be {expected_prev_diff} got {previous_diff}"
|
|
assert previous_diff == expected_prev_diff, msg1 + msg2
|
|
|
|
self.query = query
|
|
cf_id = get_table_id(session, keyspace_name=self.keyspace_name, table_name=self.table_name)
|
|
read_schema_history_table(session, cf_id)
|
|
|
|
|
|
class DDL(NamedTuple):
|
|
ddl_command: str
|
|
expected_current_diff: set | None
|
|
expected_prev_diff: set | None
|
|
|
|
|
|
class TestSchemaHistory(Tester):
|
|
def prepare(self):
|
|
cluster = self.cluster
|
|
# in case support tablets and rf-rack-valid-keyspaces
|
|
# create cluster with 3 racks with 1 node in each rack
|
|
cluster_topology = generate_cluster_topology(rack_num=3)
|
|
rf = 3
|
|
cluster.populate(cluster_topology).start(wait_other_notice=True)
|
|
self.session = self.patient_cql_connection(self.cluster.nodelist()[0], row_factory=dict_factory)
|
|
create_ks(self.session, "lwt_load_ks", rf)
|
|
|
|
def test_schema_history_alter_table(self):
|
|
"""test schema history changes following alter table cql commands"""
|
|
self.prepare()
|
|
verifier = HistoryVerifier(table_name="table2")
|
|
queries_and_expected_diffs = [
|
|
DDL(ddl_command="CREATE TABLE IF NOT EXISTS lwt_load_ks.table2 (pk int PRIMARY KEY, v int, int_col int)", expected_current_diff=None, expected_prev_diff=None),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER v TYPE varint", expected_current_diff={("v", "varint")}, expected_prev_diff={("v", "int")}),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD (v2 int, v3 int)", expected_current_diff={("v2", "int"), ("v3", "int")}, expected_prev_diff=set()),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ALTER int_col TYPE varint", expected_current_diff={("int_col", "varint")}, expected_prev_diff={("int_col", "int")}),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP int_col", expected_current_diff=set(), expected_prev_diff={("int_col", "varint")}),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 ADD int_col bigint", expected_current_diff={("int_col", "bigint")}, expected_prev_diff=set()),
|
|
DDL(ddl_command="ALTER TABLE lwt_load_ks.table2 DROP (int_col,v)", expected_current_diff=set(), expected_prev_diff={("int_col", "bigint"), ("v", "varint")}),
|
|
]
|
|
for ddl in queries_and_expected_diffs:
|
|
self.session.execute(ddl.ddl_command)
|
|
verifier.verify(self.session, ddl.expected_current_diff, ddl.expected_prev_diff, query=ddl.ddl_command)
|