test/cluster: Test deferred stream enablement on tablet tables

Async cluster test exercising the deferred enablement lifecycle:
ENABLING -> ENABLED -> disabled, verifying tablet merge blocking
and unblocking at each stage. Uses delay_cdc_stream_finalization
error injection and CQL ALTER TABLE with tablet count constraints.

Also adds tablet scheduler config to test_config.yaml (fast refresh
interval, scale factor 1) for reliable tablet count changes.
This commit is contained in:
Piotr Szymaniak
2026-04-08 11:33:12 +02:00
parent 4b6937b570
commit a5d35d2b4c

View File

@@ -30,6 +30,7 @@ import re
from test.cluster.util import get_replication
from test.pylib.manager_client import ManagerClient
from test.pylib.util import wait_for
from test.pylib.rest_client import inject_error
from test.pylib.tablets import get_all_tablet_replicas
from test.pylib.tablets import get_tablet_replica
@@ -1393,3 +1394,135 @@ async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient):
stop_event.set()
t.join()
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_deferred_stream_enablement_on_tablets(manager: ManagerClient):
"""Test that enabling Alternator Streams on a tablet table uses deferred
enablement: the table goes through an ENABLING state (where the intent
is stored but CDC is not yet active) before the topology coordinator
finalizes it to ENABLED. While streams are active (ENABLING or ENABLED),
tablet merges must be blocked but splits must still be allowed. After
streams are disabled, merges must be allowed again.
The test exercises an elaborate sequence of tablet count changes:
in each of the ENABLING and ENABLED states, first a shrinkage is
attempted (must be blocked), then an increase (must succeed). After
disabling streams, a shrinkage is attempted (must succeed).
"""
server = (await manager.servers_add(1, config=alternator_config))[0]
cql = manager.get_cql()
alternator = get_alternator(server.ip_addr)
async def get_table_id(ks, table_name):
rows = await cql.run_async(
f"SELECT id FROM system_schema.tables "
f"WHERE keyspace_name='{ks}' AND table_name='{table_name}'")
return rows[0].id
async def get_tablet_count(table_id):
rows = await cql.run_async(
f"SELECT tablet_count FROM system.tablets "
f"WHERE table_id={table_id} LIMIT 1")
return rows[0].tablet_count
async def set_tablet_target(ks, table_name, count):
"""Set both min and max tablet count to force a specific target."""
await cql.run_async(
f'ALTER TABLE "{ks}"."{table_name}" '
f"WITH tablets = {{'min_tablet_count': {count}, 'max_tablet_count': {count}}}")
async def wait_for_tablet_count(table_id, expected_count):
"""Wait for tablet count to reach exact expected_count."""
async def check():
count = await get_tablet_count(table_id)
if count == expected_count:
return count
return None
return await wait_for(check, time.time() + 60, period=0.1)
async def assert_tablet_count_stable(table_id, expected_count, duration=5):
"""Assert tablet count stays at expected_count for duration seconds."""
deadline = time.time() + duration
while time.time() < deadline:
count = await get_tablet_count(table_id)
assert count == expected_count, \
f"Tablet count changed unexpectedly from {expected_count}"
await asyncio.sleep(0.1)
# === Phase 1: ENABLING state ===
# Hold finalization with an error injection so the table stays in
# ENABLING (enable_requested=true, enabled=false) state.
async with inject_error(manager.api, server.ip_addr, "delay_cdc_stream_finalization"):
table = alternator.create_table(
TableName=unique_table_name(),
Tags=[{'Key': 'system:initial_tablets', 'Value': '4'}],
BillingMode='PAY_PER_REQUEST',
KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}],
AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}])
try:
# create_table enables Streams immediately, so to test deferred
# Streams enablement, we need to use update_table.
table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'})
# Verify ENABLING state: StreamSpecification is present but
# LatestStreamArn is not (CDC log table does not exist yet).
desc = table.meta.client.describe_table(TableName=table.name)['Table']
assert 'StreamSpecification' in desc
assert desc['StreamSpecification']['StreamEnabled'] == True
assert desc['StreamSpecification']['StreamViewType'] == 'KEYS_ONLY'
assert 'LatestStreamArn' not in desc
# Double-enable must be rejected while ENABLING.
with pytest.raises(ClientError, match='ValidationException.*already has an enabled stream'):
table.update(StreamSpecification={
'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'})
ks = f'alternator_{table.name}'
table_id = await get_table_id(ks, table.name)
count = await get_tablet_count(table_id)
assert count == 4
# Shrinkage attempt (ENABLING): must be BLOCKED.
# Setting target to 2 would normally trigger a merge (4 -> 2),
# but tablet_merge_blocked prevents it.
await set_tablet_target(ks, table.name, 2)
await assert_tablet_count_stable(table_id, 4, duration=5)
# Increase (ENABLING): must succeed (4 -> 8).
await set_tablet_target(ks, table.name, 8)
await wait_for_tablet_count(table_id, 8)
except:
table.delete()
raise
# <-- delay_cdc_stream_finalization disabled; finalization proceeds.
# === Phase 2: Transition to ENABLED ===
try:
async def check_stream_enabled():
desc = table.meta.client.describe_table(TableName=table.name)['Table']
if 'LatestStreamArn' in desc:
return True
return None
await wait_for(check_stream_enabled, time.time() + 60, period=0.1)
count = await get_tablet_count(table_id)
assert count == 8
# === Phase 3: ENABLED state ===
# Shrinkage attempt (ENABLED): must be BLOCKED.
await set_tablet_target(ks, table.name, 4)
await assert_tablet_count_stable(table_id, 8, duration=5)
# Increase (ENABLED): must succeed (8 -> 16).
await set_tablet_target(ks, table.name, 16)
await wait_for_tablet_count(table_id, 16)
# === Phase 4: Disable streams, merges must be unblocked ===
table.update(StreamSpecification={'StreamEnabled': False})
# Shrinkage (streams disabled): must SUCCEED (16 -> 8).
await set_tablet_target(ks, table.name, 8)
await wait_for_tablet_count(table_id, 8)
finally:
table.delete()
table.meta.client.get_waiter('table_not_exists').wait(TableName=table.name)