Files
scylladb/test/cqlpy/test_cdc.py
Dawid Mędrek 0602afc085 cdc: Preserve properties when reattaching log table
When we enable CDC on a table, Scylla creates a log table for it.
It has default properties, but the user may change them later on.
Furthermore, it's possible to detach that log table by simply
disabling CDC on the base table:

```cql
/* Create a table with CDC enabled. The log table is created. */
CREATE TABLE ks.t (pk int PRIMARY KEY) WITH cdc = {'enabled': true};

/* Detach the log table. */
ALTER TABLE ks.t WITH cdc = {'enabled': false};

/* Modify a property of the log table. */
ALTER TABLE ks.t_scylla_cdc_log WITH bloom_filter_fp_chance = 0.13;
```

The log table can also be reattached by enabling CDC on the base table
again:

```cql
/* Reattach the log table */
ALTER TABLE ks.t WITH cdc = {'enabled': true};
```

However, because the process of reattachment goes through the same
code that created it in the first place, the properties of the log
table are rolled back to their default values. This may be confusing
to the user and, if unnoticed, also have other consequences, e.g.
affecting performance.

To prevent that, we ensure that the properties are preserved.

A reproducer test,
`test_log_table_preserves_properties_after_reattachment`, has been
provided to verify that the changes are correct. It fails before this
commit.

Another test, `test_log_table_preserves_id_after_reattachment`, has
also been added because the current implementation sets properties
and the ID separately.

Fixes scylladb/scylladb#25523
2025-11-17 11:56:30 +01:00

451 lines
24 KiB
Python

