/* * Copyright (C) 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include "api/api.hh" #include "api/storage_service.hh" #include "api/api-doc/tasks.json.hh" #include "api/api-doc/storage_service.json.hh" #include "compaction/compaction_manager.hh" #include "compaction/task_manager_module.hh" #include "service/storage_service.hh" #include "tasks/task_manager.hh" #include "replica/database.hh" using namespace seastar::httpd; extern logging::logger apilog; namespace api { namespace t = httpd::tasks_json; namespace ss = httpd::storage_service_json; using namespace json; using ks_cf_func = std::function(http_context&, std::unique_ptr, sstring, std::vector)>; static auto wrap_ks_cf(http_context &ctx, ks_cf_func f) { return [&ctx, f = std::move(f)](std::unique_ptr req) { auto [keyspace, table_infos] = parse_table_infos(ctx, *req); return f(ctx, std::move(req), std::move(keyspace), std::move(table_infos)); }; } static future> force_keyspace_compaction(http_context& ctx, std::unique_ptr req) { auto& db = ctx.db; auto [ keyspace, table_infos ] = parse_table_infos(ctx, *req, "cf"); auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true); auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false); apilog.info("force_keyspace_compaction: keyspace={} tables={}, flush={} consider_only_existing_data={}", keyspace, table_infos, flush, consider_only_existing_data); auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); std::optional fmopt; if (!flush && !consider_only_existing_data) { fmopt = compaction::flush_mode::skip; } return compaction_module.make_and_start_task({}, std::move(keyspace), tasks::task_id::create_null_id(), db, table_infos, fmopt, consider_only_existing_data); } static future> upgrade_sstables(http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector table_infos) { auto& db = ctx.db; bool exclude_current_version = req_param(*req, "exclude_current_version", false); apilog.info("upgrade_sstables: keyspace={} tables={} exclude_current_version={}", keyspace, table_infos, exclude_current_version); auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); return compaction_module.make_and_start_task({}, std::move(keyspace), db, table_infos, exclude_current_version); } static future> force_keyspace_cleanup(http_context& ctx, sharded& ss, std::unique_ptr req) { auto& db = ctx.db; auto [keyspace, table_infos] = parse_table_infos(ctx, *req); const auto& rs = db.local().find_keyspace(keyspace).get_replication_strategy(); if (rs.is_local() || !rs.is_vnode_based()) { auto reason = rs.is_local() ? "require" : "support"; apilog.info("Keyspace {} does not {} cleanup", keyspace, reason); co_return nullptr; } apilog.info("force_keyspace_cleanup: keyspace={} tables={}", keyspace, table_infos); if (!co_await ss.local().is_vnodes_cleanup_allowed(keyspace)) { auto msg = "Can not perform cleanup operation when topology changes"; apilog.warn("force_keyspace_cleanup: keyspace={} tables={}: {}", keyspace, table_infos, msg); co_await coroutine::return_exception(std::runtime_error(msg)); } auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); co_return co_await compaction_module.make_and_start_task( {}, std::move(keyspace), db, table_infos, compaction::flush_mode::all_tables, tasks::is_user_task::yes); } void set_tasks_compaction_module(http_context& ctx, routes& r, sharded& ss, sharded& snap_ctl) { t::force_keyspace_compaction_async.set(r, [&ctx](std::unique_ptr req) -> future { auto task = co_await force_keyspace_compaction(ctx, std::move(req)); co_return json::json_return_type(task->get_status().id.to_sstring()); }); ss::force_keyspace_compaction.set(r, [&ctx](std::unique_ptr req) -> future { auto task = co_await force_keyspace_compaction(ctx, std::move(req)); co_await task->done(); co_return json_void(); }); t::force_keyspace_cleanup_async.set(r, [&ctx, &ss](std::unique_ptr req) -> future { tasks::task_id id = tasks::task_id::create_null_id(); auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req)); if (task) { id = task->get_status().id; } co_return json::json_return_type(id.to_sstring()); }); ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr req) -> future { auto task = co_await force_keyspace_cleanup(ctx, ss, std::move(req)); if (task) { co_await task->done(); } co_return json::json_return_type(0); }); t::perform_keyspace_offstrategy_compaction_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector table_infos) -> future { apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos); auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module(); auto task = co_await compaction_module.make_and_start_task({}, std::move(keyspace), ctx.db, table_infos, nullptr); co_return json::json_return_type(task->get_status().id.to_sstring()); })); ss::perform_keyspace_offstrategy_compaction.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector table_infos) -> future { apilog.info("perform_keyspace_offstrategy_compaction: keyspace={} tables={}", keyspace, table_infos); bool res = false; auto& compaction_module = ctx.db.local().get_compaction_manager().get_task_manager_module(); auto task = co_await compaction_module.make_and_start_task({}, std::move(keyspace), ctx.db, table_infos, &res); co_await task->done(); co_return json::json_return_type(res); })); t::upgrade_sstables_async.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector table_infos) -> future { auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos)); co_return json::json_return_type(task->get_status().id.to_sstring()); })); ss::upgrade_sstables.set(r, wrap_ks_cf(ctx, [] (http_context& ctx, std::unique_ptr req, sstring keyspace, std::vector table_infos) -> future { auto task = co_await upgrade_sstables(ctx, std::move(req), std::move(keyspace), std::move(table_infos)); co_await task->done(); co_return json::json_return_type(0); })); t::scrub_async.set(r, [&ctx, &snap_ctl] (std::unique_ptr req) -> future { auto& db = ctx.db; auto info = parse_scrub_options(ctx, std::move(req)); if (!info.snapshot_tag.empty()) { db::snapshot_options opts = {.skip_flush = false}; co_await snap_ctl.local().take_column_family_snapshot(info.keyspace, info.column_families, info.snapshot_tag, opts); } auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); auto task = co_await compaction_module.make_and_start_task({}, std::move(info.keyspace), db, std::move(info.column_families), info.opts, nullptr); co_return json::json_return_type(task->get_status().id.to_sstring()); }); ss::force_compaction.set(r, [&ctx] (std::unique_ptr req) -> future { auto& db = ctx.db; auto flush = validate_bool_x(req->get_query_param("flush_memtables"), true); auto consider_only_existing_data = validate_bool_x(req->get_query_param("consider_only_existing_data"), false); apilog.info("force_compaction: flush={} consider_only_existing_data={}", flush, consider_only_existing_data); auto& compaction_module = db.local().get_compaction_manager().get_task_manager_module(); std::optional fmopt; if (!flush && !consider_only_existing_data) { fmopt = compaction::flush_mode::skip; } auto task = co_await compaction_module.make_and_start_task({}, db, fmopt, consider_only_existing_data); co_await task->done(); co_return json_void(); }); } void unset_tasks_compaction_module(http_context& ctx, httpd::routes& r) { t::force_keyspace_compaction_async.unset(r); ss::force_keyspace_compaction.unset(r); t::force_keyspace_cleanup_async.unset(r); ss::force_keyspace_cleanup.unset(r); t::perform_keyspace_offstrategy_compaction_async.unset(r); ss::perform_keyspace_offstrategy_compaction.unset(r); t::upgrade_sstables_async.unset(r); ss::upgrade_sstables.unset(r); t::scrub_async.unset(r); ss::force_compaction.unset(r); } }