topology coordinator: complete pending operation for a replaced node

A replaced node may have pending operation on it. The replace operation
will move the node into the 'left' state and the request will never be
completed. More over the code does not expect left node to have a
request. It will try to process the request and will crash because the
node for the request will not be found.

The patch checks is the replaced node has peening request and completes
it with failure. It also changes topology loading code to skip requests
for nodes that are in a left state. This is not strictly needed, but
makes the code more robust.

Fixes #27990

Closes scylladb/scylladb#28009
This commit is contained in:
Gleb Natapov
2026-01-06 14:29:47 +02:00
committed by Patryk Jędrzejczak
parent 551eecab63
commit bee5f63cb6
4 changed files with 120 additions and 5 deletions

View File

@@ -3295,7 +3295,7 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
supported_features = decode_features(deserialize_set_column(*topology(), row, "supported_features"));
}
if (row.has("topology_request")) {
if (row.has("topology_request") && nstate != service::node_state::left) {
auto req = service::topology_request_from_string(row.get_as<sstring>("topology_request"));
ret.requests.emplace(host_id, req);
switch(req) {

View File

@@ -1299,7 +1299,7 @@ std::unordered_set<raft::server_id> storage_service::ignored_nodes_from_join_par
return ignored_nodes;
}
utils::chunked_vector<canonical_mutation> storage_service::build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp) {
utils::chunked_vector<canonical_mutation> storage_service::build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp, utils::UUID old_request_id) {
topology_mutation_builder builder(write_timestamp);
auto ignored_nodes = ignored_nodes_from_join_params(params);
@@ -1339,7 +1339,21 @@ utils::chunked_vector<canonical_mutation> storage_service::build_mutation_from_j
.set("done", false);
rtbuilder.set("request_type", params.replaced_id ? topology_request::replace : topology_request::join);
return {builder.build(), rtbuilder.build()};
utils::chunked_vector<canonical_mutation> muts = {builder.build(), rtbuilder.build()};
if (old_request_id) {
// If this is a replace operation, we need to mark the old request for replaced node as done if exists
// It should be safe to do so here since if the request is still pending it means the topology coordinator does not
// work on it yet, and if the topology coordinator will pick it up meanwhile the write of this new state will fail
topology_mutation_builder builder(write_timestamp);
builder.with_node(*params.replaced_id).del("topology_request");
topology_request_tracking_mutation_builder rtbuilder(old_request_id, _feature_service.topology_requests_type_column);
rtbuilder.done("node was replaced before the request could start");
muts.push_back(builder.build());
muts.push_back(rtbuilder.build());
}
return muts;
}
class join_node_rpc_handshaker : public service::group0_handshaker {
@@ -7624,6 +7638,8 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
co_return result;
}
utils::UUID old_request_id;
if (params.replaced_id) {
auto rhid = locator::host_id{params.replaced_id->uuid()};
if (is_me(rhid) || _gossiper.is_alive(rhid)) {
@@ -7661,9 +7677,13 @@ future<join_node_request_result> storage_service::join_node_request_handler(join
};
co_return result;
}
if (_topology_state_machine._topology.requests.find(*params.replaced_id) != _topology_state_machine._topology.requests.end()) {
old_request_id = replaced_it->second.request_id;
}
}
auto mutation = build_mutation_from_join_params(params, guard.write_timestamp());
auto mutation = build_mutation_from_join_params(params, guard.write_timestamp(), old_request_id);
topology_change change{{std::move(mutation)}};
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard,

View File

@@ -1120,7 +1120,8 @@ private:
// raft_group0_client::_read_apply_mutex must be held
future<> view_building_state_load();
utils::chunked_vector<canonical_mutation> build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp);
utils::chunked_vector<canonical_mutation> build_mutation_from_join_params(const join_node_request_params& params, api::timestamp_type write_timestamp,
utils::UUID old_request_id = utils::UUID{});
std::unordered_set<raft::server_id> ignored_nodes_from_join_params(const join_node_request_params& params);
future<join_node_request_result> join_node_request_handler(join_node_request_params params);

View File

@@ -0,0 +1,94 @@
#
# Copyright (C) 2026-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import asyncio
import logging
import time
from contextlib import suppress
import pytest
from test.cluster.conftest import skip_mode
from test.cluster.util import (
get_topology_coordinator,
find_server_by_host_id,
wait_for_token_ring_and_group0_consistency,
)
from test.pylib.manager_client import ManagerClient
from test.pylib.scylla_cluster import ReplaceConfig
from test.pylib.util import wait_for_first_completed
logger = logging.getLogger(__name__)
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_decommission_kill_then_replace(manager: ManagerClient) -> None:
"""
Boots a 3-node cluster, pauses the topology coordinator before processing backlog,
starts decommissioning a non-coordinator node, kills it before decommission completes,
and replaces it. Finally, unpauses the coordinator and waits for topology convergence.
"""
# Start a 3-node cluster
servers = await manager.servers_add(3)
# Identify the topology coordinator and enable the pause injection there
coord_host_id = await get_topology_coordinator(manager)
coord_srv = await find_server_by_host_id(manager, servers, coord_host_id)
inj = 'topology_coordinator_pause_before_processing_backlog'
await manager.api.enable_injection(coord_srv.ip_addr, inj, one_shot=True)
# Pick a decommission target that's not the coordinator
decomm_target = next(s for s in servers if s.server_id != coord_srv.server_id)
logger.info(f"Starting decommission on {decomm_target}, coordinator is {coord_srv}")
# Start decommission in the background
decomm_task = asyncio.create_task(
manager.decommission_node(
decomm_target.server_id,
)
)
log = await manager.server_open_log(decomm_target.server_id)
await log.wait_for("api - decommission")
# Kill the node before decommission completes to simulate failure mid-operation
await manager.server_stop_gracefully(decomm_target.server_id)
logger.info(f"Killed decommissioning node {decomm_target}")
# Replace the killed node with a new one
replace_cfg = ReplaceConfig(
replaced_id=decomm_target.server_id,
reuse_ip_addr=False,
use_host_id=True,
)
logger.info(f"Replacing killed node {decomm_target} with a new node (async)")
# Start replacement asynchronously
replace_task = asyncio.create_task(manager.server_add(replace_cfg=replace_cfg))
# Wait until ANY node receives the join request from the replacing node
running = await manager.running_servers()
logs = [await manager.server_open_log(s.server_id) for s in running]
marks = [await log.mark() for log in logs]
await wait_for_first_completed([
l.wait_for("received request to join from host_id", from_mark=m)
for l, m in zip(logs, marks)
])
# Unpause the coordinator so backlog processing can proceed
await manager.api.message_injection(coord_srv.ip_addr, inj)
# Wait for the replacement to finish once the coordinator is unpaused
new_server_info = await replace_task
logger.info(f"Replacement finished: {new_server_info}")
# Wait for topology to converge: group0 and token ring match
await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60)
# Make sure the background decommission task completes (expected to fail)
with suppress(Exception):
await decomm_task