# Copyright 2021-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
import re
from cassandra.cluster import ConsistencyLevel
from cassandra.query import SimpleStatement
from cassandra.protocol import InvalidRequest
from .util import new_test_table, unique_name, keyspace_has_tablets
from .nodetool import flush
import pytest
import time
# Waits until at least one CDC generation is published to system_distributed.cdc_generation_timestamps
# and system_distributed.cdc_streams_descriptions_v2. It may happen after the first node bootstraps.
def wait_for_first_cdc_generation(cql, timeout):
query = SimpleStatement(
"select time from system_distributed.cdc_generation_timestamps where key = 'timestamps'",
consistency_level=ConsistencyLevel.ONE)
deadline = time.time() + timeout
while len(list(cql.execute(query))) == 0:
assert time.time() < deadline, "Timed out waiting for the first CDC generation"
time.sleep(1)
@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True)
def test_cdc_log_entries_use_cdc_streams(scylla_only, cql, test_keyspace):
'''Test that the stream IDs chosen for CDC log entries come from the CDC generation
whose streams are listed in the streams description table. Since this test is executed
on a single-node cluster, there is only one generation.'''
has_tablets = keyspace_has_tablets(cql, test_keyspace)
if not has_tablets:
wait_for_first_cdc_generation(cql, 60)
stream_ids = set()
schema = "pk int primary key"
extra = " with cdc = {'enabled': true}"
with new_test_table(cql, test_keyspace, schema, extra) as table:
stmt = cql.prepare(f"insert into {table} (pk) values (?) using timeout 5m")
for i in range(100):
cql.execute(stmt, [i])
log_stream_ids = set(r[0] for r in cql.execute(f'select "cdc$stream_id" from {table}_scylla_cdc_log'))
if has_tablets:
streams_desc = cql.execute(SimpleStatement(
'select stream_id from system.cdc_streams',
consistency_level=ConsistencyLevel.ONE))
for entry in streams_desc:
stream_ids.add(entry.stream_id)
# There should be exactly one generation, so we just select the streams
if not has_tablets:
streams_desc = cql.execute(SimpleStatement(
'select streams from system_distributed.cdc_streams_descriptions_v2',
consistency_level=ConsistencyLevel.ONE))
for entry in streams_desc:
stream_ids.update(entry.streams)
assert(log_stream_ids.issubset(stream_ids))
# Test for #10473 - reading logs (from sstable) after dropping
# column in base.
def test_cdc_alter_table_drop_column(scylla_only, cql, test_keyspace):
schema = "pk int primary key, v int"
extra = " with cdc = {'enabled': true}"
with new_test_table(cql, test_keyspace, schema, extra) as table:
cql.execute(f"insert into {table} (pk, v) values (0, 0)")
cql.execute(f"insert into {table} (pk, v) values (1, null)")
flush(cql, table)
flush(cql, table + "_scylla_cdc_log")
cql.execute(f"alter table {table} drop v")
cql.execute(f"select * from {table}_scylla_cdc_log")
# Regression test for #12098 - check that LWT inserts don't observe
# themselves inside preimages
def test_cdc_with_lwt_preimage(scylla_only, cql, test_keyspace):
schema = "pk int primary key"
extra = " with cdc = {'enabled': true, 'preimage':true}"
with new_test_table(cql, test_keyspace, schema, extra) as table:
stmt = cql.prepare(f"insert into {table} (pk) values (?) if not exists")
for pk in range(500):
cql.execute(stmt, (pk,))
rs = cql.execute(f"select \"cdc$operation\" from {table}_scylla_cdc_log")
# There should be no preimages because no keys were overwritten;
# `cdc$operation` should only be `2` in all CDC log rows (denoting INSERT)
assert all(r[0] == 2 for r in rs)
# For a table named "xyz", the CDC table is always named "xyz_scylla_cdc_log".
# Check what happens if a table called "xyz_scylla_cdc_log" already exists
# (as a normal table), and we then try to create "xyz" with CDC enabled,
# or create "xyz" without CDC and then try to enable it.
# Unlike the secondary-index code which tries to find a different name to
# use for its backing view, the CDC code doesn't do that, but creating the
# table with CDC (or enabling CDC) should fail gracefully with a clear
# error message, and this test verifies that.
def test_cdc_taken_log_name(scylla_only, cql, test_keyspace):
name = test_keyspace + "." + unique_name()
cql.execute(f"CREATE TABLE {name}_scylla_cdc_log (p int PRIMARY KEY)")
try:
schema = "pk int primary key, v int"
extra = " with cdc = {'enabled': true}"
# We can't create a table {name} with CDC enabled:
with pytest.raises(InvalidRequest, match=f"{name}_scylla_cdc_log already exists"):
cql.execute(f"CREATE TABLE {name} ({schema}) {extra}")
cql.execute(f"DROP TABLE {name}")
# We can create a table {name} *without* CDC enabled, but then we
# can't enable CDC:
try:
cql.execute(f"CREATE TABLE {name} ({schema})")
with pytest.raises(InvalidRequest, match=f"{name}_scylla_cdc_log already exists"):
cql.execute(f"ALTER TABLE {name} {extra}")
finally:
cql.execute(f"DROP TABLE {name}")
finally:
cql.execute(f"DROP TABLE {name}_scylla_cdc_log")
def test_alter_column_of_cdc_log_table(cql, test_keyspace, scylla_only):
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, v int, u int", "with cdc = {'enabled': true}") as table:
cdc_log_table_name = f"{table}_scylla_cdc_log"
errmsg = "You cannot modify the set of columns of a CDC log table directly. " \
"Modify the base table instead."
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f"ALTER TABLE {cdc_log_table_name} ADD c int")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f"ALTER TABLE {cdc_log_table_name} DROP u")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} DROP "cdc$stream_id"')
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f"ALTER TABLE {cdc_log_table_name} ALTER u TYPE float")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} ALTER "cdc$stream_id" TYPE float')
cql.execute(f"ALTER TABLE {table} DROP u")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} DROP "cdc$deleted_u"')
def test_rename_column_of_cdc_log_table(cql, test_keyspace, scylla_only):
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, v int, u int", "with cdc = {'enabled': true}") as table:
cdc_log_table_name = f"{table}_scylla_cdc_log"
errmsg = "Cannot rename a column of a CDC log table."
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f"ALTER TABLE {cdc_log_table_name} RENAME u TO c")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} RENAME "cdc$stream_id" TO c')
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} RENAME "cdc$stream_id" TO "cdc$c"')
cql.execute(f"ALTER TABLE {table} DROP u")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} RENAME "cdc$deleted_u" TO c')
# Verify that you cannot modify the set of columns on a CDC log table, even when it stops being active.
def test_alter_column_of_inactive_cdc_log_table(cql, test_keyspace, scylla_only):
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, v int, u int", "with cdc = {'enabled': true}") as table:
cdc_log_table_name = f"{table}_scylla_cdc_log"
# Insert some data just so we don't work an empty table. This shouldn't
# have ANY impact on how the test should behave, but let's make do it anyway.
cql.execute(f"INSERT INTO {table}(p, v, u) VALUES (1, 2, 3)")
# Detach the log table.
cql.execute(f"ALTER TABLE {table} WITH cdc = {{'enabled': false}}")
errmsg = "You cannot modify the set of columns of a CDC log table directly. " \
"Although the base table has deactivated CDC, this table will continue being " \
"a CDC log table until it is dropped. If you want to modify the columns in it, " \
"you can only do that by reenabling CDC on the base table, which will reattach " \
"this log table. Then you will be able to modify the columns in the base table, " \
"and that will have effect on the log table too. Modifying the columns of a CDC " \
"log table directly is never allowed."
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f"ALTER TABLE {cdc_log_table_name} ADD c int")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f"ALTER TABLE {cdc_log_table_name} DROP u")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} DROP "cdc$stream_id"')
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f"ALTER TABLE {cdc_log_table_name} ALTER u TYPE float")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} ALTER "cdc$stream_id" TYPE float')
# Verify that the set of columnfs of a table whose name resembles that of a CDC log table is possible.
def test_alter_column_of_fake_cdc_log_table(cql, test_keyspace, scylla_only):
name = unique_name()
fake_cdc_log_table_name = f"{name}_scylla_cdc_log"
try:
cql.execute(f"CREATE TABLE {test_keyspace}.{fake_cdc_log_table_name} (p int PRIMARY KEY, v int)")
cql.execute(f"ALTER TABLE {test_keyspace}.{fake_cdc_log_table_name} DROP v")
finally:
cql.execute(f"DROP TABLE IF EXISTS {test_keyspace}.{fake_cdc_log_table_name}")
# Verify that you cannot rename a column of a CDC log table, even when it stops being active.
def test_rename_column_of_inactive_cdc_log_table(cql, test_keyspace, scylla_only):
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY, v int, u int", "with cdc = {'enabled': true}") as table:
cdc_log_table_name = f"{table}_scylla_cdc_log"
# Insert some data just so we don't work an empty table. This shouldn't
# have ANY impact on how the test should behave, but let's make do it anyway.
cql.execute(f"INSERT INTO {table}(p, v, u) VALUES (1, 2, 3)")
# Detach the log table.
cql.execute(f"ALTER TABLE {table} WITH cdc = {{'enabled': false}}")
errmsg = "You cannot rename a column of a CDC log table. Although the base table " \
"has deactivated CDC, this table will continue being a CDC log table until it " \
"is dropped."
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f"ALTER TABLE {cdc_log_table_name} RENAME u TO c")
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} RENAME "cdc$stream_id" TO c')
with pytest.raises(InvalidRequest, match=errmsg):
cql.execute(f'ALTER TABLE {cdc_log_table_name} RENAME "cdc$stream_id" TO "cdc$c"')
# Verify that you can rename a column in a table whose name resembles that of a CDC log table
# but that is NOT a CDC log table.
def test_rename_column_of_fake_cdc_log_table(cql, test_keyspace, scylla_only):
name = unique_name()
fake_cdc_log_table_name = f"{name}_scylla_cdc_log"
try:
cql.execute(f"CREATE TABLE {test_keyspace}.{fake_cdc_log_table_name} (p int PRIMARY KEY, v int)")
cql.execute(f"ALTER TABLE {test_keyspace}.{fake_cdc_log_table_name} RENAME p TO q")
finally:
cql.execute(f"DROP TABLE IF EXISTS {test_keyspace}.{fake_cdc_log_table_name}")
# When the user disables CDC on a table, the CDC log table is not removed. Instead, it's detached from the base table,
# and it functions as a normal table (with some differences). If that log table lives up to the point when the user
# re-enabled CDC on the base table, instead of creating a new log table, the old one is re-attached to the base.
#
# Verify that changes in the base table after disabling CDC are reflected on the log table after re-enabling CDC.
# Verify that we can perform basic operations on the log table without running into problems.
@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True)
def test_reattach_cdc_log_table_after_altering_base(cql, test_keyspace, scylla_only):
with new_test_table(cql, test_keyspace, "p int, v int, u int, a int, PRIMARY KEY (p, v)", "WITH cdc = {'enabled': true}") as table:
cdc_log_table = f"{table}_scylla_cdc_log"
# Prefill the table with some data. It shouldn't matter, but let's do it anyway.
cql.execute(f"INSERT INTO {table} (p, v, u, a) VALUES (1, 2, 3, 4)")
# Detach the log table.
cql.execute(f"ALTER TABLE {table} WITH cdc = {{'enabled': false}}")
# Change the base table.
cql.execute(f"ALTER TABLE {table} DROP a")
cql.execute(f"ALTER TABLE {table} ADD b double")
cql.execute(f"ALTER TABLE {table} RENAME v TO v2")
# Make sure that the detached log table still exists and is not affected by any changes in the base table.
cql.execute(f"INSERT INTO {table} (p, v2, u, b) VALUES (5, 6, 7, 9.2)")
result = cql.execute(f"SELECT * FROM {cdc_log_table} WHERE p = 5 ALLOW FILTERING").all()
assert len(result) == 0
# Reattach the log table.
cql.execute(f"ALTER TABLE {table} WITH cdc = {{'enabled': true}}")
# Confirm the definition of the log table has been updated.
expected_definition = f'CREATE TABLE {cdc_log_table} (\n' \
' "cdc$stream_id" blob,\n' \
' "cdc$time" timeuuid,\n' \
' "cdc$batch_seq_no" int,\n' \
' b double,\n' \
' "cdc$deleted_b" boolean,\n' \
' "cdc$deleted_u" boolean,\n' \
' "cdc$end_of_batch" boolean,\n' \
' "cdc$operation" tinyint,\n' \
' "cdc$ttl" bigint,\n' \
' p int,\n' \
' u int,\n' \
' v2 int,\n' \
' v int,\n' \
' "cdc$deleted_a" boolean,\n' \
' a int,\n' \
' PRIMARY KEY ("cdc$stream_id", "cdc$time", "cdc$batch_seq_no")\n' \
')'
describe_result = cql.execute(f"DESC TABLE {cdc_log_table} WITH INTERNALS").one()
assert hasattr(describe_result, "create_statement")
assert expected_definition in describe_result.create_statement
assert f"ALTER TABLE {cdc_log_table} DROP v" in describe_result.create_statement
assert f'ALTER TABLE {cdc_log_table} DROP "cdc$deleted_a"' in describe_result.create_statement
assert f"ALTER TABLE {cdc_log_table} DROP a" in describe_result.create_statement
# Changes on the base table should be reflected on the log table again.
cql.execute(f"INSERT INTO {table} (p, v2, u, b) VALUES (10, 11, 12, 13.3)")
result = cql.execute(f"SELECT * FROM {cdc_log_table} WHERE p = 10 ALLOW FILTERING").all()
assert len(result) > 0
# When the user disables CDC on a table, the CDC log table is not removed. Instead, it's detached from the base table,
# and it functions as a normal table (with some differences). If that log table lives up to the point when the user
# re-enabled CDC on the base table, instead of creating a new log table, the old one is re-attached to the base.
#
# Verify that changes in the base table after disabling CDC are reflected on the log table after re-enabling CDC,
# and that the log table ends up with the same definition as if it had never been detached.
@pytest.mark.parametrize("test_keyspace", ["tablets", "vnodes"], indirect=True)
def test_reattach_cdc_log_table_after_altering_base_schema(cql, test_keyspace, scylla_only):
with new_test_table(cql, test_keyspace, "p int, v int, u int, a int, PRIMARY KEY (p, v)", "WITH cdc = {'enabled': true}") as t1, \
new_test_table(cql, test_keyspace, "p int, v int, u int, a int, PRIMARY KEY (p, v)", "WITH cdc = {'enabled': true}") as t2:
# We'll be performing the same operations on the tables: one with disabled CDC, the other with enabled CDC.
cql.execute(f"ALTER TABLE {t2} WITH cdc = {{'enabled': false}}")
for table in [t1, t2]:
cql.execute(f"ALTER TABLE {table} DROP a")
cql.execute(f"ALTER TABLE {table} ADD b double")
cql.execute(f"ALTER TABLE {table} RENAME v TO v2")
cql.execute(f"ALTER TABLE {t2} WITH cdc = {{'enabled': true}}")
cdc_log_t1 = f"{t1}_scylla_cdc_log"
cdc_log_t2 = f"{t2}_scylla_cdc_log"
# Note that we cannot use `WITH INTERNALS` here because it'll also include the UUIDs
# of the tables, which will obviously be different.
desc_t1 = cql.execute(f"DESC TABLE {cdc_log_t1}").one()
assert hasattr(desc_t1, "create_statement")
create_stmt_t1 = desc_t1.create_statement
desc_t2 = cql.execute(f"DESC TABLE {cdc_log_t2}").one()
assert hasattr(desc_t2, "create_statement")
create_stmt_t2 = desc_t2.create_statement
# We need to change the name of one of the tables.
create_stmt_t2 = create_stmt_t2.replace(t2, t1)
assert create_stmt_t1 == create_stmt_t2
# Verify that the `tombstone_gc` property is present after creating a CDC log table.
# Reproducer of scylladb/scylladb#25187.
def test_log_table_tombstone_gc_present(cql, test_keyspace, scylla_only):
def get_log_alter_stmt(log_table_name: str):
descs_it = cql.execute(f"DESCRIBE KEYSPACE {test_keyspace} WITH INTERNALS")
[log_desc] = filter(lambda desc: desc.name == log_table_name, descs_it)
assert hasattr(log_desc, "create_statement")
return log_desc.create_statement
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY", "WITH cdc = {'enabled': true}") as table:
_, table_name = table.split(".")
cdc_log_table = f"{table_name}_scylla_cdc_log"
log_stmt = get_log_alter_stmt(cdc_log_table)
# Doesn't matter what the value is. Just check the property is present.
assert "tombstone_gc = {" in log_stmt
# Verify that performing a no-op ALTER TABLE statement (changing a property to the same value)
# on the log table doesn't change anything.
# Reproducer of scylladb/scylladb#25187.
def test_desc_log_table_properties_preserved_after_noop_alter(cql, test_keyspace, scylla_only):
def get_log_alter_stmt(log_table_name: str):
descs_it = cql.execute(f"DESCRIBE KEYSPACE {test_keyspace} WITH INTERNALS")
[log_desc] = filter(lambda desc: desc.name == log_table_name, descs_it)
assert hasattr(log_desc, "create_statement")
return log_desc.create_statement
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY", "WITH cdc = {'enabled': true}") as table:
_, table_name = table.split(".")
cdc_log_table = f"{table_name}_scylla_cdc_log"
old_log_stmt = get_log_alter_stmt(cdc_log_table)
# No-op.
cql.execute(old_log_stmt)
new_log_stmt = get_log_alter_stmt(cdc_log_table)
assert old_log_stmt == new_log_stmt
# Verify that after detaching or reattaching a CDC log table, its properties
# are preserved.
#
# Reproduces scylladb/scylladb#25523.
def test_log_table_preserves_properties_after_reattachment(cql, test_keyspace, scylla_only):
with new_test_table(cql, test_keyspace, "p int PRIMARY KEY", "WITH cdc = {'enabled': true}") as table:
cdc_log_table = f"{table}_scylla_cdc_log"
# Every property here should have a non-custom value.
properties = [
("bloom_filter_fp_chance", "0.13"),
("caching", "{'keys': 'ALL', 'rows_per_partition': 'NONE'}"),
("comment", "'some not very interesting custom comment'"),
("compaction", "{'class': 'TimeWindowCompactionStrategy', 'compaction_window_size': '37', 'compaction_window_unit': 'MINUTES', 'expired_sstable_check_frequency_seconds': '1351'}"),
("compression", "{'sstable_compression': 'org.apache.cassandra.io.compress.SnappyCompressor'}"),
# FIXME: Due to scylladb/scylladb#2431, we cannot change this property.
# ("crc_check_chance", "1"),
("default_time_to_live", "2"),
("gc_grace_seconds", "3"),
("max_index_interval", "2371"),
("memtable_flush_period_in_ms", "60013"),
("min_index_interval", "137"),
("speculative_retry", "'95.0PERCENTILE'"),
("tombstone_gc", "{'mode': 'timeout', 'propagation_delay_in_seconds': '3719'}"),
]
def validate_properties():
desc_row = cql.execute(f"DESC TABLE {cdc_log_table} WITH INTERNALS").one()
create_statement = desc_row.create_statement
for property, value in properties:
assert f"{property} = {value}" in create_statement
for property, value in properties:
cql.execute(f"ALTER TABLE {cdc_log_table} WITH {property} = {value}")
# Step 1. Make sure that we really altered the CDC log table.
validate_properties()
# Step 2. Make sure that after detaching the log table, the properties are preserved.
cql.execute(f"ALTER TABLE {table} WITH cdc = {{'enabled': false}}")
validate_properties()
# Step 3. Make sure that after reattaching the log table, the properties are preserved.
cql.execute(f"ALTER TABLE {table} WITH cdc = {{'enabled': true}}")
validate_properties()
# Verify that the ID of the log table doesn't change due to detaching or reattaching
# it via executing `ALTER TABLE <base_table> WITH cdc = {'enabled': <enabled>}`.
def test_log_table_preserves_id_after_reattachment(cql, test_keyspace, scylla_only):
def get_definition(table_name: str):
desc_row = cql.execute(f"DESC TABLE {table_name} WITH INTERNALS").one()
return desc_row.create_statement
with new_test_table(cql, test_keyspace, "pk int PRIMARY KEY, v int", "WITH cdc = {'enabled': true}") as table:
cdc_log_table = f"{table}_scylla_cdc_log"
desc = get_definition(cdc_log_table)
# String of form `ID = <ID>`
match = re.search(r"(ID = \S+)", desc)
id = match[1]
# The ID should not change after detaching the log table.
cql.execute(f"ALTER TABLE {table} WITH cdc = {{'enabled': false}}")
assert id in get_definition(cdc_log_table)
# The ID shouldn't change after reattaching the log table either.
cql.execute(f"ALTER TABLE {table} WITH cdc = {{'enabled': true}}")
assert id in get_definition(cdc_log_table)
def test_create_if_not_exists_with_cdc(scylla_only, cql, test_keyspace):
table_name = f"{test_keyspace}.{unique_name()}"
create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} (p int PRIMARY KEY) WITH cdc = {{ 'enabled': true }}"
cql.execute(create_table_query)
try:
# Creating again should be a no-op and not fail.
cql.execute(create_table_query)
finally:
cql.execute(f"DROP TABLE IF EXISTS {table_name}")