diff --git a/node_ops/task_manager_module.cc b/node_ops/task_manager_module.cc index 2f6d0b29d6..e6ccefdcd3 100644 --- a/node_ops/task_manager_module.cc +++ b/node_ops/task_manager_module.cc @@ -101,7 +101,7 @@ future> node_ops_virtual_task::get_status_help .entity = "", .progress_units = "", .progress = tasks::task_manager::task::progress{}, - .children = started ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector{} + .children = started ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector{} }; } diff --git a/service/task_manager_module.cc b/service/task_manager_module.cc index 686a3f6d13..dcf9e9a4a9 100644 --- a/service/task_manager_module.cc +++ b/service/task_manager_module.cc @@ -168,9 +168,9 @@ future> tablet_virtual_task::wait(tasks::task_ } else if (is_resize_task(task_type)) { auto new_tablet_count = _ss.get_token_metadata().tablets().get_tablet_map(table).tablet_count(); res->status.state = new_tablet_count == tablet_count ? tasks::task_manager::task_state::suspended : tasks::task_manager::task_state::done; - res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector{}; + res->status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector{}; } else { - res->status.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())); + res->status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()); } res->status.end_time = db_clock::now(); // FIXME: Get precise end time. co_return res->status; @@ -276,7 +276,7 @@ future> tablet_virtual_task::get_status_helper(task } return make_ready_future(); }); - res.status.children = co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())); + res.status.children = co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()); } else if (is_migration_task(task_type)) { // Migration task. auto tablet_id = hint.get_tablet_id(); res.pending_replica = tmap.get_tablet_transition_info(tablet_id)->pending_replica; @@ -290,7 +290,7 @@ future> tablet_virtual_task::get_status_helper(task if (task_info.tablet_task_id.uuid() == id.uuid()) { update_status(task_info, res.status, sched_nr); res.status.state = tasks::task_manager::task_state::running; - res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, std::bind_front(&gms::gossiper::is_alive, &_ss.gossiper())) : utils::chunked_vector{}; + res.status.children = task_type == locator::tablet_task_type::split ? co_await get_children(get_module(), id, _ss.get_token_metadata_ptr()) : utils::chunked_vector{}; co_return res; } } diff --git a/tasks/task_manager.cc b/tasks/task_manager.cc index 1b4064ebdf..d40012d102 100644 --- a/tasks/task_manager.cc +++ b/tasks/task_manager.cc @@ -400,7 +400,7 @@ task_manager::virtual_task::impl::impl(module_ptr module) noexcept : _module(std::move(module)) {} -future> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, std::function is_host_alive) { +future> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr) { auto ms = module->get_task_manager()._messaging; if (!ms) { auto ids = co_await module->get_task_manager().get_virtual_task_children(parent_id); @@ -417,19 +417,18 @@ future> task_manager::virtual_task::impl::g tmlogger.info("tasks_vt_get_children: waiting"); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{10}); }); - co_return co_await map_reduce(nodes, [ms, parent_id, is_host_alive = std::move(is_host_alive)] (auto host_id) -> future> { - if (is_host_alive(host_id)) { - return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) { - return resp | std::views::transform([host_id] (auto id) { - return task_identity{ - .host_id = host_id, - .task_id = id - }; - }) | std::ranges::to>(); - }); - } else { - return make_ready_future>(); - } + co_return co_await map_reduce(nodes, [ms, parent_id] (auto host_id) -> future> { + return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) { + return resp | std::views::transform([host_id] (auto id) { + return task_identity{ + .host_id = host_id, + .task_id = id + }; + }) | std::ranges::to>(); + }).handle_exception_type([host_id, parent_id] (const rpc::closed_error& ex) { + tmlogger.warn("Failed to get children of virtual task with id={} from node {}: {}", parent_id, host_id, ex); + return utils::chunked_vector{}; + }); }, utils::chunked_vector{}, [] (auto a, auto&& b) { std::move(b.begin(), b.end(), std::back_inserter(a)); return a; diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index fb51ed04a5..a29c4f991d 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -19,6 +19,7 @@ #include "db_clock.hh" #include "utils/log.hh" #include "locator/host_id.hh" +#include "locator/token_metadata_fwd.hh" #include "schema/schema_fwd.hh" #include "tasks/types.hh" #include "utils/chunked_vector.hh" @@ -281,7 +282,7 @@ public: impl& operator=(impl&&) = delete; virtual ~impl() = default; protected: - static future> get_children(module_ptr module, task_id parent_id, std::function is_host_alive); + static future> get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr); public: virtual task_group get_group() const noexcept = 0; // Returns std::nullopt if an operation with task_id isn't tracked by this virtual_task. diff --git a/test/cluster/tasks/test_node_ops_tasks.py b/test/cluster/tasks/test_node_ops_tasks.py index d5cf182294..2c116301d9 100644 --- a/test/cluster/tasks/test_node_ops_tasks.py +++ b/test/cluster/tasks/test_node_ops_tasks.py @@ -255,27 +255,3 @@ async def test_node_ops_task_wait(manager: ManagerClient): await decommission_task await waiting_task - -@pytest.mark.asyncio -async def test_get_children(manager: ManagerClient): - module_name = "node_ops" - tm = TaskManagerClient(manager.api) - servers = [await manager.server_add(cmdline=cmdline) for _ in range(2)] - - injection = "tasks_vt_get_children" - handler = await inject_error_one_shot(manager.api, servers[0].ip_addr, injection) - - log = await manager.server_open_log(servers[0].server_id) - mark = await log.mark() - - bootstrap_task = [task for task in await tm.list_tasks(servers[0].ip_addr, module_name) if task.kind == "cluster"][0] - - async def _decommission(): - await log.wait_for('tasks_vt_get_children: waiting', from_mark=mark) - await manager.decommission_node(servers[1].server_id) - await handler.message() - - async def _get_status(): - await tm.get_task_status(servers[0].ip_addr, bootstrap_task.task_id) - - await asyncio.gather(*(_decommission(), _get_status()))