/* * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include #include #include "compaction_manager.hh" #include "compaction/compaction_manager.hh" #include "api/api.hh" #include "api/api-doc/compaction_manager.json.hh" #include "api/api-doc/storage_service.json.hh" #include "db/compaction_history_entry.hh" #include "db/system_keyspace.hh" #include "column_family.hh" #include "unimplemented.hh" #include "storage_service.hh" #include namespace api { namespace cm = httpd::compaction_manager_json; namespace ss = httpd::storage_service_json; using namespace json; using namespace seastar::httpd; static future get_cm_stats(sharded& cm, int64_t compaction::compaction_manager::stats::*f) { return cm.map_reduce0([f](compaction::compaction_manager& cm) { return cm.get_stats().*f; }, int64_t(0), std::plus()).then([](const int64_t& res) { return make_ready_future(res); }); } static std::unordered_map, uint64_t, utils::tuple_hash> sum_pending_tasks(std::unordered_map, uint64_t, utils::tuple_hash>&& a, const std::unordered_map, uint64_t, utils::tuple_hash>& b) { for (auto&& i : b) { if (i.second) { a[i.first] += i.second; } } return std::move(a); } void set_compaction_manager(http_context& ctx, routes& r, sharded& cm) { cm::get_compactions.set(r, [&cm] (std::unique_ptr req) { return cm.map_reduce0([](compaction::compaction_manager& cm) { std::vector summaries; for (const auto& c : cm.get_compactions()) { cm::summary s; s.id = fmt::to_string(c.compaction_uuid); s.ks = c.ks_name; s.cf = c.cf_name; s.unit = "keys"; s.task_type = compaction::compaction_name(c.type); s.completed = c.total_keys_written; s.total = c.total_partitions; summaries.push_back(std::move(s)); } return summaries; }, std::vector(), concat).then([](const std::vector& res) { return make_ready_future(res); }); }); cm::get_pending_tasks_by_table.set(r, [&ctx] (std::unique_ptr req) { return ctx.db.map_reduce0([](replica::database& db) { return do_with(std::unordered_map, uint64_t, utils::tuple_hash>(), [&db](std::unordered_map, uint64_t, utils::tuple_hash>& tasks) { return db.get_tables_metadata().for_each_table_gently([&tasks] (table_id, lw_shared_ptr table) -> future<> { replica::table& cf = *table.get(); tasks[std::make_pair(cf.schema()->ks_name(), cf.schema()->cf_name())] = co_await cf.estimate_pending_compactions(); }).then([&tasks] { return std::move(tasks); }); }); }, std::unordered_map, uint64_t, utils::tuple_hash>(), sum_pending_tasks).then( [](const std::unordered_map, uint64_t, utils::tuple_hash>& task_map) { std::vector res; res.reserve(task_map.size()); for (auto i : task_map) { cm::pending_compaction task; task.ks = i.first.first; task.cf = i.first.second; task.task = i.second; res.emplace_back(std::move(task)); } return make_ready_future(res); }); }); cm::force_user_defined_compaction.set(r, [] (std::unique_ptr req) { //TBD // FIXME warn(unimplemented::cause::API); return make_ready_future(json_void()); }); cm::stop_compaction.set(r, [&cm] (std::unique_ptr req) { auto type = req->get_query_param("type"); return cm.invoke_on_all([type] (compaction::compaction_manager& cm) { return cm.stop_compaction(type); }).then([] { return make_ready_future(json_void()); }); }); cm::stop_keyspace_compaction.set(r, [&ctx, &cm] (std::unique_ptr req) -> future { auto [ks_name, tables] = parse_table_infos(ctx, *req, "tables"); auto type = req->get_query_param("type"); co_await cm.invoke_on_all([&] (compaction::compaction_manager& cm) { return parallel_for_each(tables, [&] (const table_info& ti) { return cm.stop_compaction(type, [id = ti.id] (const compaction::compaction_group_view* x) { return x->schema()->id() == id; }); }); }); co_return json_void(); }); cm::get_pending_tasks.set(r, [&ctx] (std::unique_ptr req) { return map_reduce_cf(ctx.db, int64_t(0), [](replica::column_family& cf) { return cf.estimate_pending_compactions(); }, std::plus()); }); cm::get_completed_tasks.set(r, [&cm] (std::unique_ptr req) { return get_cm_stats(cm, &compaction::compaction_manager::stats::completed_tasks); }); cm::get_total_compactions_completed.set(r, [] (std::unique_ptr req) { // FIXME // We are currently dont have an API for compaction // so returning a 0 as the number of total compaction is ok return make_ready_future(0); }); cm::get_bytes_compacted.set(r, [] (std::unique_ptr req) { //TBD // FIXME warn(unimplemented::cause::API); return make_ready_future(0); }); cm::get_compaction_history.set(r, [&cm] (std::unique_ptr req) { noncopyable_function(output_stream&&)> f = [&cm] (output_stream&& out) -> future<> { auto s = std::move(out); bool first = true; std::exception_ptr ex; try { co_await s.write("["); co_await cm.local().get_compaction_history([&s, &first](const db::compaction_history_entry& entry) mutable -> future<> { cm::history h; h.id = fmt::to_string(entry.id); h.shard_id = entry.shard_id; h.ks = std::move(entry.ks); h.cf = std::move(entry.cf); h.compaction_type = entry.compaction_type; h.started_at = entry.started_at; h.compacted_at = entry.compacted_at; h.bytes_in = entry.bytes_in; h.bytes_out = entry.bytes_out; std::map items(entry.rows_merged.begin(), entry.rows_merged.end()); for (auto it : items) { httpd::compaction_manager_json::row_merged e; e.key = it.first; e.value = it.second; h.rows_merged.push(std::move(e)); } for (const auto& data : entry.sstables_in) { httpd::compaction_manager_json::sstableinfo sstable; sstable.generation = fmt::to_string(data.generation), sstable.origin = data.origin, sstable.size = data.size, h.sstables_in.push(std::move(sstable)); } for (const auto& data : entry.sstables_out) { httpd::compaction_manager_json::sstableinfo sstable; sstable.generation = fmt::to_string(data.generation), sstable.origin = data.origin, sstable.size = data.size, h.sstables_out.push(std::move(sstable)); } h.total_tombstone_purge_attempt = entry.total_tombstone_purge_attempt; h.total_tombstone_purge_failure_due_to_overlapping_with_memtable = entry.total_tombstone_purge_failure_due_to_overlapping_with_memtable; h.total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable = entry.total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable; if (!first) { co_await s.write(", "); } first = false; co_await formatter::write(s, h); }); co_await s.write("]"); co_await s.flush(); } catch (...) { ex = std::current_exception(); } co_await s.close(); if (ex) { co_await coroutine::return_exception_ptr(std::move(ex)); } }; return make_ready_future(std::move(f)); }); cm::get_compaction_info.set(r, [] (std::unique_ptr req) { //TBD // FIXME warn(unimplemented::cause::API); std::vector res; return make_ready_future(res); }); ss::get_compaction_throughput_mb_per_sec.set(r, [&cm](std::unique_ptr req) { int value = cm.local().throughput_mbs(); return make_ready_future(value); }); } void unset_compaction_manager(http_context& ctx, routes& r) { cm::get_compactions.unset(r); cm::get_pending_tasks_by_table.unset(r); cm::force_user_defined_compaction.unset(r); cm::stop_compaction.unset(r); cm::stop_keyspace_compaction.unset(r); cm::get_pending_tasks.unset(r); cm::get_completed_tasks.unset(r); cm::get_total_compactions_completed.unset(r); cm::get_bytes_compacted.unset(r); cm::get_compaction_history.unset(r); cm::get_compaction_info.unset(r); ss::get_compaction_throughput_mb_per_sec.unset(r); } }