diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 05d987b98f..76d39a5591 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -4237,6 +4237,7 @@ public: , _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker) , _async_gate("topology_coordinator") { + _lifecycle_notifier.register_subscriber(this); _db.get_notifier().register_listener(this); // When the delay_cdc_stream_finalization error injection is disabled // (test releases it), wake the topology coordinator so it retries @@ -4400,6 +4401,7 @@ future topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds( } future<> topology_coordinator::refresh_tablet_load_stats() { + co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min)); auto tm = get_token_metadata_ptr(); locator::load_stats stats; @@ -4723,7 +4725,6 @@ future<> topology_coordinator::run() { co_await _async_gate.close(); co_await std::move(tablet_load_stats_refresher); - co_await _tablet_load_stats_refresh.join(); co_await std::move(cdc_generation_publisher); co_await std::move(cdc_streams_gc); co_await std::move(gossiper_orphan_remover); @@ -4736,6 +4737,8 @@ future<> topology_coordinator::stop() { co_await _db.get_notifier().unregister_listener(this); utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization"); _topo_sm.on_tablet_split_ready = nullptr; + co_await _lifecycle_notifier.unregister_subscriber(this); + co_await _tablet_load_stats_refresh.join(); // if topology_coordinator::run() is aborted either because we are not a // leader anymore, or we are shutting down as a leader, we have to handle @@ -4797,7 +4800,6 @@ future<> run_topology_coordinator( topology_cmd_rpc_tracker}; std::exception_ptr ex; - lifecycle_notifier.register_subscriber(&coordinator); try { rtlogger.info("start topology coordinator fiber"); co_await with_scheduling_group(group0.get_scheduling_group(), [&] { @@ -4818,7 +4820,7 @@ future<> run_topology_coordinator( } on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex)); } - co_await lifecycle_notifier.unregister_subscriber(&coordinator); + co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min)); co_await coordinator.stop(); } diff --git a/test/cluster/test_tablet_stats.py b/test/cluster/test_tablet_stats.py index 903cdade01..33d19d5224 100644 --- a/test/cluster/test_tablet_stats.py +++ b/test/cluster/test_tablet_stats.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 # from test.pylib.manager_client import ManagerClient -from test.cluster.util import get_topology_coordinator, trigger_stepdown +from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table import pytest import logging @@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient): coord3 = await get_topology_coordinator(manager) if coord3: break + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_load_stats_refresh_during_shutdown(manager: ManagerClient): + """Verify that _tablet_load_stats_refresh is properly joined during + topology coordinator shutdown, even when a schema change notification + triggers a refresh between run() completing and stop() being called. + + Reproduces the scenario using two injection points: + - topology_coordinator_pause_before_stop: pauses after run() finishes + but before stop() is called + - refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats() + so it's still in-flight during shutdown + + Without the join in stop(), the refresh task outlives the coordinator + and accesses freed memory. + """ + servers = await manager.servers_add(3) + await manager.get_ready_cql(servers) + + async with new_test_keyspace(manager, + "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: + coord = await get_topology_coordinator(manager) + host_ids = [await manager.get_host_id(s.server_id) for s in servers] + coord_idx = host_ids.index(coord) + coord_server = servers[coord_idx] + + log = await manager.server_open_log(coord_server.server_id) + mark = await log.mark() + + # Injection B: pause between run() returning and stop() being called. + await manager.api.enable_injection( + coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True) + + # Stepdown causes the topology coordinator to abort and shut down. + logger.info("Triggering stepdown on coordinator") + await trigger_stepdown(manager, coord_server) + + # Wait for injection B to fire. The coordinator has finished run() but + # the schema change listener is still registered. + mark, _ = await log.wait_for( + "topology_coordinator_pause_before_stop: waiting", from_mark=mark) + + # Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm. + # Enable it now so it only catches the notification-triggered call. + await manager.api.enable_injection( + coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True) + + # CREATE TABLE fires on_create_column_family on the old coordinator which + # fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task + # via with_scheduling_group on the gossip scheduling group. + logger.info("Issuing CREATE TABLE while coordinator is paused before stop()") + async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False): + # Wait for injection A: refresh_tablet_load_stats() is now blocked before + # accessing _shared_tm. The topology_coordinator is still alive (paused at B). + await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark) + + # Release injection B: coordinator proceeds through stop(). + # Without the fix, stop() returns quickly and run_topology_coordinator + # frees the topology_coordinator frame. With the fix, stop() blocks at + # _tablet_load_stats_refresh.join() until injection A is released. + logger.info("Releasing injection B: coordinator will stop") + await manager.api.message_injection( + coord_server.ip_addr, "topology_coordinator_pause_before_stop") + + # Release injection A: refresh_tablet_load_stats() resumes and accesses + # this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this' + # points to freed memory and ASan detects heap-use-after-free. + logger.info("Releasing injection A: refresh resumes") + await manager.api.message_injection( + coord_server.ip_addr, "refresh_tablet_load_stats_pause") + + # If the bug is present, the node crashed. read_barrier will fail. + await read_barrier(manager.api, coord_server.ip_addr)