Files
scylladb/tasks/task_handler.cc
Aleksandra Martyniuk 55fde70f8d api: tasks: task_manager: keep children identities in chunked_{array,vector}
task_status contains a vector of children identities. If the number
of children is large, we may hit oversized allocation.

Change all types of children-related containers to chunked_vector.
Modify the children type returned from task manager API.

Fixes: scylladb#25795.

Closes scylladb/scylladb#25923
2025-09-15 08:44:16 +03:00

230 lines
9.2 KiB
C++

/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "db/timeout_clock.hh"
#include "tasks/task_handler.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/overloaded_functor.hh"
#include <seastar/core/with_timeout.hh>
#include <queue>
namespace tasks {
using task_status_variant = std::variant<tasks::task_manager::foreign_task_ptr, tasks::task_manager::task::task_essentials>;
static future<task_status> get_task_status(task_manager::task_ptr task) {
auto host_id = task->get_module()->get_task_manager().get_host_id();
auto local_task_status = task->get_status();
auto status = task_status{
.task_id = local_task_status.id,
.type = task->type(),
.kind = task_kind::node,
.scope = local_task_status.scope,
.state = local_task_status.state,
.is_abortable = task->is_abortable(),
.start_time = local_task_status.start_time,
.end_time = local_task_status.end_time,
.error = local_task_status.error,
.parent_id = task->get_parent_id(),
.sequence_number = local_task_status.sequence_number,
.shard = local_task_status.shard,
.keyspace = local_task_status.keyspace,
.table = local_task_status.table,
.entity = local_task_status.entity,
.progress_units = local_task_status.progress_units,
.progress = co_await task->get_progress(),
.children = co_await task->get_children().map_each_task<task_identity>(
[host_id] (const task_manager::foreign_task_ptr& task) {
// There is no race because id does not change for the whole task lifetime.
return task_identity{
.host_id = host_id,
.task_id = task->id()
};
},
[host_id] (const task_manager::task::task_essentials& task) {
return task_identity{
.host_id = host_id,
.task_id = task.task_status.id
};
})
};
co_return status;
}
static future<task_status> get_foreign_task_status(const task_manager::foreign_task_ptr& task) {
co_return co_await smp::submit_to(task.get_owner_shard(), [t = co_await task.copy()] () mutable {
return get_task_status(t.release());
});
}
template<typename T>
static T get_virtual_task_info(task_id id, std::optional<T> info) {
// A service associated with the virtual task may have stopped tracking the operation
// before its info got retrieved.
if (!info) {
throw task_manager::task_not_found(id);
}
return *info;
}
// Prolongs task life to make sure all its children will be accessible.
struct status_helper {
task_status status;
task_manager::foreign_task_ptr task;
};
future<status_helper> task_handler::get_status_helper() {
return task_manager::invoke_on_task(_tm.container(), _id, std::function(
[id = _id] (task_manager::task_variant task_v, tasks::virtual_task_hint hint) -> future<status_helper> {
return std::visit(overloaded_functor{
[] (task_manager::task_ptr task) -> future<status_helper> {
co_return status_helper{
.status = co_await get_task_status(task),
.task = task
};
},
[id, hint = std::move(hint)] (task_manager::virtual_task_ptr task) -> future<status_helper> {
auto id_ = id;
auto status = co_await task->get_status(id_, std::move(hint));
co_return status_helper{
.status = get_virtual_task_info(id_, status),
.task = nullptr
};
}
}, task_v);
}));
}
future<task_status> task_handler::get_status() {
auto s = co_await get_status_helper();
co_return s.status;
}
future<task_status> task_handler::wait_for_task(std::optional<std::chrono::seconds> timeout) {
auto wait = task_manager::invoke_on_task(_tm.container(), _id, std::function([id = _id] (task_manager::task_variant task_v, tasks::virtual_task_hint hint) -> future<task_status> {
return std::visit(overloaded_functor{
[] (task_manager::task_ptr task) {
return task->done().then_wrapped([task] (auto f) {
// done() is called only because we want the task to be complete before getting its status.
// The future should be ignored here as the result does not matter.
f.ignore_ready_future();
return get_task_status(std::move(task));
});
},
[id, hint = std::move(hint)] (task_manager::virtual_task_ptr task) -> future<task_status> {
auto id_ = id;
auto status = co_await task->wait(id_, std::move(hint));
co_return get_virtual_task_info(id_, status);
}
}, task_v);
}));
return with_timeout(timeout ? db::timeout_clock::now() + timeout.value() : db::no_timeout, std::move(wait));
}
future<utils::chunked_vector<task_status>> task_handler::get_status_recursively(bool local) {
auto sh = co_await get_status_helper();
std::queue<task_status_variant> q;
utils::chunked_vector<task_status> res;
if (sh.task) { // task
q.push(co_await sh.task.copy()); // Task cannot be moved since we need it to be alive during whole loop execution.
} else { // virtual task
res.push_back(sh.status);
for (auto ident: sh.status.children) {
if (ident.host_id != _tm.get_host_id()) {
// FIXME: add non-local version
continue;
}
try {
auto child = co_await _tm.lookup_task_on_all_shards(_tm.container(), ident.task_id);
q.push(std::move(child));
} catch (const task_manager::task_not_found& e) {
continue; // Virtual task's children may get unregistered.
}
}
}
// Push children's statuses in BFS order.
while (!q.empty()) {
auto& current = q.front();
co_await std::visit(overloaded_functor {
[&] (const tasks::task_manager::foreign_task_ptr& task) -> future<> {
res.push_back(co_await get_foreign_task_status(task));
co_await task->get_children().for_each_task([&q] (const tasks::task_manager::foreign_task_ptr& child) -> future<> {
q.push(co_await child.copy());
}, [&] (const tasks::task_manager::task::task_essentials& child) {
q.push(child);
return make_ready_future();
});
},
[&] (const tasks::task_manager::task::task_essentials& task) -> future<> {
auto status = task_status{
.task_id = task.task_status.id,
.type = task.type,
.kind = task_kind::node,
.scope = task.task_status.scope,
.state = task.task_status.state,
.is_abortable = task.abortable,
.start_time = task.task_status.start_time,
.end_time = task.task_status.end_time,
.error = task.task_status.error,
.parent_id = task.parent_id,
.sequence_number = task.task_status.sequence_number,
.shard = task.task_status.shard,
.keyspace = task.task_status.keyspace,
.table = task.task_status.table,
.entity = task.task_status.entity,
.progress_units = task.task_status.progress_units,
.progress = task.task_progress,
.children = task.failed_children | std::views::transform([&tm = _tm] (auto& child) {
return task_identity{
.host_id = tm.get_host_id(),
.task_id = child.task_status.id
};
}) | std::ranges::to<utils::chunked_vector<task_identity>>()
};
res.push_back(status);
for (auto& child: task.failed_children) {
q.push(child);
}
return make_ready_future();
}
}, current);
q.pop();
}
co_return res;
}
future<> task_handler::abort() {
co_await task_manager::invoke_on_task(_tm.container(), _id, [id = _id] (task_manager::task_variant task_v, tasks::virtual_task_hint hint) -> future<> {
return std::visit(overloaded_functor{
[id] (task_manager::task_ptr task) -> future<> {
if (!task->is_abortable()) {
co_await coroutine::return_exception(task_not_abortable(id));
}
task->abort();
},
[id, hint = std::move(hint)] (task_manager::virtual_task_ptr task) -> future<> {
auto id_ = id;
auto hint_ = std::move(hint);
if (!co_await task->is_abortable(hint_)) {
co_await coroutine::return_exception(task_not_abortable(id_));
}
co_await task->abort(id_, std::move(hint_));
}
}, task_v);
});
}
}