From 8756f7c068609ed0412906347eacc9c54b217a16 Mon Sep 17 00:00:00 2001 From: Andrzej Jackowski Date: Tue, 28 Apr 2026 07:37:28 +0200 Subject: [PATCH 1/2] topology_coordinator: join tablet load stats refresh in stop() Commit 2b7aa3211d made schema changes trigger tablet load stats refreshes in the background. A notification can still arrive after run() stops the periodic refresher and before the coordinator object is destroyed. Move lifecycle subscription cleanup to stop() and join the serialized refresh there after unregistering refresh trigger sources. This keeps the coordinator alive until notification-triggered refresh work has completed. Fixes SCYLLADB-1728 --- service/topology_coordinator.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index 05d987b98f..cff7cb3118 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -4237,6 +4237,7 @@ public: , _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker) , _async_gate("topology_coordinator") { + _lifecycle_notifier.register_subscriber(this); _db.get_notifier().register_listener(this); // When the delay_cdc_stream_finalization error injection is disabled // (test releases it), wake the topology coordinator so it retries @@ -4723,7 +4724,6 @@ future<> topology_coordinator::run() { co_await _async_gate.close(); co_await std::move(tablet_load_stats_refresher); - co_await _tablet_load_stats_refresh.join(); co_await std::move(cdc_generation_publisher); co_await std::move(cdc_streams_gc); co_await std::move(gossiper_orphan_remover); @@ -4736,6 +4736,8 @@ future<> topology_coordinator::stop() { co_await _db.get_notifier().unregister_listener(this); utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization"); _topo_sm.on_tablet_split_ready = nullptr; + co_await _lifecycle_notifier.unregister_subscriber(this); + co_await _tablet_load_stats_refresh.join(); // if topology_coordinator::run() is aborted either because we are not a // leader anymore, or we are shutting down as a leader, we have to handle @@ -4797,7 +4799,6 @@ future<> run_topology_coordinator( topology_cmd_rpc_tracker}; std::exception_ptr ex; - lifecycle_notifier.register_subscriber(&coordinator); try { rtlogger.info("start topology coordinator fiber"); co_await with_scheduling_group(group0.get_scheduling_group(), [&] { @@ -4818,7 +4819,6 @@ future<> run_topology_coordinator( } on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex)); } - co_await lifecycle_notifier.unregister_subscriber(&coordinator); co_await coordinator.stop(); } From 459e3970cd14e8b7eb5da683cf7ac3cf4858135c Mon Sep 17 00:00:00 2001 From: Andrzej Jackowski Date: Tue, 28 Apr 2026 07:37:45 +0200 Subject: [PATCH 2/2] test: tablet_stats: reproduce shutdown refresh race The coordinator can receive a schema-change notification after run() finishes but before stop() unregisters listeners. The test pins that window with error injections and verifies stop() waits for the refresh instead of letting it outlive the coordinator. Test time in dev: 9.51s Refs SCYLLADB-1728 --- service/topology_coordinator.cc | 2 + test/cluster/test_tablet_stats.py | 77 ++++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index cff7cb3118..76d39a5591 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -4401,6 +4401,7 @@ future topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds( } future<> topology_coordinator::refresh_tablet_load_stats() { + co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min)); auto tm = get_token_metadata_ptr(); locator::load_stats stats; @@ -4819,6 +4820,7 @@ future<> run_topology_coordinator( } on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex)); } + co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min)); co_await coordinator.stop(); } diff --git a/test/cluster/test_tablet_stats.py b/test/cluster/test_tablet_stats.py index 903cdade01..33d19d5224 100644 --- a/test/cluster/test_tablet_stats.py +++ b/test/cluster/test_tablet_stats.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 # from test.pylib.manager_client import ManagerClient -from test.cluster.util import get_topology_coordinator, trigger_stepdown +from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table import pytest import logging @@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient): coord3 = await get_topology_coordinator(manager) if coord3: break + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_load_stats_refresh_during_shutdown(manager: ManagerClient): + """Verify that _tablet_load_stats_refresh is properly joined during + topology coordinator shutdown, even when a schema change notification + triggers a refresh between run() completing and stop() being called. + + Reproduces the scenario using two injection points: + - topology_coordinator_pause_before_stop: pauses after run() finishes + but before stop() is called + - refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats() + so it's still in-flight during shutdown + + Without the join in stop(), the refresh task outlives the coordinator + and accesses freed memory. + """ + servers = await manager.servers_add(3) + await manager.get_ready_cql(servers) + + async with new_test_keyspace(manager, + "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: + coord = await get_topology_coordinator(manager) + host_ids = [await manager.get_host_id(s.server_id) for s in servers] + coord_idx = host_ids.index(coord) + coord_server = servers[coord_idx] + + log = await manager.server_open_log(coord_server.server_id) + mark = await log.mark() + + # Injection B: pause between run() returning and stop() being called. + await manager.api.enable_injection( + coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True) + + # Stepdown causes the topology coordinator to abort and shut down. + logger.info("Triggering stepdown on coordinator") + await trigger_stepdown(manager, coord_server) + + # Wait for injection B to fire. The coordinator has finished run() but + # the schema change listener is still registered. + mark, _ = await log.wait_for( + "topology_coordinator_pause_before_stop: waiting", from_mark=mark) + + # Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm. + # Enable it now so it only catches the notification-triggered call. + await manager.api.enable_injection( + coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True) + + # CREATE TABLE fires on_create_column_family on the old coordinator which + # fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task + # via with_scheduling_group on the gossip scheduling group. + logger.info("Issuing CREATE TABLE while coordinator is paused before stop()") + async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False): + # Wait for injection A: refresh_tablet_load_stats() is now blocked before + # accessing _shared_tm. The topology_coordinator is still alive (paused at B). + await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark) + + # Release injection B: coordinator proceeds through stop(). + # Without the fix, stop() returns quickly and run_topology_coordinator + # frees the topology_coordinator frame. With the fix, stop() blocks at + # _tablet_load_stats_refresh.join() until injection A is released. + logger.info("Releasing injection B: coordinator will stop") + await manager.api.message_injection( + coord_server.ip_addr, "topology_coordinator_pause_before_stop") + + # Release injection A: refresh_tablet_load_stats() resumes and accesses + # this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this' + # points to freed memory and ASan detects heap-use-after-free. + logger.info("Releasing injection A: refresh resumes") + await manager.api.message_injection( + coord_server.ip_addr, "refresh_tablet_load_stats_pause") + + # If the bug is present, the node crashed. read_barrier will fail. + await read_barrier(manager.api, coord_server.ip_addr)