/* * Copyright (C) 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include #include #include #include #include #include "db/timeout_clock.hh" #include "message/messaging_service.hh" #include "utils/assert.hh" #include "utils/chunked_vector.hh" #include "utils/overloaded_functor.hh" #include "service/storage_service.hh" #include "tasks/task_handler.hh" #include "task_manager.hh" #include "tasks/virtual_task_hint.hh" #include "utils/error_injection.hh" #include "idl/tasks.dist.hh" using namespace std::chrono_literals; template std::vector concat(std::vector a, std::vector&& b) { std::move(b.begin(), b.end(), std::back_inserter(a)); return a; } namespace tasks { logging::logger tmlogger("task_manager"); bool task_manager::task::children::all_finished() const noexcept { return _children.empty(); } size_t task_manager::task::children::size() const noexcept { return _children.size() + _finished_children.size(); } future<> task_manager::task::children::add_child(foreign_task_ptr task) { rwlock::holder exclusive_holder = co_await _lock.hold_write_lock(); auto id = task->id(); auto inserted = _children.emplace(id, std::move(task)).second; SCYLLA_ASSERT(inserted); } future<> task_manager::task::children::mark_as_finished(task_id id, task_essentials essentials) const { rwlock::holder exclusive_holder = co_await _lock.hold_write_lock(); auto it = _children.find(id); _finished_children.push_back(essentials); [&] () noexcept { // erase is expected to not throw _children.erase(it); }(); } future task_manager::task::children::get_progress(const std::string& progress_units) const { rwlock::holder shared_holder = co_await _lock.hold_read_lock(); tasks::task_manager::task::progress progress{}; co_await coroutine::parallel_for_each(_children, [&] (const auto& child_entry) -> future<> { const auto& child = child_entry.second; auto local_progress = co_await smp::submit_to(child.get_owner_shard(), [&child, &progress_units] { SCYLLA_ASSERT(child->get_status().progress_units == progress_units); return child->get_progress(); }); progress += local_progress; }); for (const auto& child: _finished_children) { progress += child.task_progress; } co_return progress; } future<> task_manager::task::children::for_each_task(std::function(const foreign_task_ptr&)> f_children, std::function(const task_essentials&)> f_finished_children) const { rwlock::holder shared_holder = co_await _lock.hold_read_lock(); for (const auto& [_, child]: _children) { co_await f_children(child); } for (const auto& child: _finished_children) { co_await f_finished_children(child); } } task_manager::task::impl::impl(module_ptr module, task_id id, uint64_t sequence_number, std::string scope, std::string keyspace, std::string table, std::string entity, task_id parent_id) noexcept : _status({ .id = id, .state = task_state::created, .sequence_number = sequence_number, .shard = this_shard_id(), .scope = std::move(scope), .keyspace = std::move(keyspace), .table = std::move(table), .entity = std::move(entity) }) , _parent_id(parent_id) , _module(module) { // Child tasks of regular tasks do not need to subscribe to abort source because they will be aborted recursively by their parents. if (!parent_id) { _shutdown_subscription = module->abort_source().subscribe([this] () noexcept { abort(); }); } } future> task_manager::task::impl::expected_total_workload() const { return make_ready_future>(std::nullopt); } future task_manager::task::impl::get_progress() const { if (is_complete()) { co_return co_await _children.get_progress(_status.progress_units); } std::optional expected_workload = co_await expected_total_workload(); if (!expected_workload && _children.size() == 0) { co_return task_manager::task::progress{}; } auto progress = co_await _children.get_progress(_status.progress_units); progress.total = expected_workload.value_or(progress.total); co_return progress; } is_abortable task_manager::task::impl::is_abortable() const noexcept { return is_abortable::no; } is_internal task_manager::task::impl::is_internal() const noexcept { return tasks::is_internal(_parent_id && _parent_kind != task_kind::cluster); } tasks::is_user_task task_manager::task::impl::is_user_task() const noexcept { return tasks::is_user_task::no; } static future<> abort_children(task_manager::module_ptr module, task_id parent_id) noexcept { co_await utils::get_local_injector().inject("tasks_abort_children", utils::wait_for_message(10s)); auto entered = module->async_gate().try_enter(); if (!entered) { co_return; } auto leave_gate = defer([&module] () { module->async_gate().leave(); }); co_await module->get_task_manager().container().invoke_on_all([parent_id] (task_manager& tm) { for (auto& task : tm.get_local_tasks()) { if (task.second->get_parent_id() == parent_id) { task.second->abort(); } } }); } void task_manager::task::impl::abort() noexcept { if (!_as.abort_requested()) { _as.request_abort(); (void)abort_children(_module, _status.id); } } bool task_manager::task::impl::is_complete() const noexcept { return _status.state == tasks::task_manager::task_state::done || _status.state == tasks::task_manager::task_state::failed; } bool task_manager::task::impl::is_done() const noexcept { return _status.state == tasks::task_manager::task_state::done; } future> task_manager::task::impl::get_failed_children() const { return _children.map_each_task([] (const foreign_task_ptr&) { return std::nullopt; }, [] (const task_essentials& child) -> std::optional { if (child.task_status.state == task_state::failed || !child.failed_children.empty()) { return child; } return std::nullopt; } ); } void task_manager::task::impl::set_virtual_parent() noexcept { _parent_kind = task_kind::cluster; _shutdown_subscription = _module->abort_source().subscribe([this] () noexcept { abort(); }); } task_id task_manager::task::impl::id() const noexcept { return _status.id; } task_manager::task::status& task_manager::task::impl::get_status() noexcept { return _status; } future<> task_manager::task::impl::done() const noexcept { return _done.get_shared_future(); } void task_manager::task::impl::run_to_completion() { (void)run().then([this] { _as.check(); return finish(); }).handle_exception([this] (std::exception_ptr ex) { return finish_failed(std::move(ex)); }); } future<> task_manager::task::impl::maybe_fold_into_parent() const noexcept { try { if (is_internal() && _parent_id && _parent_kind == task_kind::node && _children.all_finished()) { auto parent = co_await _module->get_task_manager().lookup_task_on_all_shards(_module->get_task_manager().container(), _parent_id); task_essentials child{ .task_status = _status, .task_progress = co_await get_progress(), .parent_id = _parent_id, .type = type(), .abortable = is_abortable(), .failed_children = co_await get_failed_children(), }; co_await smp::submit_to(parent.get_owner_shard(), [&parent, id = _status.id, child = std::move(child)] () { return parent->get_children().mark_as_finished(id, std::move(child)); }); } } catch (...) { // If folding fails, leave the subtree unfolded. tasks::tmlogger.warn("Folding of task with id={} failed due to {}. Ignored", _status.id, std::current_exception()); } } future<> task_manager::task::impl::finish() noexcept { if (!_done.available()) { _status.end_time = db_clock::now(); _status.state = task_manager::task_state::done; co_await maybe_fold_into_parent(); _done.set_value(); co_await release_resources(); } } future<> task_manager::task::impl::finish_failed(std::exception_ptr ex, std::string error) noexcept { if (!_done.available()) { _status.end_time = db_clock::now(); _status.state = task_manager::task_state::failed; _status.error = std::move(error); co_await maybe_fold_into_parent(); _done.set_exception(ex); co_await release_resources(); } } future<> task_manager::task::impl::finish_failed(std::exception_ptr ex) noexcept { std::string error; try { error = fmt::format("{}", ex); } catch (...) { error = "Failed to get error message"; } return finish_failed(ex, std::move(error)); } task_manager::task::task(task_impl_ptr&& impl, gate::holder gh) noexcept : _impl(std::move(impl)), _gate_holder(std::move(gh)) { register_task(); } task_id task_manager::task::id() { return _impl->id(); } std::string task_manager::task::type() const { return _impl->type(); } task_manager::task::status& task_manager::task::get_status() noexcept { return _impl->get_status(); } uint64_t task_manager::task::get_sequence_number() const noexcept { return _impl->_status.sequence_number; } task_id task_manager::task::get_parent_id() const noexcept { return _impl->_parent_id; } void task_manager::task::change_state(task_state state) noexcept { _impl->_status.state = state; } future<> task_manager::task::add_child(foreign_task_ptr&& child) { return _impl->_children.add_child(std::move(child)); } void task_manager::task::start() { if (_impl->_status.state != task_state::created) { on_fatal_internal_error(tmlogger, seastar::format("{} task with id = {} was started twice", _impl->_module->get_name(), id())); } _impl->_status.start_time = db_clock::now(); try { // Background fiber does not capture task ptr, so the task can be unregistered and destroyed independently in the foreground. // After the ttl expires, the task id will be used to unregister the task if that didn't happen in any other way. auto module = _impl->_module; auto user_task = is_user_task(); bool drop_after_complete = (get_parent_id() && _impl->_parent_kind == task_kind::node) || is_internal(); (void)done().finally([module, drop_after_complete, user_task] { if (drop_after_complete) { return make_ready_future<>(); } return sleep_abortable(user_task ? module->get_task_manager().get_user_task_ttl() : module->get_task_manager().get_task_ttl(), module->abort_source()); }).then_wrapped([module, id = id()] (auto f) { f.ignore_ready_future(); module->unregister_task(id); }); _impl->_as.check(); _impl->_status.state = task_manager::task_state::running; _impl->run_to_completion(); } catch (...) { (void)_impl->finish_failed(std::current_exception()).then([impl = _impl] {}); } } std::string task_manager::task::get_module_name() const noexcept { return _impl->_module->get_name(); } task_manager::module_ptr task_manager::task::get_module() const noexcept { return _impl->_module; } future task_manager::task::get_progress() const { return _impl->get_progress(); } is_abortable task_manager::task::is_abortable() const noexcept { return _impl->is_abortable(); }; is_internal task_manager::task::is_internal() const noexcept { return _impl->is_internal(); } is_user_task task_manager::task::is_user_task() const noexcept { return _impl->is_user_task(); } void task_manager::task::abort() noexcept { _impl->abort(); } bool task_manager::task::abort_requested() const noexcept { return _impl->_as.abort_requested(); } future<> task_manager::task::done() const noexcept { return _impl->done(); } void task_manager::task::register_task() { _impl->_module->register_task(shared_from_this()); } void task_manager::task::unregister_task() noexcept { _impl->_module->unregister_task(id()); } const task_manager::task::children& task_manager::task::get_children() const noexcept { return _impl->_children; } bool task_manager::task::is_complete() const noexcept { return _impl->is_complete(); } future> task_manager::task::get_failed_children() const { return _impl->get_failed_children(); } void task_manager::task::set_virtual_parent() noexcept { _impl->set_virtual_parent(); } task_manager::virtual_task::impl::impl(module_ptr module) noexcept : _module(std::move(module)) {} future> task_manager::virtual_task::impl::get_children(module_ptr module, task_id parent_id, locator::token_metadata_ptr tmptr) { auto ms = module->get_task_manager()._messaging; if (!ms) { auto ids = co_await module->get_task_manager().get_virtual_task_children(parent_id); co_return ids | std::views::transform([&tm = module->get_task_manager()] (auto id) { return task_identity{ .host_id = tm.get_host_id(), .task_id = id }; }) | std::ranges::to>(); } auto nodes = module->get_nodes(); co_await utils::get_local_injector().inject("tasks_vt_get_children", [] (auto& handler) -> future<> { tmlogger.info("tasks_vt_get_children: waiting"); co_await handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{60}); }); co_return co_await map_reduce(nodes, [ms, parent_id] (auto host_id) -> future> { return ser::tasks_rpc_verbs::send_tasks_get_children(ms, host_id, parent_id).then([host_id] (auto resp) { return resp | std::views::transform([host_id] (auto id) { return task_identity{ .host_id = host_id, .task_id = id }; }) | std::ranges::to>(); }).handle_exception_type([host_id, parent_id] (const rpc::closed_error& ex) { tmlogger.warn("Failed to get children of virtual task with id={} from node {}: {}", parent_id, host_id, ex); return utils::chunked_vector{}; }); }, utils::chunked_vector{}, [] (auto a, auto&& b) { std::move(b.begin(), b.end(), std::back_inserter(a)); return a; }); } task_manager::module_ptr task_manager::virtual_task::impl::get_module() const noexcept { return _module; } task_manager& task_manager::virtual_task::impl::get_task_manager() const noexcept { return _module->get_task_manager(); } future task_manager::virtual_task::impl::is_abortable(virtual_task_hint hint) const { return make_ready_future(is_abortable::no); } task_manager::virtual_task::virtual_task(virtual_task_impl_ptr&& impl) noexcept : _impl(std::move(impl)) { SCYLLA_ASSERT(this_shard_id() == 0); } future> task_manager::virtual_task::contains(tasks::task_id task_id) const { return _impl->contains(task_id); } task_manager::module_ptr task_manager::virtual_task::get_module() const noexcept { return _impl->get_module(); } task_manager::task_group task_manager::virtual_task::get_group() const noexcept { return _impl->get_group(); } future task_manager::virtual_task::is_abortable(virtual_task_hint hint) const { return _impl->is_abortable(std::move(hint)); } future> task_manager::virtual_task::get_status(task_id id, virtual_task_hint hint) { return _impl->get_status(id, std::move(hint)); } future> task_manager::virtual_task::wait(task_id id, virtual_task_hint hint) { return _impl->wait(id, std::move(hint)); } future<> task_manager::virtual_task::abort(task_id id, virtual_task_hint hint) noexcept { return _impl->abort(id, std::move(hint)); } future> task_manager::virtual_task::get_stats() { return _impl->get_stats(); } task_manager::module::module(task_manager& tm, std::string name) noexcept : _tm(tm), _name(std::move(name)), _gate(fmt::format("task_manager::module[{}]", _name)) { _abort_subscription = _tm.abort_source().subscribe([this] () noexcept { abort_source().request_abort(); }); } uint64_t task_manager::module::new_sequence_number() noexcept { return ++_sequence_number; } task_manager& task_manager::module::get_task_manager() noexcept { return _tm; } const task_manager& task_manager::module::get_task_manager() const noexcept { return _tm; } abort_source& task_manager::module::abort_source() noexcept { return _as; } named_gate& task_manager::module::async_gate() noexcept { return _gate; } const std::string& task_manager::module::get_name() const noexcept { return _name; } task_manager::task_map& task_manager::module::get_local_tasks() noexcept { return _tasks._local_tasks; } const task_manager::task_map& task_manager::module::get_local_tasks() const noexcept { return _tasks._local_tasks; } task_manager::virtual_task_map& task_manager::module::get_virtual_tasks() noexcept { return _tasks._virtual_tasks; } const task_manager::virtual_task_map& task_manager::module::get_virtual_tasks() const noexcept { return _tasks._virtual_tasks; } task_manager::tasks_collection& task_manager::module::get_tasks_collection() noexcept { return _tasks; } const task_manager::tasks_collection& task_manager::module::get_tasks_collection() const noexcept { return _tasks; } std::set task_manager::module::get_nodes() const { return {_tm.get_host_id()}; } future> task_manager::module::get_stats(is_internal internal, std::function filter) const { utils::chunked_vector stats; for (auto [_, task]: get_local_tasks()) { if ((internal || !task->is_internal()) && filter(task->get_status().keyspace, task->get_status().table)) { stats.push_back(task_stats{ .task_id = task->id(), .type = task->type(), .kind = task_kind::node, .scope = task->get_status().scope, .state = task->get_status().state, .sequence_number = task->get_sequence_number(), .keyspace = task->get_status().keyspace, .table = task->get_status().table, .entity = task->get_status().entity, .shard = task->get_status().shard, .start_time = task->get_status().start_time, .end_time = task->get_status().end_time, }); } } if (this_shard_id() == 0) { auto virtual_tasks = get_virtual_tasks(); // Copy to make sure iterators are valid. for (auto [_, vt]: virtual_tasks) { auto vstats = co_await vt->get_stats(); for (auto&& s: vstats) { if (filter(s.keyspace, s.table)) { stats.push_back(std::move(s)); } } } } co_return stats; } void task_manager::module::register_task(task_ptr task) { get_local_tasks()[task->id()] = task; try { _tm.register_task(task); } catch (...) { get_local_tasks().erase(task->id()); throw; } } void task_manager::module::register_virtual_task(virtual_task_ptr task) { SCYLLA_ASSERT(this_shard_id() == 0); auto group = task->get_group(); get_virtual_tasks()[group] = task; try { _tm.register_virtual_task(task); } catch (...) { get_virtual_tasks().erase(group); throw; } } void task_manager::module::unregister_task(task_id id) noexcept { get_local_tasks().erase(id); _tm.unregister_task(id); } future<> task_manager::module::stop() noexcept { tmlogger.info("Stopping module {}", _name); abort_source().request_abort(); co_await _gate.close(); if (this_shard_id() == 0) { for (auto& [group, _]: _tasks._virtual_tasks) { _tm.unregister_virtual_task(group); } _tasks._virtual_tasks = {}; } _tm.unregister_module(_name); } future task_manager::module::make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d) { auto task = make_lw_shared(std::move(task_impl_ptr), async_gate().hold()); bool abort = false; if (parent_d) { // Regular task as a parent. auto sequence_number = co_await _tm.container().invoke_on(parent_d.shard, coroutine::lambda([id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable -> future> { const auto& all_tasks = tm.get_local_tasks(); if (auto it = all_tasks.find(id); it != all_tasks.end()) { co_await it->second->add_child(std::move(task)); abort = it->second->abort_requested(); co_return it->second->get_sequence_number(); } co_return std::nullopt; })); if (!sequence_number) { // Virtual task as a parent. task->set_virtual_parent(); sequence_number = new_sequence_number(); } task->get_status().sequence_number = sequence_number.value(); } if (abort) { task->abort(); } co_return task; } task_manager::task_manager(config cfg, class abort_source& as) noexcept : _cfg(std::move(cfg)) , _task_ttl(_cfg.task_ttl) , _user_task_ttl(_cfg.user_task_ttl) { _abort_subscription = as.subscribe([this] () noexcept { _as.request_abort(); }); tmlogger.debug("Started task manager (TTL={}) (USER TTL={})", get_task_ttl(), get_user_task_ttl()); } task_manager::task_manager() noexcept : _task_ttl(0) , _user_task_ttl(0) {} locator::host_id task_manager::get_host_id() const noexcept { return _host_id; } void task_manager::set_host_id(locator::host_id host_id) noexcept { _host_id = host_id; } task_manager::modules& task_manager::get_modules() noexcept { return _modules; } const task_manager::modules& task_manager::get_modules() const noexcept { return _modules; } task_manager::task_map& task_manager::get_local_tasks() noexcept { return _tasks._local_tasks; } const task_manager::task_map& task_manager::get_local_tasks() const noexcept { return _tasks._local_tasks; } task_manager::virtual_task_map& task_manager::get_virtual_tasks() noexcept { return _tasks._virtual_tasks; } const task_manager::virtual_task_map& task_manager::get_virtual_tasks() const noexcept { return _tasks._virtual_tasks; } task_manager::tasks_collection& task_manager::get_tasks_collection() noexcept { return _tasks; } const task_manager::tasks_collection& task_manager::get_tasks_collection() const noexcept { return _tasks; } std::set task_manager::get_nodes(service::storage_service& ss) const { return std::ranges::join_view(std::to_array({ std::views::all(ss._topology_state_machine._topology.normal_nodes), std::views::all(ss._topology_state_machine._topology.transition_nodes)}) ) | std::views::transform([] (auto& node) { return locator::host_id{node.first.uuid()}; }) | std::views::filter([&ss] (locator::host_id host_id) { return ss._gossiper.is_alive(host_id); }) | std::ranges::to>(); } future> task_manager::get_virtual_task_children(task_id parent_id) { return container().map_reduce0([parent_id] (task_manager& tm) { return tm.get_local_tasks() | std::views::values | std::views::filter([parent_id] (const auto& task) { return task->get_parent_id() == parent_id; }) | std::views::transform([] (const auto& task) { return task->id(); }) | std::ranges::to(); }, std::vector{}, concat); } task_manager::module_ptr task_manager::make_module(std::string name) { auto m = seastar::make_shared(*this, name); register_module(std::move(name), m); return m; } task_manager::module_ptr task_manager::find_module(std::string module_name) { auto it = _modules.find(module_name); if (it == _modules.end()) { throw std::runtime_error(seastar::format("module {} not found", module_name)); } return it->second; } future<> task_manager::stop() noexcept { if (!_modules.empty()) { on_internal_error(tmlogger, "Tried to stop task manager while some modules were not unregistered"); } return make_ready_future<>(); } future task_manager::lookup_task_on_all_shards(sharded& tm, task_id tid) { return task_manager::invoke_on_task(tm, tid, std::function([tid] (task_variant task_v, virtual_task_hint) { return std::visit(overloaded_functor{ [] (tasks::task_manager::task_ptr task) { return make_ready_future(make_foreign(task)); }, [tid] (tasks::task_manager::virtual_task_ptr task) -> future { throw tasks::task_manager::task_not_found(tid); // The method is designed for regular tasks. } }, task_v); })); } future> task_manager::lookup_virtual_task(task_manager& tm, task_id id) { auto vts = tm.get_virtual_tasks(); for (auto [_, vt]: tm.get_virtual_tasks()) { if (auto hint = co_await vt->contains(id); hint.has_value()) { co_return std::make_pair(vt, std::move(hint.value())); } } co_return std::make_pair(nullptr, {}); } future<> task_manager::invoke_on_task(sharded& tm, task_id id, std::function (task_manager::task_variant, virtual_task_hint)> func) { co_await task_manager::invoke_on_task(tm, id, std::function([func = std::move(func)] (task_manager::task_variant task_v, virtual_task_hint vt_hint) -> future { co_await func(task_v, std::move(vt_hint)); co_return true; })); } abort_source& task_manager::abort_source() noexcept { return _as; } std::chrono::seconds task_manager::get_task_ttl() const noexcept { return std::chrono::seconds(_task_ttl); } std::chrono::seconds task_manager::get_user_task_ttl() const noexcept { return std::chrono::seconds(_user_task_ttl); } void task_manager::register_module(std::string name, module_ptr module) { _modules[name] = module; tmlogger.info("Registered module {}", name); } void task_manager::unregister_module(std::string name) noexcept { _modules.erase(name); tmlogger.info("Unregistered module {}", name); } void task_manager::register_task(task_ptr task) { _tasks._local_tasks[task->id()] = task; } void task_manager::register_virtual_task(virtual_task_ptr task) { _tasks._virtual_tasks[task->get_group()] = task; } void task_manager::unregister_task(task_id id) noexcept { _tasks._local_tasks.erase(id); } void task_manager::unregister_virtual_task(task_group group) noexcept { _tasks._virtual_tasks.erase(group); } void task_manager::init_ms_handlers(netw::messaging_service& ms) { _messaging = &ms; ser::tasks_rpc_verbs::register_tasks_get_children(_messaging, [this] (const rpc::client_info& cinfo, tasks::get_children_request req) -> future { return get_virtual_task_children(task_id{req.id}); }); } future<> task_manager::uninit_ms_handlers() { if (auto* ms = std::exchange(_messaging, nullptr)) { return ser::tasks_rpc_verbs::unregister(ms).discard_result(); } return make_ready_future(); } locator::tablet_task_type virtual_task_hint::get_task_type() const { if (!task_type.has_value()) { on_internal_error(tmlogger, "tablet_virtual_task hint does not contain task type"); } return task_type.value(); } locator::tablet_id virtual_task_hint::get_tablet_id() const { if (!tablet_id.has_value()) { on_internal_error(tmlogger, "tablet_virtual_task hint does not contain tablet_id"); } return tablet_id.value(); } ::table_id virtual_task_hint::get_table_id() const { if (!table_id.has_value()) { on_internal_error(tasks::tmlogger, "tablet_virtual_task hint does not contain table_id"); } return table_id.value(); } }