From 459e3970cd14e8b7eb5da683cf7ac3cf4858135c Mon Sep 17 00:00:00 2001 From: Andrzej Jackowski Date: Tue, 28 Apr 2026 07:37:45 +0200 Subject: [PATCH] test: tablet_stats: reproduce shutdown refresh race The coordinator can receive a schema-change notification after run() finishes but before stop() unregisters listeners. The test pins that window with error injections and verifies stop() waits for the refresh instead of letting it outlive the coordinator. Test time in dev: 9.51s Refs SCYLLADB-1728 --- service/topology_coordinator.cc | 2 + test/cluster/test_tablet_stats.py | 77 ++++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc index cff7cb3118..76d39a5591 100644 --- a/service/topology_coordinator.cc +++ b/service/topology_coordinator.cc @@ -4401,6 +4401,7 @@ future topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds( } future<> topology_coordinator::refresh_tablet_load_stats() { + co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min)); auto tm = get_token_metadata_ptr(); locator::load_stats stats; @@ -4819,6 +4820,7 @@ future<> run_topology_coordinator( } on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex)); } + co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min)); co_await coordinator.stop(); } diff --git a/test/cluster/test_tablet_stats.py b/test/cluster/test_tablet_stats.py index 903cdade01..33d19d5224 100644 --- a/test/cluster/test_tablet_stats.py +++ b/test/cluster/test_tablet_stats.py @@ -4,7 +4,7 @@ # SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1 # from test.pylib.manager_client import ManagerClient -from test.cluster.util import get_topology_coordinator, trigger_stepdown +from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table import pytest import logging @@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient): coord3 = await get_topology_coordinator(manager) if coord3: break + + +@pytest.mark.asyncio +@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode') +async def test_load_stats_refresh_during_shutdown(manager: ManagerClient): + """Verify that _tablet_load_stats_refresh is properly joined during + topology coordinator shutdown, even when a schema change notification + triggers a refresh between run() completing and stop() being called. + + Reproduces the scenario using two injection points: + - topology_coordinator_pause_before_stop: pauses after run() finishes + but before stop() is called + - refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats() + so it's still in-flight during shutdown + + Without the join in stop(), the refresh task outlives the coordinator + and accesses freed memory. + """ + servers = await manager.servers_add(3) + await manager.get_ready_cql(servers) + + async with new_test_keyspace(manager, + "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks: + coord = await get_topology_coordinator(manager) + host_ids = [await manager.get_host_id(s.server_id) for s in servers] + coord_idx = host_ids.index(coord) + coord_server = servers[coord_idx] + + log = await manager.server_open_log(coord_server.server_id) + mark = await log.mark() + + # Injection B: pause between run() returning and stop() being called. + await manager.api.enable_injection( + coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True) + + # Stepdown causes the topology coordinator to abort and shut down. + logger.info("Triggering stepdown on coordinator") + await trigger_stepdown(manager, coord_server) + + # Wait for injection B to fire. The coordinator has finished run() but + # the schema change listener is still registered. + mark, _ = await log.wait_for( + "topology_coordinator_pause_before_stop: waiting", from_mark=mark) + + # Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm. + # Enable it now so it only catches the notification-triggered call. + await manager.api.enable_injection( + coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True) + + # CREATE TABLE fires on_create_column_family on the old coordinator which + # fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task + # via with_scheduling_group on the gossip scheduling group. + logger.info("Issuing CREATE TABLE while coordinator is paused before stop()") + async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False): + # Wait for injection A: refresh_tablet_load_stats() is now blocked before + # accessing _shared_tm. The topology_coordinator is still alive (paused at B). + await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark) + + # Release injection B: coordinator proceeds through stop(). + # Without the fix, stop() returns quickly and run_topology_coordinator + # frees the topology_coordinator frame. With the fix, stop() blocks at + # _tablet_load_stats_refresh.join() until injection A is released. + logger.info("Releasing injection B: coordinator will stop") + await manager.api.message_injection( + coord_server.ip_addr, "topology_coordinator_pause_before_stop") + + # Release injection A: refresh_tablet_load_stats() resumes and accesses + # this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this' + # points to freed memory and ASan detects heap-use-after-free. + logger.info("Releasing injection A: refresh resumes") + await manager.api.message_injection( + coord_server.ip_addr, "refresh_tablet_load_stats_pause") + + # If the bug is present, the node crashed. read_barrier will fail. + await read_barrier(manager.api, coord_server.ip_addr)