Files
scylladb/tasks/task_manager.cc
Aleksandra Martyniuk 2d16083ba6 tasks: fix indentation
2026-03-18 15:37:24 +01:00

850 lines
29 KiB
C++

/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/on_internal_error.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/when_all.hh>
#include <seastar/rpc/rpc_types.hh>
#include <seastar/util/defer.hh>
#include "db/timeout_clock.hh"
#include "message/messaging_service.hh"
#include "utils/assert.hh"
#include "utils/chunked_vector.hh"
#include "utils/overloaded_functor.hh"
#include "service/storage_service.hh"
#include "tasks/task_handler.hh"
#include "task_manager.hh"
#include "tasks/virtual_task_hint.hh"
#include "utils/error_injection.hh"
#include "idl/tasks.dist.hh"
using namespace std::chrono_literals;
template <typename T>
std::vector<T> concat(std::vector<T> a, std::vector<T>&& b) {
std::move(b.begin(), b.end(), std::back_inserter(a));
return a;
}
namespace tasks {
logging::logger tmlogger("task_manager");
bool task_manager::task::children::all_finished() const noexcept {
return _children.empty();
}
size_t task_manager::task::children::size() const noexcept {
return _children.size() + _finished_children.size();
}
future<> task_manager::task::children::add_child(foreign_task_ptr task) {
rwlock::holder exclusive_holder = co_await _lock.hold_write_lock();
auto id = task->id();
auto inserted = _children.emplace(id, std::move(task)).second;
SCYLLA_ASSERT(inserted);
}
future<> task_manager::task::children::mark_as_finished(task_id id, task_essentials essentials) const {
rwlock::holder exclusive_holder = co_await _lock.hold_write_lock();
auto it = _children.find(id);
_finished_children.push_back(essentials);
[&] () noexcept { // erase is expected to not throw
_children.erase(it);
}();
}
future<task_manager::task::progress> task_manager::task::children::get_progress(const std::string& progress_units) const {
rwlock::holder shared_holder = co_await _lock.hold_read_lock();
tasks::task_manager::task::progress progress{};
co_await coroutine::parallel_for_each(_children, [&] (const auto& child_entry) -> future<> {
const auto& child = child_entry.second;
auto local_progress = co_await smp::submit_to(child.get_owner_shard(), [&child, &progress_units] {
SCYLLA_ASSERT(child->get_status().progress_units == progress_units);
return child->get_progress();
});
progress += local_progress;
});
for (const auto& child: _finished_children) {
progress += child.task_progress;
}
co_return progress;
}
future<> task_manager::task::children::for_each_task(std::function<future<>(const foreign_task_ptr&)> f_children,
std::function<future<>(const task_essentials&)> f_finished_children) const {
rwlock::holder shared_holder = co_await _lock.hold_read_lock();
for (const auto& [_, child]: _children) {
co_await f_children(child);
}
for (const auto& child: _finished_children) {
co_await f_finished_children(child);
}
}
task_manager::task::impl::impl(module_ptr module, task_id id, uint64_t sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, task_id parent_id) noexcept
: _status({
.id = id,
.state = task_state::created,
.sequence_number = sequence_number,
.shard = this_shard_id(),
.scope = std::move(scope),
.keyspace = std::move(keyspace),
.table = std::move(table),
.entity = std::move(entity)
})
, _parent_id(parent_id)
, _module(module)
{
// Child tasks of regular tasks do not need to subscribe to abort source because they will be aborted recursively by their parents.
if (!parent_id) {
_shutdown_subscription = module->abort_source().subscribe([this] () noexcept {
abort();
});
}
}
future<std::optional<double>> task_manager::task::impl::expected_total_workload() const {
return make_ready_future<std::optional<double>>(std::nullopt);
}
future<task_manager::task::progress> task_manager::task::impl::get_progress() const {
if (is_complete()) {
co_return co_await _children.get_progress(_status.progress_units);
}
std::optional<double> expected_workload = co_await expected_total_workload();
if (!expected_workload && _children.size() == 0) {
co_return task_manager::task::progress{};
}
auto progress = co_await _children.get_progress(_status.progress_units);
progress.total = expected_workload.value_or(progress.total);
co_return progress;
}
is_abortable task_manager::task::impl::is_abortable() const noexcept {
return is_abortable::no;
}
is_internal task_manager::task::impl::is_internal() const noexcept {
return tasks::is_internal(_parent_id && _parent_kind != task_kind::cluster);
}
tasks::is_user_task task_manager::task::impl::is_user_task() const noexcept {
return tasks::is_user_task::no;
}
static future<> abort_children(task_manager::module_ptr module, task_id parent_id) noexcept {
co_await utils::get_local_injector().inject("tasks_abort_children", utils::wait_for_message(10s));
auto entered = module->async_gate().try_enter();
if (!entered) {
co_return;
}
auto leave_gate = defer([&module] () {
module->async_gate().leave();
});
co_await module->get_task_manager().container().invoke_on_all([parent_id] (task_manager& tm) {
for (auto& task : tm.get_local_tasks()) {
if (task.second->get_parent_id() == parent_id) {
task.second->abort();
}
}
});
}
void task_manager::task::impl::abort() noexcept {
if (!_as.abort_requested()) {
_as.request_abort();
(void)abort_children(_module, _status.id);
}
}
bool task_manager::task::impl::is_complete() const noexcept {
return _status.state == tasks::task_manager::task_state::done || _status.state == tasks::task_manager::task_state::failed;
}
bool task_manager::task::impl::is_done() const noexcept {
return _status.state == tasks::task_manager::task_state::done;
}
future<utils::chunked_vector<task_manager::task::task_essentials>> task_manager::task::impl::get_failed_children() const {
return _children.map_each_task<task_essentials>([] (const foreign_task_ptr&) { return std::nullopt; },
[] (const task_essentials& child) -> std::optional<task_essentials> {
if (child.task_status.state == task_state::failed || !child.failed_children.empty()) {
return child;
}
return std::nullopt;
}
);
}
void task_manager::task::impl::set_virtual_parent() noexcept {
_parent_kind = task_kind::cluster;
_shutdown_subscription = _module->abort_source().subscribe([this] () noexcept {
abort();
});
}
task_id task_manager::task::impl::id() const noexcept {
return _status.id;
}
task_manager::task::status& task_manager::task::impl::get_status() noexcept {
return _status;
}
future<> task_manager::task::impl::done() const noexcept {
return _done.get_shared_future();
}
void task_manager::task::impl::run_to_completion() {
(void)run().then([this] {
_as.check();
return finish();
}).handle_exception([this] (std::exception_ptr ex) {
return finish_failed(std::move(ex));
});
}
future<> task_manager::task::impl::maybe_fold_into_parent() const noexcept {
try {
if (is_internal() && _parent_id && _parent_kind == task_kind::node && _children.all_finished()) {
auto parent = co_await _module->get_task_manager().lookup_task_on_all_shards(_module->get_task_manager().container(), _parent_id);
task_essentials child{
.task_status = _status,
.task_progress = co_await get_progress(),
.parent_id = _parent_id,
.type = type(),
.abortable = is_abortable(),
.failed_children = co_await get_failed_children(),
};
co_await smp::submit_to(parent.get_owner_shard(), [&parent, id = _status.id, child = std::move(child)] () {
return parent->get_children().mark_as_finished(id, std::move(child));
});
}
} catch (...) {
// If folding fails, leave the subtree unfolded.
tasks::tmlogger.warn("Folding of task with id={} failed due to {}. Ignored", _status.id, std::current_exception());
}
}
future<> task_manager::task::impl::finish() noexcept {
if (!_done.available()) {
_status.end_time = db_clock::now();
_status.state = task_manager::task_state::done;
co_await maybe_fold_into_parent();
_done.set_value();
co_await release_resources();
}
}
future<> task_manager::task::impl::finish_failed(std::exception_ptr ex, std::string error) noexcept {
if (!_done.available()) {
_status.end_time = db_clock::now();
_status.state = task_manager::task_state::failed;
_status.error = std::move(error);
co_await maybe_fold_into_parent();
_done.set_exception(ex);
co_await release_resources();
}
}
future<> task_manager::task::impl::finish_failed(std::exception_ptr ex) noexcept {
std::string error;
try {
error = fmt::format("{}", ex);
} catch (...) {
error = "Failed to get error message";
}
return finish_failed(ex, std::move(error));
}
task_manager::task::task(task_impl_ptr&& impl, gate::holder gh) noexcept : _impl(std::move(impl)), _gate_holder(std::move(gh)) {
register_task();
}
task_id task_manager::task::id() {
return _impl->id();
}
std::string task_manager::task::type() const {
return _impl->type();
}
task_manager::task::status& task_manager::task::get_status() noexcept {
return _impl->get_status();
}
uint64_t task_manager::task::get_sequence_number() const noexcept {
return _impl->_status.sequence_number;
}
task_id task_manager::task::get_parent_id() const noexcept {
return _impl->_parent_id;
}
void task_manager::task::change_state(task_state state) noexcept {
_impl->_status.state = state;
}
future<> task_manager::task::add_child(foreign_task_ptr&& child) {
return _impl->_children.add_child(std::move(child));
}
void task_manager::task::start() {
if (_impl->_status.state != task_state::created) {
on_fatal_internal_error(tmlogger, seastar::format("{} task with id = {} was started twice", _impl->_module->get_name(), id()));
}
_impl->_status.start_time = db_clock::now();
try {
// Background fiber does not capture task ptr, so the task can be unregistered and destroyed independently in the foreground.
// After the ttl expires, the task id will be used to unregister the task if that didn't happen in any other way.
auto module = _impl->_module;
auto user_task = is_user_task();
bool drop_after_complete = (get_parent_id() && _impl->_parent_kind == task_kind::node) || is_internal();
(void)done().finally([module, drop_after_complete, user_task] {
if (drop_after_complete) {
return make_ready_future<>();
}
return sleep_abortable(user_task ? module->get_task_manager().get_user_task_ttl() : module->get_task_manager().get_task_ttl(), module->abort_source());
}).then_wrapped([module, id = id()] (auto f) {
f.ignore_ready_future();
module->unregister_task(id);
});
_impl->_as.check();
_impl->_status.state = task_manager::task_state::running;
_impl->run_to_completion();
} catch (...) {
(void)_impl->finish_failed(std::current_exception()).then([impl = _impl] {});
}
}
std::string task_manager::task::get_module_name() const noexcept {
return _impl->_module->get_name();
}
task_manager::module_ptr task_manager::task::get_module() const noexcept {
return _impl->_module;
}
future<task_manager::task::progress> task_manager::task::get_progress() const {
return _impl->get_progress();
}
is_abortable task_manager::task::is_abortable() const noexcept {
return _impl->is_abortable();
};
is_internal task_manager::task::is_internal() const noexcept {
return _impl->is_internal();
}
is_user_task task_manager::task::is_user_task() const noexcept {
return _impl->is_user_task();
}
void task_manager::task::abort() noexcept {
_impl->abort();
}
bool task_manager::task::abort_requested() const noexcept {
return _impl->_as.abort_requested();
}
future<> task_manager::task::done() const noexcept {
return _impl->done();
}
void task_manager::task::register_task() {
_impl->_module->register_task(shared_from_this());
}
void task_manager::task::unregister_task() noexcept {
_impl->_module->unregister_task(id());
}
const task_manager::task::children& task_manager::task::get_children() const noexcept {
return _impl->_children;
}
bool task_manager::task::is_complete() const noexcept {
return _impl->is_complete();
}
future<utils::chunked_vector<task_manager::task::task_essentials>> task_manager::task::get_failed_children() const {
return _impl->get_failed_children();
}
void task_manager::task::set_virtual_parent() noexcept {
_impl->set_virtual_parent();
}
task_manager::virtual_task::impl::impl(module_ptr module) noexcept
: _module(std::move(module))
{}
future<utils::chunked_vector<task_identity>> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr) {
auto ms = module->get_task_manager()._messaging;
if (!ms) {
auto ids = co_await module->get_task_manager().get_virtual_task_children(parent_id);
co_return ids | std::views::transform([&tm = module->get_task_manager()] (auto id) {
return task_identity{
.host_id = tm.get_host_id(),
.task_id = id
};
}) | std::ranges::to<utils::chunked_vector<task_identity>>();
}
auto nodes = module->get_nodes();
co_await utils::get_local_injector().inject("tasks_vt_get_children", [] (auto& handler) -> future<> {
tmlogger.info("tasks_vt_get_children: waiting");
co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{60});
});
co_return co_await map_reduce(nodes, [ms, parent_id] (auto host_id) -> future<utils::chunked_vector<task_identity>> {
return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) {
return resp | std::views::transform([host_id] (auto id) {
return task_identity{
.host_id = host_id,
.task_id = id
};
}) | std::ranges::to<utils::chunked_vector<task_identity>>();
}).handle_exception_type([host_id, parent_id] (const rpc::closed_error& ex) {
tmlogger.warn("Failed to get children of virtual task with id={} from node {}: {}", parent_id, host_id, ex);
return utils::chunked_vector<task_identity>{};
});
}, utils::chunked_vector<task_identity>{}, [] (auto a, auto&& b) {
std::move(b.begin(), b.end(), std::back_inserter(a));
return a;
});
}
task_manager::module_ptr task_manager::virtual_task::impl::get_module() const noexcept {
return _module;
}
task_manager& task_manager::virtual_task::impl::get_task_manager() const noexcept {
return _module->get_task_manager();
}
future<tasks::is_abortable> task_manager::virtual_task::impl::is_abortable(virtual_task_hint hint) const {
return make_ready_future<tasks::is_abortable>(is_abortable::no);
}
task_manager::virtual_task::virtual_task(virtual_task_impl_ptr&& impl) noexcept
: _impl(std::move(impl))
{
SCYLLA_ASSERT(this_shard_id() == 0);
}
future<std::optional<tasks::virtual_task_hint>> task_manager::virtual_task::contains(tasks::task_id task_id) const {
return _impl->contains(task_id);
}
task_manager::module_ptr task_manager::virtual_task::get_module() const noexcept {
return _impl->get_module();
}
task_manager::task_group task_manager::virtual_task::get_group() const noexcept {
return _impl->get_group();
}
future<tasks::is_abortable> task_manager::virtual_task::is_abortable(virtual_task_hint hint) const {
return _impl->is_abortable(std::move(hint));
}
future<std::optional<task_status>> task_manager::virtual_task::get_status(task_id id, virtual_task_hint hint) {
return _impl->get_status(id, std::move(hint));
}
future<std::optional<task_status>> task_manager::virtual_task::wait(task_id id, virtual_task_hint hint) {
return _impl->wait(id, std::move(hint));
}
future<> task_manager::virtual_task::abort(task_id id, virtual_task_hint hint) noexcept {
return _impl->abort(id, std::move(hint));
}
future<std::vector<task_stats>> task_manager::virtual_task::get_stats() {
return _impl->get_stats();
}
task_manager::module::module(task_manager& tm, std::string name) noexcept : _tm(tm), _name(std::move(name)), _gate(fmt::format("task_manager::module[{}]", _name)) {
_abort_subscription = _tm.abort_source().subscribe([this] () noexcept {
abort_source().request_abort();
});
}
uint64_t task_manager::module::new_sequence_number() noexcept {
return ++_sequence_number;
}
task_manager& task_manager::module::get_task_manager() noexcept {
return _tm;
}
const task_manager& task_manager::module::get_task_manager() const noexcept {
return _tm;
}
abort_source& task_manager::module::abort_source() noexcept {
return _as;
}
named_gate& task_manager::module::async_gate() noexcept {
return _gate;
}
const std::string& task_manager::module::get_name() const noexcept {
return _name;
}
task_manager::task_map& task_manager::module::get_local_tasks() noexcept {
return _tasks._local_tasks;
}
const task_manager::task_map& task_manager::module::get_local_tasks() const noexcept {
return _tasks._local_tasks;
}
task_manager::virtual_task_map& task_manager::module::get_virtual_tasks() noexcept {
return _tasks._virtual_tasks;
}
const task_manager::virtual_task_map& task_manager::module::get_virtual_tasks() const noexcept {
return _tasks._virtual_tasks;
}
task_manager::tasks_collection& task_manager::module::get_tasks_collection() noexcept {
return _tasks;
}
const task_manager::tasks_collection& task_manager::module::get_tasks_collection() const noexcept {
return _tasks;
}
std::set<locator::host_id> task_manager::module::get_nodes() const {
return {_tm.get_host_id()};
}
future<utils::chunked_vector<task_stats>> task_manager::module::get_stats(is_internal internal, std::function<bool(std::string& keyspace, std::string& table)> filter) const {
utils::chunked_vector<task_stats> stats;
for (auto [_, task]: get_local_tasks()) {
if ((internal || !task->is_internal()) && filter(task->get_status().keyspace, task->get_status().table)) {
stats.push_back(task_stats{
.task_id = task->id(),
.type = task->type(),
.kind = task_kind::node,
.scope = task->get_status().scope,
.state = task->get_status().state,
.sequence_number = task->get_sequence_number(),
.keyspace = task->get_status().keyspace,
.table = task->get_status().table,
.entity = task->get_status().entity,
.shard = task->get_status().shard,
.start_time = task->get_status().start_time,
.end_time = task->get_status().end_time,
});
}
}
if (this_shard_id() == 0) {
auto virtual_tasks = get_virtual_tasks(); // Copy to make sure iterators are valid.
for (auto [_, vt]: virtual_tasks) {
auto vstats = co_await vt->get_stats();
for (auto&& s: vstats) {
if (filter(s.keyspace, s.table)) {
stats.push_back(std::move(s));
}
}
}
}
co_return stats;
}
void task_manager::module::register_task(task_ptr task) {
get_local_tasks()[task->id()] = task;
try {
_tm.register_task(task);
} catch (...) {
get_local_tasks().erase(task->id());
throw;
}
}
void task_manager::module::register_virtual_task(virtual_task_ptr task) {
SCYLLA_ASSERT(this_shard_id() == 0);
auto group = task->get_group();
get_virtual_tasks()[group] = task;
try {
_tm.register_virtual_task(task);
} catch (...) {
get_virtual_tasks().erase(group);
throw;
}
}
void task_manager::module::unregister_task(task_id id) noexcept {
get_local_tasks().erase(id);
_tm.unregister_task(id);
}
future<> task_manager::module::stop() noexcept {
tmlogger.info("Stopping module {}", _name);
abort_source().request_abort();
co_await _gate.close();
if (this_shard_id() == 0) {
for (auto& [group, _]: _tasks._virtual_tasks) {
_tm.unregister_virtual_task(group);
}
_tasks._virtual_tasks = {};
}
_tm.unregister_module(_name);
}
future<task_manager::task_ptr> task_manager::module::make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d) {
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr), async_gate().hold());
bool abort = false;
if (parent_d) {
// Regular task as a parent.
auto sequence_number = co_await _tm.container().invoke_on(parent_d.shard, coroutine::lambda([id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable -> future<std::optional<uint64_t>> {
const auto& all_tasks = tm.get_local_tasks();
if (auto it = all_tasks.find(id); it != all_tasks.end()) {
co_await it->second->add_child(std::move(task));
abort = it->second->abort_requested();
co_return it->second->get_sequence_number();
}
co_return std::nullopt;
}));
if (!sequence_number) { // Virtual task as a parent.
task->set_virtual_parent();
sequence_number = new_sequence_number();
}
task->get_status().sequence_number = sequence_number.value();
}
if (abort) {
task->abort();
}
co_return task;
}
task_manager::task_manager(config cfg, class abort_source& as) noexcept
: _cfg(std::move(cfg))
, _task_ttl(_cfg.task_ttl)
, _user_task_ttl(_cfg.user_task_ttl)
{
_abort_subscription = as.subscribe([this] () noexcept {
_as.request_abort();
});
tmlogger.debug("Started task manager (TTL={}) (USER TTL={})", get_task_ttl(), get_user_task_ttl());
}
task_manager::task_manager() noexcept
: _task_ttl(0)
, _user_task_ttl(0)
{}
locator::host_id task_manager::get_host_id() const noexcept {
return _host_id;
}
void task_manager::set_host_id(locator::host_id host_id) noexcept {
_host_id = host_id;
}
task_manager::modules& task_manager::get_modules() noexcept {
return _modules;
}
const task_manager::modules& task_manager::get_modules() const noexcept {
return _modules;
}
task_manager::task_map& task_manager::get_local_tasks() noexcept {
return _tasks._local_tasks;
}
const task_manager::task_map& task_manager::get_local_tasks() const noexcept {
return _tasks._local_tasks;
}
task_manager::virtual_task_map& task_manager::get_virtual_tasks() noexcept {
return _tasks._virtual_tasks;
}
const task_manager::virtual_task_map& task_manager::get_virtual_tasks() const noexcept {
return _tasks._virtual_tasks;
}
task_manager::tasks_collection& task_manager::get_tasks_collection() noexcept {
return _tasks;
}
const task_manager::tasks_collection& task_manager::get_tasks_collection() const noexcept {
return _tasks;
}
std::set<locator::host_id> task_manager::get_nodes(service::storage_service& ss) const {
return std::ranges::join_view(std::to_array({
std::views::all(ss._topology_state_machine._topology.normal_nodes),
std::views::all(ss._topology_state_machine._topology.transition_nodes)})
) | std::views::transform([] (auto& node) {
return locator::host_id{node.first.uuid()};
}) | std::views::filter([&ss] (locator::host_id host_id) {
return ss._gossiper.is_alive(host_id);
}) | std::ranges::to<std::set<locator::host_id>>();
}
future<std::vector<task_id>> task_manager::get_virtual_task_children(task_id parent_id) {
return container().map_reduce0([parent_id] (task_manager& tm) {
return tm.get_local_tasks() |
std::views::values |
std::views::filter([parent_id] (const auto& task) { return task->get_parent_id() == parent_id; }) |
std::views::transform([] (const auto& task) { return task->id(); }) |
std::ranges::to<std::vector>();
}, std::vector<task_id>{}, concat<task_id>);
}
task_manager::module_ptr task_manager::make_module(std::string name) {
auto m = seastar::make_shared<task_manager::module>(*this, name);
register_module(std::move(name), m);
return m;
}
task_manager::module_ptr task_manager::find_module(std::string module_name) {
auto it = _modules.find(module_name);
if (it == _modules.end()) {
throw std::runtime_error(seastar::format("module {} not found", module_name));
}
return it->second;
}
future<> task_manager::stop() noexcept {
if (!_modules.empty()) {
on_internal_error(tmlogger, "Tried to stop task manager while some modules were not unregistered");
}
return make_ready_future<>();
}
future<task_manager::foreign_task_ptr> task_manager::lookup_task_on_all_shards(sharded<task_manager>& tm, task_id tid) {
return task_manager::invoke_on_task(tm, tid, std::function([tid] (task_variant task_v, virtual_task_hint) {
return std::visit(overloaded_functor{
[] (tasks::task_manager::task_ptr task) {
return make_ready_future<task_manager::foreign_task_ptr>(make_foreign(task));
},
[tid] (tasks::task_manager::virtual_task_ptr task) -> future<tasks::task_manager::foreign_task_ptr> {
throw tasks::task_manager::task_not_found(tid); // The method is designed for regular tasks.
}
}, task_v);
}));
}
future<std::pair<task_manager::virtual_task_ptr, tasks::virtual_task_hint>> task_manager::lookup_virtual_task(task_manager& tm, task_id id) {
auto vts = tm.get_virtual_tasks();
for (auto [_, vt]: tm.get_virtual_tasks()) {
if (auto hint = co_await vt->contains(id); hint.has_value()) {
co_return std::make_pair(vt, std::move(hint.value()));
}
}
co_return std::make_pair<task_manager::virtual_task_ptr, tasks::virtual_task_hint>(nullptr, {});
}
future<> task_manager::invoke_on_task(sharded<task_manager>& tm, task_id id, std::function<future<> (task_manager::task_variant, virtual_task_hint)> func) {
co_await task_manager::invoke_on_task(tm, id, std::function([func = std::move(func)] (task_manager::task_variant task_v, virtual_task_hint vt_hint) -> future<bool> {
co_await func(task_v, std::move(vt_hint));
co_return true;
}));
}
abort_source& task_manager::abort_source() noexcept {
return _as;
}
std::chrono::seconds task_manager::get_task_ttl() const noexcept {
return std::chrono::seconds(_task_ttl);
}
std::chrono::seconds task_manager::get_user_task_ttl() const noexcept {
return std::chrono::seconds(_user_task_ttl);
}
void task_manager::register_module(std::string name, module_ptr module) {
_modules[name] = module;
tmlogger.info("Registered module {}", name);
}
void task_manager::unregister_module(std::string name) noexcept {
_modules.erase(name);
tmlogger.info("Unregistered module {}", name);
}
void task_manager::register_task(task_ptr task) {
_tasks._local_tasks[task->id()] = task;
}
void task_manager::register_virtual_task(virtual_task_ptr task) {
_tasks._virtual_tasks[task->get_group()] = task;
}
void task_manager::unregister_task(task_id id) noexcept {
_tasks._local_tasks.erase(id);
}
void task_manager::unregister_virtual_task(task_group group) noexcept {
_tasks._virtual_tasks.erase(group);
}
void task_manager::init_ms_handlers(netw::messaging_service& ms) {
_messaging = &ms;
ser::tasks_rpc_verbs::register_tasks_get_children(_messaging, [this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future<tasks::get_children_response> {
return get_virtual_task_children(task_id{req.id});
});
}
future<> task_manager::uninit_ms_handlers() {
if (auto* ms = std::exchange(_messaging, nullptr)) {
return ser::tasks_rpc_verbs::unregister(ms).discard_result();
}
return make_ready_future();
}
locator::tablet_task_type virtual_task_hint::get_task_type() const {
if (!task_type.has_value()) {
on_internal_error(tmlogger, "tablet_virtual_task hint does not contain task type");
}
return task_type.value();
}
locator::tablet_id virtual_task_hint::get_tablet_id() const {
if (!tablet_id.has_value()) {
on_internal_error(tmlogger, "tablet_virtual_task hint does not contain tablet_id");
}
return tablet_id.value();
}
::table_id virtual_task_hint::get_table_id() const {
if (!table_id.has_value()) {
on_internal_error(tasks::tmlogger, "tablet_virtual_task hint does not contain table_id");
}
return table_id.value();
}
}