From c7826aa91004de9221d2386f4f20fddf1a199f0b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Mon, 21 Mar 2022 13:28:59 -0300 Subject: [PATCH] compaction_manager: Wire cleanup task into the strategy cleanup method As the cleanup process can now be driven by the compaction strategy, let's move cleanup into a new task type that uses the new compaction_strategy::get_cleanup_compaction_jobs(). By the time being all strategies are using the default method that returns one descriptor for each sstable that needs clean up. Signed-off-by: Raphael S. Carvalho --- compaction/compaction_manager.cc | 85 ++++++++++++++++++++++++++++++-- compaction/compaction_manager.hh | 7 +++ 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index f25bddd289..336276d0ac 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1095,7 +1095,9 @@ private: } }; -future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { +template +requires std::derived_from +future<> compaction_manager::perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, Args... args) { if (_state != state::enabled) { co_return; } @@ -1119,7 +1121,11 @@ future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compa return a->data_size() > b->data_size(); }); }); - co_await perform_task(seastar::make_shared(*this, t, std::move(options), std::move(sstables), std::move(compacting), can_purge)); + co_await perform_task(seastar::make_shared(*this, t, std::move(options), std::move(sstables), std::move(compacting), std::forward(args)...)); +} + +future<> compaction_manager::rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func get_func, can_purge_tombstones can_purge) { + return perform_task_on_all_files(t, std::move(options), std::move(get_func), can_purge); } class compaction_manager::validate_sstables_compaction_task : public compaction_manager::sstables_task { @@ -1177,6 +1183,74 @@ future<> compaction_manager::perform_sstable_scrub_validate_mode(replica::table* return perform_task(seastar::make_shared(*this, t, std::move(all_sstables))); } +class compaction_manager::cleanup_sstables_compaction_task : public compaction_manager::task { + const sstables::compaction_type_options _cleanup_options; + compacting_sstable_registration _compacting; + std::vector _pending_cleanup_jobs; +public: + cleanup_sstables_compaction_task(compaction_manager& mgr, replica::table* t, sstables::compaction_type_options options, + std::vector candidates, compacting_sstable_registration compacting) + : task(mgr, t, options.type(), sstring(sstables::to_string(options.type()))) + , _cleanup_options(std::move(options)) + , _compacting(std::move(compacting)) + , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(t->as_table_state(), candidates)) + { + // Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs + // will have more space available released by previous jobs. + std::ranges::sort(_pending_cleanup_jobs, std::ranges::greater(), std::mem_fn(&sstables::compaction_descriptor::sstables_size)); + _cm._stats.pending_tasks += _pending_cleanup_jobs.size(); + } + + virtual ~cleanup_sstables_compaction_task() { + _cm._stats.pending_tasks -= _pending_cleanup_jobs.size(); + } +protected: + virtual future<> do_run() override { + switch_state(state::pending); + auto maintenance_permit = co_await seastar::get_units(_cm._maintenance_ops_sem, 1); + + while (!_pending_cleanup_jobs.empty() && can_proceed()) { + auto active_job = std::move(_pending_cleanup_jobs.back()); + active_job.options = _cleanup_options; + co_await run_cleanup_job(std::move(active_job)); + _pending_cleanup_jobs.pop_back(); + _cm._stats.pending_tasks--; + } + } +private: + // Releases reference to cleaned files such that respective used disk space can be freed. + void release_exhausted(std::vector exhausted_sstables) { + _compacting.release_compacting(exhausted_sstables); + } + + future<> run_cleanup_job(sstables::compaction_descriptor descriptor) { + co_await coroutine::switch_to(_cm._compaction_controller.sg()); + + for (;;) { + compaction_backlog_tracker user_initiated(std::make_unique(_cm._compaction_controller.backlog_of_shares(200), _cm._available_memory)); + _cm.register_backlog_tracker(user_initiated); + + std::exception_ptr ex; + try { + setup_new_compaction(descriptor.run_identifier); + co_await compact_sstables(descriptor, _compaction_data, + std::bind(&cleanup_sstables_compaction_task::release_exhausted, this, std::placeholders::_1)); + finish_compaction(); + _cm.reevaluate_postponed_compactions(); + co_return; // done with current job + } catch (...) { + ex = std::current_exception(); + } + + finish_compaction(state::failed); + // retry current job or rethrows exception + if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) { + co_return; + } + } + } +}; + bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges, schema_ptr s) { @@ -1208,8 +1282,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab }); }; if (check_for_cleanup()) { - return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", - t->schema()->ks_name(), t->schema()->cf_name()))); + throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}", + t->schema()->ks_name(), t->schema()->cf_name())); } auto sorted_owned_ranges = db.get_keyspace_local_ranges(t->schema()->ks_name()); @@ -1226,7 +1300,8 @@ future<> compaction_manager::perform_cleanup(replica::database& db, replica::tab }); }; - return rewrite_sstables(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), std::move(get_sstables)); + co_await perform_task_on_all_files(t, sstables::compaction_type_options::make_cleanup(std::move(sorted_owned_ranges)), + std::move(get_sstables)); } // Submit a table to be upgraded and wait for its termination. diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 32a81739a4..77d83dee26 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -222,6 +222,7 @@ public: class regular_compaction_task; class offstrategy_compaction_task; class rewrite_sstables_compaction_task; + class cleanup_sstables_compaction_task; class validate_sstables_compaction_task; class compaction_manager_test_task; @@ -327,6 +328,12 @@ private: using get_candidates_func = std::function>()>; + // Guarantees that a maintenance task, e.g. cleanup, will be performed on all files available at the time + // by retrieving set of candidates only after all compactions for table T were stopped, if any. + template + requires std::derived_from + future<> perform_task_on_all_files(replica::table* t, sstables::compaction_type_options options, get_candidates_func, Args... args); + future<> rewrite_sstables(replica::table* t, sstables::compaction_type_options options, get_candidates_func, can_purge_tombstones can_purge = can_purge_tombstones::yes); public: compaction_manager(compaction_scheduling_group csg, maintenance_scheduling_group msg, size_t available_memory, abort_source& as);