Finishing the deprecation of the skip_mode function in favor of pytest.mark.skip_mode. This PR is only cleaning and migrating leftover tests that are still used and old way of skip_mode. Closes scylladb/scylladb#28299
424 lines
23 KiB
Python
424 lines
23 KiB
Python
#
|
|
# Copyright (C) 2025-present ScyllaDB
|
|
#
|
|
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
#
|
|
|
|
# Tests for CDC tables in tablets enabled keyspaces
|
|
|
|
from collections import defaultdict
|
|
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import read_barrier
|
|
from test.pylib.util import wait_for
|
|
from test.pylib.tablets import get_tablet_count, get_base_table, get_tablet_replicas
|
|
from test.cluster.util import new_test_keyspace
|
|
|
|
import asyncio
|
|
import logging
|
|
import threading
|
|
import time
|
|
import pytest
|
|
from enum import IntEnum
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# follows cdc::stream_state
|
|
class CdcStreamState(IntEnum):
|
|
CURRENT = 0
|
|
CLOSED = 1
|
|
OPENED = 2
|
|
|
|
# Basic test creating a table with CDC enabled, using either CREATE or ALTER to enable CDC, and
|
|
# verifying that CDC streams for the table are created. Then we write to the table and verify
|
|
# the CDC log entries are created in the table's streams.
|
|
@pytest.mark.parametrize("with_alter", [pytest.param(False, id="create"), pytest.param(True, id="alter")])
|
|
@pytest.mark.asyncio
|
|
async def test_create_cdc_with_tablets(manager: ManagerClient, with_alter: bool):
|
|
servers = await manager.servers_add(1)
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
if with_alter:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int)")
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH cdc={{'enabled': true}}")
|
|
else:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}")
|
|
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) == 2
|
|
|
|
log_table_id = await manager.get_table_id(ks, "test_scylla_cdc_log")
|
|
for r in rows:
|
|
assert r.table_id == log_table_id
|
|
streams = [r.stream_id for r in rows]
|
|
|
|
row_count=1000
|
|
await asyncio.gather(*[cql.run_async(f"INSERT INTO {ks}.test(pk, v) VALUES({i},{i+1})") for i in range(row_count)])
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test_scylla_cdc_log")
|
|
assert len(rows) == row_count
|
|
|
|
total_log_rows = 0
|
|
for s in streams:
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test_scylla_cdc_log WHERE \"cdc$stream_id\"=0x{s.hex()}")
|
|
total_log_rows += len(rows)
|
|
assert total_log_rows == row_count
|
|
|
|
# Verify the cdc log table is created as a colocated table.
|
|
base_id = await manager.get_table_id(ks, 'test')
|
|
cdc_id = await manager.get_table_id(ks, 'test_scylla_cdc_log')
|
|
assert base_id == (await get_base_table(manager, cdc_id))
|
|
|
|
# Create tables with CDC and verify the CDC streams are removed from the
|
|
# system tables when the tables or keyspaces are dropped.
|
|
@pytest.mark.asyncio
|
|
async def test_drop_table_and_drop_keyspace_removes_cdc_streams(manager: ManagerClient):
|
|
servers = await manager.servers_add(1)
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks1:
|
|
await cql.run_async(f"CREATE TABLE {ks1}.test1 (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}")
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) == 2
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_history")
|
|
assert len(rows) == 0
|
|
|
|
await cql.run_async(f"CREATE TABLE {ks1}.test2 (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}")
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) == 4
|
|
|
|
await cql.run_async(f"DROP TABLE {ks1}.test2")
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) == 2
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks2:
|
|
await cql.run_async(f"CREATE TABLE {ks2}.test (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}")
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) == 4
|
|
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) == 2
|
|
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) == 0
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_history")
|
|
assert len(rows) == 0
|
|
|
|
# Create a table with CDC, then disable CDC and drop the CDC log table, and create CDC again.
|
|
# Verify streams are created and cleaned up correctly.
|
|
@pytest.mark.asyncio
|
|
async def test_drop_and_recreate_cdc(manager: ManagerClient):
|
|
await manager.servers_add(1)
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test1 (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}")
|
|
await cql.run_async(f"ALTER TABLE {ks}.test1 WITH cdc={{'enabled': false}}")
|
|
|
|
# The CDC table still exists and streams are still present
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) > 0
|
|
|
|
await cql.run_async(f"DROP TABLE {ks}.test1_scylla_cdc_log")
|
|
|
|
# the streams should be removed now
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) == 0
|
|
|
|
await cql.run_async(f"ALTER TABLE {ks}.test1 WITH cdc={{'enabled': true}}")
|
|
|
|
# new streams are created
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) > 0
|
|
|
|
# after dropping the keyspace, the streams should be removed
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_state")
|
|
assert len(rows) == 0
|
|
|
|
async def validate_virtual_tables(manager, servers, cql, log_table_id, ks, table):
|
|
await asyncio.gather(*[read_barrier(manager.api, s.ip_addr) for s in servers])
|
|
|
|
# read all streams from the internal tables and verify against the virtual tables
|
|
base_rows = await cql.run_async(f"SELECT toUnixTimestamp(timestamp) AS ts, stream_id FROM system.cdc_streams_state WHERE table_id={log_table_id}")
|
|
history_rows = await cql.run_async(f"SELECT toUnixTimestamp(timestamp) AS ts, stream_id, stream_state FROM system.cdc_streams_history WHERE table_id={log_table_id} ORDER BY timestamp ASC")
|
|
|
|
all_streams = defaultdict(list)
|
|
for r in base_rows:
|
|
all_streams[r.ts].append((r.stream_id, CdcStreamState.OPENED))
|
|
for r in history_rows:
|
|
all_streams[r.ts].append((r.stream_id, r.stream_state))
|
|
|
|
# verify the timestamps in the internal table match those in the virtual table
|
|
virtual_ts_rows = await cql.run_async(f"SELECT toUnixTimestamp(timestamp) AS ts FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='{table}' ORDER BY timestamp ASC")
|
|
virtual_ts = [r.ts for r in virtual_ts_rows]
|
|
|
|
assert list(all_streams.keys()) == virtual_ts, f"Timestamps mismatch: internal {all_streams.keys()}, virtual {virtual_ts}"
|
|
|
|
# verify the current stream set for each timestamp in the virtual table matches the stream set
|
|
# we get from the internal tables by starting from the base and applying the diffs
|
|
current_streams = set()
|
|
for ts, streams in all_streams.items():
|
|
closed_streams = set([s for s, state in streams if state == CdcStreamState.CLOSED])
|
|
opened_streams = set([s for s, state in streams if state == CdcStreamState.OPENED])
|
|
|
|
assert closed_streams.issubset(current_streams)
|
|
|
|
current_streams = (current_streams - closed_streams) | opened_streams
|
|
|
|
rows = await cql.run_async(f"SELECT stream_id FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='{table}' AND timestamp = {ts} AND stream_state = {CdcStreamState.CURRENT}")
|
|
current_streams_virtual = set([r.stream_id for r in rows])
|
|
assert current_streams_virtual == current_streams
|
|
|
|
# Read CDC stream information from the virtual tables system.cdc_timestamps and system.cdc_streams
|
|
# and verify it against the internal CDC tables.
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_cdc_virtual_tables(manager: ManagerClient):
|
|
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = await manager.servers_add(1, config=cfg)
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
|
|
assert [] == await cql.run_async("SELECT * FROM system.cdc_timestamps")
|
|
assert [] == await cql.run_async("SELECT * FROM system.cdc_streams")
|
|
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}")
|
|
log_table_id = await manager.get_table_id(ks, "test_scylla_cdc_log")
|
|
|
|
await validate_virtual_tables(manager, servers, cql, log_table_id, ks, 'test')
|
|
|
|
# trigger multiple tablet splits to create new cdc timestamps and streams and validate the
|
|
# virtual tables after each one.
|
|
for _ in range(3):
|
|
prev_history_count = (await cql.run_async(f"SELECT COUNT(*) AS cnt FROM system.cdc_streams_history WHERE table_id={log_table_id}"))[0].cnt
|
|
prev_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test_scylla_cdc_log')
|
|
await cql.run_async(f"ALTER TABLE {ks}.test_scylla_cdc_log WITH tablets = {{'min_tablet_count': {prev_tablet_count * 2}}};")
|
|
|
|
async def tablet_count_is(expected_tablet_count):
|
|
new_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test_scylla_cdc_log')
|
|
if new_tablet_count == expected_tablet_count:
|
|
return True
|
|
await wait_for(lambda: tablet_count_is(prev_tablet_count * 2), time.time() + 60)
|
|
new_history_count = (await cql.run_async(f"SELECT COUNT(*) AS cnt FROM system.cdc_streams_history WHERE table_id={log_table_id}"))[0].cnt
|
|
assert new_history_count > prev_history_count
|
|
|
|
await validate_virtual_tables(manager, servers, cql, log_table_id, ks, 'test')
|
|
|
|
# drop the table and verify the virtual tables are cleared
|
|
await cql.run_async(f"DROP TABLE {ks}.test")
|
|
|
|
assert [] == await cql.run_async("SELECT * FROM system.cdc_timestamps")
|
|
assert [] == await cql.run_async("SELECT * FROM system.cdc_streams")
|
|
await validate_virtual_tables(manager, servers, cql, log_table_id, ks, 'test')
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_cdc_virtual_tables_with_multiple_cdc_tables(manager: ManagerClient):
|
|
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = await manager.servers_add(1, config=cfg)
|
|
cql = manager.get_cql()
|
|
N = 3
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
for i in range(N):
|
|
await cql.run_async(f"CREATE TABLE {ks}.test{i} (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}")
|
|
|
|
log_table_id = [await manager.get_table_id(ks, f"test{i}_scylla_cdc_log") for i in range(N)]
|
|
|
|
virt_tables = await cql.run_async(f"SELECT table_name FROM system.cdc_timestamps WHERE keyspace_name='{ks}' ALLOW FILTERING")
|
|
assert set([r.table_name for r in virt_tables]) == set([f'test{i}' for i in range(N)])
|
|
|
|
for i in range(N):
|
|
await validate_virtual_tables(manager, servers, cql, log_table_id[i], ks, f'test{i}')
|
|
|
|
# drop one table and verify it's removed from the virtual tables and the others remain valid.
|
|
await cql.run_async(f"DROP TABLE {ks}.test{N-1}")
|
|
log_table_id = log_table_id[:-1]
|
|
N = N - 1
|
|
|
|
virt_tables = await cql.run_async(f"SELECT table_name FROM system.cdc_timestamps WHERE keyspace_name='{ks}' ALLOW FILTERING")
|
|
assert set([r.table_name for r in virt_tables]) == set([f'test{i}' for i in range(N)])
|
|
|
|
for i in range(N):
|
|
await validate_virtual_tables(manager, servers, cql, log_table_id[i], ks, f'test{i}')
|
|
|
|
assert [] == await cql.run_async("SELECT * FROM system.cdc_timestamps")
|
|
assert [] == await cql.run_async("SELECT * FROM system.cdc_streams")
|
|
|
|
|
|
# Split the tablets of a CDC table and wait for the CDC streams to split and become synchronized.
|
|
# Then read the sequence of stream sets in two ways - by reading the current stream set for each
|
|
# timestamp, and by applying the differences (closed / opened) in each timestamp, and verify we
|
|
# get the same result.
|
|
# Then trigger tablet merge and do the same, verifying streams are merged.
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_cdc_stream_split_and_merge_basic(manager: ManagerClient):
|
|
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1 }
|
|
servers = await manager.servers_add(1, config=cfg)
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 1}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}} AND tablets = {{'min_tablet_count': 2}}")
|
|
|
|
async def assert_streams_are_synchronized_with_tablets():
|
|
tablet_count = await get_tablet_count(manager, servers[0], ks, 'test_scylla_cdc_log')
|
|
rows = await cql.run_async(f"SELECT toUnixTimestamp(timestamp) AS ts FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='test' ORDER BY timestamp DESC LIMIT 1")
|
|
ts = rows[0].ts # most recent
|
|
rows = await cql.run_async(f"SELECT * FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='test' AND timestamp = {ts} AND stream_state = {CdcStreamState.CURRENT}")
|
|
assert len(rows) == tablet_count
|
|
|
|
await assert_streams_are_synchronized_with_tablets()
|
|
|
|
prev_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test_scylla_cdc_log')
|
|
await cql.run_async(f"ALTER TABLE {ks}.test_scylla_cdc_log WITH tablets = {{'min_tablet_count': {prev_tablet_count * 2}}};")
|
|
|
|
async def tablet_count_is(expected_tablet_count):
|
|
new_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test_scylla_cdc_log')
|
|
if new_tablet_count == expected_tablet_count:
|
|
return True
|
|
|
|
await wait_for(lambda: tablet_count_is(prev_tablet_count * 2), time.time() + 60)
|
|
await assert_streams_are_synchronized_with_tablets()
|
|
|
|
# Reconstruct the stream set in each timestamp in two ways:
|
|
# * read the streams with kind=current
|
|
# * apply diffs
|
|
# and verify we get the same result
|
|
async def validate_stream_history():
|
|
rows = await cql.run_async(f"SELECT toUnixTimestamp(timestamp) AS ts FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='test' ORDER BY timestamp ASC")
|
|
timestamps = [r.ts for r in rows]
|
|
current_streams = set()
|
|
for ts in timestamps:
|
|
rows = await cql.run_async(f"SELECT * FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='test' AND timestamp = {ts} AND stream_state >= 1 and stream_state <= 2")
|
|
closed_streams = set([r.stream_id for r in rows if r.stream_state == CdcStreamState.CLOSED])
|
|
opened_streams = set([r.stream_id for r in rows if r.stream_state == CdcStreamState.OPENED])
|
|
|
|
assert closed_streams.issubset(current_streams)
|
|
|
|
# Construct the stream set by applying the difference in each timestamp, and verify it's the same
|
|
# as reading the current stream set.
|
|
current_streams = (current_streams - closed_streams) | opened_streams
|
|
|
|
rows = await cql.run_async(f"SELECT * FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='test' AND timestamp = {ts} AND stream_state = {CdcStreamState.CURRENT}")
|
|
current_streams2 = set([r.stream_id for r in rows])
|
|
assert current_streams2 == current_streams
|
|
|
|
await validate_stream_history()
|
|
|
|
prev_tablet_count = await get_tablet_count(manager, servers[0], ks, 'test_scylla_cdc_log')
|
|
await cql.run_async(f"ALTER TABLE {ks}.test_scylla_cdc_log WITH tablets = {{'min_tablet_count': {prev_tablet_count // 2}}};")
|
|
await wait_for(lambda: tablet_count_is(prev_tablet_count // 2), time.time() + 60)
|
|
await assert_streams_are_synchronized_with_tablets()
|
|
|
|
await validate_stream_history()
|
|
|
|
# Test that base table writes and their corresponding CDC log writes are co-located on the same replica.
|
|
# We create a 2-node cluster with RF=1, stop one node, and verify that the partitions available
|
|
# on the alive node work correctly for both base table and CDC log.
|
|
@pytest.mark.asyncio
|
|
async def test_cdc_colocation(manager: ManagerClient):
|
|
# Create 2-node cluster to test co-location
|
|
servers = await manager.servers_add(2)
|
|
cql = manager.get_cql()
|
|
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 16}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true}}")
|
|
|
|
# Insert diverse test data to ensure distribution across both nodes
|
|
test_data = {}
|
|
for i in range(100):
|
|
test_data[i] = i * 10
|
|
await cql.run_async(f"INSERT INTO {ks}.test(pk, v) VALUES({i}, {i * 10})")
|
|
|
|
await manager.disable_tablet_balancing()
|
|
|
|
# create map that maps each stream_id to a list of all partitions it contains
|
|
rows = await cql.run_async(f"SELECT \"cdc$stream_id\" as sid, pk FROM {ks}.test_scylla_cdc_log")
|
|
stream_partitions = {}
|
|
pk_to_stream = {}
|
|
for r in rows:
|
|
stream_id = r.sid
|
|
if r.pk in pk_to_stream:
|
|
assert pk_to_stream[r.pk] == stream_id, f"Partition {r.pk} is in multiple streams: {pk_to_stream[r.pk]} and {stream_id}"
|
|
if stream_id not in stream_partitions:
|
|
stream_partitions[stream_id] = []
|
|
stream_partitions[stream_id].append(r.pk)
|
|
|
|
pk_to_host = {}
|
|
rows = await cql.run_async(f"SELECT pk, token(pk) AS tok FROM {ks}.test")
|
|
for r in rows:
|
|
replicas = await get_tablet_replicas(manager, servers[0], ks, 'test', r.tok)
|
|
assert len(replicas) == 1, f"Partition {r.pk} has multiple replicas: {replicas}"
|
|
pk_to_host[r.pk] = replicas[0][0]
|
|
|
|
s0_host_id = await manager.get_host_id(servers[0].server_id)
|
|
s1_host_id = await manager.get_host_id(servers[1].server_id)
|
|
|
|
# Stop first node and test partitions available on second node
|
|
await manager.server_stop(servers[0].server_id)
|
|
|
|
accessible_partitions = set([pk for pk, host_id in pk_to_host.items() if host_id == s1_host_id])
|
|
inaccessible_partitions = set([pk for pk, host_id in pk_to_host.items() if host_id == s0_host_id])
|
|
|
|
for stream_id, partitions in stream_partitions.items():
|
|
partitions_set = set(partitions)
|
|
if partitions_set.issubset(accessible_partitions):
|
|
# Verify that the CDC log entries for this stream are accessible
|
|
rows = await cql.run_async(f"SELECT * FROM {ks}.test_scylla_cdc_log WHERE \"cdc$stream_id\" = 0x{stream_id.hex()}")
|
|
assert len(rows) > 0, f"No CDC log entries found for stream {stream_id.hex()}"
|
|
else:
|
|
assert partitions_set.issubset(inaccessible_partitions), \
|
|
f"Stream {stream_id} partitions {partitions} are not fully contained in inaccessible partitions"
|
|
|
|
await manager.server_start(servers[0].server_id)
|
|
|
|
# Test garbage collection of CDC streams.
|
|
# Create a CDC table with short TTL, then split the tablets to create a new stream set,
|
|
# and wait until the old streams are garbage collected and we have a single stream set again.
|
|
# Verify the remaining stream set is equal to the most recent stream set.
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
|
async def test_cdc_stream_garbage_collection(manager: ManagerClient):
|
|
cfg = { 'tablet_load_stats_refresh_interval_in_seconds': 1, 'error_injections_at_startup': ['short_cdc_streams_gc_refresh_interval' ] }
|
|
servers = await manager.servers_add(1, config=cfg)
|
|
cql = manager.get_cql()
|
|
async with new_test_keyspace(manager, "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} AND tablets = {'initial': 2}") as ks:
|
|
await cql.run_async(f"CREATE TABLE {ks}.test (pk int PRIMARY KEY, v int) WITH cdc={{'enabled': true, 'ttl': 3600}}")
|
|
|
|
rows = await cql.run_async(f"SELECT toUnixTimestamp(timestamp) as ts FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='test'")
|
|
assert len(rows) == 1
|
|
ts1 = rows[0].ts
|
|
|
|
await cql.run_async(f"ALTER TABLE {ks}.test_scylla_cdc_log WITH tablets = {{'min_tablet_count': 4}};")
|
|
|
|
async def new_stream_timestamp_created():
|
|
rows = await cql.run_async(f"SELECT * FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='test' AND timestamp > {ts1}")
|
|
if len(rows) > 0:
|
|
return True
|
|
await wait_for(new_stream_timestamp_created, time.time() + 60)
|
|
|
|
ts_before = await cql.run_async(f"SELECT * FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='test' ORDER BY timestamp DESC")
|
|
streams_before = await cql.run_async(f"SELECT * FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='test'")
|
|
assert len(ts_before) == 2
|
|
|
|
# set short TTL to trigger garbage collection
|
|
await cql.run_async(f"ALTER TABLE {ks}.test WITH cdc = {{'enabled': true, 'ttl': 5}}")
|
|
|
|
async def streams_garbage_collected():
|
|
rows = await cql.run_async(f"SELECT * FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='test'")
|
|
if len(rows) == 1:
|
|
return True
|
|
await wait_for(streams_garbage_collected, time.time() + 60)
|
|
|
|
ts_after = await cql.run_async(f"SELECT * FROM system.cdc_timestamps WHERE keyspace_name='{ks}' AND table_name='test' ORDER BY timestamp DESC")
|
|
streams_after = await cql.run_async(f"SELECT * FROM system.cdc_streams WHERE keyspace_name='{ks}' AND table_name='test'")
|
|
|
|
# now we have a single timestamp, and it is the most recent one
|
|
assert len(ts_after) == 1
|
|
assert ts_after[0].timestamp == ts_before[0].timestamp
|
|
|
|
# now there is a single stream set, and it is the same as the stream set for the most recent timestamp
|
|
assert set([r.stream_id for r in streams_after if r.stream_state == CdcStreamState.CURRENT]) == \
|
|
set([r.stream_id for r in streams_before if r.timestamp == ts_before[0].timestamp and r.stream_state == CdcStreamState.CURRENT])
|
|
|
|
# cdc_streams_history is empty. we have only a base stream set
|
|
rows = await cql.run_async("SELECT * FROM system.cdc_streams_history")
|
|
assert len(rows) == 0
|