From bee5f63cb62a22930efb8f2520cd9e7d03559082 Mon Sep 17 00:00:00 2001 From: Gleb Natapov Date: Tue, 6 Jan 2026 14:29:47 +0200 Subject: [PATCH] topology coordinator: complete pending operation for a replaced node A replaced node may have pending operation on it. The replace operation will move the node into the 'left' state and the request will never be completed. More over the code does not expect left node to have a request. It will try to process the request and will crash because the node for the request will not be found. The patch checks is the replaced node has peening request and completes it with failure. It also changes topology loading code to skip requests for nodes that are in a left state. This is not strictly needed, but makes the code more robust. Fixes #27990 Closes scylladb/scylladb#28009 --- db/system_keyspace.cc | 2 +- service/storage_service.cc | 26 ++++- service/storage_service.hh | 3 +- .../test_decommission_kill_then_replace.py | 94 +++++++++++++++++++ 4 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 test/cluster/test_decommission_kill_then_replace.py diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 039bd2ef4a..7e410633d1 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -3295,7 +3295,7 @@ future system_keyspace::load_topology_state(const std::unorde supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features")); } - if (row.has("topology_request")) { + if (row.has("topology_request") && nstate != service::node_state::left) { auto req = service::topology_request_from_string(row.get_as("topology_request")); ret.requests.emplace(host_id, req); switch(req) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 2e277dc164..1ea9ceb817 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1299,7 +1299,7 @@ std::unordered_set storage_service::ignored_nodes_from_join_par return ignored_nodes; } -utils::chunked_vector storage_service::build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp) { +utils::chunked_vector storage_service::build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp, utils::UUID old_request_id) { topology_mutation_builder builder(write_timestamp); auto ignored_nodes = ignored_nodes_from_join_params(params); @@ -1339,7 +1339,21 @@ utils::chunked_vector storage_service::build_mutation_from_j .set("done", false); rtbuilder.set("request_type", params.replaced_id ? topology_request::replace : topology_request::join); - return {builder.build(), rtbuilder.build()}; + utils::chunked_vector muts = {builder.build(), rtbuilder.build()}; + + if (old_request_id) { + // If this is a replace operation, we need to mark the old request for replaced node as done if exists + // It should be safe to do so here since if the request is still pending it means the topology coordinator does not + // work on it yet, and if the topology coordinator will pick it up meanwhile the write of this new state will fail + topology_mutation_builder builder(write_timestamp); + builder.with_node(*params.replaced_id).del("topology_request"); + topology_request_tracking_mutation_builder rtbuilder(old_request_id, _feature_service.topology_requests_type_column); + rtbuilder.done("node was replaced before the request could start"); + muts.push_back(builder.build()); + muts.push_back(rtbuilder.build()); + } + + return muts; } class join_node_rpc_handshaker : public service::group0_handshaker { @@ -7624,6 +7638,8 @@ future storage_service::join_node_request_handler(join co_return result; } + utils::UUID old_request_id; + if (params.replaced_id) { auto rhid = locator::host_id{params.replaced_id->uuid()}; if (is_me(rhid) || _gossiper.is_alive(rhid)) { @@ -7661,9 +7677,13 @@ future storage_service::join_node_request_handler(join }; co_return result; } + + if (_topology_state_machine._topology.requests.find(*params.replaced_id) != _topology_state_machine._topology.requests.end()) { + old_request_id = replaced_it->second.request_id; + } } - auto mutation = build_mutation_from_join_params(params, guard.write_timestamp()); + auto mutation = build_mutation_from_join_params(params, guard.write_timestamp(), old_request_id); topology_change change{{std::move(mutation)}}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, diff --git a/service/storage_service.hh b/service/storage_service.hh index 6f77d583b0..65f307b3c0 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -1120,7 +1120,8 @@ private: // raft_group0_client::_read_apply_mutex must be held future<> view_building_state_load(); - utils::chunked_vector build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp); + utils::chunked_vector build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp, + utils::UUID old_request_id = utils::UUID{}); std::unordered_set ignored_nodes_from_join_params(const join_node_request_params& params); future join_node_request_handler(join_node_request_params params); diff --git a/test/cluster/test_decommission_kill_then_replace.py b/test/cluster/test_decommission_kill_then_replace.py new file mode 100644 index 0000000000..0367e48fcf --- /dev/null +++ b/test/cluster/test_decommission_kill_then_replace.py @@ -0,0 +1,94 @@ +# +# Copyright (C) 2026-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 +# + +import asyncio +import logging +import time +from contextlib import suppress + +import pytest + +from test.cluster.conftest import skip_mode +from test.cluster.util import ( + get_topology_coordinator, + find_server_by_host_id, + wait_for_token_ring_and_group0_consistency, +) +from test.pylib.manager_client import ManagerClient +from test.pylib.scylla_cluster import ReplaceConfig +from test.pylib.util import wait_for_first_completed + +logger = logging.getLogger(__name__) + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_decommission_kill_then_replace(manager: ManagerClient) -> None: + """ + Boots a 3-node cluster, pauses the topology coordinator before processing backlog, + starts decommissioning a non-coordinator node, kills it before decommission completes, + and replaces it. Finally, unpauses the coordinator and waits for topology convergence. + """ + # Start a 3-node cluster + servers = await manager.servers_add(3) + + # Identify the topology coordinator and enable the pause injection there + coord_host_id = await get_topology_coordinator(manager) + coord_srv = await find_server_by_host_id(manager, servers, coord_host_id) + inj = 'topology_coordinator_pause_before_processing_backlog' + await manager.api.enable_injection(coord_srv.ip_addr, inj, one_shot=True) + + # Pick a decommission target that's not the coordinator + decomm_target = next(s for s in servers if s.server_id != coord_srv.server_id) + logger.info(f"Starting decommission on {decomm_target}, coordinator is {coord_srv}") + + # Start decommission in the background + decomm_task = asyncio.create_task( + manager.decommission_node( + decomm_target.server_id, + ) + ) + + log = await manager.server_open_log(decomm_target.server_id) + await log.wait_for("api - decommission") + + # Kill the node before decommission completes to simulate failure mid-operation + await manager.server_stop_gracefully(decomm_target.server_id) + logger.info(f"Killed decommissioning node {decomm_target}") + + # Replace the killed node with a new one + replace_cfg = ReplaceConfig( + replaced_id=decomm_target.server_id, + reuse_ip_addr=False, + use_host_id=True, + ) + logger.info(f"Replacing killed node {decomm_target} with a new node (async)") + + # Start replacement asynchronously + replace_task = asyncio.create_task(manager.server_add(replace_cfg=replace_cfg)) + + # Wait until ANY node receives the join request from the replacing node + running = await manager.running_servers() + logs = [await manager.server_open_log(s.server_id) for s in running] + marks = [await log.mark() for log in logs] + await wait_for_first_completed([ + l.wait_for("received request to join from host_id", from_mark=m) + for l, m in zip(logs, marks) + ]) + + # Unpause the coordinator so backlog processing can proceed + await manager.api.message_injection(coord_srv.ip_addr, inj) + + # Wait for the replacement to finish once the coordinator is unpaused + new_server_info = await replace_task + logger.info(f"Replacement finished: {new_server_info}") + + # Wait for topology to converge: group0 and token ring match + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60) + + # Make sure the background decommission task completes (expected to fail) + with suppress(Exception): + await decomm_task