mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-28 20:27:03 +00:00
Merge 'topology_coordinator: join tablet load stats refresh in stop()' from Andrzej Jackowski
Commit2b7aa32(topology_coordinator: Refresh load stats after table is created or altered) registered topology_coordinator as a schema change listener and added on_create_column_family which fire-and-forgets _tablet_load_stats_refresh.trigger(). The triggered task runs on the gossip scheduling group via with_scheduling_group and accesses the topology_coordinator via 'this'. stop() unregisters the listener but does not wait for any in-flight refresh task. If a notification fires between _tablet_load_stats_refresh.join() in run() and unregister_listener in stop(), the scheduled task can outlive the topology_coordinator and access freed memory after run_topology_coordinator's coroutine frame is destroyed. Wait for the refresh to complete in stop() after unregistering the listener, ensuring no task can fire after destruction. Fixes SCYLLADB-1728 Backport to 2026.1 and 2026.2, because the issue was introduced in2b7aa32Closes scylladb/scylladb#29653 * https://github.com/scylladb/scylladb: test: tablet_stats: reproduce shutdown refresh race topology_coordinator: join tablet load stats refresh in stop()
This commit is contained in:
@@ -4237,6 +4237,7 @@ public:
|
||||
, _topology_cmd_rpc_tracker(topology_cmd_rpc_tracker)
|
||||
, _async_gate("topology_coordinator")
|
||||
{
|
||||
_lifecycle_notifier.register_subscriber(this);
|
||||
_db.get_notifier().register_listener(this);
|
||||
// When the delay_cdc_stream_finalization error injection is disabled
|
||||
// (test releases it), wake the topology coordinator so it retries
|
||||
@@ -4400,6 +4401,7 @@ future<bool> topology_coordinator::maybe_retry_failed_rf_change_tablet_rebuilds(
|
||||
}
|
||||
|
||||
future<> topology_coordinator::refresh_tablet_load_stats() {
|
||||
co_await utils::get_local_injector().inject("refresh_tablet_load_stats_pause", utils::wait_for_message(5min));
|
||||
auto tm = get_token_metadata_ptr();
|
||||
|
||||
locator::load_stats stats;
|
||||
@@ -4723,7 +4725,6 @@ future<> topology_coordinator::run() {
|
||||
|
||||
co_await _async_gate.close();
|
||||
co_await std::move(tablet_load_stats_refresher);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
co_await std::move(cdc_generation_publisher);
|
||||
co_await std::move(cdc_streams_gc);
|
||||
co_await std::move(gossiper_orphan_remover);
|
||||
@@ -4736,6 +4737,8 @@ future<> topology_coordinator::stop() {
|
||||
co_await _db.get_notifier().unregister_listener(this);
|
||||
utils::get_local_injector().unregister_on_disable("delay_cdc_stream_finalization");
|
||||
_topo_sm.on_tablet_split_ready = nullptr;
|
||||
co_await _lifecycle_notifier.unregister_subscriber(this);
|
||||
co_await _tablet_load_stats_refresh.join();
|
||||
|
||||
// if topology_coordinator::run() is aborted either because we are not a
|
||||
// leader anymore, or we are shutting down as a leader, we have to handle
|
||||
@@ -4797,7 +4800,6 @@ future<> run_topology_coordinator(
|
||||
topology_cmd_rpc_tracker};
|
||||
|
||||
std::exception_ptr ex;
|
||||
lifecycle_notifier.register_subscriber(&coordinator);
|
||||
try {
|
||||
rtlogger.info("start topology coordinator fiber");
|
||||
co_await with_scheduling_group(group0.get_scheduling_group(), [&] {
|
||||
@@ -4818,7 +4820,7 @@ future<> run_topology_coordinator(
|
||||
}
|
||||
on_fatal_internal_error(rtlogger, format("unhandled exception in topology_coordinator::run: {}", ex));
|
||||
}
|
||||
co_await lifecycle_notifier.unregister_subscriber(&coordinator);
|
||||
co_await utils::get_local_injector().inject("topology_coordinator_pause_before_stop", utils::wait_for_message(5min));
|
||||
co_await coordinator.stop();
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.1
|
||||
#
|
||||
from test.pylib.manager_client import ManagerClient
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown
|
||||
from test.cluster.util import get_topology_coordinator, trigger_stepdown, new_test_keyspace, new_test_table
|
||||
|
||||
import pytest
|
||||
import logging
|
||||
@@ -83,3 +83,78 @@ async def test_load_stats_on_coordinator_failover(manager: ManagerClient):
|
||||
coord3 = await get_topology_coordinator(manager)
|
||||
if coord3:
|
||||
break
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skip_mode(mode='release', reason='error injections are not supported in release mode')
|
||||
async def test_load_stats_refresh_during_shutdown(manager: ManagerClient):
|
||||
"""Verify that _tablet_load_stats_refresh is properly joined during
|
||||
topology coordinator shutdown, even when a schema change notification
|
||||
triggers a refresh between run() completing and stop() being called.
|
||||
|
||||
Reproduces the scenario using two injection points:
|
||||
- topology_coordinator_pause_before_stop: pauses after run() finishes
|
||||
but before stop() is called
|
||||
- refresh_tablet_load_stats_pause: holds refresh_tablet_load_stats()
|
||||
so it's still in-flight during shutdown
|
||||
|
||||
Without the join in stop(), the refresh task outlives the coordinator
|
||||
and accesses freed memory.
|
||||
"""
|
||||
servers = await manager.servers_add(3)
|
||||
await manager.get_ready_cql(servers)
|
||||
|
||||
async with new_test_keyspace(manager,
|
||||
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1}") as ks:
|
||||
coord = await get_topology_coordinator(manager)
|
||||
host_ids = [await manager.get_host_id(s.server_id) for s in servers]
|
||||
coord_idx = host_ids.index(coord)
|
||||
coord_server = servers[coord_idx]
|
||||
|
||||
log = await manager.server_open_log(coord_server.server_id)
|
||||
mark = await log.mark()
|
||||
|
||||
# Injection B: pause between run() returning and stop() being called.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop", one_shot=True)
|
||||
|
||||
# Stepdown causes the topology coordinator to abort and shut down.
|
||||
logger.info("Triggering stepdown on coordinator")
|
||||
await trigger_stepdown(manager, coord_server)
|
||||
|
||||
# Wait for injection B to fire. The coordinator has finished run() but
|
||||
# the schema change listener is still registered.
|
||||
mark, _ = await log.wait_for(
|
||||
"topology_coordinator_pause_before_stop: waiting", from_mark=mark)
|
||||
|
||||
# Injection A: block refresh_tablet_load_stats() before it accesses _shared_tm.
|
||||
# Enable it now so it only catches the notification-triggered call.
|
||||
await manager.api.enable_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause", one_shot=True)
|
||||
|
||||
# CREATE TABLE fires on_create_column_family on the old coordinator which
|
||||
# fire-and-forgets _tablet_load_stats_refresh.trigger() scheduling a task
|
||||
# via with_scheduling_group on the gossip scheduling group.
|
||||
logger.info("Issuing CREATE TABLE while coordinator is paused before stop()")
|
||||
async with new_test_table(manager, ks, "pk int PRIMARY KEY", reuse_tables=False):
|
||||
# Wait for injection A: refresh_tablet_load_stats() is now blocked before
|
||||
# accessing _shared_tm. The topology_coordinator is still alive (paused at B).
|
||||
await log.wait_for("refresh_tablet_load_stats_pause: waiting", from_mark=mark)
|
||||
|
||||
# Release injection B: coordinator proceeds through stop().
|
||||
# Without the fix, stop() returns quickly and run_topology_coordinator
|
||||
# frees the topology_coordinator frame. With the fix, stop() blocks at
|
||||
# _tablet_load_stats_refresh.join() until injection A is released.
|
||||
logger.info("Releasing injection B: coordinator will stop")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "topology_coordinator_pause_before_stop")
|
||||
|
||||
# Release injection A: refresh_tablet_load_stats() resumes and accesses
|
||||
# this->_shared_tm via get_token_metadata_ptr(). Without the fix, 'this'
|
||||
# points to freed memory and ASan detects heap-use-after-free.
|
||||
logger.info("Releasing injection A: refresh resumes")
|
||||
await manager.api.message_injection(
|
||||
coord_server.ip_addr, "refresh_tablet_load_stats_pause")
|
||||
|
||||
# If the bug is present, the node crashed. read_barrier will fail.
|
||||
await read_barrier(manager.api, coord_server.ip_addr)
|
||||
|
||||
Reference in New Issue
Block a user