# # Copyright (C) 2025-present ScyllaDB # # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 # from test.pylib.manager_client import ManagerClient from test.cluster.util import new_test_keyspace, reconnect_driver from cassandra.protocol import InvalidRequest import asyncio import logging import threading import pytest logger = logging.getLogger(__name__) @pytest.mark.asyncio async def test_add_and_drop_column_with_cdc(manager: ManagerClient): """ Test writing to a table with CDC enabled while adding and dropping a column. In particular we are interested at the behavior when the schemas of the base table and the CDC log may not be in sync, and we write a value to a column that exists in the base table but not in the CDC table. Reproduces #24952 """ servers = await manager.servers_add(3, auto_rack_dc="dc1") cql = manager.get_cql() async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}") # sleep before CDC augmentation, because we want to have a write that starts with some base schema, and then # the table is altered while the write is in progress, and the CDC augmentation will use the new schema that # is not compatible with the base schema. await asyncio.gather(*[manager.api.enable_injection(s.ip_addr, "sleep_before_cdc_augmentation", one_shot=False) for s in servers]) # The writer thread writes to the column 'a' while it's being added and dropped. # We want to write a value to that column while it's in different stages - may exist # in one table but not in the other. stop_writer = threading.Event() writer_error = threading.Event() def do_writes(): i = 0 try: while not stop_writer.is_set(): try: cql.execute(f"INSERT INTO {ks}.test(pk, v, a) VALUES({i}, {i+1}, {i+2})") except InvalidRequest as e: if "Unknown identifier" in str(e): pass else: raise i += 1 except Exception as e: logger.error(f"Unexpected error while writing to {ks}.test: {e}") writer_error.set() writer_thread = threading.Thread(target=do_writes) writer_thread.start() await cql.run_async(f"ALTER TABLE {ks}.test ADD a int") await asyncio.sleep(1) await cql.run_async(f"ALTER TABLE {ks}.test DROP a") stop_writer.set() writer_thread.join() if writer_error.is_set(): pytest.fail("Unexpected error occurred during writes to the table") base_rows = await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test") cdc_rows = await cql.run_async(f"SELECT COUNT(*) FROM {ks}.test_scylla_cdc_log") assert base_rows[0].count == cdc_rows[0].count, f"Base table rows: {base_rows[0].count}, CDC log rows: {cdc_rows[0].count}" @pytest.mark.asyncio async def test_cdc_compatible_schema(manager: ManagerClient): """ Basic test that we can write to a table with CDC enabled when the schema of the base table is altered, or when the schema is loaded after node restart. We want to ensure the schemas of the base table and its CDC table are loaded correctly. """ servers = await manager.servers_add(1) cql = manager.get_cql() log = await manager.server_open_log(servers[0].server_id) async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'enabled': false}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}") await cql.run_async(f"INSERT INTO {ks}.test(pk, v) VALUES(1, 10)") await cql.run_async(f"ALTER TABLE {ks}.test ADD a int") await cql.run_async(f"INSERT INTO {ks}.test(pk, v, a) VALUES(1, 20, 30)") # Verify the CDC schema is set after node restart. await manager.server_restart(servers[0].server_id) cql = await reconnect_driver(manager) await cql.run_async(f"INSERT INTO {ks}.test(pk, v, a) VALUES(2, 40, 50)") # validate rows in the CDC log cdc_rows = await cql.run_async(f"SELECT * FROM {ks}.test_scylla_cdc_log") assert len(cdc_rows) == 3, f"Expected 3 rows in CDC log, got {len(cdc_rows)}" assert set([row.a for row in cdc_rows]) == {None, 30, 50}, \ f"Unexpected values in column 'a' of CDC log: {[row.a for row in cdc_rows]}" matches = await log.grep("has no CDC schema set") assert len(matches) == 0, "Found unexpected log messages indicating missing CDC schema" @pytest.mark.asyncio async def test_recreate_column_too_soon(manager: ManagerClient): """ Test that recreating a dropped column too soon fails with an appropriate error. When dropping a column from a CDC log table, the drop timestamp is set several seconds into the future to prevent race conditions with concurrent writes. This test verifies that attempting to recreate a column with the same name before the drop timestamp has passed results in a proper error message, preventing potential data corruption. """ await manager.servers_add(1, auto_rack_dc="dc1") cql = manager.get_cql() async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int, dropped_col int) WITH cdc={{'enabled': true}}") await cql.run_async(f"ALTER TABLE {ks}.test DROP dropped_col") # recreating too soon with pytest.raises(Exception, match="a column with the same name was dropped too recently"): await cql.run_async(f"ALTER TABLE {ks}.test ADD dropped_col int") @pytest.mark.asyncio async def test_concurrent_writes_and_drop_column_with_cdc_preimage(manager: ManagerClient): """ Test concurrent writes and column drop with CDC preimage enabled. This test reproduces an issue where writes concurrent with column drop can cause malformed SSTables when CDC preimage is enabled. The problem occurs because: 1. The table has CDC with preimage='full', which means CDC preimage generation accesses all columns in the table, including ones not touched by the write 2. Writes continuously update existing rows (triggering preimage generation) 3. Concurrently, a column is dropped from the table 4. The preimage generation may access the dropped column even though the actual write doesn't touch it 5. This can result in writes having newer timestamps than the column drop timestamp, leading to malformed SSTables where dropped columns appear with data newer than their drop time The test validates that the resulting SSTables are well-formed by running compaction, which would fail if the SSTables were corrupted. Reproduces #26340. """ servers = await manager.servers_add(3, auto_rack_dc="dc1") cql = manager.get_cql() async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") as ks: await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int, dropped_col int) WITH cdc={{'enabled': true, 'preimage': 'full'}}") await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test (pk, v) VALUES ({pk}, 0)") for pk in range(50)]) stop_writer = asyncio.Event() async def continuous_writer(): """Task that continuously writes to the table without touching the dynamic column""" v = 1 while not stop_writer.is_set(): try: # Update existing row to trigger preimage generation await asyncio.gather(*[cql.run_async(f"UPDATE {ks}.test SET v = {v} WHERE pk = {pk}") for pk in range(50)]) v += 1 except Exception as e: # Some writes might fail due to #26405 - ignore if "does not have base column" in str(e): continue else: raise async def drop_column(): await asyncio.sleep(0.5) # Let some writes happen first # Drop the column and flush concurrently. # we want values that are written at the time the column is dropped to be flushed await asyncio.gather(*[ cql.run_async(f"ALTER TABLE {ks}.test DROP dropped_col"), manager.api.flush_keyspace(servers[0].ip_addr, ks) ]) # do writes while dropping the column writer_task = asyncio.create_task(continuous_writer()) schema_task = asyncio.create_task(drop_column()) await schema_task stop_writer.set() await writer_task # run compaction to trigger validation of the sstables await manager.api.keyspace_compaction(servers[0].ip_addr, ks)