From f1980f8dc6d0371c689e9ce3f7eeb02ffcbcba73 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 27 Jul 2022 14:00:52 +0200 Subject: [PATCH] scrub compaction: count validation errors and return status over the rest api Performing compaction scrub user did not know whether any validation errors were encountered. The number of validation errors per given compaction scrub is gathered and summed from each shard. Basing on that value return status over the rest api is set to 3 if any validation errors were encountered. --- api/storage_service.cc | 34 +++++++++++++++++++++++++------- compaction/compaction_manager.cc | 25 ++++++++++++----------- compaction/compaction_manager.hh | 10 +++++----- 3 files changed, 46 insertions(+), 23 deletions(-) diff --git a/api/storage_service.cc b/api/storage_service.cc index c44509c7c0..b2ed24bf2f 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -11,6 +11,7 @@ #include "db/config.hh" #include "db/schema_tables.hh" #include "utils/hash.hh" +#include #include #include #include @@ -1266,6 +1267,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded& snap_ctl) { ss::get_snapshot_details.set(r, [&snap_ctl](std::unique_ptr req) { return snap_ctl.local().get_snapshot_details().then([] (std::unordered_map>&& result) { @@ -1409,16 +1417,28 @@ void set_snapshot(http_context& ctx, routes& r, sharded& snap_ } else { throw httpd::bad_param_exception(fmt::format("Unknown argument for 'quarantine_mode' parameter: {}", quarantine_mode_str)); } - return f.then([&ctx, keyspace, column_families, opts] { - return ctx.db.invoke_on_all([=] (replica::database& db) { - return do_for_each(column_families, [=, &db](sstring cfname) { + + const auto& reduce_compaction_stats = [] (const compaction_manager::compaction_stats_opt& lhs, const compaction_manager::compaction_stats_opt& rhs) { + sstables::compaction_stats stats{}; + stats += lhs.value(); + stats += rhs.value(); + return stats; + }; + + return f.then([&ctx, keyspace, column_families, opts, &reduce_compaction_stats] { + return ctx.db.map_reduce0([=] (replica::database& db) { + return map_reduce(column_families, [=, &db] (sstring cfname) { auto& cm = db.get_compaction_manager(); auto& cf = db.find_column_family(keyspace, cfname); return cm.perform_sstable_scrub(cf.as_table_state(), opts); - }); - }); - }).then([]{ - return make_ready_future(0); + }, std::make_optional(sstables::compaction_stats{}), reduce_compaction_stats); + }, std::make_optional(sstables::compaction_stats{}), reduce_compaction_stats); + }).then([] (auto f) { + if (f->validation_errors) { + return make_ready_future(static_cast(scrub_status::validation_errors)); + } else { + return make_ready_future(static_cast(scrub_status::successful)); + } }); }); } diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index d243e7a13c..594990f8c8 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -277,7 +277,7 @@ compaction_manager::task::task(compaction_manager& mgr, compaction::table_state* , _description(std::move(desc)) {} -future<> compaction_manager::perform_task(shared_ptr task) { +future compaction_manager::perform_task(shared_ptr task) { _tasks.push_back(task); auto unregister_task = defer([this, task] { _tasks.remove(task); @@ -285,8 +285,9 @@ future<> compaction_manager::perform_task(shared_ptr t cmlog.debug("{}: started", *task); try { - co_await task->run(); + auto&& res = co_await task->run(); cmlog.debug("{}: done", *task); + co_return res; } catch (sstables::compaction_stopped_exception& e) { cmlog.info("{}: stopped, reason: {}", *task, e.what()); } catch (sstables::compaction_aborted_exception& e) { @@ -303,6 +304,8 @@ future<> compaction_manager::perform_task(shared_ptr t _stats.errors++; throw; } + + co_return std::nullopt; } future compaction_manager::task::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, can_purge_tombstones can_purge) { @@ -401,7 +404,7 @@ future<> compaction_manager::perform_major_compaction(compaction::table_state& t if (_state != state::enabled) { return make_ready_future<>(); } - return perform_task(make_shared(*this, &t)); + return perform_task(make_shared(*this, &t)).discard_result();; } class compaction_manager::custom_compaction_task : public compaction_manager::task { @@ -441,7 +444,7 @@ future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables return make_ready_future<>(); } - return perform_task(make_shared(*this, &t, type, desc, std::move(job))); + return perform_task(make_shared(*this, &t, type, desc, std::move(job))).discard_result(); } future<> compaction_manager::update_static_shares(float static_shares) { @@ -1252,9 +1255,9 @@ private: template requires std::derived_from -future<> compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { +future compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { if (_state != state::enabled) { - co_return; + co_return std::nullopt; } // since we might potentially have ongoing compactions, and we @@ -1276,10 +1279,10 @@ future<> compaction_manager::perform_task_on_all_files(compaction::table_state& return a->data_size() > b->data_size(); }); }); - co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); + co_return co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); } -future<> compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { +future compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { return perform_task_on_all_files(t, std::move(options), std::move(get_func), can_purge); } @@ -1344,9 +1347,9 @@ static std::vector get_all_sstables(compaction::table_ return s; } -future<> compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) { +future compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) { if (_state != state::enabled) { - return make_ready_future<>(); + return make_ready_future(); } // All sstables must be included, even the ones being compacted, such that everything in table is validated. auto all_sstables = get_all_sstables(t); @@ -1505,7 +1508,7 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, comp } // Submit a table to be scrubbed and wait for its termination. -future<> compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) { +future compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) { auto scrub_mode = opts.operation_mode; if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { return perform_sstable_scrub_validate_mode(t); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 04b6adf0da..31255a2972 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -299,7 +299,7 @@ private: class strategy_control; std::unique_ptr _strategy_control; private: - future<> perform_task(shared_ptr); + future perform_task(shared_ptr); future<> stop_tasks(std::vector> tasks, sstring reason); future<> update_throughput(uint32_t value_mbs); @@ -340,7 +340,7 @@ private: // similar-sized compaction. void postpone_compaction_for_table(compaction::table_state* t); - future<> perform_sstable_scrub_validate_mode(compaction::table_state& t); + future perform_sstable_scrub_validate_mode(compaction::table_state& t); future<> update_static_shares(float shares); using get_candidates_func = std::function>()>; @@ -349,9 +349,9 @@ private: // by retrieving set of candidates only after all compactions for table T were stopped, if any. template requires std::derived_from - future<> perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args); + future perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args); - future<> rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); + future rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); // Stop all fibers, without waiting. Safe to be called multiple times. void do_stop() noexcept; @@ -411,7 +411,7 @@ public: future<> perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version); // Submit a table to be scrubbed and wait for its termination. - future<> perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts); + future perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts); // Submit a table for major compaction. future<> perform_major_compaction(compaction::table_state& t);