From 0ceee3e4b3d53e0e384bbea356ad8ae2db50b596 Mon Sep 17 00:00:00 2001 From: Aleksandra Martyniuk Date: Wed, 29 Mar 2023 15:28:14 +0200 Subject: [PATCH] compaction: use compaction namespace in compaction_manager.cc --- compaction/compaction_manager.cc | 177 ++++++++++++++++--------------- 1 file changed, 89 insertions(+), 88 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index ee5ac613c3..ef54659c4d 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -28,6 +28,7 @@ static logging::logger cmlog("compaction_manager"); using namespace std::chrono_literals; +using namespace compaction; class compacting_sstable_registration { compaction_manager& _cm; @@ -161,7 +162,7 @@ unsigned compaction_manager::current_compaction_fan_in_threshold() const { return std::min(unsigned(32), largest_fan_in); } -bool compaction_manager::can_register_compaction(compaction::table_state& t, int weight, unsigned fan_in) const { +bool compaction_manager::can_register_compaction(table_state& t, int weight, unsigned fan_in) const { // Only one weight is allowed if parallel compaction is disabled. if (!t.get_compaction_strategy().parallel_compaction() && has_table_ongoing_compaction(t)) { return false; @@ -197,21 +198,21 @@ void compaction_manager::deregister_weight(int weight) { reevaluate_postponed_compactions(); } -std::vector in_strategy_sstables(compaction::table_state& table_s) { +std::vector in_strategy_sstables(table_state& table_s) { auto sstables = table_s.main_sstable_set().all(); return boost::copy_range>(*sstables | boost::adaptors::filtered([] (const sstables::shared_sstable& sst) { return sstables::is_eligible_for_compaction(sst); })); } -std::vector compaction_manager::get_candidates(compaction::table_state& t) { +std::vector compaction_manager::get_candidates(table_state& t) { std::vector candidates; candidates.reserve(t.main_sstable_set().size()); // prevents sstables that belongs to a partial run being generated by ongoing compaction from being // selected for compaction, which could potentially result in wrong behavior. auto partial_run_identifiers = boost::copy_range>(_tasks - | boost::adaptors::filtered(std::mem_fn(&compaction::compaction_task_executor::generating_output_run)) - | boost::adaptors::transformed(std::mem_fn(&compaction::compaction_task_executor::output_run_id))); + | boost::adaptors::filtered(std::mem_fn(&compaction_task_executor::generating_output_run)) + | boost::adaptors::transformed(std::mem_fn(&compaction_task_executor::output_run_id))); // Filter out sstables that are being compacted. for (auto& sst : in_strategy_sstables(t)) { @@ -260,7 +261,7 @@ private: virtual void replace_sstables(std::vector old_ssts, std::vector new_ssts) override {} }; -compaction_manager::compaction_state& compaction_manager::get_compaction_state(compaction::table_state* t) { +compaction_manager::compaction_state& compaction_manager::get_compaction_state(table_state* t) { try { return _compaction_state.at(t); } catch (std::out_of_range&) { @@ -269,7 +270,7 @@ compaction_manager::compaction_state& compaction_manager::get_compaction_state(c } } -compaction::compaction_task_executor::compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc) +compaction_task_executor::compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type type, sstring desc) : _cm(mgr) , _compacting_table(t) , _compaction_state(_cm.get_compaction_state(t)) @@ -278,7 +279,7 @@ compaction::compaction_task_executor::compaction_task_executor(compaction_manage , _description(std::move(desc)) {} -future compaction_manager::perform_task(shared_ptr task) { +future compaction_manager::perform_task(shared_ptr task) { _tasks.push_back(task); auto unregister_task = defer([this, task] { _tasks.remove(task); @@ -309,7 +310,7 @@ future compaction_manager::perform_tas co_return std::nullopt; } -future compaction::compaction_task_executor::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) { +future compaction_task_executor::compact_sstables_and_update_history(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) { if (!descriptor.sstables.size()) { // if there is nothing to compact, just return. co_return sstables::compaction_result{}; @@ -324,8 +325,8 @@ future compaction::compaction_task_executor::compac co_return res; } -future compaction::compaction_task_executor::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) { - compaction::table_state& t = *_compacting_table; +future compaction_task_executor::compact_sstables(sstables::compaction_descriptor descriptor, sstables::compaction_data& cdata, release_exhausted_func_t release_exhausted, compaction_manager::can_purge_tombstones can_purge) { + table_state& t = *_compacting_table; if (can_purge) { descriptor.enable_garbage_collection(t.main_sstable_set()); } @@ -346,7 +347,7 @@ future compaction::compaction_task_executor::compac co_return co_await sstables::compact_sstables(std::move(descriptor), cdata, t); } -future<> compaction::compaction_task_executor::update_history(compaction::table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) { +future<> compaction_task_executor::update_history(table_state& t, const sstables::compaction_result& res, const sstables::compaction_data& cdata) { auto ended_at = std::chrono::duration_cast(res.stats.ended_at.time_since_epoch()); if (_cm._sys_ks) { @@ -378,7 +379,7 @@ protected: sstables::shared_sstable consume_sstable(); public: - explicit sstables_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type compaction_type, sstring desc, std::vector sstables) + explicit sstables_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type compaction_type, sstring desc, std::vector sstables) : compaction_task_executor(mgr, t, compaction_type, std::move(desc)) { set_sstables(std::move(sstables)); @@ -389,7 +390,7 @@ public: class major_compaction_task_executor : public compaction_task_executor { public: - major_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t) + major_compaction_task_executor(compaction_manager& mgr, table_state* t) : compaction_task_executor(mgr, t, sstables::compaction_type::Compaction, "Major compaction") {} @@ -409,7 +410,7 @@ protected: // candidates are sstables that aren't being operated on by other compaction types. // those are eligible for major compaction. - compaction::table_state* t = _compacting_table; + table_state* t = _compacting_table; sstables::compaction_strategy cs = t->get_compaction_strategy(); sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, _cm.get_candidates(*t)); auto compacting = compacting_sstable_registration(_cm, descriptor.sstables); @@ -437,11 +438,11 @@ protected: } -future<> compaction_manager::perform_major_compaction(compaction::table_state& t) { +future<> compaction_manager::perform_major_compaction(table_state& t) { if (_state != state::enabled) { return make_ready_future<>(); } - return perform_task(make_shared(*this, &t)).discard_result();; + return perform_task(make_shared(*this, &t)).discard_result();; } namespace compaction { @@ -450,7 +451,7 @@ class custom_compaction_task_executor : public compaction_task_executor { noncopyable_function(sstables::compaction_data&)> _job; public: - custom_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type type, sstring desc, noncopyable_function(sstables::compaction_data&)> job) + custom_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type type, sstring desc, noncopyable_function(sstables::compaction_data&)> job) : compaction_task_executor(mgr, t, type, std::move(desc)) , _job(std::move(job)) {} @@ -480,12 +481,12 @@ protected: } -future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function(sstables::compaction_data&)> job) { +future<> compaction_manager::run_custom_job(table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function(sstables::compaction_data&)> job) { if (_state != state::enabled) { return make_ready_future<>(); } - return perform_task(make_shared(*this, &t, type, desc, std::move(job))).discard_result(); + return perform_task(make_shared(*this, &t, type, desc, std::move(job))).discard_result(); } future<> compaction_manager::update_static_shares(float static_shares) { @@ -493,7 +494,7 @@ future<> compaction_manager::update_static_shares(float static_shares) { return _compaction_controller.update_static_shares(static_shares); } -compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, compaction::table_state& t) +compaction_manager::compaction_reenabler::compaction_reenabler(compaction_manager& cm, table_state& t) : _cm(cm) , _table(&t) , _compaction_state(cm.get_compaction_state(_table)) @@ -526,20 +527,20 @@ compaction_manager::compaction_reenabler::~compaction_reenabler() { } future -compaction_manager::stop_and_disable_compaction(compaction::table_state& t) { +compaction_manager::stop_and_disable_compaction(table_state& t) { compaction_reenabler cre(*this, t); co_await stop_ongoing_compactions("user-triggered operation", &t); co_return cre; } future<> -compaction_manager::run_with_compaction_disabled(compaction::table_state& t, std::function ()> func) { +compaction_manager::run_with_compaction_disabled(table_state& t, std::function ()> func) { compaction_reenabler cre = co_await stop_and_disable_compaction(t); co_await func(); } -std::string_view compaction::compaction_task_executor::to_string(state s) { +std::string_view compaction_task_executor::to_string(state s) { switch (s) { case state::none: return "none"; case state::pending: return "pending"; @@ -553,11 +554,11 @@ std::string_view compaction::compaction_task_executor::to_string(state s) { namespace compaction { -std::ostream& operator<<(std::ostream& os, compaction::compaction_task_executor::state s) { - return os << compaction::compaction_task_executor::to_string(s); +std::ostream& operator<<(std::ostream& os, compaction_task_executor::state s) { + return os << compaction_task_executor::to_string(s); } -std::ostream& operator<<(std::ostream& os, const compaction::compaction_task_executor& task) { +std::ostream& operator<<(std::ostream& os, const compaction_task_executor& task) { return os << task.describe(); } @@ -571,21 +572,21 @@ compaction_manager::compaction_state::~compaction_state() { compaction_done.broken(); } -std::string compaction::compaction_task_executor::describe() const { +std::string compaction_task_executor::describe() const { auto* t = _compacting_table; auto s = t->schema(); return fmt::format("{} task {} for table {}.{} [{}]", _description, fmt::ptr(this), s->ks_name(), s->cf_name(), fmt::ptr(t)); } -compaction::compaction_task_executor::~compaction_task_executor() { +compaction_task_executor::~compaction_task_executor() { switch_state(state::none); } -compaction::sstables_task_executor::~sstables_task_executor() { +sstables_task_executor::~sstables_task_executor() { _cm._stats.pending_tasks -= _sstables.size() - (_state == state::pending); } -future compaction::compaction_task_executor::run() noexcept { +future compaction_task_executor::run() noexcept { try { _compaction_done = do_run(); return compaction_done(); @@ -594,7 +595,7 @@ future compaction::compaction_task_exe } } -compaction::compaction_task_executor::state compaction::compaction_task_executor::switch_state(state new_state) { +compaction_task_executor::state compaction_task_executor::switch_state(state new_state) { auto old_state = std::exchange(_state, new_state); switch (old_state) { case state::none: @@ -629,7 +630,7 @@ compaction::compaction_task_executor::state compaction::compaction_task_executor return old_state; } -void compaction::sstables_task_executor::set_sstables(std::vector new_sstables) { +void sstables_task_executor::set_sstables(std::vector new_sstables) { if (!_sstables.empty()) { on_internal_error(cmlog, format("sstables were already set")); } @@ -638,7 +639,7 @@ void compaction::sstables_task_executor::set_sstables(std::vector> compaction::compaction_task_executor::acquire_semaphore(named_semaphore& sem, size_t units) { +future> compaction_task_executor::acquire_semaphore(named_semaphore& sem, size_t units) { return seastar::get_units(sem, units, _compaction_data.abort).handle_exception_type([this] (const abort_requested_exception& e) { auto s = _compacting_table->schema(); return make_exception_future>( @@ -657,13 +658,13 @@ future> compaction::compactio }); } -void compaction::compaction_task_executor::setup_new_compaction(sstables::run_id output_run_id) { +void compaction_task_executor::setup_new_compaction(sstables::run_id output_run_id) { _compaction_data = _cm.create_compaction_data(); _output_run_identifier = output_run_id; switch_state(state::active); } -void compaction::compaction_task_executor::finish_compaction(state finish_state) noexcept { +void compaction_task_executor::finish_compaction(state finish_state) noexcept { switch_state(finish_state); _output_run_identifier = sstables::run_id::create_null_id(); if (finish_state != state::failed) { @@ -672,17 +673,17 @@ void compaction::compaction_task_executor::finish_compaction(state finish_state) _compaction_state.compaction_done.signal(); } -void compaction::compaction_task_executor::stop(sstring reason) noexcept { +void compaction_task_executor::stop(sstring reason) noexcept { _compaction_data.stop(std::move(reason)); } -sstables::compaction_stopped_exception compaction::compaction_task_executor::make_compaction_stopped_exception() const { +sstables::compaction_stopped_exception compaction_task_executor::make_compaction_stopped_exception() const { auto s = _compacting_table->schema(); return sstables::compaction_stopped_exception(s->ks_name(), s->cf_name(), _compaction_data.stop_requested); } compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task_manager& tm) - : _task_manager_module(make_shared(tm)) + : _task_manager_module(make_shared(tm)) , _cfg(std::move(cfg)) , _compaction_controller(make_compaction_controller(compaction_sg(), static_shares(), [this] () -> float { _last_backlog = backlog(); @@ -718,7 +719,7 @@ compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task } compaction_manager::compaction_manager(tasks::task_manager& tm) - : _task_manager_module(make_shared(tm)) + : _task_manager_module(make_shared(tm)) , _cfg(config{ .available_memory = 1 }) , _compaction_controller(make_compaction_controller(compaction_sg(), 1, [] () -> float { return 1.0; })) , _backlog_manager(_compaction_controller) @@ -803,7 +804,7 @@ future<> compaction_manager::postponed_compactions_reevaluation() { auto postponed = std::exchange(_postponed, {}); try { for (auto it = postponed.begin(); it != postponed.end();) { - compaction::table_state* t = *it; + table_state* t = *it; it = postponed.erase(it); // skip reevaluation of a table_state that became invalid post its removal if (!_compaction_state.contains(t)) { @@ -824,11 +825,11 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept { _postponed_reevaluation.signal(); } -void compaction_manager::postpone_compaction_for_table(compaction::table_state* t) { +void compaction_manager::postpone_compaction_for_table(table_state* t) { _postponed.insert(t); } -future<> compaction_manager::stop_tasks(std::vector> tasks, sstring reason) { +future<> compaction_manager::stop_tasks(std::vector> tasks, sstring reason) { // To prevent compaction from being postponed while tasks are being stopped, // let's stop all tasks before the deferring point below. for (auto& t : tasks) { @@ -849,10 +850,10 @@ future<> compaction_manager::stop_tasks(std::vector compaction_manager::stop_ongoing_compactions(sstring reason, compaction::table_state* t, std::optional type_opt) noexcept { +future<> compaction_manager::stop_ongoing_compactions(sstring reason, table_state* t, std::optional type_opt) noexcept { try { auto ongoing_compactions = get_compactions(t).size(); - auto tasks = boost::copy_range>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) { + auto tasks = boost::copy_range>>(_tasks | boost::adaptors::filtered([t, type_opt] (auto& task) { return (!t || task->compacting_table() == t) && (!type_opt || task->type() == *type_opt); })); logging::log_level level = tasks.empty() ? log_level::debug : log_level::info; @@ -922,11 +923,11 @@ void compaction_manager::do_stop() noexcept { } } -inline bool compaction_manager::can_proceed(compaction::table_state* t) const { +inline bool compaction_manager::can_proceed(table_state* t) const { return (_state == state::enabled) && _compaction_state.contains(t) && !_compaction_state.at(t).compaction_disabled(); } -inline bool compaction::compaction_task_executor::can_proceed(throw_if_stopping do_throw_if_stopping) const { +inline bool compaction_task_executor::can_proceed(throw_if_stopping do_throw_if_stopping) const { if (stopping()) { // Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run. if (do_throw_if_stopping) { @@ -937,7 +938,7 @@ inline bool compaction::compaction_task_executor::can_proceed(throw_if_stopping return _cm.can_proceed(_compacting_table); } -future compaction::compaction_task_executor::maybe_retry(std::exception_ptr err, bool throw_on_abort) { +future compaction_task_executor::maybe_retry(std::exception_ptr err, bool throw_on_abort) { try { std::rethrow_exception(err); } catch (sstables::compaction_stopped_exception& e) { @@ -974,7 +975,7 @@ namespace compaction { class regular_compaction_task_executor : public compaction_task_executor { public: - regular_compaction_task_executor(compaction_manager& mgr, compaction::table_state& t) + regular_compaction_task_executor(compaction_manager& mgr, table_state& t) : compaction_task_executor(mgr, &t, sstables::compaction_type::Compaction, "Compaction") {} protected: @@ -992,7 +993,7 @@ protected: co_return std::nullopt; } - compaction::table_state& t = *_compacting_table; + table_state& t = *_compacting_table; sstables::compaction_strategy cs = t.get_compaction_strategy(); sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t, _cm.get_strategy_control(), _cm.get_candidates(t)); int weight = calculate_weight(descriptor); @@ -1057,21 +1058,21 @@ protected: } -void compaction_manager::submit(compaction::table_state& t) { +void compaction_manager::submit(table_state& t) { if (_state != state::enabled || t.is_auto_compaction_disabled_by_user()) { return; } // OK to drop future. // waited via task->stop() - (void)perform_task(make_shared(*this, t)); + (void)perform_task(make_shared(*this, t)); } -bool compaction_manager::can_perform_regular_compaction(compaction::table_state& t) { +bool compaction_manager::can_perform_regular_compaction(table_state& t) { return can_proceed(&t) && !t.is_auto_compaction_disabled_by_user(); } -future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction::table_state& t) { +future<> compaction_manager::maybe_wait_for_sstable_count_reduction(table_state& t) { auto schema = t.schema(); if (!can_perform_regular_compaction(t)) { cmlog.trace("maybe_wait_for_sstable_count_reduction in {}.{}: cannot perform regular compaction", @@ -1116,7 +1117,7 @@ namespace compaction { class offstrategy_compaction_task_executor : public compaction_task_executor { bool _performed = false; public: - offstrategy_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t) + offstrategy_compaction_task_executor(compaction_manager& mgr, table_state* t) : compaction_task_executor(mgr, t, sstables::compaction_type::Reshape, "Offstrategy compaction") {} @@ -1136,7 +1137,7 @@ private: // by the fact that off-strategy is serialized across all tables, meaning that the // actual requirement is the size of the largest table's maintenance set. - compaction::table_state& t = *_compacting_table; + table_state& t = *_compacting_table; const auto& maintenance_sstables = t.maintenance_sstable_set(); // Filter out sstables that require view building, to avoid a race between off-strategy @@ -1232,7 +1233,7 @@ protected: std::exception_ptr ex; try { - compaction::table_state& t = *_compacting_table; + table_state& t = *_compacting_table; auto maintenance_sstables = t.maintenance_sstable_set().all(); cmlog.info("Starting off-strategy compaction for {}.{}, {} candidates were found", t.schema()->ks_name(), t.schema()->cf_name(), maintenance_sstables->size()); @@ -1256,11 +1257,11 @@ protected: } -future compaction_manager::perform_offstrategy(compaction::table_state& t) { +future compaction_manager::perform_offstrategy(table_state& t) { if (_state != state::enabled) { co_return false; } - auto task = make_shared(*this, &t); + auto task = make_shared(*this, &t); co_await perform_task(task); co_return task->performed(); } @@ -1273,7 +1274,7 @@ class rewrite_sstables_compaction_task_executor : public sstables_task_executor compaction_manager::can_purge_tombstones _can_purge; public: - rewrite_sstables_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options, + rewrite_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, std::vector sstables, compacting_sstable_registration compacting, compaction_manager::can_purge_tombstones can_purge) : sstables_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type())), std::move(sstables)) @@ -1343,8 +1344,8 @@ private: } template -requires std::derived_from -future compaction_manager::perform_task_on_all_files(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { +requires std::derived_from +future compaction_manager::perform_task_on_all_files(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { if (_state != state::enabled) { co_return std::nullopt; } @@ -1371,15 +1372,15 @@ future compaction_manager::perform_tas co_return co_await perform_task(seastar::make_shared(*this, &t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); } -future compaction_manager::rewrite_sstables(compaction::table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { - return perform_task_on_all_files(t, std::move(options), std::move(get_func), can_purge); +future compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { + return perform_task_on_all_files(t, std::move(options), std::move(get_func), can_purge); } namespace compaction { class validate_sstables_compaction_task_executor : public sstables_task_executor { public: - validate_sstables_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, std::vector sstables) + validate_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, std::vector sstables) : sstables_task_executor(mgr, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables)) {} @@ -1433,20 +1434,20 @@ private: } -static std::vector get_all_sstables(compaction::table_state& t) { +static std::vector get_all_sstables(table_state& t) { auto s = boost::copy_range>(*t.main_sstable_set().all()); auto maintenance_set = t.maintenance_sstable_set().all(); s.insert(s.end(), maintenance_set->begin(), maintenance_set->end()); return s; } -future compaction_manager::perform_sstable_scrub_validate_mode(compaction::table_state& t) { +future compaction_manager::perform_sstable_scrub_validate_mode(table_state& t) { if (_state != state::enabled) { return make_ready_future(); } // All sstables must be included, even the ones being compacted, such that everything in table is validated. auto all_sstables = get_all_sstables(t); - return perform_task(seastar::make_shared(*this, &t, std::move(all_sstables))); + return perform_task(seastar::make_shared(*this, &t, std::move(all_sstables))); } namespace compaction { @@ -1456,7 +1457,7 @@ class cleanup_sstables_compaction_task_executor : public compaction_task_executo compacting_sstable_registration _compacting; std::vector _pending_cleanup_jobs; public: - cleanup_sstables_compaction_task_executor(compaction_manager& mgr, compaction::table_state* t, sstables::compaction_type_options options, + cleanup_sstables_compaction_task_executor(compaction_manager& mgr, table_state* t, sstables::compaction_type_options options, std::vector candidates, compacting_sstable_registration compacting) : compaction_task_executor(mgr, t, options.type(), sstring(sstables::to_string(options.type()))) , _cleanup_options(std::move(options)) @@ -1545,7 +1546,7 @@ bool needs_cleanup(const sstables::shared_sstable& sst, return true; } -future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t) { +future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, table_state& t) { auto check_for_cleanup = [this, &t] { return boost::algorithm::any_of(_tasks, [&t] (auto& task) { return task->compacting_table() == &t && task->type() == sstables::compaction_type::Cleanup; @@ -1569,12 +1570,12 @@ future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_range }); }; - co_await perform_task_on_all_files(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), + co_await perform_task_on_all_files(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables)); } // Submit a table to be upgraded and wait for its termination. -future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction::table_state& t, bool exclude_current_version) { +future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, table_state& t, bool exclude_current_version) { auto get_sstables = [this, &t, exclude_current_version] { std::vector tables; @@ -1602,7 +1603,7 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own } // Submit a table to be scrubbed and wait for its termination. -future compaction_manager::perform_sstable_scrub(compaction::table_state& t, sstables::compaction_type_options::scrub opts) { +future compaction_manager::perform_sstable_scrub(table_state& t, sstables::compaction_type_options::scrub opts) { auto scrub_mode = opts.operation_mode; if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) { return perform_sstable_scrub_validate_mode(t); @@ -1633,7 +1634,7 @@ compaction_manager::compaction_state::compaction_state(table_state& t) { } -void compaction_manager::add(compaction::table_state& t) { +void compaction_manager::add(table_state& t) { auto [_, inserted] = _compaction_state.try_emplace(&t, t); if (!inserted) { auto s = t.schema(); @@ -1641,7 +1642,7 @@ void compaction_manager::add(compaction::table_state& t) { } } -future<> compaction_manager::remove(compaction::table_state& t) noexcept { +future<> compaction_manager::remove(table_state& t) noexcept { auto& c_state = get_compaction_state(&t); // We need to guarantee that a task being stopped will not retry to compact @@ -1675,8 +1676,8 @@ future<> compaction_manager::remove(compaction::table_state& t) noexcept { #endif } -const std::vector compaction_manager::get_compactions(compaction::table_state* t) const { - auto to_info = [] (const shared_ptr& task) { +const std::vector compaction_manager::get_compactions(table_state* t) const { + auto to_info = [] (const shared_ptr& task) { sstables::compaction_info ret; ret.compaction_uuid = task->compaction_data().compaction_uuid; ret.type = task->type(); @@ -1687,22 +1688,22 @@ const std::vector compaction_manager::get_compactions return ret; }; using ret = std::vector; - return boost::copy_range(_tasks | boost::adaptors::filtered([t] (const shared_ptr& task) { + return boost::copy_range(_tasks | boost::adaptors::filtered([t] (const shared_ptr& task) { return (!t || task->compacting_table() == t) && task->compaction_running(); }) | boost::adaptors::transformed(to_info)); } -bool compaction_manager::has_table_ongoing_compaction(const compaction::table_state& t) const { - return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr& task) { +bool compaction_manager::has_table_ongoing_compaction(const table_state& t) const { + return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const shared_ptr& task) { return task->compacting_table() == &t && task->compaction_running(); }); }; -bool compaction_manager::compaction_disabled(compaction::table_state& t) const { +bool compaction_manager::compaction_disabled(table_state& t) const { return _compaction_state.contains(&t) && _compaction_state.at(&t).compaction_disabled(); } -future<> compaction_manager::stop_compaction(sstring type, compaction::table_state* table) { +future<> compaction_manager::stop_compaction(sstring type, table_state* table) { sstables::compaction_type target_type; try { target_type = sstables::to_compaction_type(type); @@ -1721,7 +1722,7 @@ future<> compaction_manager::stop_compaction(sstring type, compaction::table_sta return stop_ongoing_compactions("user request", table, target_type); } -void compaction_manager::propagate_replacement(compaction::table_state& t, +void compaction_manager::propagate_replacement(table_state& t, const std::vector& removed, const std::vector& added) { for (auto& task : _tasks) { if (task->compacting_table() == &t && task->compaction_running()) { @@ -1736,7 +1737,7 @@ public: explicit strategy_control(compaction_manager& cm) noexcept : _cm(cm) {} bool has_ongoing_compaction(table_state& table_s) const noexcept override { - return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr& task) { + return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const shared_ptr& task) { return task->compaction_running() && task->compacting_table()->schema()->ks_name() == s->ks_name() && task->compacting_table()->schema()->cf_name() == s->cf_name(); @@ -1744,7 +1745,7 @@ public: } }; -compaction::strategy_control& compaction_manager::get_strategy_control() const noexcept { +strategy_control& compaction_manager::get_strategy_control() const noexcept { return *_strategy_control; } @@ -1890,13 +1891,13 @@ compaction_backlog_manager::~compaction_backlog_manager() { } } -void compaction_manager::register_backlog_tracker(compaction::table_state& t, compaction_backlog_tracker new_backlog_tracker) { +void compaction_manager::register_backlog_tracker(table_state& t, compaction_backlog_tracker new_backlog_tracker) { auto& cs = get_compaction_state(&t); cs.backlog_tracker = std::move(new_backlog_tracker); register_backlog_tracker(cs.backlog_tracker); } -compaction_backlog_tracker& compaction_manager::get_backlog_tracker(compaction::table_state& t) { +compaction_backlog_tracker& compaction_manager::get_backlog_tracker(table_state& t) { auto& cs = get_compaction_state(&t); return cs.backlog_tracker; }