Merge 'tasks: do not fail the wait request if rpc fails' from Aleksandra Martyniuk

During decommission, we first mark a topology request as done, then shut
down a node and in the following steps we remove node from the topology.
Thus,  finished request does not imply that a node is removed from
the topology.

Due to that, in node_ops_virtual_task::wait, while gathering children
from the whole cluster, we may hit the connection exception - because
a node is still in topology, even though it is down.

Modify the get_children method to ignore the exception and warn
about the failure instead.

Keep token_metadata_ptr in get_children to prevent topology from changing.

Fixes: https://scylladb.atlassian.net/browse/SCYLLADB-867

Needs backports to all versions

Closes scylladb/scylladb#29035

* github.com:scylladb/scylladb:
  tasks: fix indentation
  tasks: do not fail the wait request if rpc fails
  tasks: pass token_metadata_ptr to task_manager::virtual_task::impl::get_children

(cherry picked from commit 2e47fd9f56)

Closes scylladb/scylladb#29193
This commit is contained in:
Botond Dénes
2026-03-19 10:03:18 +02:00
committed by Avi Kivity
parent 2d1fdce790
commit 41e2c2d1c4
5 changed files with 20 additions and 44 deletions

View File

@@ -101,7 +101,7 @@ future<std::optional<tasks::task_status>> node_ops_virtual_task::get_status_help
.entity = "",
.progress_units = "",
.progress = tasks::task_manager::task::progress{},
.children = started ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{}
.children = started ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector<tasks::task_identity>{}
};
}

View File

@@ -168,9 +168,9 @@ future<std::optional<tasks::task_status>> tablet_virtual_task::wait(tasks::task_
} else if (is_resize_task(task_type)) {
auto new_tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count();
res->status.state = new_tablet_count == tablet_count ? tasks::task_manager::task_state::suspended : tasks::task_manager::task_state::done;
res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{};
res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector<tasks::task_identity>{};
} else {
res->status.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper()));
res->status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr());
}
res->status.end_time = db_clock::now(); // FIXME: Get precise end time.
co_return res->status;
@@ -276,7 +276,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
}
return make_ready_future();
});
res.status.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper()));
res.status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr());
} else if (is_migration_task(task_type)) { // Migration task.
auto tablet_id = hint.get_tablet_id();
res.pending_replica = tmap.get_tablet_transition_info(tablet_id)->pending_replica;
@@ -290,7 +290,7 @@ future<std::optional<status_helper>> tablet_virtual_task::get_status_helper(task
if (task_info.tablet_task_id.uuid() == id.uuid()) {
update_status(task_info, res.status, sched_nr);
res.status.state = tasks::task_manager::task_state::running;
res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector<tasks::task_identity>{};
res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector<tasks::task_identity>{};
co_return res;
}
}

View File

@@ -400,7 +400,7 @@ task_manager::virtual_task::impl::impl(module_ptr module) noexcept
: _module(std::move(module))
{}
future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, std::function<bool(locator::host_id)> is_host_alive) {
future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr) {
auto ms = module->get_task_manager()._messaging;
if (!ms) {
auto ids = co_await module->get_task_manager().get_virtual_task_children(parent_id);
@@ -417,19 +417,18 @@ future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::g
tmlogger.info("tasks_vt_get_children: waiting");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{10});
});
co_return co_await map_reduce(nodes, [ms, parent_id, is_host_alive = std::move(is_host_alive)] (auto host_id) -> future<utils::chunked_vector<task_identity>> {
if (is_host_alive(host_id)) {
return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) {
return resp | std::views::transform([host_id] (auto id) {
return task_identity{
.host_id = host_id,
.task_id = id
};
}) | std::ranges::to<utils::chunked_vector<task_identity>>();
});
} else {
return make_ready_future<utils::chunked_vector<task_identity>>();
}
co_return co_await map_reduce(nodes, [ms, parent_id] (auto host_id) -> future<utils::chunked_vector<task_identity>> {
return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) {
return resp | std::views::transform([host_id] (auto id) {
return task_identity{
.host_id = host_id,
.task_id = id
};
}) | std::ranges::to<utils::chunked_vector<task_identity>>();
}).handle_exception_type([host_id, parent_id] (const rpc::closed_error& ex) {
tmlogger.warn("Failed to get children of virtual task with id={} from node {}: {}", parent_id, host_id, ex);
return utils::chunked_vector<task_identity>{};
});
}, utils::chunked_vector<task_identity>{}, [] (auto a, auto&& b) {
std::move(b.begin(), b.end(), std::back_inserter(a));
return a;

View File

@@ -19,6 +19,7 @@
#include "db_clock.hh"
#include "utils/log.hh"
#include "locator/host_id.hh"
#include "locator/token_metadata_fwd.hh"
#include "schema/schema_fwd.hh"
#include "tasks/types.hh"
#include "utils/chunked_vector.hh"
@@ -281,7 +282,7 @@ public:
impl& operator=(impl&&) = delete;
virtual ~impl() = default;
protected:
static future<utils::chunked_vector<task_identity>> get_children(module_ptr module, task_id parent_id, std::function<bool(locator::host_id)> is_host_alive);
static future<utils::chunked_vector<task_identity>> get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr);
public:
virtual task_group get_group() const noexcept = 0;
// Returns std::nullopt if an operation with task_id isn't tracked by this virtual_task.

View File

@@ -255,27 +255,3 @@ async def test_node_ops_task_wait(manager: ManagerClient):
await decommission_task
await waiting_task
@pytest.mark.asyncio
async def test_get_children(manager: ManagerClient):
module_name = "node_ops"
tm = TaskManagerClient(manager.api)
servers = [await manager.server_add(cmdline=cmdline) for _ in range(2)]
injection = "tasks_vt_get_children"
handler = await inject_error_one_shot(manager.api, servers[0].ip_addr, injection)
log = await manager.server_open_log(servers[0].server_id)
mark = await log.mark()
bootstrap_task = [task for task in await tm.list_tasks(servers[0].ip_addr, module_name) if task.kind == "cluster"][0]
async def _decommission():
await log.wait_for('tasks_vt_get_children: waiting', from_mark=mark)
await manager.decommission_node(servers[1].server_id)
await handler.message()
async def _get_status():
await tm.get_task_status(servers[0].ip_addr, bootstrap_task.task_id)
await asyncio.gather(*(_decommission(), _get_status()))