diff --git a/test/cluster/test_alternator.py b/test/cluster/test_alternator.py index 4fa7449aad..3bdf009171 100644 --- a/test/cluster/test_alternator.py +++ b/test/cluster/test_alternator.py @@ -30,6 +30,7 @@ import re from test.cluster.util import get_replication from test.pylib.manager_client import ManagerClient from test.pylib.util import wait_for +from test.pylib.rest_client import inject_error from test.pylib.tablets import get_all_tablet_replicas from test.pylib.tablets import get_tablet_replica @@ -1393,3 +1394,135 @@ async def test_alternator_invalid_shard_for_lwt(manager: ManagerClient): stop_event.set() t.join() + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_deferred_stream_enablement_on_tablets(manager: ManagerClient): + """Test that enabling Alternator Streams on a tablet table uses deferred + enablement: the table goes through an ENABLING state (where the intent + is stored but CDC is not yet active) before the topology coordinator + finalizes it to ENABLED. While streams are active (ENABLING or ENABLED), + tablet merges must be blocked but splits must still be allowed. After + streams are disabled, merges must be allowed again. + + The test exercises an elaborate sequence of tablet count changes: + in each of the ENABLING and ENABLED states, first a shrinkage is + attempted (must be blocked), then an increase (must succeed). After + disabling streams, a shrinkage is attempted (must succeed). + """ + server = (await manager.servers_add(1, config=alternator_config))[0] + cql = manager.get_cql() + alternator = get_alternator(server.ip_addr) + + async def get_table_id(ks, table_name): + rows = await cql.run_async( + f"SELECT id FROM system_schema.tables " + f"WHERE keyspace_name='{ks}' AND table_name='{table_name}'") + return rows[0].id + + async def get_tablet_count(table_id): + rows = await cql.run_async( + f"SELECT tablet_count FROM system.tablets " + f"WHERE table_id={table_id} LIMIT 1") + return rows[0].tablet_count + + async def set_tablet_target(ks, table_name, count): + """Set both min and max tablet count to force a specific target.""" + await cql.run_async( + f'ALTER TABLE "{ks}"."{table_name}" ' + f"WITH tablets = {{'min_tablet_count': {count}, 'max_tablet_count': {count}}}") + + async def wait_for_tablet_count(table_id, expected_count): + """Wait for tablet count to reach exact expected_count.""" + async def check(): + count = await get_tablet_count(table_id) + if count == expected_count: + return count + return None + return await wait_for(check, time.time() + 60, period=0.1) + + async def assert_tablet_count_stable(table_id, expected_count, duration=5): + """Assert tablet count stays at expected_count for duration seconds.""" + deadline = time.time() + duration + while time.time() < deadline: + count = await get_tablet_count(table_id) + assert count == expected_count, \ + f"Tablet count changed unexpectedly from {expected_count}" + await asyncio.sleep(0.1) + + # === Phase 1: ENABLING state === + # Hold finalization with an error injection so the table stays in + # ENABLING (enable_requested=true, enabled=false) state. + async with inject_error(manager.api, server.ip_addr, "delay_cdc_stream_finalization"): + table = alternator.create_table( + TableName=unique_table_name(), + Tags=[{'Key': 'system:initial_tablets', 'Value': '4'}], + BillingMode='PAY_PER_REQUEST', + KeySchema=[{'AttributeName': 'p', 'KeyType': 'HASH'}], + AttributeDefinitions=[{'AttributeName': 'p', 'AttributeType': 'S'}]) + try: + # create_table enables Streams immediately, so to test deferred + # Streams enablement, we need to use update_table. + table.update(StreamSpecification={'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}) + # Verify ENABLING state: StreamSpecification is present but + # LatestStreamArn is not (CDC log table does not exist yet). + desc = table.meta.client.describe_table(TableName=table.name)['Table'] + assert 'StreamSpecification' in desc + assert desc['StreamSpecification']['StreamEnabled'] == True + assert desc['StreamSpecification']['StreamViewType'] == 'KEYS_ONLY' + assert 'LatestStreamArn' not in desc + + # Double-enable must be rejected while ENABLING. + with pytest.raises(ClientError, match='ValidationException.*already has an enabled stream'): + table.update(StreamSpecification={ + 'StreamEnabled': True, 'StreamViewType': 'KEYS_ONLY'}) + + ks = f'alternator_{table.name}' + table_id = await get_table_id(ks, table.name) + count = await get_tablet_count(table_id) + assert count == 4 + + # Shrinkage attempt (ENABLING): must be BLOCKED. + # Setting target to 2 would normally trigger a merge (4 -> 2), + # but tablet_merge_blocked prevents it. + await set_tablet_target(ks, table.name, 2) + await assert_tablet_count_stable(table_id, 4, duration=5) + + # Increase (ENABLING): must succeed (4 -> 8). + await set_tablet_target(ks, table.name, 8) + await wait_for_tablet_count(table_id, 8) + except: + table.delete() + raise + # <-- delay_cdc_stream_finalization disabled; finalization proceeds. + + # === Phase 2: Transition to ENABLED === + try: + async def check_stream_enabled(): + desc = table.meta.client.describe_table(TableName=table.name)['Table'] + if 'LatestStreamArn' in desc: + return True + return None + await wait_for(check_stream_enabled, time.time() + 60, period=0.1) + + count = await get_tablet_count(table_id) + assert count == 8 + + # === Phase 3: ENABLED state === + # Shrinkage attempt (ENABLED): must be BLOCKED. + await set_tablet_target(ks, table.name, 4) + await assert_tablet_count_stable(table_id, 8, duration=5) + + # Increase (ENABLED): must succeed (8 -> 16). + await set_tablet_target(ks, table.name, 16) + await wait_for_tablet_count(table_id, 16) + + # === Phase 4: Disable streams, merges must be unblocked === + table.update(StreamSpecification={'StreamEnabled': False}) + + # Shrinkage (streams disabled): must SUCCEED (16 -> 8). + await set_tablet_target(ks, table.name, 8) + await wait_for_tablet_count(table_id, 8) + finally: + table.delete() + table.meta.client.get_waiter('table_not_exists').wait(TableName=table.name)