api: storage_service: tasks: force_keyspace_cleanup

Currently, all apis that start a compaction have two versions:
synchronous and asynchronous. They share most of the implementation,
but some checks and params have diverged.

Unify the handlers of /storage_service/keyspace_cleanup/{keyspace}
and /tasks/compaction/keyspace_cleanup/{keyspace}.

(cherry picked from commit 044b001bb4)
This commit is contained in:
Aleksandra Martyniuk
2025-10-28 15:30:58 +01:00
parent 998d186709
commit 6a72fd4bb4
3 changed files with 30 additions and 30 deletions

View File

@@ -765,25 +765,10 @@ rest_force_keyspace_compaction(http_context& ctx, std::unique_ptr<http::request>
static
future<json::json_return_type>
rest_force_keyspace_cleanup(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
if (rs.get_type() == locator::replication_strategy_type::local || !rs.is_vnode_based()) {
auto reason = rs.get_type() == locator::replication_strategy_type::local ? "require" : "support";
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
co_return json::json_return_type(0);
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
if (task) {
co_await task->done();
}
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<cleanup_keyspace_compaction_task_impl>(
{}, std::move(keyspace), db, table_infos, flush_mode::all_tables, tasks::is_user_task::yes);
co_await task->done();
co_return json::json_return_type(0);
}

View File

@@ -51,6 +51,27 @@ future<tasks::task_manager::task_ptr> force_keyspace_compaction(http_context& ct
return compaction_module.make_and_start_task<compaction::major_keyspace_compaction_task_impl>({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data);
}
future<tasks::task_manager::task_ptr> force_keyspace_cleanup(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req) {
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy();
if (rs.get_type() == locator::replication_strategy_type::local || !rs.is_vnode_based()) {
auto reason = rs.get_type() == locator::replication_strategy_type::local ? "require" : "support";
apilog.info("Keyspace {} does not {} cleanup", keyspace, reason);
co_return nullptr;
}
apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
co_return co_await compaction_module.make_and_start_task<compaction::cleanup_keyspace_compaction_task_impl>(
{}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes);
}
void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::storage_service>& ss, sharded<db::snapshot_ctl>& snap_ctl) {
t::force_keyspace_compaction_async.set(r, [&ctx](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto task = co_await force_keyspace_compaction(ctx, std::move(req));
@@ -58,19 +79,12 @@ void set_tasks_compaction_module(http_context& ctx, routes& r, sharded<service::
});
t::force_keyspace_cleanup_async.set(r, [&ctx, &ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
auto& db = ctx.db;
auto [keyspace, table_infos] = parse_table_infos(ctx, *req);
apilog.info("force_keyspace_cleanup_async: keyspace={} tables={}", keyspace, table_infos);
if (!co_await ss.local().is_cleanup_allowed(keyspace)) {
auto msg = "Can not perform cleanup operation when topology changes";
apilog.warn("force_keyspace_cleanup_async: keyspace={} tables={}: {}", keyspace, table_infos, msg);
co_await coroutine::return_exception(std::runtime_error(msg));
tasks::task_id id = tasks::task_id::create_null_id();
auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req));
if (task) {
id = task->get_status().id;
}
auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module();
auto task = co_await compaction_module.make_and_start_task<cleanup_keyspace_compaction_task_impl>({}, std::move(keyspace), db, table_infos, flush_mode::all_tables, tasks::is_user_task::yes);
co_return json::json_return_type(task->get_status().id.to_sstring());
co_return json::json_return_type(id.to_sstring());
});
t::perform_keyspace_offstrategy_compaction_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr<http::request> req, sstring keyspace, std::vector<table_info> table_infos) -> future<json::json_return_type> {

View File

@@ -30,5 +30,6 @@ void set_tasks_compaction_module(http_context& ctx, httpd::routes& r, sharded<se
void unset_tasks_compaction_module(http_context& ctx, httpd::routes& r);
future<tasks::task_manager::task_ptr> force_keyspace_compaction(http_context& ctx, std::unique_ptr<http::request> req);
future<tasks::task_manager::task_ptr> force_keyspace_cleanup(http_context& ctx, sharded<service::storage_service>& ss, std::unique_ptr<http::request> req);
}