Files
scylladb/test/cluster/test_cdc_generation_data.py
Petr Gusev 8b8b7adbe5 raft_group0: split shutdown into abort_and_drain and destroy
Previously, raft_group0::abort() was called in
storage_service::do_drain (introduced in #24418) to
stop the group0 Raft server before destroying local storage.
This was necessary because raft::server depends on storage
(via raft_sys_table_storage and group0_state_machine).

However, this caused issues: services like
sstable_dict_autotrainer and auth::service, which use
group0_client but are not stopped by storage_service,
could trigger use-after-free if raft_group0 was destroyed
too early. This can happen both during normal shutdown
and when 'nodetool drain' is used.

This commit reworks the shutdown logic:
* Introduces abort_and_drain(), which aborts the server
and waits for background tasks to finish, but keeps the
server object alive. Clients will see raft::stopped_error if
they try to access group0 after abort_and_drain().
* Final destruction happens in a separate method destroy(),
called later from main.cc.

The raft_server_for_group::aborted is changed to a
shared_future -- abort_server now returns a future so that
we can wait for it in abort_and_drain(), it should return
the future from the previous abort_server call, which can
happen in the on_background_error callback.

Node startup can fail before reaching storage_service,
in which case ss.drain_on_shutdown() and abort_and_drain()
are never called. To ensure proper cleanup,
abort_and_drain() is called from main.cc before destroy().

Clients of raft_group_registry are expected to call
destroy_server() for the servers they own. Currently,
the only such client is raft_group0, which satisfies
this requirement. As a result,
raft_group_registry::stop_servers() is no longer needed.
Instead, raft_group_registry::stop() now verifies that all
servers have been properly destroyed.
If any remain, it calls on_internal_error().

The call to drain_on_shutdown() in cql_test_env.cc
appears redundant. The only source of raft::server
instances in raft_group_registry is group0_service, and
if group0_service.start() succeeds, both abort_and_drain()
and destroy() are guaranteed to be called during shutdown.
2025-07-25 17:16:14 +02:00

76 lines
2.8 KiB
Python

from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error
from test.cluster.util import check_token_ring_and_group0_consistency
from test.cluster.conftest import skip_mode
import logging
import pytest
import asyncio
logger = logging.getLogger(__name__)
"""
The injection forces the topology coordinator to send CDC generation data in multiple parts,
if it didn't the command size would go over commitlog segment size limit making it impossible to commit and apply the command.
"""
@pytest.mark.asyncio
async def test_send_data_in_parts(manager: ManagerClient):
config = {
'schema_commitlog_segment_size_in_mb': 2
}
first_server = await manager.server_add(config=config)
async with inject_error(manager.api, first_server.ip_addr, 'cdc_generation_mutations_replication'):
async with inject_error(manager.api, first_server.ip_addr,
'cdc_generation_mutations_topology_snapshot_replication'):
await manager.server_add(config=config)
await check_token_ring_and_group0_consistency(manager)
cql = manager.get_cql()
rows = await cql.run_async("SELECT description FROM system.group0_history")
for row in rows:
if row.description.startswith('insert CDC generation data (UUID: ') and row.description.endswith('), part'):
break
else:
pytest.fail("No CDC generation data sent in parts was found")
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_group0_apply_while_node_is_being_shutdown(manager: ManagerClient):
# This a regression test for #24401.
logger.info("Starting s0")
s0 = await manager.server_add(cmdline=['--logger-log-level', 'raft_group0=debug'])
logger.info("Injecting topology_state_load_before_update_cdc into s0")
await manager.api.enable_injection(s0.ip_addr, "topology_state_load_before_update_cdc", False)
logger.info("Starting s1")
s1_start_task = asyncio.create_task(manager.server_add())
logger.info("Waiting for topology_state_load_before_update_cdc on s0")
log = await manager.server_open_log(s0.server_id)
await log.wait_for('topology_state_load_before_update_cdc hit, wait for message')
logger.info("Triggering s0 shutdown")
stop_s0_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id))
logger.info("Waiting for 'Aborting raft group0 service...' on s0")
await log.wait_for('Aborting raft group0 service...')
logger.info("Releasing topology_state_load_before_update_cdc on s0")
await manager.api.message_injection(s0.ip_addr, 'topology_state_load_before_update_cdc')
await stop_s0_task
try:
await s1_start_task
except Exception:
pass # ingore errors, since we don't care
errors = await log.grep_for_errors()
assert errors == []