test: tablet_stats: reproduce shutdown refresh race

The coordinator can receive a schema-change notification after run()
finishes but before stop() unregisters listeners. The test pins that
window with error injections and verifies stop() waits for the refresh
instead of letting it outlive the coordinator.

Test time in dev: 9.51s

Refs SCYLLADB-1728
This commit is contained in:
Andrzej Jackowski
2026-04-28 07:37:45 +02:00
parent 8756f7c068
commit 459e3970cd
2 changed files with 78 additions and 1 deletions

View File

@@ -4401,6 +4401,7 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
}
future<> topology_coordinator::refresh_tablet_load_stats() {
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
auto tm = get_token_metadata_ptr();
locator::load_stats stats;
@@ -4819,6 +4820,7 @@ future<> run_topology_coordinator(
}
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
}
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
co_await coordinator.stop();
}

View File

@@ -4,7 +4,7 @@
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
#
from test.pylib.manager_client import ManagerClient
from test.cluster.util import get_topology_coordinator, trigger_stepdown
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
import pytest
import logging
@@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
coord3 = await get_topology_coordinator(manager)
if coord3:
break
@pytest.mark.asyncio
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
"""Verify that _tablet_load_stats_refresh is properly joined during
topology coordinator shutdown, even when a schema change notification
triggers a refresh between run() completing and stop() being called.
Reproduces the scenario using two injection points:
- topology_coordinator_pause_before_stop: pauses after run() finishes
but before stop() is called
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
so it's still in-flight during shutdown
Without the join in stop(), the refresh task outlives the coordinator
and accesses freed memory.
"""
servers = await manager.servers_add(3)
await manager.get_ready_cql(servers)
async with new_test_keyspace(manager,
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
coord = await get_topology_coordinator(manager)
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
coord_idx = host_ids.index(coord)
coord_server = servers[coord_idx]
log = await manager.server_open_log(coord_server.server_id)
mark = await log.mark()
# Injection B: pause between run() returning and stop() being called.
await manager.api.enable_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
# Stepdown causes the topology coordinator to abort and shut down.
logger.info("Triggering stepdown on coordinator")
await trigger_stepdown(manager, coord_server)
# Wait for injection B to fire. The coordinator has finished run() but
# the schema change listener is still registered.
mark, _ = await log.wait_for(
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
# Enable it now so it only catches the notification-triggered call.
await manager.api.enable_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
# CREATE TABLE fires on_create_column_family on the old coordinator which
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
# via with_scheduling_group on the gossip scheduling group.
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
# Release injection B: coordinator proceeds through stop().
# Without the fix, stop() returns quickly and run_topology_coordinator
# frees the topology_coordinator frame. With the fix, stop() blocks at
# _tablet_load_stats_refresh.join() until injection A is released.
logger.info("Releasing injection B: coordinator will stop")
await manager.api.message_injection(
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
# Release injection A: refresh_tablet_load_stats() resumes and accesses
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
# points to freed memory and ASan detects heap-use-after-free.
logger.info("Releasing injection A: refresh resumes")
await manager.api.message_injection(
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
# If the bug is present, the node crashed. read_barrier will fail.
await read_barrier(manager.api, coord_server.ip_addr)