diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 039bd2ef4a..7e410633d1 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -3295,7 +3295,7 @@ future system_keyspace::load_topology_state(const std::unorde supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features")); } - if (row.has("topology_request")) { + if (row.has("topology_request") && nstate != service::node_state::left) { auto req = service::topology_request_from_string(row.get_as("topology_request")); ret.requests.emplace(host_id, req); switch(req) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 2e277dc164..1ea9ceb817 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -1299,7 +1299,7 @@ std::unordered_set storage_service::ignored_nodes_from_join_par return ignored_nodes; } -utils::chunked_vector storage_service::build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp) { +utils::chunked_vector storage_service::build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp, utils::UUID old_request_id) { topology_mutation_builder builder(write_timestamp); auto ignored_nodes = ignored_nodes_from_join_params(params); @@ -1339,7 +1339,21 @@ utils::chunked_vector storage_service::build_mutation_from_j .set("done", false); rtbuilder.set("request_type", params.replaced_id ? topology_request::replace : topology_request::join); - return {builder.build(), rtbuilder.build()}; + utils::chunked_vector muts = {builder.build(), rtbuilder.build()}; + + if (old_request_id) { + // If this is a replace operation, we need to mark the old request for replaced node as done if exists + // It should be safe to do so here since if the request is still pending it means the topology coordinator does not + // work on it yet, and if the topology coordinator will pick it up meanwhile the write of this new state will fail + topology_mutation_builder builder(write_timestamp); + builder.with_node(*params.replaced_id).del("topology_request"); + topology_request_tracking_mutation_builder rtbuilder(old_request_id, _feature_service.topology_requests_type_column); + rtbuilder.done("node was replaced before the request could start"); + muts.push_back(builder.build()); + muts.push_back(rtbuilder.build()); + } + + return muts; } class join_node_rpc_handshaker : public service::group0_handshaker { @@ -7624,6 +7638,8 @@ future storage_service::join_node_request_handler(join co_return result; } + utils::UUID old_request_id; + if (params.replaced_id) { auto rhid = locator::host_id{params.replaced_id->uuid()}; if (is_me(rhid) || _gossiper.is_alive(rhid)) { @@ -7661,9 +7677,13 @@ future storage_service::join_node_request_handler(join }; co_return result; } + + if (_topology_state_machine._topology.requests.find(*params.replaced_id) != _topology_state_machine._topology.requests.end()) { + old_request_id = replaced_it->second.request_id; + } } - auto mutation = build_mutation_from_join_params(params, guard.write_timestamp()); + auto mutation = build_mutation_from_join_params(params, guard.write_timestamp(), old_request_id); topology_change change{{std::move(mutation)}}; group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, diff --git a/service/storage_service.hh b/service/storage_service.hh index 6f77d583b0..65f307b3c0 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -1120,7 +1120,8 @@ private: // raft_group0_client::_read_apply_mutex must be held future<> view_building_state_load(); - utils::chunked_vector build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp); + utils::chunked_vector build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp, + utils::UUID old_request_id = utils::UUID{}); std::unordered_set ignored_nodes_from_join_params(const join_node_request_params& params); future join_node_request_handler(join_node_request_params params); diff --git a/test/cluster/test_decommission_kill_then_replace.py b/test/cluster/test_decommission_kill_then_replace.py new file mode 100644 index 0000000000..0367e48fcf --- /dev/null +++ b/test/cluster/test_decommission_kill_then_replace.py @@ -0,0 +1,94 @@ +# +# Copyright (C) 2026-present ScyllaDB +# +# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 +# + +import asyncio +import logging +import time +from contextlib import suppress + +import pytest + +from test.cluster.conftest import skip_mode +from test.cluster.util import ( + get_topology_coordinator, + find_server_by_host_id, + wait_for_token_ring_and_group0_consistency, +) +from test.pylib.manager_client import ManagerClient +from test.pylib.scylla_cluster import ReplaceConfig +from test.pylib.util import wait_for_first_completed + +logger = logging.getLogger(__name__) + + +@pytest.mark.asyncio +@skip_mode('release', 'error injections are not supported in release mode') +async def test_decommission_kill_then_replace(manager: ManagerClient) -> None: + """ + Boots a 3-node cluster, pauses the topology coordinator before processing backlog, + starts decommissioning a non-coordinator node, kills it before decommission completes, + and replaces it. Finally, unpauses the coordinator and waits for topology convergence. + """ + # Start a 3-node cluster + servers = await manager.servers_add(3) + + # Identify the topology coordinator and enable the pause injection there + coord_host_id = await get_topology_coordinator(manager) + coord_srv = await find_server_by_host_id(manager, servers, coord_host_id) + inj = 'topology_coordinator_pause_before_processing_backlog' + await manager.api.enable_injection(coord_srv.ip_addr, inj, one_shot=True) + + # Pick a decommission target that's not the coordinator + decomm_target = next(s for s in servers if s.server_id != coord_srv.server_id) + logger.info(f"Starting decommission on {decomm_target}, coordinator is {coord_srv}") + + # Start decommission in the background + decomm_task = asyncio.create_task( + manager.decommission_node( + decomm_target.server_id, + ) + ) + + log = await manager.server_open_log(decomm_target.server_id) + await log.wait_for("api - decommission") + + # Kill the node before decommission completes to simulate failure mid-operation + await manager.server_stop_gracefully(decomm_target.server_id) + logger.info(f"Killed decommissioning node {decomm_target}") + + # Replace the killed node with a new one + replace_cfg = ReplaceConfig( + replaced_id=decomm_target.server_id, + reuse_ip_addr=False, + use_host_id=True, + ) + logger.info(f"Replacing killed node {decomm_target} with a new node (async)") + + # Start replacement asynchronously + replace_task = asyncio.create_task(manager.server_add(replace_cfg=replace_cfg)) + + # Wait until ANY node receives the join request from the replacing node + running = await manager.running_servers() + logs = [await manager.server_open_log(s.server_id) for s in running] + marks = [await log.mark() for log in logs] + await wait_for_first_completed([ + l.wait_for("received request to join from host_id", from_mark=m) + for l, m in zip(logs, marks) + ]) + + # Unpause the coordinator so backlog processing can proceed + await manager.api.message_injection(coord_srv.ip_addr, inj) + + # Wait for the replacement to finish once the coordinator is unpaused + new_server_info = await replace_task + logger.info(f"Replacement finished: {new_server_info}") + + # Wait for topology to converge: group0 and token ring match + await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60) + + # Make sure the background decommission task completes (expected to fail) + with suppress(Exception): + await decomm_task