From 752edc2205151d441d508e07cb98d16a62ca36d9 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Mon, 31 Oct 2022 14:57:11 +0100 Subject: [PATCH 1/3] tasks: add abort_source to task_manager::task::impl task_manager::task can be aborted with impl's abort_source. By default abort request is propagated to all task's descendants. --- tasks/task_manager.hh | 36 ++++++++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index c6a859f730..87e5fa2624 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -8,6 +8,7 @@ #pragma once +#include #include #include #include @@ -95,6 +96,7 @@ public: foreign_task_vector _children; shared_promise<> _done; module_ptr _module; + abort_source _as; public: impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, task_id parent_id) : _status({ @@ -126,7 +128,24 @@ public: } virtual future<> abort() noexcept { - return make_ready_future<>(); + if (!_as.abort_requested()) { + _as.request_abort(); + + std::vector children_info{_children.size()}; + boost::transform(_children, children_info.begin(), [] (const auto& child) { + return task_info{child->id(), child.get_owner_shard()}; + }); + + co_await coroutine::parallel_for_each(children_info, [this] (auto info) { + return smp::submit_to(info.shard, [info, &tm = _module->get_task_manager().container()] { + auto& tasks = tm.local().get_all_tasks(); + if (auto it = tasks.find(info.id); it != tasks.end()) { + return it->second->abort(); + } + return make_ready_future<>(); + }); + }); + } } protected: virtual future<> run() = 0; @@ -245,6 +264,10 @@ public: return _impl->abort(); } + bool abort_requested() const noexcept { + return _impl->_as.abort_requested(); + } + future<> done() const noexcept { return _impl->_done.get_shared_future(); } @@ -337,17 +360,22 @@ public: future make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d = task_info{}) { auto task = make_lw_shared(std::move(task_impl_ptr)); foreign_task_ptr parent; + bool abort = false; if (parent_d) { - task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task)] (task_manager& tm) mutable { + task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable { const auto& all_tasks = tm.get_all_tasks(); if (auto it = all_tasks.find(id); it != all_tasks.end()) { it->second->add_child(std::move(task)); - return make_ready_future(it->second->get_sequence_number()); + abort = it->second->abort_requested(); + return it->second->get_sequence_number(); } else { - return make_exception_future(task_manager::task_not_found(id)); + throw task_manager::task_not_found(id); } }); } + if (abort) { + co_await task->abort(); + } co_return task; } }; From ebffca7ea5dffafbcf8b08ef65ce784d2dc8e27d Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 16 Nov 2022 18:07:57 +0100 Subject: [PATCH 2/3] tasks: delete unused variable --- tasks/task_manager.hh | 1 - 1 file changed, 1 deletion(-) diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index 87e5fa2624..90a4cd1dbf 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -359,7 +359,6 @@ public: // from a parent and set. Otherwise, it must be set by caller. future make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d = task_info{}) { auto task = make_lw_shared(std::move(task_impl_ptr)); - foreign_task_ptr parent; bool abort = false; if (parent_d) { task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable { From 4250bd9458e1e01297af9ba3ca35536f80c8d77f Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 16 Nov 2022 12:06:52 +0100 Subject: [PATCH 3/3] tasks: do not run tasks that are aborted Currently in start() method a task is run even if it was already aborted. When start() is called on an aborted task, its state is set to task_manager::task_state::failed and it doesn't run. --- tasks/task_manager.hh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index 90a4cd1dbf..37370ffb2b 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -221,7 +221,6 @@ public: void start() { _impl->_status.start_time = db_clock::now(); - _impl->_status.state = task_manager::task_state::running; try { // Background fiber does not capture task ptr, so the task can be unregistered and destroyed independently in the foreground. @@ -234,6 +233,8 @@ public: module->unregister_task(id); }); }); + _impl->_as.check(); + _impl->_status.state = task_manager::task_state::running; _impl->run_to_completion(); } catch (...) { _impl->finish_failed(std::current_exception());