Files
scylladb/test/cluster/test_cdc_generation_data.py
Pavel Emelyanov cd7d9a63bc error_injection: Convert handler-style breakpoints to wait_for_message sugar
Replace verbose handler lambdas that only log and call
wait_for_message() with the equivalent one-liner breakpoint sugar.
The behavior is identical -- the sugar produces the same log messages
in the format "{name}: waiting for message" / "{name}: message received".

Update Python tests that waited for the old ad-hoc log messages to
match the new standardized format.

Converted injections:
 - topology_state_load_before_update_cdc (storage_service.cc)
 - migration_streaming_wait x2 (storage_service.cc)
 - pause_after_streaming_tablet (storage_service.cc)
 - cdc_generation_publisher_fiber (topology_coordinator.cc)
 - wait_after_tablet_cleanup (topology_coordinator.cc)
 - fast_orphan_removal_fiber (topology_coordinator.cc)
 - split_storage_groups_wait (table.cc)
 - wait_before_stop_compaction_groups (table.cc)
 - tasks_vt_get_children (task_manager.cc)
 - truncate_compaction_disabled_wait (database.cc)

Signed-off-by: Pavel Emelyanov <xemul@scylladb.com>

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-26 15:01:01 +03:00

74 lines
2.8 KiB
Python

from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.cluster.util import check_token_ring_and_group0_consistency
import logging
import pytest
import asyncio
logger = logging.getLogger(__name__)
"""
The injection forces the topology coordinator to send CDC generation data in multiple parts,
if it didn't the command size would go over commitlog segment size limit making it impossible to commit and apply the command.
"""
async def test_send_data_in_parts(manager: ManagerClient):
config = {
'schema_commitlog_segment_size_in_mb': 2
}
first_server = await manager.server_add(config=config)
async with inject_error(manager.api, first_server.ip_addr, 'cdc_generation_mutations_replication'):
async with inject_error(manager.api, first_server.ip_addr,
'cdc_generation_mutations_topology_snapshot_replication'):
await manager.server_add(config=config)
await check_token_ring_and_group0_consistency(manager)
cql = manager.get_cql()
rows = await cql.run_async("SELECT description FROM system.group0_history")
for row in rows:
if row.description.startswith('insert CDC generation data (UUID: ') and row.description.endswith('), part'):
break
else:
pytest.fail("No CDC generation data sent in parts was found")
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_group0_apply_while_node_is_being_shutdown(manager: ManagerClient):
# This a regression test for #24401.
logger.info("Starting s0")
s0 = await manager.server_add(cmdline=['--logger-log-level', 'raft_group0=debug'])
logger.info("Injecting topology_state_load_before_update_cdc into s0")
await manager.api.enable_injection(s0.ip_addr, "topology_state_load_before_update_cdc", False)
logger.info("Starting s1")
s1_start_task = asyncio.create_task(manager.server_add())
logger.info("Waiting for topology_state_load_before_update_cdc on s0")
log = await manager.server_open_log(s0.server_id)
await log.wait_for('topology_state_load_before_update_cdc: waiting for message')
logger.info("Triggering s0 shutdown")
stop_s0_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id))
logger.info("Waiting for 'Aborting raft group0 service...' on s0")
await log.wait_for('Aborting raft group0 service...')
logger.info("Releasing topology_state_load_before_update_cdc on s0")
await manager.api.message_injection(s0.ip_addr, 'topology_state_load_before_update_cdc')
await stop_s0_task
try:
await s1_start_task
except Exception:
pass # ingore errors, since we don't care
errors = await log.grep_for_errors()
errors = await manager.filter_errors(errors)
assert errors == []