/* * Copyright (C) 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "build_mode.hh" #ifndef SCYLLA_BUILD_MODE_RELEASE #include #include "task_manager_test.hh" #include "api/api.hh" #include "api/api-doc/task_manager_test.json.hh" #include "tasks/test_module.hh" #include "tasks/virtual_task_hint.hh" #include "utils/overloaded_functor.hh" namespace api { namespace tmt = httpd::task_manager_test_json; using namespace json; using namespace seastar::httpd; static future make_test_task(tasks::task_manager& task_manager, sstring module_name, unsigned shard, tasks::task_id id, std::string keyspace, std::string table, std::string entity, tasks::task_info parent_d, tasks::is_user_task user_task) { return task_manager.container().invoke_on(shard, [id, module = std::move(module_name), keyspace = std::move(keyspace), table = std::move(table), entity = std::move(entity), parent_d, user_task] (tasks::task_manager& tm) { auto module_ptr = tm.find_module(module); auto task_impl_ptr = seastar::make_shared(module_ptr, id ? id : tasks::task_id::create_random_id(), parent_d ? 0 : module_ptr->new_sequence_number(), std::move(keyspace), std::move(table), std::move(entity), parent_d.id, user_task); return module_ptr->make_task(std::move(task_impl_ptr), parent_d).then([] (auto task) { return task->id(); }); }); } void set_task_manager_test(http_context& ctx, routes& r, sharded& tm) { tmt::register_test_module.set(r, [&tm] (std::unique_ptr req) -> future { co_await tm.invoke_on_all([] (tasks::task_manager& tm) { auto m = make_shared(tm); tm.register_module("test", m); }); co_return json_void(); }); tmt::unregister_test_module.set(r, [&tm] (std::unique_ptr req) -> future { co_await tm.invoke_on_all([] (tasks::task_manager& tm) -> future<> { auto module_name = "test"; auto module = tm.find_module(module_name); co_await module->stop(); }); co_return json_void(); }); tmt::register_test_task.set(r, [&tm] (std::unique_ptr req) -> future { sharded& tms = tm; const auto id_param = req->get_query_param("task_id"); auto id = !id_param.empty() ? tasks::task_id{utils::UUID{id_param}} : tasks::task_id::create_null_id(); const auto shard_param = req->get_query_param("shard"); unsigned shard = shard_param.empty() ? 0 : boost::lexical_cast(shard_param); std::string keyspace = req->get_query_param("keyspace"); std::string table = req->get_query_param("table"); std::string entity = req->get_query_param("entity"); tasks::task_info data; if (auto parent_id = req->get_query_param("parent_id"); !parent_id.empty()) { data.id = tasks::task_id{utils::UUID{parent_id}}; auto parent_ptr = co_await tasks::task_manager::lookup_task_on_all_shards(tm, data.id); data.shard = parent_ptr->get_status().shard; } auto user_task = tasks::is_user_task{req_param(*req, "user_task", false)}; auto module = tms.local().find_module("test"); id = co_await make_test_task(module->get_task_manager(), module->get_name(), shard, id, keyspace, table, entity, data, user_task); co_await tms.invoke_on(shard, [id] (tasks::task_manager& tm) { auto it = tm.get_local_tasks().find(id); if (it != tm.get_local_tasks().end()) { it->second->start(); } }); co_return id.to_sstring(); }); tmt::unregister_test_task.set(r, [&tm] (std::unique_ptr req) -> future { auto id = tasks::task_id{utils::UUID{req->get_query_param("task_id")}}; try { co_await tasks::task_manager::invoke_on_task(tm, id, [] (tasks::task_manager::task_variant task_v, tasks::virtual_task_hint) -> future<> { return std::visit(overloaded_functor{ [] (tasks::task_manager::task_ptr task) -> future<> { tasks::test_task test_task{task}; co_await test_task.unregister_task(); }, [] (tasks::task_manager::virtual_task_ptr task) { return make_ready_future(); } }, task_v); }); } catch (tasks::task_manager::task_not_found& e) { throw bad_param_exception(e.what()); } co_return json_void(); }); tmt::finish_test_task.set(r, [&tm] (std::unique_ptr req) -> future { auto id = tasks::task_id{utils::UUID{req->get_path_param("task_id")}}; std::string error = req->get_query_param("error"); bool fail = !error.empty(); try { co_await tasks::task_manager::invoke_on_task(tm, id, [fail, error = std::move(error)] (tasks::task_manager::task_variant task_v, tasks::virtual_task_hint) -> future<> { return std::visit(overloaded_functor{ [fail, error = std::move(error)] (tasks::task_manager::task_ptr task) -> future<> { tasks::test_task test_task{task}; if (fail) { co_await test_task.finish_failed(std::make_exception_ptr(std::runtime_error(error))); } else { co_await test_task.finish(); } }, [] (tasks::task_manager::virtual_task_ptr task) { return make_ready_future(); } }, task_v); }); } catch (tasks::task_manager::task_not_found& e) { throw bad_param_exception(e.what()); } co_return json_void(); }); } void unset_task_manager_test(http_context& ctx, routes& r) { tmt::register_test_module.unset(r); tmt::unregister_test_module.unset(r); tmt::register_test_task.unset(r); tmt::unregister_test_task.unset(r); tmt::finish_test_task.unset(r); } } #endif