mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-01 04:26:48 +00:00
Merge 'Do not run aborted tasks' from Aleksandra Martyniuk
task_manager::task::impl contains an abort source which can be used to check whether it is aborted and an abort method which aborts the task (request_abort on abort_source) and all its descendants recursively. When the start method is called after the task was aborted, then its state is set to failed and the task does not run. Fixes: #11995 Closes #11996 * github.com:scylladb/scylladb: tasks: do not run tasks that are aborted tasks: delete unused variable tasks: add abort_source to task_manager::task::impl
This commit is contained in:
@@ -8,6 +8,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/range/algorithm/transform.hpp>
|
||||
#include <seastar/core/gate.hh>
|
||||
#include <seastar/core/sharded.hh>
|
||||
#include <seastar/core/sleep.hh>
|
||||
@@ -95,6 +96,7 @@ public:
|
||||
foreign_task_vector _children;
|
||||
shared_promise<> _done;
|
||||
module_ptr _module;
|
||||
abort_source _as;
|
||||
public:
|
||||
impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, task_id parent_id)
|
||||
: _status({
|
||||
@@ -126,7 +128,24 @@ public:
|
||||
}
|
||||
|
||||
virtual future<> abort() noexcept {
|
||||
return make_ready_future<>();
|
||||
if (!_as.abort_requested()) {
|
||||
_as.request_abort();
|
||||
|
||||
std::vector<task_info> children_info{_children.size()};
|
||||
boost::transform(_children, children_info.begin(), [] (const auto& child) {
|
||||
return task_info{child->id(), child.get_owner_shard()};
|
||||
});
|
||||
|
||||
co_await coroutine::parallel_for_each(children_info, [this] (auto info) {
|
||||
return smp::submit_to(info.shard, [info, &tm = _module->get_task_manager().container()] {
|
||||
auto& tasks = tm.local().get_all_tasks();
|
||||
if (auto it = tasks.find(info.id); it != tasks.end()) {
|
||||
return it->second->abort();
|
||||
}
|
||||
return make_ready_future<>();
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
protected:
|
||||
virtual future<> run() = 0;
|
||||
@@ -202,7 +221,6 @@ public:
|
||||
|
||||
void start() {
|
||||
_impl->_status.start_time = db_clock::now();
|
||||
_impl->_status.state = task_manager::task_state::running;
|
||||
|
||||
try {
|
||||
// Background fiber does not capture task ptr, so the task can be unregistered and destroyed independently in the foreground.
|
||||
@@ -215,6 +233,8 @@ public:
|
||||
module->unregister_task(id);
|
||||
});
|
||||
});
|
||||
_impl->_as.check();
|
||||
_impl->_status.state = task_manager::task_state::running;
|
||||
_impl->run_to_completion();
|
||||
} catch (...) {
|
||||
_impl->finish_failed(std::current_exception());
|
||||
@@ -245,6 +265,10 @@ public:
|
||||
return _impl->abort();
|
||||
}
|
||||
|
||||
bool abort_requested() const noexcept {
|
||||
return _impl->_as.abort_requested();
|
||||
}
|
||||
|
||||
future<> done() const noexcept {
|
||||
return _impl->_done.get_shared_future();
|
||||
}
|
||||
@@ -336,18 +360,22 @@ public:
|
||||
// from a parent and set. Otherwise, it must be set by caller.
|
||||
future<task_ptr> make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d = task_info{}) {
|
||||
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr));
|
||||
foreign_task_ptr parent;
|
||||
bool abort = false;
|
||||
if (parent_d) {
|
||||
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task)] (task_manager& tm) mutable {
|
||||
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable {
|
||||
const auto& all_tasks = tm.get_all_tasks();
|
||||
if (auto it = all_tasks.find(id); it != all_tasks.end()) {
|
||||
it->second->add_child(std::move(task));
|
||||
return make_ready_future<uint64_t>(it->second->get_sequence_number());
|
||||
abort = it->second->abort_requested();
|
||||
return it->second->get_sequence_number();
|
||||
} else {
|
||||
return make_exception_future<uint64_t>(task_manager::task_not_found(id));
|
||||
throw task_manager::task_not_found(id);
|
||||
}
|
||||
});
|
||||
}
|
||||
if (abort) {
|
||||
co_await task->abort();
|
||||
}
|
||||
co_return task;
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user