diff --git a/api/storage_service.cc b/api/storage_service.cc index c44509c7c0..b2ed24bf2f 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -11,6 +11,7 @@ #include "db/config.hh" #include "db/schema_tables.hh" #include "utils/hash.hh" +#include #include #include #include @@ -1266,6 +1267,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded& snap_ctl) { ss::get_snapshot_details.set(r, [&snap_ctl](std::unique_ptr req) { return snap_ctl.local().get_snapshot_details().then([] (std::unordered_map>&& result) { @@ -1409,16 +1417,28 @@ void set_snapshot(http_context& ctx, routes& r, sharded& snap_ } else { throw httpd::bad_param_exception(fmt::format("Unknown argument for 'quarantine_mode' parameter: {}", quarantine_mode_str)); } - return f.then([&ctx, keyspace, column_families, opts] { - return ctx.db.invoke_on_all([=] (replica::database& db) { - return do_for_each(column_families, [=, &db](sstring cfname) { + + const auto& reduce_compaction_stats = [] (const compaction_manager::compaction_stats_opt& lhs, const compaction_manager::compaction_stats_opt& rhs) { + sstables::compaction_stats stats{}; + stats += lhs.value(); + stats += rhs.value(); + return stats; + }; + + return f.then([&ctx, keyspace, column_families, opts, &reduce_compaction_stats] { + return ctx.db.map_reduce0([=] (replica::database& db) { + return map_reduce(column_families, [=, &db] (sstring cfname) { auto& cm = db.get_compaction_manager(); auto& cf = db.find_column_family(keyspace, cfname); return cm.perform_sstable_scrub(cf.as_table_state(), opts); - }); - }); - }).then([]{ - return make_ready_future(0); + }, std::make_optional(sstables::compaction_stats{}), reduce_compaction_stats); + }, std::make_optional(sstables::compaction_stats{}), reduce_compaction_stats); + }).then([] (auto f) { + if (f->validation_errors) { + return make_ready_future(static_cast(scrub_status::validation_errors)); + } else { + return make_ready_future(static_cast(scrub_status::successful)); + } }); }); } diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index d243e7a13c..594990f8c8 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -277,7 +277,7 @@ compaction_manager::task::task(compaction_manager& mgr, compaction::table_state* , _description(std::move(desc)) {} -future<> compaction_manager::perform_task(shared_ptr task) { +future compaction_manager::perform_task(shared_ptr task) { _tasks.push_back(task); auto unregister_task = defer([this, task] { _tasks.remove(task); @@ -285,8 +285,9 @@ future<> compaction_manager::perform_task(shared_ptr t cmlog.debug("{}: started", *task); try { - co_await task->run(); + auto&& res = co_await task->run(); cmlog.debug("{}: done", *task); + co_return res; } catch (sstables::compaction_stopped_exception& e) { cmlog.info("{}: stopped, reason: {}", *task, e.what()); } catch (sstables::compaction_aborted_exception& e) { @@ -303,6 +304,8 @@ future<> compaction_manager::perform_task(shared_ptr t _stats.errors++; throw; } + + co_return std::nullopt; } future compaction_manager::task::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, can_purge_tombstones can_purge) { @@ -401,7 +404,7 @@ future<> compaction_manager::perform_major_compaction(compaction::table_state& t if (_state != state::enabled) { return make_ready_future<>(); } - return perform_task(make_shared(*this, &t)); + return perform_task(make_shared(*this, &t)).discard_result();; } class compaction_manager::custom_compaction_task : public compaction_manager::task { @@ -441,7 +444,7 @@ future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables return make_ready_future<>(); } - return perform_task(make_shared(*this, &t, type, desc, std::move(job))); + return perform_task(make_shared(*this, &t, type, desc, std::move(job))).discard_result(); } future<> compaction_manager::update_static_shares(float static_shares) { @@ -1252,9 +1255,9 @@ private: template requires std::derived_from -future<> compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { +future compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { if (_state != state::enabled) { - co_return; + co_return std::nullopt; } // since we might potentially have ongoing compactions, and we @@ -1276,10 +1279,10 @@ future<> compaction_manager::perform_task_on_all_files(compaction::table_state& return a->data_size() > b->data_size(); }); }); - co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); + co_return co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); } -future<> compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { +future compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { return perform_task_on_all_files(t, std::move(options), std::move(get_func), can_purge); } @@ -1344,9 +1347,9 @@ static std::vector get_all_sstables(compaction::table_ return s; } -future<> compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) { +future compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) { if (_state != state::enabled) { - return make_ready_future<>(); + return make_ready_future(); } // All sstables must be included, even the ones being compacted, such that everything in table is validated. auto all_sstables = get_all_sstables(t); @@ -1505,7 +1508,7 @@ future<> compaction_manager::perform_sstable_upgrade(replica::database& db, comp } // Submit a table to be scrubbed and wait for its termination. -future<> compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) { +future compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) { auto scrub_mode = opts.operation_mode; if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { return perform_sstable_scrub_validate_mode(t); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 04b6adf0da..31255a2972 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -299,7 +299,7 @@ private: class strategy_control; std::unique_ptr _strategy_control; private: - future<> perform_task(shared_ptr); + future perform_task(shared_ptr); future<> stop_tasks(std::vector> tasks, sstring reason); future<> update_throughput(uint32_t value_mbs); @@ -340,7 +340,7 @@ private: // similar-sized compaction. void postpone_compaction_for_table(compaction::table_state* t); - future<> perform_sstable_scrub_validate_mode(compaction::table_state& t); + future perform_sstable_scrub_validate_mode(compaction::table_state& t); future<> update_static_shares(float shares); using get_candidates_func = std::function>()>; @@ -349,9 +349,9 @@ private: // by retrieving set of candidates only after all compactions for table T were stopped, if any. template requires std::derived_from - future<> perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args); + future perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, Args... args); - future<> rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); + future rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); // Stop all fibers, without waiting. Safe to be called multiple times. void do_stop() noexcept; @@ -411,7 +411,7 @@ public: future<> perform_sstable_upgrade(replica::database& db, compaction::table_state& t, bool exclude_current_version); // Submit a table to be scrubbed and wait for its termination. - future<> perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts); + future perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts); // Submit a table for major compaction. future<> perform_major_compaction(compaction::table_state& t);