diff --git a/tasks/task_manager.hh b/tasks/task_manager.hh index 3a2c558909..c6a859f730 100644 --- a/tasks/task_manager.hh +++ b/tasks/task_manager.hh @@ -318,37 +318,37 @@ public: co_await _gate.close(); _tm.unregister_module(_name); } - + public: template requires std::is_base_of_v future make_task(unsigned shard, task_id id = task_id::create_null_id(), std::string keyspace = "", std::string table = "", std::string type = "", std::string entity = "", task_info parent_d = task_info{}) { + return _tm.container().invoke_on(shard, [id, module = _name, keyspace = std::move(keyspace), table = std::move(table), type = std::move(type), entity = std::move(entity), parent_d] (task_manager& tm) { + auto module_ptr = tm.find_module(module); + auto task_impl_ptr = std::make_unique(module_ptr, id ? id : task_id::create_random_id(), parent_d ? 0 : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_d.id); + return module_ptr->make_task(std::move(task_impl_ptr), parent_d).then([] (auto task) { + return task->id(); + }); + }); + } + + // Must be called on target shard. + // If task has a parent, data concerning its children is updated and sequence number is inherited + // from a parent and set. Otherwise, it must be set by caller. + future make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d = task_info{}) { + auto task = make_lw_shared(std::move(task_impl_ptr)); foreign_task_ptr parent; - uint64_t sequence_number = 0; if (parent_d) { - parent = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id] (task_manager& tm) mutable -> future { + task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task)] (task_manager& tm) mutable { const auto& all_tasks = tm.get_all_tasks(); if (auto it = all_tasks.find(id); it != all_tasks.end()) { - co_return it->second; + it->second->add_child(std::move(task)); + return make_ready_future(it->second->get_sequence_number()); } else { - co_return coroutine::return_exception(task_manager::task_not_found(id)); + return make_exception_future(task_manager::task_not_found(id)); } }); - sequence_number = parent->get_sequence_number(); } - - auto task = co_await _tm.container().invoke_on(shard, [id, module = _name, sequence_number, keyspace = std::move(keyspace), table = std::move(table), type = std::move(type), entity = std::move(entity), parent_d] (task_manager& tm) { - auto module_ptr = tm.find_module(module); - auto task_impl_ptr = std::make_unique(module_ptr, id ? id : task_id::create_random_id(), parent_d ? sequence_number : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(type), std::move(entity), parent_d.id); - return make_ready_future(make_lw_shared(std::move(task_impl_ptr))); - }); - id = task->id(); - - if (parent_d) { - co_await _tm.container().invoke_on(parent.get_owner_shard(), [task = std::move(parent), child = std::move(task)] (task_manager& tm) mutable { - task->add_child(std::move(child)); - }); - } - co_return id; + co_return task; } }; public: