Merge 'tasks: do not fail the wait request if rpc fails' from Aleksandra Martyniuk

During decommission, we first mark a topology request as done, then shut
down a node and in the following steps we remove node from the topology.
Thus,  finished request does not imply that a node is removed from
the topology.

Due to that, in node_ops_virtual_task::wait, while gathering children
from the whole cluster, we may hit the connection exception - because
a node is still in topology, even though it is down.

Modify the get_children method to ignore the exception and warn
about the failure instead.

Keep token_metadata_ptr in get_children to prevent topology from changing.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-867

Needs backports to all versions

Closes scylladb/scylladb#29035

* github.com:scylladb/scylladb:
  tasks: fix indentation
  tasks: do not fail the wait request if rpc fails
  tasks: pass token_metadata_ptr to task_manager::virtual_task::impl::get_children

(cherry picked from commit 2e47fd9f56)

Closes scylladb/scylladb#29531
This commit is contained in:
Botond Dénes
2026-03-19 10:03:18 +02:00
parent 42edeee977
commit 2eda427b96
5 changed files with 18 additions and 42 deletions

View File

@@ -91,7 +91,7 @@ future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status_help
.entity = "",
.progress_units = "",
.progress = tasks::task_manager::task::progress{},
.children = started ? co_await get_children(get_module(), id, [&gossiper = _ss.gossiper()] (gms::inet_address addr) { return gossiper.is_alive(addr); }) : std::vector<tasks::task_identity>{}
.children = started ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : std::vector<tasks::task_identity>{}
};
}

View File

@@ -156,7 +156,7 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
} else if (is_resize_task(task_type)) {
auto new_tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count();
res->status.state = new_tablet_count == tablet_count ? tasks::task_manager::task_state::suspended : tasks::task_manager::task_state::done;
res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, [&gossiper = _ss.gossiper()] (gms::inet_address addr) { return gossiper.is_alive(addr); }) : std::vector<tasks::task_identity>{};
res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : std::vector<tasks::task_identity>{};
}
res->status.end_time = db_clock::now(); // FIXME: Get precise end time.
co_return res->status;
@@ -262,7 +262,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
if (task_info.tablet_task_id.uuid() == id.uuid()) {
update_status(task_info, res.status, sched_nr);
res.status.state = tasks::task_manager::task_state::running;
res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, [&gossiper = _ss.gossiper()] (gms::inet_address addr) { return gossiper.is_alive(addr); }) : std::vector<tasks::task_identity>{};
res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : std::vector<tasks::task_identity>{};
co_return res;
}
}

View File

@@ -389,7 +389,7 @@ task_manager::virtual_task::impl::impl(module_ptr module) noexcept
: _module(std::move(module))
{}
future<std::vector<task_identity>> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, std::function<bool(gms::inet_address)> is_host_alive) {
future<std::vector<task_identity>> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr) {
auto ms = module->get_task_manager()._messaging;
if (!ms) {
auto ids = co_await module->get_task_manager().get_virtual_task_children(parent_id);
@@ -406,19 +406,18 @@ future<std::vector<task_identity>> task_manager::virtual_task::impl::get_childre
tmlogger.info("tasks_vt_get_children: waiting");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{10});
});
co_return co_await map_reduce(nodes, [ms, parent_id, is_host_alive = std::move(is_host_alive)] (auto addr) -> future<std::vector<task_identity>> {
if (is_host_alive(addr)) {
return ms->send_tasks_get_children(netw::msg_addr{addr}, parent_id).then([addr] (auto resp) {
return resp | std::views::transform([addr] (auto id) {
return task_identity{
.node = addr,
.task_id = id
};
}) | std::ranges::to<std::vector<task_identity>>();
});
} else {
return make_ready_future<std::vector<task_identity>>();
}
co_return co_await map_reduce(nodes, [ms, parent_id] (auto addr) -> future<std::vector<task_identity>> {
return ms->send_tasks_get_children(netw::msg_addr{addr}, parent_id).then([addr] (auto resp) {
return resp | std::views::transform([addr] (auto id) {
return task_identity{
.node = addr,
.task_id = id
};
}) | std::ranges::to<std::vector<task_identity>>();
}).handle_exception_type([addr, parent_id] (const rpc::closed_error& ex) {
tmlogger.warn("Failed to get children of virtual task with id={} from node {}: {}", parent_id, addr, ex);
return std::vector<task_identity>{};
});
}, std::vector<task_identity>{}, concat<task_identity>);
}

View File

@@ -19,6 +19,7 @@
#include "db_clock.hh"
#include "utils/log.hh"
#include "gms/inet_address.hh"
#include "locator/token_metadata_fwd.hh"
#include "schema/schema_fwd.hh"
#include "tasks/types.hh"
#include "utils/chunked_vector.hh"
@@ -279,7 +280,7 @@ public:
impl& operator=(impl&&) = delete;
virtual ~impl() = default;
protected:
static future<std::vector<task_identity>> get_children(module_ptr module, task_id parent_id, std::function<bool(gms::inet_address)> is_host_alive);
static future<std::vector<task_identity>> get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr);
public:
virtual task_group get_group() const noexcept = 0;
// Returns std::nullopt if an operation with task_id isn't tracked by this virtual_task.

View File

@@ -258,27 +258,3 @@ async def test_node_ops_task_wait(manager: ManagerClient):
await decommission_task
await waiting_task
@pytest.mark.asyncio
async def test_get_children(manager: ManagerClient):
module_name = "node_ops"
tm = TaskManagerClient(manager.api)
servers = [await manager.server_add() for _ in range(2)]
injection = "tasks_vt_get_children"
handler = await inject_error_one_shot(manager.api, servers[0].ip_addr, injection)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
bootstrap_task = [task for task in await tm.list_tasks(servers[0].ip_addr, module_name) if task.kind == "cluster"][0]
async def _decommission():
await log.wait_for('tasks_vt_get_children: waiting', from_mark=mark)
await manager.decommission_node(servers[1].server_id)
await handler.message()
async def _get_status():
await tm.get_task_status(servers[0].ip_addr, bootstrap_task.task_id)
await asyncio.gather(*(_decommission(), _get_status()))