Previously, raft_group0::abort() was called in storage_service::do_drain (introduced in #24418) to stop the group0 Raft server before destroying local storage. This was necessary because raft::server depends on storage (via raft_sys_table_storage and group0_state_machine). However, this caused issues: services like sstable_dict_autotrainer and auth::service, which use group0_client but are not stopped by storage_service, could trigger use-after-free if raft_group0 was destroyed too early. This can happen both during normal shutdown and when 'nodetool drain' is used. This commit reworks the shutdown logic: * Introduces abort_and_drain(), which aborts the server and waits for background tasks to finish, but keeps the server object alive. Clients will see raft::stopped_error if they try to access group0 after abort_and_drain(). * Final destruction happens in a separate method destroy(), called later from main.cc. The raft_server_for_group::aborted is changed to a shared_future -- abort_server now returns a future so that we can wait for it in abort_and_drain(), it should return the future from the previous abort_server call, which can happen in the on_background_error callback. Node startup can fail before reaching storage_service, in which case ss.drain_on_shutdown() and abort_and_drain() are never called. To ensure proper cleanup, abort_and_drain() is called from main.cc before destroy(). Clients of raft_group_registry are expected to call destroy_server() for the servers they own. Currently, the only such client is raft_group0, which satisfies this requirement. As a result, raft_group_registry::stop_servers() is no longer needed. Instead, raft_group_registry::stop() now verifies that all servers have been properly destroyed. If any remain, it calls on_internal_error(). The call to drain_on_shutdown() in cql_test_env.cc appears redundant. The only source of raft::server instances in raft_group_registry is group0_service, and if group0_service.start() succeeds, both abort_and_drain() and destroy() are guaranteed to be called during shutdown.
76 lines
2.8 KiB
Python
76 lines
2.8 KiB
Python
from test.pylib.manager_client import ManagerClient
|
|
from test.pylib.rest_client import inject_error
|
|
from test.cluster.util import check_token_ring_and_group0_consistency
|
|
from test.cluster.conftest import skip_mode
|
|
import logging
|
|
import pytest
|
|
import asyncio
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
"""
|
|
The injection forces the topology coordinator to send CDC generation data in multiple parts,
|
|
if it didn't the command size would go over commitlog segment size limit making it impossible to commit and apply the command.
|
|
"""
|
|
@pytest.mark.asyncio
|
|
async def test_send_data_in_parts(manager: ManagerClient):
|
|
config = {
|
|
'schema_commitlog_segment_size_in_mb': 2
|
|
}
|
|
|
|
first_server = await manager.server_add(config=config)
|
|
|
|
async with inject_error(manager.api, first_server.ip_addr, 'cdc_generation_mutations_replication'):
|
|
async with inject_error(manager.api, first_server.ip_addr,
|
|
'cdc_generation_mutations_topology_snapshot_replication'):
|
|
await manager.server_add(config=config)
|
|
|
|
await check_token_ring_and_group0_consistency(manager)
|
|
|
|
cql = manager.get_cql()
|
|
rows = await cql.run_async("SELECT description FROM system.group0_history")
|
|
|
|
for row in rows:
|
|
if row.description.startswith('insert CDC generation data (UUID: ') and row.description.endswith('), part'):
|
|
break
|
|
else:
|
|
pytest.fail("No CDC generation data sent in parts was found")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@skip_mode('release', 'error injections are not supported in release mode')
|
|
async def test_group0_apply_while_node_is_being_shutdown(manager: ManagerClient):
|
|
# This a regression test for #24401.
|
|
|
|
logger.info("Starting s0")
|
|
s0 = await manager.server_add(cmdline=['--logger-log-level', 'raft_group0=debug'])
|
|
|
|
logger.info("Injecting topology_state_load_before_update_cdc into s0")
|
|
await manager.api.enable_injection(s0.ip_addr, "topology_state_load_before_update_cdc", False)
|
|
|
|
logger.info("Starting s1")
|
|
s1_start_task = asyncio.create_task(manager.server_add())
|
|
|
|
logger.info("Waiting for topology_state_load_before_update_cdc on s0")
|
|
log = await manager.server_open_log(s0.server_id)
|
|
await log.wait_for('topology_state_load_before_update_cdc hit, wait for message')
|
|
|
|
logger.info("Triggering s0 shutdown")
|
|
stop_s0_task = asyncio.create_task(manager.server_stop_gracefully(s0.server_id))
|
|
|
|
logger.info("Waiting for 'Aborting raft group0 service...' on s0")
|
|
await log.wait_for('Aborting raft group0 service...')
|
|
|
|
logger.info("Releasing topology_state_load_before_update_cdc on s0")
|
|
await manager.api.message_injection(s0.ip_addr, 'topology_state_load_before_update_cdc')
|
|
|
|
await stop_s0_task
|
|
try:
|
|
await s1_start_task
|
|
except Exception:
|
|
pass # ingore errors, since we don't care
|
|
|
|
errors = await log.grep_for_errors()
|
|
assert errors == []
|