/* * Copyright (C) 2015-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "compaction_manager.hh" #include "compaction_descriptor.hh" #include "compaction_strategy.hh" #include "compaction_backlog_manager.hh" #include "compaction_weight_registration.hh" #include "sstables/sstables.hh" #include "sstables/sstables_manager.hh" #include #include #include #include #include #include #include #include #include "sstables/sstable_directory.hh" #include "utils/assert.hh" #include "utils/error_injection.hh" #include "utils/UUID_gen.hh" #include "db/compaction_history_entry.hh" #include "db/system_keyspace.hh" #include "db/config.hh" #include "tombstone_gc-internals.hh" #include #include "utils/labels.hh" static logging::logger cmlog("compaction_manager"); using namespace std::chrono_literals; namespace compaction { class compacting_sstable_registration { compaction_manager& _cm; compaction::compaction_state& _cs; std::unordered_set _compacting; public: explicit compacting_sstable_registration(compaction_manager& cm, compaction::compaction_state& cs) noexcept : _cm(cm) , _cs(cs) { } compacting_sstable_registration(compaction_manager& cm, compaction::compaction_state& cs, const std::vector& compacting) : compacting_sstable_registration(cm, cs) { register_compacting(compacting); } compacting_sstable_registration& operator=(const compacting_sstable_registration&) = delete; compacting_sstable_registration(const compacting_sstable_registration&) = delete; compacting_sstable_registration& operator=(compacting_sstable_registration&& other) noexcept { if (this != &other) { this->~compacting_sstable_registration(); new (this) compacting_sstable_registration(std::move(other)); } return *this; } compacting_sstable_registration(compacting_sstable_registration&& other) noexcept : _cm(other._cm) , _cs(other._cs) , _compacting(std::move(other._compacting)) { } ~compacting_sstable_registration() { // _compacting might be empty, but this should be just fine // for deregister_compacting_sstables. _cm.deregister_compacting_sstables(_compacting); } void register_compacting(const std::vector& sstables) { _compacting.reserve(_compacting.size() + sstables.size()); _compacting.insert(sstables.begin(), sstables.end()); _cm.register_compacting_sstables(sstables); } // Explicitly release compacting sstables void release_compacting(const std::vector& sstables) { _cm.deregister_compacting_sstables(sstables); for (const auto& sst : sstables) { _compacting.erase(sst); _cs.sstables_requiring_cleanup.erase(sst); } if (_cs.sstables_requiring_cleanup.empty()) { _cs.owned_ranges_ptr = nullptr; } } void release_all() noexcept { _cm.deregister_compacting_sstables(_compacting); _compacting = {}; } class update_me : public compaction_task_executor::on_replacement { compacting_sstable_registration& _registration; public: update_me(compacting_sstable_registration& registration) : _registration{registration} {} void on_removal(const std::vector& sstables) override { _registration.release_compacting(sstables); } void on_addition(const std::vector& sstables) override { _registration.register_compacting(sstables); } }; auto update_on_sstable_replacement() { return update_me(*this); } }; compaction_data compaction_manager::create_compaction_data() { compaction_data cdata = {}; cdata.compaction_uuid = utils::UUID_gen::get_time_UUID(); return cdata; } compaction_weight_registration::compaction_weight_registration(compaction_manager* cm, int weight) : _cm(cm) , _weight(weight) { _cm->register_weight(_weight); } compaction_weight_registration& compaction_weight_registration::operator=(compaction_weight_registration&& other) noexcept { if (this != &other) { this->~compaction_weight_registration(); new (this) compaction_weight_registration(std::move(other)); } return *this; } compaction_weight_registration::compaction_weight_registration(compaction_weight_registration&& other) noexcept : _cm(other._cm) , _weight(other._weight) { other._cm = nullptr; other._weight = 0; } compaction_weight_registration::~compaction_weight_registration() { if (_cm) { _cm->deregister_weight(_weight); } } void compaction_weight_registration::deregister() { _cm->deregister_weight(_weight); _cm = nullptr; } int compaction_weight_registration::weight() const { return _weight; } // Calculate weight of compaction job. static inline int calculate_weight(uint64_t total_size) { // At the moment, '4' is being used as log base for determining the weight // of a compaction job. With base of 4, what happens is that when you have // a 40-second compaction in progress, and a tiny 10-second compaction // comes along, you do them in parallel. // TODO: Find a possibly better log base through experimentation. static constexpr int WEIGHT_LOG_BASE = 4; // Fixed tax is added to size before taking the log, to make sure all jobs // smaller than the tax (i.e. 1MB) will be serialized. static constexpr int fixed_size_tax = 1024*1024; // computes the logarithm (base WEIGHT_LOG_BASE) of total_size. return int(std::log(total_size + fixed_size_tax) / std::log(WEIGHT_LOG_BASE)); } static inline int calculate_weight(const compaction_descriptor& descriptor) { // Use weight 0 for compactions that are comprised solely of completely expired sstables. // We want these compactions to be in a separate weight class because they are very lightweight, fast and efficient. if (descriptor.sstables.empty() || descriptor.has_only_fully_expired) { return 0; } return calculate_weight(descriptor.sstables_size()); } unsigned compaction_manager::current_compaction_fan_in_threshold() const { if (_tasks.empty()) { return 0; } auto largest_fan_in = std::ranges::max(_tasks | std::views::transform([] (auto& task) { return task.compaction_running() ? task.compaction_data().compaction_fan_in : 0; })); // conservatively limit fan-in threshold to 32, such that tons of small sstables won't accumulate if // running major on a leveled table, which can even have more than one thousand files. return std::min(unsigned(32), largest_fan_in); } bool compaction_manager::can_register_compaction(compaction_group_view& t, int weight, unsigned fan_in) const { // Only one weight is allowed if parallel compaction is disabled. if (!t.get_compaction_strategy().parallel_compaction() && has_table_ongoing_compaction(t)) { return false; } // Weightless compaction doesn't have to be serialized, and won't dillute overall efficiency. if (!weight) { return true; } // TODO: Maybe allow only *smaller* compactions to start? That can be done // by returning true only if weight is not in the set and is lower than any // entry in the set. if (_weight_tracker.contains(weight)) { // If reached this point, it means that there is an ongoing compaction // with the weight of the compaction job. return false; } // A compaction cannot proceed until its fan-in is greater than or equal to the current largest fan-in. // That's done to prevent a less efficient compaction from "diluting" a more efficient one. // Compactions with the same efficiency can run in parallel as long as they aren't similar sized, // i.e. an efficient small-sized job can proceed in parallel to an efficient big-sized one. if (fan_in < current_compaction_fan_in_threshold()) { return false; } return true; } void compaction_manager::register_weight(int weight) { _weight_tracker.insert(weight); } void compaction_manager::deregister_weight(int weight) { _weight_tracker.erase(weight); reevaluate_postponed_compactions(); } future> in_strategy_sstables(compaction_group_view& table_s) { auto set = co_await table_s.main_sstable_set(); auto sstables = set->all(); co_return *sstables | std::views::filter([] (const sstables::shared_sstable& sst) { return is_eligible_for_compaction(sst); }) | std::ranges::to(); } future> compaction_manager::get_candidates(compaction_group_view& t) const { auto main_set = co_await t.main_sstable_set(); co_return get_candidates(t, *main_set->all()); } bool compaction_manager::eligible_for_compaction(const sstables::shared_sstable& sstable) const { return is_eligible_for_compaction(sstable) && !_compacting_sstables.contains(sstable); } bool compaction_manager::eligible_for_compaction(const sstables::frozen_sstable_run& sstable_run) const { return std::ranges::all_of(sstable_run->all(), [this] (const sstables::shared_sstable& sstable) { return eligible_for_compaction(sstable); }); } template requires std::convertible_to, sstables::shared_sstable> || std::convertible_to, sstables::frozen_sstable_run> std::vector> compaction_manager::get_candidates(compaction_group_view& t, const Range& sstables) const { using range_candidates_t = std::ranges::range_value_t; std::vector candidates; candidates.reserve(sstables.size()); // prevents sstables that belongs to a partial run being generated by ongoing compaction from being // selected for compaction, which could potentially result in wrong behavior. auto partial_run_identifiers = _tasks | std::views::filter(std::mem_fn(&compaction_task_executor::generating_output_run)) | std::views::transform(std::mem_fn(&compaction_task_executor::output_run_id)) | std::ranges::to(); // Filter out sstables that are being compacted. for (const auto& sst : sstables) { if (!eligible_for_compaction(sst)) { continue; } if (partial_run_identifiers.contains(sst->run_identifier())) { continue; } candidates.push_back(sst); } return candidates; } template requires std::same_as, sstables::shared_sstable> void compaction_manager::register_compacting_sstables(const Range& sstables) { // make all required allocations in advance to merge // so it should not throw _compacting_sstables.reserve(_compacting_sstables.size() + std::ranges::size(sstables)); try { _compacting_sstables.insert(std::ranges::begin(sstables), std::ranges::end(sstables)); } catch (...) { cmlog.error("Unexpected error when registering compacting SSTables: {}. Ignored...", std::current_exception()); } } template requires std::same_as, sstables::shared_sstable> void compaction_manager::deregister_compacting_sstables(const Range& sstables) { // Remove compacted sstables from the set of compacting sstables. for (auto& sstable : sstables) { _compacting_sstables.erase(sstable); } } class user_initiated_backlog_tracker final : public compaction_backlog_tracker::impl { public: explicit user_initiated_backlog_tracker(float added_backlog, size_t available_memory) : _added_backlog(added_backlog), _available_memory(available_memory) {} private: float _added_backlog; size_t _available_memory; virtual double backlog(const compaction_backlog_tracker::ongoing_writes& ow, const compaction_backlog_tracker::ongoing_compactions& oc) const override { return _added_backlog * _available_memory; } virtual void replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) override {} }; compaction::compaction_state& compaction_manager::get_compaction_state(compaction_group_view* t) { try { return _compaction_state.at(t); } catch (std::out_of_range&) { // Note: don't dereference t as it might not exist throw std::out_of_range(format("Compaction state for table [{}] not found", fmt::ptr(t))); } } compaction_task_executor::compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, ::compaction::compaction_type type, sstring desc) : _cm(mgr) , _compacting_table(t) , _compaction_state(_cm.get_compaction_state(t)) , _do_throw_if_stopping(do_throw_if_stopping) , _type(type) , _description(std::move(desc)) {} future compaction_manager::perform_task(shared_ptr task, throw_if_stopping do_throw_if_stopping) { cmlog.debug("{}: started", *task); try { auto&& res = co_await task->run_compaction(); cmlog.debug("{}: done", *task); co_return res; } catch (compaction_stopped_exception& e) { cmlog.info("{}: stopped, reason: {}", *task, e.what()); if (do_throw_if_stopping) { throw; } } catch (compaction_aborted_exception& e) { cmlog.error("{}: aborted, reason: {}", *task, e.what()); _stats.errors++; throw; } catch (storage_io_error& e) { _stats.errors++; cmlog.error("{}: failed due to storage io error: {}: stopping", *task, e.what()); do_stop(); throw; } catch (...) { cmlog.error("{}: failed, reason {}: stopping", *task, std::current_exception()); _stats.errors++; throw; } co_return std::nullopt; } future<> compaction_manager::on_compaction_completion(compaction_group_view& t, compaction_completion_desc desc, sstables::offstrategy offstrategy) { auto& cs = get_compaction_state(&t); auto new_sstables = desc.new_sstables | std::ranges::to(); for (const auto& sst : desc.old_sstables) { if (!new_sstables.contains(sst)) { cs.sstables_requiring_cleanup.erase(sst); } } if (cs.sstables_requiring_cleanup.empty()) { cs.owned_ranges_ptr = nullptr; } return t.on_compaction_completion(std::move(desc), offstrategy); } future compaction_task_executor::compact_sstables_and_update_history(compaction_descriptor descriptor, ::compaction::compaction_data& cdata, on_replacement& on_replace, compaction_manager::can_purge_tombstones can_purge) { if (!descriptor.sstables.size()) { // if there is nothing to compact, just return. co_return compaction_result{}; } bool should_update_history = this->should_update_history(descriptor.options.type()); compaction_result res = co_await compact_sstables(std::move(descriptor), cdata, on_replace, std::move(can_purge)); if (should_update_history) { co_await update_history(*_compacting_table, compaction_result(res), cdata); } co_return res; } future compaction_task_executor::sstable_set_for_tombstone_gc(compaction_group_view& t) { auto compound_set = t.sstable_set_for_tombstone_gc(); // Compound set will be linearized into a single set, since compaction might add or remove sstables // to it for incremental compaction to work. auto new_set = sstables::make_partitioned_sstable_set(t.schema(), t.token_range()); co_await compound_set->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) { auto inserted = new_set.insert(sst); if (!inserted) { on_internal_error(cmlog, format("Unable to insert SSTable {} into set used for tombstone GC", sst->get_filename())); } }); co_return std::move(new_set); } future compaction_task_executor::compact_sstables(compaction_descriptor descriptor, ::compaction::compaction_data& cdata, on_replacement& on_replace, compaction_manager::can_purge_tombstones can_purge, sstables::offstrategy offstrategy) { compaction_group_view& t = *_compacting_table; if (can_purge) { descriptor.enable_garbage_collection(co_await sstable_set_for_tombstone_gc(t)); } descriptor.creator = [&t] (shard_id) { // All compaction types going through this path will work on normal input sstables only. // Off-strategy, for example, waits until the sstables move out of staging state. return t.make_sstable(sstables::sstable_state::normal); }; descriptor.replacer = [this, &t, &on_replace, offstrategy] (compaction_completion_desc desc) { t.get_compaction_strategy().notify_completion(t, desc.old_sstables, desc.new_sstables); _cm.propagate_replacement(t, desc.old_sstables, desc.new_sstables); // on_replace updates the compacting registration with the old and new // sstables. while on_compaction_completion() removes the old sstables // from the table's sstable set, and adds the new ones to the sstable // set. // since the regular compactions exclude the sstables in the sstable // set which are currently being compacted, if we want to ensure the // exclusive access of compactions to an sstable we should guard it // with the registration when adding/removing it to/from the sstable // set. otherwise, the regular compaction would pick it up in the time // window, where the sstables: // - are still in the main set // - are not being compacted. on_replace.on_addition(desc.new_sstables); auto old_sstables = desc.old_sstables; _cm.on_compaction_completion(t, std::move(desc), offstrategy).get(); on_replace.on_removal(old_sstables); }; // retrieve owned_ranges if_required if (!descriptor.owned_ranges) { std::vector sstables_requiring_cleanup; const auto& cs = _cm.get_compaction_state(_compacting_table); for (const auto& sst : descriptor.sstables) { if (cs.sstables_requiring_cleanup.contains(sst)) { sstables_requiring_cleanup.emplace_back(sst); } } if (!sstables_requiring_cleanup.empty()) { cmlog.info("The following SSTables require cleanup in this compaction: {}", sstables_requiring_cleanup); if (!cs.owned_ranges_ptr) { on_internal_error_noexcept(cmlog, "SSTables require cleanup but compaction state has null owned ranges"); } descriptor.owned_ranges = cs.owned_ranges_ptr; } } co_return co_await ::compaction::compact_sstables(std::move(descriptor), cdata, t, _progress_monitor); } future<> compaction_task_executor::update_history(compaction_group_view& t, compaction_result&& res, const ::compaction::compaction_data& cdata) { auto started_at = std::chrono::duration_cast(res.stats.started_at.time_since_epoch()); auto ended_at = std::chrono::duration_cast(res.stats.ended_at.time_since_epoch()); if (auto sys_ks = _cm._sys_ks.get_permit()) { co_await utils::get_local_injector().inject("update_history_wait", utils::wait_for_message(120s)); std::unordered_map rows_merged; for (size_t id=0; idks_name(), .cf = t.schema()->cf_name(), .compaction_type = fmt::to_string(res.type), .started_at = started_at.count(), .compacted_at = ended_at.count(), .bytes_in = res.stats.start_size, .bytes_out = res.stats.end_size, .rows_merged = std::move(rows_merged), .sstables_in = std::move(res.sstables_in), .sstables_out = std::move(res.sstables_out), .total_tombstone_purge_attempt = res.stats.tombstone_purge_stats.attempts, .total_tombstone_purge_failure_due_to_overlapping_with_memtable = res.stats.tombstone_purge_stats.failures_due_to_overlapping_with_memtable, .total_tombstone_purge_failure_due_to_overlapping_with_uncompacting_sstable = res.stats.tombstone_purge_stats.failures_due_to_overlapping_with_uncompacting_sstable, }; co_await sys_ks->update_compaction_history(std::move(entry)); } } future<> compaction_manager::get_compaction_history(compaction_history_consumer&& f) { if (auto sys_ks = _sys_ks.get_permit()) { co_await sys_ks->get_compaction_history(std::move(f)); } } } template Executor> struct fmt::formatter : fmt::formatter {}; namespace compaction { class sstables_task_executor : public compaction_task_executor, public sstables_compaction_task_impl { protected: std::vector _sstables; void set_sstables(std::vector new_sstables); sstables::shared_sstable consume_sstable(); public: explicit sstables_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, ::compaction::compaction_type compaction_type, sstring desc, std::vector sstables, tasks::task_id parent_id, sstring entity = "") : compaction_task_executor(mgr, do_throw_if_stopping, t, compaction_type, std::move(desc)) , sstables_compaction_task_impl(mgr._task_manager_module, tasks::task_id::create_random_id(), 0, "compaction group", t->schema()->ks_name(), t->schema()->cf_name(), std::move(entity), parent_id) { _status.progress_units = "bytes"; set_sstables(std::move(sstables)); } virtual ~sstables_task_executor() = default; virtual future<> release_resources() noexcept override; virtual future get_progress() const override { return compaction_task_impl::get_progress(_compaction_data, _progress_monitor); } virtual void abort() noexcept override { return compaction_task_executor::abort(_as); } protected: virtual future<> run() override { return perform(); } }; class major_compaction_task_executor : public compaction_task_executor, public major_compaction_task_impl { public: major_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, tasks::task_id parent_id, bool consider_only_existing_data) : compaction_task_executor(mgr, do_throw_if_stopping, t, compaction_type::Major, "Major compaction") , major_compaction_task_impl(mgr._task_manager_module, tasks::task_id::create_random_id(), 0, "compaction group", t->schema()->ks_name(), t->schema()->cf_name(), "", parent_id, flush_mode::compacted_tables, consider_only_existing_data) { _status.progress_units = "bytes"; } virtual future get_progress() const override { return compaction_task_impl::get_progress(_compaction_data, _progress_monitor); } virtual void abort() noexcept override { return compaction_task_executor::abort(_as); } protected: virtual future<> run() override { return perform(); } // first take major compaction semaphore, then exclusely take compaction lock for table. // it cannot be the other way around, or minor compaction for this table would be // prevented while an ongoing major compaction doesn't release the semaphore. virtual future do_run() override { co_await coroutine::switch_to(_cm.maintenance_sg()); switch_state(state::pending); auto units = co_await acquire_semaphore(_cm._maintenance_ops_sem); // Write lock is used to synchronize selection of sstables for compaction and their registration. // Also used to synchronize with regular compaction, so major waits for regular to cease before selecting candidates. auto lock_holder = co_await _compaction_state.lock.hold_write_lock(); if (!can_proceed()) { co_return std::nullopt; } // candidates are sstables that aren't being operated on by other compaction types. // those are eligible for major compaction. compaction_group_view* t = _compacting_table; compaction_strategy cs = t->get_compaction_strategy(); compaction_descriptor descriptor = cs.get_major_compaction_job(*t, co_await _cm.get_candidates(*t)); descriptor.gc_check_only_compacting_sstables = _consider_only_existing_data; auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(t), descriptor.sstables); auto on_replace = compacting.update_on_sstable_replacement(); setup_new_compaction(descriptor.run_identifier); cmlog.info0("User initiated compaction started on behalf of {}", *t); // Now that the sstables for major compaction are registered // and the user_initiated_backlog_tracker is set up // the exclusive lock can be freed to let regular compaction run in parallel to major lock_holder.return_all(); co_await utils::get_local_injector().inject("major_compaction_wait", [this] (auto& handler) -> future<> { cmlog.info("major_compaction_wait: waiting"); while (!handler.poll_for_message() && !_compaction_data.is_stop_requested()) { co_await sleep(std::chrono::milliseconds(5)); } cmlog.info("major_compaction_wait: released"); }); co_await compact_sstables_and_update_history(std::move(descriptor), _compaction_data, on_replace); finish_compaction(); co_return std::nullopt; } }; template requires std::is_base_of_v && std::is_base_of_v && requires (compaction_manager& cm, throw_if_stopping do_throw_if_stopping, Args&&... args) { {TaskExecutor(cm, do_throw_if_stopping, std::forward(args)...)} -> std::same_as; } future compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, tasks::task_info parent_info, Args&&... args) { auto task_executor = seastar::make_shared(*this, do_throw_if_stopping, std::forward(args)...); _tasks.push_back(*task_executor); auto unregister_task = defer([task_executor] { task_executor->unlink(); task_executor->switch_state(compaction_task_executor::state::none); }); auto task = co_await get_task_manager_module().make_task(task_executor, parent_info); task->start(); co_await task->done(); co_return task_executor->get_stats(); } std::optional compaction_manager::start_compaction(compaction_group_view& t) { if (is_disabled()) { return std::nullopt; } auto it = _compaction_state.find(&t); if (it == _compaction_state.end() || it->second.gate.is_closed()) { return std::nullopt; } return it->second.gate.hold(); } future<> compaction_manager::perform_major_compaction(compaction_group_view& t, tasks::task_info info, bool consider_only_existing_data) { auto gh = start_compaction(t); if (!gh) { co_return; } co_await perform_compaction(throw_if_stopping::no, info, &t, info.id, consider_only_existing_data).discard_result(); } class custom_compaction_task_executor : public compaction_task_executor, public compaction_task_impl { noncopyable_function(::compaction::compaction_data&, compaction_progress_monitor&)> _job; public: custom_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, tasks::task_id parent_id, ::compaction::compaction_type type, sstring desc, noncopyable_function(::compaction::compaction_data&, compaction_progress_monitor&)> job) : compaction_task_executor(mgr, do_throw_if_stopping, t, type, std::move(desc)) , compaction_task_impl(mgr._task_manager_module, tasks::task_id::create_random_id(), 0, "compaction group", t->schema()->ks_name(), t->schema()->cf_name(), "", parent_id) , _job(std::move(job)) { _status.progress_units = "bytes"; } virtual std::string type() const override { return fmt::format("{} compaction", compaction_type()); } virtual future get_progress() const override { return compaction_task_impl::get_progress(_compaction_data, _progress_monitor); } virtual void abort() noexcept override { return compaction_task_executor::abort(_as); } protected: virtual future<> run() override { return perform(); } virtual future do_run() override { if (!can_proceed(throw_if_stopping::yes)) { co_return std::nullopt; } switch_state(state::pending); auto units = co_await acquire_semaphore(_cm._maintenance_ops_sem); if (!can_proceed(throw_if_stopping::yes)) { co_return std::nullopt; } setup_new_compaction(); // NOTE: // no need to register shared sstables because they're excluded from non-resharding // compaction and some of them may not even belong to current shard. co_await _job(compaction_data(), _progress_monitor); finish_compaction(); co_return std::nullopt; } }; future<> compaction_manager::run_custom_job(compaction_group_view& t, compaction_type type, const char* desc, noncopyable_function(compaction_data&, compaction_progress_monitor&)> job, tasks::task_info info, throw_if_stopping do_throw_if_stopping) { auto gh = start_compaction(t); if (!gh) { co_return; } co_return co_await perform_compaction(do_throw_if_stopping, info, &t, info.id, type, desc, std::move(job)).discard_result(); } future<> compaction_manager::update_static_shares(float static_shares) { cmlog.info("Updating static shares to {}", static_shares); return _compaction_controller.update_static_shares(static_shares); } compaction_reenabler::compaction_reenabler(compaction_manager& cm, compaction_group_view& t) : _cm(cm) , _table(&t) , _compaction_state(cm.get_compaction_state(_table)) , _holder(_compaction_state.gate.hold()) { _compaction_state.compaction_disabled_counter++; cmlog.debug("Temporarily disabled compaction for {}. compaction_disabled_counter={}", t, _compaction_state.compaction_disabled_counter); } compaction_reenabler::compaction_reenabler(compaction_reenabler&& o) noexcept : _cm(o._cm) , _table(std::exchange(o._table, nullptr)) , _compaction_state(o._compaction_state) , _holder(std::move(o._holder)) {} compaction_reenabler::~compaction_reenabler() { // submit compaction request if we're the last holder of the gate which is still opened. if (_table && --_compaction_state.compaction_disabled_counter == 0 && !_compaction_state.gate.is_closed()) { cmlog.debug("Reenabling compaction for {}", *_table); try { _cm.submit(*_table); } catch (...) { cmlog.warn("compaction_reenabler could not reenable compaction for {}: {}", *_table, std::current_exception()); } } } future<> compaction_manager::await_ongoing_compactions(compaction_group_view* t) { auto name = t ? t->schema()->ks_name() + "." + t->schema()->cf_name() : "ALL"; try { auto tasks = _tasks | std::views::filter([t] (const auto& task) { return (!t || task.compacting_table() == t); }) | std::views::transform([] (auto& task) { return task.shared_from_this(); }) | std::ranges::to>>(); auto sz = tasks.size(); cmlog.debug("Awaiting ongoing unrepaired compactions table={} tasks={}", name, sz); bool task_stopped = false; co_await await_tasks(std::move(tasks), task_stopped); cmlog.debug("Awaiting ongoing unrepaired compactions table={} tasks={} done", name, sz); } catch (...) { cmlog.error("Awaiting ongoing unrepaired compactions table={} failed: {}", name, std::current_exception()); throw; } } future compaction_manager::get_incremental_repair_read_lock(compaction::compaction_group_view& t, const sstring& reason) { if (!reason.empty()) { cmlog.debug("Get get_incremental_repair_read_lock for {} started", reason); } compaction::compaction_state& cs = get_compaction_state(&t); auto gh = cs.gate.hold(); auto ret = co_await cs.incremental_repair_lock.hold_read_lock(); if (!reason.empty()) { cmlog.debug("Get get_incremental_repair_read_lock for {} done", reason); } co_return ret; } future compaction_manager::get_incremental_repair_write_lock(compaction::compaction_group_view& t, const sstring& reason) { if (!reason.empty()) { cmlog.debug("Get get_incremental_repair_write_lock for {} started", reason); } compaction::compaction_state& cs = get_compaction_state(&t); auto gh = cs.gate.hold(); auto ret = co_await cs.incremental_repair_lock.hold_write_lock(); if (!reason.empty()) { cmlog.debug("Get get_incremental_repair_write_lock for {} done", reason); } co_return ret; } future compaction_manager::await_and_disable_compaction(compaction_group_view& t) { compaction_reenabler cre(*this, t); co_await await_ongoing_compactions(&t); co_return cre; } compaction_reenabler compaction_manager::stop_and_disable_compaction_no_wait(compaction_group_view& t, sstring reason) { compaction_reenabler cre(*this, t); try { do_stop_ongoing_compactions(std::move(reason), [&t] (const compaction_group_view* x) { return x == &t; } , {}); } catch (...) { cmlog.error("Stopping ongoing compactions failed: {}. Ignored", std::current_exception()); } return cre; } future compaction_manager::stop_and_disable_compaction(sstring reason, compaction_group_view& t) { compaction_reenabler cre(*this, t); co_await stop_ongoing_compactions(std::move(reason), &t); co_return cre; } future<> compaction_manager::run_with_compaction_disabled(compaction_group_view& t, std::function ()> func, sstring reason) { compaction_reenabler cre = co_await stop_and_disable_compaction(std::move(reason), t); co_await func(); } } auto fmt::formatter::format(compaction::compaction_task_executor::state s, fmt::format_context& ctx) const -> decltype(ctx.out()) { std::string_view name; switch (s) { using enum compaction::compaction_task_executor::state; case none: name = "none"; break; case pending: name = "pending"; break; case active: name = "active"; break; case done: name = "done"; break; case postponed: name = "postponed"; break; case failed: name = "failed"; break; } return fmt::format_to(ctx.out(), "{}", name); } auto fmt::formatter::format(const compaction::compaction_task_executor& ex, fmt::format_context& ctx) const -> decltype(ctx.out()) { auto* t = ex._compacting_table; return fmt::format_to(ctx.out(), "{} task {} for table {} [{}]", ex._description, fmt::ptr(&ex), *t, fmt::ptr(t)); } namespace compaction { inline compaction_controller make_compaction_controller(const compaction_manager::scheduling_group& csg, uint64_t static_shares, std::optional max_shares, std::function fn) { return compaction_controller(csg, static_shares, max_shares, 250ms, std::move(fn)); } compaction::compaction_state::~compaction_state() { compaction_done.broken(); } future<> sstables_task_executor::release_resources() noexcept { _cm._stats.pending_tasks -= _sstables.size() - (_state == state::pending); _sstables = {}; return make_ready_future(); } future compaction_task_executor::run_compaction() noexcept { try { _compaction_done = stopping() ? make_exception_future(make_compaction_stopped_exception()) : do_run(); return compaction_done(); } catch (...) { return current_exception_as_future(); } } compaction_task_executor::state compaction_task_executor::switch_state(state new_state) { auto old_state = std::exchange(_state, new_state); switch (old_state) { case state::none: case state::done: case state::postponed: case state::failed: break; case state::pending: --_cm._stats.pending_tasks; break; case state::active: --_cm._stats.active_tasks; break; } switch (new_state) { case state::none: case state::postponed: case state::failed: break; case state::pending: ++_cm._stats.pending_tasks; break; case state::active: ++_cm._stats.active_tasks; break; case state::done: ++_cm._stats.completed_tasks; break; } cmlog.debug("{}: switch_state: {} -> {}: pending={} active={} done={} errors={}", *this, old_state, new_state, _cm._stats.pending_tasks, _cm._stats.active_tasks, _cm._stats.completed_tasks, _cm._stats.errors); return old_state; } void sstables_task_executor::set_sstables(std::vector new_sstables) { if (!_sstables.empty()) { on_internal_error(cmlog, format("sstables were already set")); } _sstables = std::move(new_sstables); cmlog.debug("{}: set_sstables: {} sstable{}", *this, _sstables.size(), _sstables.size() > 1 ? "s" : ""); _cm._stats.pending_tasks += _sstables.size() - (_state == state::pending); } sstables::shared_sstable sstables_task_executor::consume_sstable() { if (_sstables.empty()) { on_internal_error(cmlog, format("no more sstables")); } auto sst = _sstables.back(); _sstables.pop_back(); --_cm._stats.pending_tasks; // from this point on, switch_state(pending|active) works the same way as any other task cmlog.debug("consumed {}", sst->get_filename()); return sst; } future> compaction_task_executor::acquire_semaphore(named_semaphore& sem, size_t units) { return seastar::get_units(sem, units, _compaction_data.abort).handle_exception_type([this] (const abort_requested_exception& e) { auto s = _compacting_table->schema(); return make_exception_future>( compaction_stopped_exception(s->ks_name(), s->cf_name(), e.what())); }); } void compaction_task_executor::setup_new_compaction(sstables::run_id output_run_id) { _compaction_data = _cm.create_compaction_data(); _output_run_identifier = output_run_id; switch_state(state::active); } void compaction_task_executor::finish_compaction(state finish_state) noexcept { switch_state(finish_state); _output_run_identifier = sstables::run_id::create_null_id(); if (finish_state != state::failed) { _compaction_retry.reset(); } _compaction_state.compaction_done.signal(); } void compaction_task_executor::abort(abort_source& as) noexcept { if (!as.abort_requested()) { as.request_abort(); stop_compaction("user requested abort"); } } void compaction_task_executor::stop_compaction(sstring reason) noexcept { _compaction_data.stop(std::move(reason)); } compaction_stopped_exception compaction_task_executor::make_compaction_stopped_exception() const { auto s = _compacting_table->schema(); return compaction_stopped_exception(s->ks_name(), s->cf_name(), _compaction_data.stop_requested); } class compaction_manager::strategy_control : public compaction::strategy_control { compaction_manager& _cm; public: explicit strategy_control(compaction_manager& cm) noexcept : _cm(cm) {} bool has_ongoing_compaction(compaction_group_view& table_s) const noexcept override { return std::any_of(_cm._tasks.begin(), _cm._tasks.end(), [&s = table_s.schema()] (const compaction_task_executor& task) { return task.compaction_running() && task.compacting_table()->schema()->ks_name() == s->ks_name() && task.compacting_table()->schema()->cf_name() == s->cf_name(); }); } future> candidates(compaction_group_view& t) const override { auto main_set = co_await t.main_sstable_set(); co_return _cm.get_candidates(t, *main_set->all()); } future> candidates_as_runs(compaction_group_view& t) const override { auto main_set = co_await t.main_sstable_set(); co_return _cm.get_candidates(t, main_set->all_sstable_runs()); } }; compaction_manager::compaction_manager(config cfg, abort_source& as, tasks::task_manager& tm) : _task_manager_module(make_shared(tm)) , _sys_ks("compaction_manager::system_keyspace") , _cfg(std::move(cfg)) , _compaction_submission_timer(compaction_sg(), compaction_submission_callback()) , _compaction_controller(make_compaction_controller(compaction_sg(), static_shares(), _cfg.max_shares.get(), [this] () -> float { _last_backlog = backlog(); auto b = _last_backlog / available_memory(); // This means we are using an unimplemented strategy if (compaction_controller::backlog_disabled(b)) { // returning the normalization factor means that we'll return the maximum // output in the _control_points. We can get rid of this when we implement // all strategies. return compaction_controller::normalization_factor; } return b; })) , _backlog_manager(_compaction_controller) , _early_abort_subscription(as.subscribe([this] () noexcept { do_stop(); })) , _throughput_updater(serialized_action([this] { return update_throughput(throughput_mbs()); })) , _update_compaction_static_shares_action([this] { return update_static_shares(static_shares()); }) , _compaction_static_shares_observer(_cfg.static_shares.observe(_update_compaction_static_shares_action.make_observer())) , _compaction_max_shares_observer(_cfg.max_shares.observe([this] (const float& max_shares) { cmlog.info("Updating max shares to {}", max_shares); _compaction_controller.set_max_shares(max_shares); })) , _strategy_control(std::make_unique(*this)) { tm.register_module(_task_manager_module->get_name(), _task_manager_module); register_metrics(); // Bandwidth throttling is node-wide, updater is needed on single shard if (this_shard_id() == 0) { _throughput_option_observer.emplace(_cfg.throughput_mb_per_sec.observe(_throughput_updater.make_observer())); // Start throttling (if configured) right at once. Any boot-time compaction // jobs (reshape/reshard) run in unlimited streaming group (void)_throughput_updater.trigger_later(); } } compaction_manager::compaction_manager(tasks::task_manager& tm) : _task_manager_module(make_shared(tm)) , _sys_ks("compaction_manager::system_keyspace") , _cfg(config{ .available_memory = 1 }) , _compaction_submission_timer(compaction_sg(), compaction_submission_callback()) , _compaction_controller(make_compaction_controller(compaction_sg(), 1, std::nullopt, [] () -> float { return 1.0; })) , _backlog_manager(_compaction_controller) , _throughput_updater(serialized_action([this] { return update_throughput(throughput_mbs()); })) , _update_compaction_static_shares_action([] { return make_ready_future<>(); }) , _compaction_static_shares_observer(_cfg.static_shares.observe(_update_compaction_static_shares_action.make_observer())) , _compaction_max_shares_observer(_cfg.max_shares.observe([] (const float& max_shares) {})) , _strategy_control(std::make_unique(*this)) { tm.register_module(_task_manager_module->get_name(), _task_manager_module); // No metric registration because this constructor is supposed to be used only by the testing // infrastructure. } compaction_manager::~compaction_manager() { // Assert that compaction manager was explicitly stopped, if started. // Otherwise, fiber(s) will be alive after the object is stopped. SCYLLA_ASSERT(_state == state::none || _state == state::stopped); } future<> compaction_manager::update_throughput(uint32_t value_mbs) { uint64_t bps = ((uint64_t)(value_mbs != 0 ? value_mbs : std::numeric_limits::max())) << 20; return compaction_sg().update_io_bandwidth(bps).then_wrapped([value_mbs] (auto f) { if (f.failed()) { cmlog.warn("Couldn't update compaction bandwidth: {}", f.get_exception()); } else if (value_mbs != 0) { cmlog.info("Set compaction bandwidth to {}MB/s", value_mbs); } else { cmlog.info("Set unlimited compaction bandwidth"); } }); } void compaction_manager::register_metrics() { namespace sm = seastar::metrics; _metrics.add_group("compaction_manager", { sm::make_gauge("compactions", [this] { return _stats.active_tasks; }, sm::description("Holds the number of currently active compactions."))(basic_level), sm::make_gauge("pending_compactions", [this] { return _stats.pending_tasks; }, sm::description("Holds the number of compaction tasks waiting for an opportunity to run.")), sm::make_counter("completed_compactions", [this] { return _stats.completed_tasks; }, sm::description("Holds the number of completed compaction tasks.")), sm::make_counter("failed_compactions", [this] { return _stats.errors; }, sm::description("Holds the number of failed compaction tasks.")), sm::make_gauge("postponed_compactions", [this] { return _postponed.size(); }, sm::description("Holds the number of tables with postponed compaction.")), sm::make_gauge("backlog", [this] { return _last_backlog; }, sm::description("Holds the sum of compaction backlog for all tables in the system.")), sm::make_gauge("normalized_backlog", [this] { return _last_backlog / available_memory(); }, sm::description("Holds the sum of normalized compaction backlog for all tables in the system. Backlog is normalized by dividing backlog by shard's available memory.")), sm::make_counter("validation_errors", [this] { return _validation_errors; }, sm::description("Holds the number of encountered validation errors.")), }); } void compaction_manager::enable() { SCYLLA_ASSERT(_state == state::none || _state == state::running); cmlog.info("Asked to enable"); if (_state == state::none) { _state = state::running; SCYLLA_ASSERT(_disabled_state_count == 0); } else if (_disabled_state_count > 0 && --_disabled_state_count > 0) { cmlog.debug("Compaction manager is still disabled, requires {} more call(s) to enable()", _disabled_state_count); return; } _compaction_submission_timer.cancel(); _compaction_submission_timer.arm_periodic(periodic_compaction_submission_interval()); _waiting_reevalution = postponed_compactions_reevaluation(); cmlog.info("Enabled"); } std::function compaction_manager::compaction_submission_callback() { return [this] () mutable { auto now = gc_clock::now(); for (auto& [table, state] : _compaction_state) { if (now - state.last_regular_compaction > periodic_compaction_submission_interval()) { postpone_compaction_for_table(table); } } reevaluate_postponed_compactions(); }; } future<> compaction_manager::postponed_compactions_reevaluation() { while (true) { co_await _postponed_reevaluation.when(); if (is_disabled()) { _postponed.clear(); co_return; } // A task_state being reevaluated can re-insert itself into postponed list, which is the reason // for moving the list to be processed into a local. auto postponed = std::exchange(_postponed, {}); try { for (auto it = postponed.begin(); it != postponed.end();) { compaction_group_view* t = *it; it = postponed.erase(it); // skip reevaluation of a compaction_group_view that became invalid post its removal if (!_compaction_state.contains(t)) { continue; } cmlog.debug("resubmitting postponed compaction for table {} [{}]", *t, fmt::ptr(t)); submit(*t); co_await coroutine::maybe_yield(); } } catch (...) { _postponed.insert(postponed.begin(), postponed.end()); } } } void compaction_manager::reevaluate_postponed_compactions() noexcept { _postponed_reevaluation.signal(); } void compaction_manager::postpone_compaction_for_table(compaction_group_view* t) { _postponed.insert(t); } void compaction_manager::stop_tasks(const std::vector>& tasks, sstring reason) noexcept { // To prevent compaction from being postponed while tasks are being stopped, // let's stop all tasks before the deferring point below. for (auto& t : tasks) { cmlog.debug("Stopping {}", *t); t->stop_compaction(reason); } } future<> compaction_manager::await_tasks(std::vector> tasks, bool task_stopped) const noexcept { co_await coroutine::parallel_for_each(tasks, [task_stopped] (auto& task) -> future<> { auto unlink_task = deferred_action([task, task_stopped] { if (task_stopped) { task->unlink(); } }); try { co_await task->compaction_done(); } catch (compaction_stopped_exception&) { // swallow stop exception if a given procedure decides to propagate it to the caller, // as it happens with reshard and reshape. } catch (...) { // just log any other errors as the callers have nothing to do with them. cmlog.debug("Awaiting {}: task returned error: {}", *task, std::current_exception()); co_return; } cmlog.debug("Awaiting {}: done", *task); }); } std::vector> compaction_manager::do_stop_ongoing_compactions(sstring reason, std::function filter, std::optional type_opt) noexcept { auto tasks = _tasks | std::views::filter([&filter, type_opt] (const auto& task) { return filter(task.compacting_table()) && (!type_opt || task.compaction_type() == *type_opt); }) | std::views::transform([] (auto& task) { return task.shared_from_this(); }) | std::ranges::to>>(); logging::log_level level = tasks.empty() ? log_level::debug : log_level::info; if (cmlog.is_enabled(level)) { auto ongoing_compactions = get_compactions(filter).size(); std::string scope = ""; if (!tasks.empty()) { const compaction_group_view* t = tasks.front()->compacting_table(); if (std::find_if(tasks.begin(), tasks.end(), [t] (auto& x) { return x->compacting_table() != t; }) == tasks.end()) { scope = fmt::format(" for table {}", *t); } } if (type_opt) { scope += fmt::format(" {} type={}", scope.size() ? "and" : "for", *type_opt); } cmlog.log(level, "Stopping {} tasks for {} ongoing compactions{} due to {}", tasks.size(), ongoing_compactions, scope, reason); } stop_tasks(tasks, std::move(reason)); return tasks; } future<> compaction_manager::stop_ongoing_compactions(sstring reason, compaction_group_view* t, std::optional type_opt) noexcept { return stop_ongoing_compactions(std::move(reason), [t] (const compaction_group_view* x) { return !t || x == t; }, type_opt); } future<> compaction_manager::stop_ongoing_compactions(sstring reason, std::function filter, std::optional type_opt) noexcept { try { auto tasks = do_stop_ongoing_compactions(std::move(reason), std::move(filter), type_opt); bool task_stopped = true; co_await await_tasks(std::move(tasks), task_stopped); } catch (...) { cmlog.error("Stopping ongoing compactions failed: {}. Ignored", std::current_exception()); } co_return; } future<> compaction_manager::drain() { cmlog.info("Asked to drain"); if (_state == state::none) { _state = state::running; } ++_disabled_state_count; _compaction_submission_timer.cancel(); // Stop ongoing compactions, if the request has not been sent already and wait for them to stop. co_await stop_ongoing_compactions("drain"); // Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber reevaluate_postponed_compactions(); cmlog.info("Drained"); } future<> compaction_manager::start(const db::config& cfg, utils::disk_space_monitor* dsm) { if (dsm && (this_shard_id() == 0)) { _out_of_space_subscription = dsm->subscribe(cfg.critical_disk_utilization_level, [this] (auto threshold_reached) { if (threshold_reached) { return container().invoke_on_all([] (compaction_manager& cm) { cm._in_critical_disk_utilization_mode = true; return cm.drain(); }); } return container().invoke_on_all([] (compaction_manager& cm) { cm._in_critical_disk_utilization_mode = false; cm.enable(); }); }); } return make_ready_future<>(); } future<> compaction_manager::stop() { do_stop(); if (_stop_future) { co_await std::exchange(*_stop_future, make_ready_future()); } } future<> compaction_manager::really_do_stop() noexcept { cmlog.info("Asked to stop"); // Reset the metrics registry _metrics.clear(); co_await stop_ongoing_compactions("shutdown"); co_await _task_manager_module->stop(); co_await coroutine::parallel_for_each(_compaction_state | std::views::values, [] (compaction_state& cs) -> future<> { if (!cs.gate.is_closed()) { co_await cs.gate.close(); } }); if (!_tasks.empty()) { on_fatal_internal_error(cmlog, format("{} tasks still exist after being stopped", _tasks.size())); } reevaluate_postponed_compactions(); co_await std::move(_waiting_reevalution); co_await _sys_ks.close(); _weight_tracker.clear(); _compaction_submission_timer.cancel(); co_await _compaction_controller.shutdown(); co_await _throughput_updater.join(); co_await _update_compaction_static_shares_action.join(); cmlog.info("Stopped"); } // Should return immediately when _state == state::none. void compaction_manager::do_stop() noexcept { if (_state == state::none || _stop_future) { return; } try { _state = state::stopped; _stop_future = really_do_stop(); } catch (...) { cmlog.error("Failed to stop the manager: {}", std::current_exception()); } } inline bool compaction_manager::can_proceed(compaction_group_view* t) const { if (is_disabled()) { return false; } auto found = _compaction_state.find(t); if (found == _compaction_state.end()) { return false; } return !found->second.compaction_disabled(); } future<> compaction_task_executor::perform() { _stats = co_await _cm.perform_task(shared_from_this(), _do_throw_if_stopping); } inline bool compaction_task_executor::can_proceed(throw_if_stopping do_throw_if_stopping) const { if (stopping()) { // Allow caller to know that task (e.g. reshape) was asked to stop while waiting for a chance to run. if (do_throw_if_stopping) { throw make_compaction_stopped_exception(); } return false; } return _cm.can_proceed(_compacting_table); } future compaction_task_executor::maybe_retry(std::exception_ptr err, bool throw_on_abort) { try { std::rethrow_exception(err); } catch (compaction_stopped_exception& e) { cmlog.info("{}: {}: stopping", *this, e.what()); } catch (compaction_aborted_exception& e) { cmlog.error("{}: {}: stopping", *this, e.what()); _cm._stats.errors++; if (throw_on_abort) { throw; } } catch (storage_io_error& e) { cmlog.error("{}: failed due to storage io error: {}: stopping", *this, e.what()); _cm._stats.errors++; _cm.do_stop(); throw; } catch (...) { if (can_proceed()) { _cm._stats.errors++; cmlog.error("{}: failed: {}. Will retry in {} seconds", *this, std::current_exception(), std::chrono::duration_cast(_compaction_retry.sleep_time()).count()); switch_state(state::pending); return _compaction_retry.retry(_compaction_data.abort).handle_exception_type([this] (sleep_aborted&) { return make_exception_future<>(make_compaction_stopped_exception()); }).then([] { return make_ready_future(false); }); } throw; } return make_ready_future(true); } class regular_compaction_task_executor : public compaction_task_executor, public regular_compaction_task_impl { public: regular_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view& t) : compaction_task_executor(mgr, do_throw_if_stopping, &t, compaction_type::Compaction, "Compaction") , regular_compaction_task_impl(mgr._task_manager_module, tasks::task_id::create_random_id(), mgr._task_manager_module->new_sequence_number(), t.schema()->ks_name(), t.schema()->cf_name(), "", tasks::task_id::create_null_id()) {} virtual void abort() noexcept override { return compaction_task_executor::abort(_as); } protected: virtual future<> run() override { return perform(); } virtual future do_run() override { if (!is_system_keyspace(_status.keyspace)) { co_await utils::get_local_injector().inject("compaction_regular_compaction_task_executor_do_run", utils::wait_for_message(10s)); } co_await coroutine::switch_to(_cm.compaction_sg()); for (;;) { auto uuid = utils::make_random_uuid(); if (!can_proceed()) { co_return std::nullopt; } switch_state(state::pending); // Write lock is used to synchronize selection of sstables for compaction and their registration. auto lock_holder = co_await _compaction_state.lock.hold_write_lock(); if (!can_proceed()) { co_return std::nullopt; } compaction_group_view& t = *_compacting_table; compaction_strategy cs = t.get_compaction_strategy(); compaction_descriptor descriptor = co_await cs.get_sstables_for_compaction(t, _cm.get_strategy_control()); int weight = calculate_weight(descriptor); bool debug_enabled = cmlog.is_enabled(log_level::debug); if (debug_enabled) { cmlog.debug("Started minor compaction sstables={} sstables_reapired_at={} range={} uuid={} compaction_uuid={}", descriptor.sstables, compacting_table()->get_sstables_repaired_at(), compacting_table()->token_range(), uuid, _compaction_data.compaction_uuid); } sstring old_sstables; if (debug_enabled) { old_sstables = ::format("{}", descriptor.sstables); } if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) { cmlog.debug("{}: sstables={} can_proceed={} auto_compaction={}", *this, descriptor.sstables.size(), can_proceed(), t.is_auto_compaction_disabled_by_user()); co_return std::nullopt; } if (!_cm.can_register_compaction(t, weight, descriptor.fan_in())) { cmlog.debug("Refused compaction job ({} sstable(s)) of weight {} for {}, postponing it...", descriptor.sstables.size(), weight, t); switch_state(state::postponed); _cm.postpone_compaction_for_table(&t); co_return std::nullopt; } auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(&t), descriptor.sstables); auto weight_r = compaction_weight_registration(&_cm, weight); auto on_replace = compacting.update_on_sstable_replacement(); cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}", fmt::ptr(this), descriptor.sstables.size(), weight, t); // Finished selecting and registering compacting sstables, so write lock can be released. lock_holder.return_all(); lock_holder = co_await _compaction_state.lock.hold_read_lock(); setup_new_compaction(descriptor.run_identifier); _compaction_state.last_regular_compaction = gc_clock::now(); std::exception_ptr ex; try { bool should_update_history = this->should_update_history(descriptor.options.type()); compaction_result res = co_await compact_sstables(std::move(descriptor), _compaction_data, on_replace); if (debug_enabled) { cmlog.debug("Finished minor compaction old_sstables={} new_sstables={} sstables_reapired_at={} range={} uuid={} compaction_uuid={}", old_sstables, res.new_sstables, compacting_table()->get_sstables_repaired_at(), compacting_table()->token_range(), uuid, _compaction_data.compaction_uuid); } finish_compaction(); if (should_update_history) { // update_history can take a long time compared to // compaction, as a call issued on shard S1 can be // handled on shard S2. If the other shard is under // heavy load, we may unnecessarily block kicking off a // new compaction. Normally it isn't a problem, but there were // edge cases where the described behaviour caused // compaction to fail to keep up with excessive // flushing, leading to too many sstables on disk and // OOM during a read. There is no need to wait with // next compaction until history is updated, so release // the weight earlier to remove unnecessary // serialization. weight_r.deregister(); co_await update_history(*_compacting_table, std::move(res), _compaction_data); } _cm.reevaluate_postponed_compactions(); continue; } catch (...) { ex = std::current_exception(); } finish_compaction(state::failed); if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) { co_return std::nullopt; } } co_return std::nullopt; } }; void compaction_manager::submit(compaction_group_view& t) { if (t.is_auto_compaction_disabled_by_user()) { return; } auto gh = start_compaction(t); if (!gh) { return; } // OK to drop future. // waited via compaction_task_executor::compaction_done() (void)perform_compaction(throw_if_stopping::no, tasks::task_info{}, t).then_wrapped([gh = std::move(gh)] (auto f) { f.ignore_ready_future(); }); } bool compaction_manager::can_perform_regular_compaction(compaction_group_view& t) { return can_proceed(&t) && !t.is_auto_compaction_disabled_by_user(); } future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_group_view& t) { auto schema = t.schema(); if (!can_perform_regular_compaction(t)) { cmlog.trace("maybe_wait_for_sstable_count_reduction in {}: cannot perform regular compaction", t); co_return; } auto num_runs_for_compaction = [&, this] -> future { auto cs = t.get_compaction_strategy(); auto desc = co_await cs.get_sstables_for_compaction(t, get_strategy_control()); co_return std::ranges::size(desc.sstables | std::views::transform(std::mem_fn(&sstables::sstable::run_identifier)) | std::ranges::to()); }; const auto injected_threshold = utils::get_local_injector().inject_parameter("set_sstable_count_reduction_threshold"); const auto threshold = injected_threshold.value_or(size_t(std::max(schema->max_compaction_threshold(), 32))); auto count = co_await num_runs_for_compaction(); if (count <= threshold) { cmlog.trace("No need to wait for sstable count reduction in {}: {} <= {}", t, count, threshold); co_return; } // Reduce the chances of falling into an endless wait, if compaction // wasn't scheduled for the table due to a problem. submit(t); using namespace std::chrono_literals; auto start = db_clock::now(); auto& cstate = get_compaction_state(&t); try { while (can_perform_regular_compaction(t) && co_await num_runs_for_compaction() > threshold) { co_await cstate.compaction_done.when(); } } catch (const broken_condition_variable&) { co_return; } auto end = db_clock::now(); auto elapsed_ms = (end - start) / 1ms; cmlog.warn("Waited {}ms for compaction of {} to catch up on {} sstable runs", elapsed_ms, t, count); } class offstrategy_compaction_task_executor : public compaction_task_executor, public offstrategy_compaction_task_impl { bool& _performed; public: offstrategy_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, tasks::task_id parent_id, bool& performed) : compaction_task_executor(mgr, do_throw_if_stopping, t, compaction_type::Reshape, "Offstrategy compaction") , offstrategy_compaction_task_impl(mgr._task_manager_module, tasks::task_id::create_random_id(), parent_id ? 0 : mgr._task_manager_module->new_sequence_number(), "compaction group", t->schema()->ks_name(), t->schema()->cf_name(), "", parent_id) , _performed(performed) { _status.progress_units = "bytes"; _performed = false; } bool performed() const noexcept { return _performed; } virtual future get_progress() const override { return compaction_task_impl::get_progress(_compaction_data, _progress_monitor); } virtual void abort() noexcept override { return compaction_task_executor::abort(_as); } protected: virtual future<> run() override { return perform(); } private: future<> run_offstrategy_compaction(::compaction::compaction_data& cdata) { // Incrementally reshape the SSTables in maintenance set. The output of each reshape // round is merged into the main set. The common case is that off-strategy input // is mostly disjoint, e.g. repair-based node ops, then all the input will be // reshaped in a single round. The incremental approach allows us to be space // efficient (avoiding a 100% overhead) as we will incrementally replace input // SSTables from maintenance set by output ones into main set. compaction_group_view& t = *_compacting_table; // Filter out sstables that require view building, to avoid a race between off-strategy // and view building. Refs: #11882 auto get_reshape_candidates = [&t] () -> future> { auto maintenance_set = co_await t.maintenance_sstable_set(); co_return *maintenance_set->all() | std::views::filter([](const sstables::shared_sstable &sst) { return !sst->requires_view_building(); }) | std::ranges::to(); }; auto get_next_job = [&] () -> future> { auto candidates = co_await get_reshape_candidates(); if (candidates.empty()) { co_return std::nullopt; } // all sstables added to maintenance set share the same underlying storage. auto& storage = candidates.front()->get_storage(); reshape_config cfg = co_await make_reshape_config(storage, reshape_mode::strict); auto desc = t.get_compaction_strategy().get_reshaping_job(co_await get_reshape_candidates(), t.schema(), cfg); co_return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt; }; std::exception_ptr err; while (auto desc = co_await get_next_job()) { auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(&t), desc->sstables); auto on_replace = compacting.update_on_sstable_replacement(); try { compaction_result _ = co_await compact_sstables(std::move(*desc), _compaction_data, on_replace, compaction_manager::can_purge_tombstones::no, sstables::offstrategy::yes); } catch (compaction_stopped_exception&) { // If off-strategy compaction stopped on user request, let's not discard the partial work. // Therefore, both un-reshaped and reshaped data will be integrated into main set, allowing // regular compaction to continue from where off-strategy left off. err = std::current_exception(); break; } _performed = true; } // There might be some remaining sstables in maintenance set that didn't require reshape, or the // user has aborted off-strategy. So we can only integrate them into the main set, such that // they become candidates for regular compaction. We cannot hold them forever in maintenance set, // as that causes read and space amplification issues. if (auto sstables = co_await get_reshape_candidates(); sstables.size()) { auto completion_desc = compaction_completion_desc{ .old_sstables = sstables, // removes from maintenance set. .new_sstables = sstables, // adds into main set. }; co_await _cm.on_compaction_completion(t, std::move(completion_desc), sstables::offstrategy::yes); } if (err) { co_await coroutine::return_exception_ptr(std::move(err)); } } future maintenance_set_size() const { auto maintenance_set = co_await _compacting_table->maintenance_sstable_set(); co_return maintenance_set->size(); } protected: virtual future do_run() override { co_await coroutine::switch_to(_cm.maintenance_sg()); for (;;) { if (!can_proceed()) { co_return std::nullopt; } switch_state(state::pending); auto units = co_await acquire_semaphore(_cm._off_strategy_sem); if (!can_proceed()) { co_return std::nullopt; } setup_new_compaction(); std::exception_ptr ex; try { compaction_group_view& t = *_compacting_table; auto size = co_await maintenance_set_size(); if (!size) { cmlog.debug("Skipping off-strategy compaction for {}, No candidates were found", t); finish_compaction(); co_return std::nullopt; } cmlog.info("Starting off-strategy compaction for {}, {} candidates were found", t, size); co_await run_offstrategy_compaction(_compaction_data); finish_compaction(); cmlog.info("Done with off-strategy compaction for {}", t); co_return std::nullopt; } catch (...) { ex = std::current_exception(); } finish_compaction(state::failed); if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) { co_return std::nullopt; } } co_return std::nullopt; } }; future compaction_manager::perform_offstrategy(compaction_group_view& t, tasks::task_info info) { auto gh = start_compaction(t); if (!gh) { co_return false; } bool performed; co_await perform_compaction(throw_if_stopping::no, info, &t, info.id, performed); co_return performed; } class rewrite_sstables_compaction_task_executor : public sstables_task_executor { compaction_type_options _options; owned_ranges_ptr _owned_ranges_ptr; compacting_sstable_registration _compacting; compaction_manager::can_purge_tombstones _can_purge; public: rewrite_sstables_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, tasks::task_id parent_id, compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, std::vector sstables, compacting_sstable_registration compacting, compaction_manager::can_purge_tombstones can_purge, sstring type_options_desc = "") : sstables_task_executor(mgr, do_throw_if_stopping, t, options.type(), sstring(to_string(options.type())), std::move(sstables), parent_id, std::move(type_options_desc)) , _options(std::move(options)) , _owned_ranges_ptr(std::move(owned_ranges_ptr)) , _compacting(std::move(compacting)) , _can_purge(can_purge) {} virtual future<> release_resources() noexcept override { _compacting.release_all(); _owned_ranges_ptr = nullptr; co_await sstables_task_executor::release_resources(); } protected: virtual future do_run() override { compaction_stats stats{}; switch_state(state::pending); auto maintenance_permit = co_await acquire_semaphore(_cm._maintenance_ops_sem); while (!_sstables.empty() && can_proceed()) { auto sst = consume_sstable(); auto res = co_await rewrite_sstable(std::move(sst)); _cm._validation_errors += res.stats.validation_errors; stats += res.stats; } co_return stats; } static compaction_descriptor make_descriptor(const sstables::shared_sstable& sst, const compaction_type_options& opt, owned_ranges_ptr owned_ranges = {}) { auto sstable_level = sst->get_sstable_level(); auto run_identifier = sst->run_identifier(); return compaction_descriptor({ sst }, sstable_level, compaction_descriptor::default_max_sstable_bytes, run_identifier, opt, owned_ranges); } virtual compaction_descriptor make_descriptor(const sstables::shared_sstable& sst) const { return make_descriptor(sst, _options, _owned_ranges_ptr); } virtual future rewrite_sstable(const sstables::shared_sstable sst) { co_await coroutine::switch_to(_cm.maintenance_sg()); for (;;) { switch_state(state::active); auto descriptor = make_descriptor(sst); // Releases reference to cleaned sstable such that respective used disk space can be freed. auto on_replace = _compacting.update_on_sstable_replacement(); setup_new_compaction(descriptor.run_identifier); std::exception_ptr ex; try { compaction_result res = co_await compact_sstables_and_update_history(std::move(descriptor), _compaction_data, on_replace, _can_purge); finish_compaction(); _cm.reevaluate_postponed_compactions(); co_return res; // done with current sstable } catch (...) { ex = std::current_exception(); } finish_compaction(state::failed); // retry current sstable or rethrows exception if ((co_await maybe_retry(std::move(ex), true)) == stop_iteration::yes) { co_return compaction_result{}; } } } }; class rewrite_sstables_component_compaction_task_executor final : public rewrite_sstables_compaction_task_executor { std::unordered_map& _rewritten_sstables; public: rewrite_sstables_component_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, tasks::task_id parent_id, compaction_type_options options, std::vector sstables, compacting_sstable_registration compacting, std::unordered_map& rewritten_sstables) : rewrite_sstables_compaction_task_executor(mgr, do_throw_if_stopping, t, parent_id, options, {}, std::move(sstables), std::move(compacting), compaction_manager::can_purge_tombstones::no, "component_rewrite"), _rewritten_sstables(rewritten_sstables) {} protected: virtual future do_run() override { compaction_stats stats{}; switch_state(state::pending); auto maintenance_permit = co_await acquire_semaphore(_cm._maintenance_ops_sem); while (!_sstables.empty()) { auto sst = consume_sstable(); auto it = _rewritten_sstables.emplace(sst, sstables::shared_sstable{}).first; auto res = co_await rewrite_sstable(std::move(sst)); _cm._validation_errors += res.stats.validation_errors; stats += res.stats; it->second = std::move(res.new_sstables.front()); } co_return stats; } }; class split_compaction_task_executor final : public rewrite_sstables_compaction_task_executor { compaction_type_options::split _opt; public: split_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, tasks::task_id parent_id, compaction_type_options options, owned_ranges_ptr owned_ranges, std::vector sstables, compacting_sstable_registration compacting) : rewrite_sstables_compaction_task_executor(mgr, do_throw_if_stopping, t, parent_id, options, std::move(owned_ranges), std::move(sstables), std::move(compacting), compaction_manager::can_purge_tombstones::yes) , _opt(options.as()) { if (utils::get_local_injector().is_enabled("split_sstable_rewrite")) { _do_throw_if_stopping = throw_if_stopping::yes; } } static bool sstable_needs_split(const sstables::shared_sstable& sst, const compaction_type_options::split& opt) { return opt.classifier(sst->get_first_decorated_key().token()) != opt.classifier(sst->get_last_decorated_key().token()); } static compaction_descriptor make_descriptor(const sstables::shared_sstable& sst, const compaction_type_options::split& split_opt) { auto opt = compaction_type_options::make_split(split_opt.classifier); return rewrite_sstables_compaction_task_executor::make_descriptor(sst, std::move(opt)); } private: bool sstable_needs_split(const sstables::shared_sstable& sst) const { return sstable_needs_split(sst, _opt); } protected: compaction_descriptor make_descriptor(const sstables::shared_sstable& sst) const override { return make_descriptor(sst, _opt); } future do_rewrite_sstable(const sstables::shared_sstable sst) { if (sstable_needs_split(sst)) { return rewrite_sstables_compaction_task_executor::rewrite_sstable(std::move(sst)); } // SSTable that doesn't require split can bypass compaction and the table will be able to place // it into the correct compaction group. Similar approach is done in off-strategy compaction for // sstables that don't require reshape and are ready to be moved across sets. compaction_completion_desc desc { .old_sstables = {sst}, .new_sstables = {sst} }; return _compacting_table->on_compaction_completion(std::move(desc), sstables::offstrategy::no).then([] { // It's fine to return empty results (zeroed stats) as compaction was bypassed. return compaction_result{}; }); } future rewrite_sstable(const sstables::shared_sstable sst) override { co_await utils::get_local_injector().inject("split_sstable_rewrite", [this] (auto& handler) -> future<> { cmlog.info("split_sstable_rewrite: waiting"); while (!handler.poll_for_message() && !_compaction_data.is_stop_requested()) { co_await sleep(std::chrono::milliseconds(5)); } cmlog.info("split_sstable_rewrite: released"); if (_compaction_data.is_stop_requested()) { throw make_compaction_stopped_exception(); } }, false); if (utils::get_local_injector().is_enabled("split_sstable_force_stop_exception")) { throw make_compaction_stopped_exception(); } co_return co_await do_rewrite_sstable(std::move(sst)); } }; template requires std::derived_from && std::derived_from future compaction_manager::perform_task_on_all_files(sstring reason, tasks::task_info info, compaction_group_view& t, compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, throw_if_stopping do_throw_if_stopping, Args... args) { auto gh = start_compaction(t); if (!gh) { co_return std::nullopt; } // since we might potentially have ongoing compactions, and we // must ensure that all sstables created before we run are included // in the re-write, we need to barrier out any previously running // compaction. std::vector sstables; compacting_sstable_registration compacting(*this, get_compaction_state(&t)); co_await run_with_compaction_disabled(t, [&sstables, &compacting, get_func = std::move(get_func)] () -> future<> { // Getting sstables and registering them as compacting must be atomic, to avoid a race condition where // regular compaction runs in between and picks the same files. sstables = co_await get_func(); compacting.register_compacting(sstables); // sort sstables by size in descending order, such that the smallest files will be rewritten first // (as sstable to be rewritten is popped off from the back of container), so rewrite will have higher // chance to succeed when the biggest files are reached. std::sort(sstables.begin(), sstables.end(), [](sstables::shared_sstable& a, sstables::shared_sstable& b) { return a->data_size() > b->data_size(); }); }, std::move(reason)); if (sstables.empty()) { co_return std::nullopt; } co_return co_await perform_compaction(do_throw_if_stopping, info, &t, info.id, std::move(options), std::move(owned_ranges_ptr), std::move(sstables), std::move(compacting), std::forward(args)...); } future compaction_manager::rewrite_sstables(compaction_group_view& t, compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, tasks::task_info info, can_purge_tombstones can_purge, sstring options_desc) { return perform_task_on_all_files("rewrite", info, t, std::move(options), std::move(owned_ranges_ptr), std::move(get_func), throw_if_stopping::no, can_purge, std::move(options_desc)); } future compaction_manager::rewrite_sstables_component(compaction_group_view& t, std::vector& sstables, compaction_type_options options, std::unordered_map& rewritten_sstables, tasks::task_info info) { auto gh = start_compaction(t); if (!gh) { co_return std::nullopt; } if (sstables.empty()) { co_return std::nullopt; } compacting_sstable_registration compacting(*this, get_compaction_state(&t)); compacting.register_compacting(sstables); co_return co_await perform_compaction(throw_if_stopping::no, info, &t, info.id, std::move(options), std::move(sstables), std::move(compacting), rewritten_sstables); } class validate_sstables_compaction_task_executor : public sstables_task_executor { compaction_manager::quarantine_invalid_sstables _quarantine_sstables; public: validate_sstables_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, tasks::task_id parent_id, std::vector sstables, compaction_manager::quarantine_invalid_sstables quarantine_sstables) : sstables_task_executor(mgr, do_throw_if_stopping, t, compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables), parent_id) , _quarantine_sstables(quarantine_sstables) {} protected: virtual future do_run() override { compaction_stats stats{}; while (!_sstables.empty() && can_proceed()) { auto sst = consume_sstable(); auto res = co_await validate_sstable(std::move(sst)); _cm._validation_errors += res.stats.validation_errors; stats += res.stats; } co_return stats; } private: future validate_sstable(const sstables::shared_sstable& sst) { co_await coroutine::switch_to(_cm.maintenance_sg()); switch_state(state::active); std::exception_ptr ex; try { auto desc = compaction_descriptor( { sst }, sst->get_sstable_level(), compaction_descriptor::default_max_sstable_bytes, sst->run_identifier(), compaction_type_options::make_scrub(compaction_type_options::scrub::mode::validate, _quarantine_sstables)); co_return co_await ::compaction::compact_sstables(std::move(desc), _compaction_data, *_compacting_table, _progress_monitor); } catch (compaction_stopped_exception&) { // ignore, will be handled by can_proceed() } catch (storage_io_error& e) { cmlog.error("{}: failed due to storage io error: {}: stopping", *this, e.what()); _cm._stats.errors++; _cm.do_stop(); throw; } catch (...) { // We are validating potentially corrupt sstables, errors are // expected, just continue with the other sstables when seeing // one. _cm._stats.errors++; cmlog.error("Scrubbing in validate mode {} failed due to {}, continuing.", sst->get_filename(), std::current_exception()); } co_return compaction_result{}; } }; static future> get_all_sstables(compaction_group_view& t) { auto main_set = co_await t.main_sstable_set(); auto maintenance_set = co_await t.maintenance_sstable_set(); auto s = *main_set->all() | std::ranges::to(); auto maintenance_sstables = maintenance_set->all(); s.insert(s.end(), maintenance_sstables->begin(), maintenance_sstables->end()); co_return s; } future compaction_manager::perform_sstable_scrub_validate_mode(compaction_group_view& t, tasks::task_info info, quarantine_invalid_sstables quarantine_sstables) { auto gh = start_compaction(t); if (!gh) { co_return compaction_stats_opt{}; } // Collect and register all sstables as compacting while compaction is disabled, to avoid a race condition where // regular compaction runs in between and picks the same files. std::vector all_sstables; compacting_sstable_registration compacting(*this, get_compaction_state(&t)); co_await run_with_compaction_disabled(t, [&all_sstables, &compacting, &t] () -> future<> { // All sstables must be included. all_sstables = co_await get_all_sstables(t); compacting.register_compacting(all_sstables); }, "disabling compaction to run scrub validate"); if (all_sstables.empty()) { co_return compaction_stats_opt{}; } co_return co_await perform_compaction(throw_if_stopping::no, info, &t, info.id, std::move(all_sstables), quarantine_sstables); } class cleanup_sstables_compaction_task_executor : public compaction_task_executor, public cleanup_compaction_task_impl { const compaction_type_options _cleanup_options; owned_ranges_ptr _owned_ranges_ptr; compacting_sstable_registration _compacting; std::vector _pending_cleanup_jobs; public: cleanup_sstables_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, compaction_group_view* t, tasks::task_id parent_id, compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, std::vector candidates, compacting_sstable_registration compacting) : compaction_task_executor(mgr, do_throw_if_stopping, t, options.type(), sstring(to_string(options.type()))) , cleanup_compaction_task_impl(mgr._task_manager_module, tasks::task_id::create_random_id(), 0, "compaction group", t->schema()->ks_name(), t->schema()->cf_name(), "", parent_id) , _cleanup_options(std::move(options)) , _owned_ranges_ptr(std::move(owned_ranges_ptr)) , _compacting(std::move(compacting)) , _pending_cleanup_jobs(t->get_compaction_strategy().get_cleanup_compaction_jobs(*t, std::move(candidates))) { // Cleanup is made more resilient under disk space pressure, by cleaning up smaller jobs first, so larger jobs // will have more space available released by previous jobs. std::ranges::sort(_pending_cleanup_jobs, std::ranges::greater(), std::mem_fn(&compaction_descriptor::sstables_size)); _cm._stats.pending_tasks += _pending_cleanup_jobs.size(); _status.progress_units = "bytes"; } virtual ~cleanup_sstables_compaction_task_executor() = default; virtual future<> release_resources() noexcept override { _cm._stats.pending_tasks -= _pending_cleanup_jobs.size(); _pending_cleanup_jobs = {}; _compacting.release_all(); _owned_ranges_ptr = nullptr; return make_ready_future(); } virtual future get_progress() const override { return compaction_task_impl::get_progress(_compaction_data, _progress_monitor); } virtual void abort() noexcept override { return compaction_task_executor::abort(_as); } protected: virtual future<> run() override { return perform(); } virtual future do_run() override { switch_state(state::pending); auto maintenance_permit = co_await acquire_semaphore(_cm._maintenance_ops_sem); while (!_pending_cleanup_jobs.empty() && can_proceed()) { auto active_job = std::move(_pending_cleanup_jobs.back()); active_job.options = _cleanup_options; active_job.owned_ranges = _owned_ranges_ptr; co_await run_cleanup_job(std::move(active_job)); _pending_cleanup_jobs.pop_back(); _cm._stats.pending_tasks--; } co_return std::nullopt; } private: future<> run_cleanup_job(compaction_descriptor descriptor) { co_await coroutine::switch_to(_cm.maintenance_sg()); // Releases reference to cleaned files such that respective used disk space can be freed. using update_registration = compacting_sstable_registration::update_me; class release_exhausted : public update_registration { compaction_descriptor& _desc; public: release_exhausted(compacting_sstable_registration& registration, compaction_descriptor& desc) : update_registration{registration} , _desc{desc} {} void on_removal(const std::vector& sstables) override { auto exhausted = sstables | std::ranges::to(); std::erase_if(_desc.sstables, [&] (const sstables::shared_sstable& sst) { return exhausted.contains(sst); }); update_registration::on_removal(sstables); } }; release_exhausted on_replace{_compacting, descriptor}; for (;;) { std::exception_ptr ex; try { setup_new_compaction(descriptor.run_identifier); co_await utils::get_local_injector().inject("sstable_cleanup_wait", utils::wait_for_message(std::chrono::seconds(60))); co_await compact_sstables_and_update_history(descriptor, _compaction_data, on_replace); finish_compaction(); _cm.reevaluate_postponed_compactions(); co_return; // done with current job } catch (...) { ex = std::current_exception(); } finish_compaction(state::failed); if (_compaction_data.is_stop_requested()) { throw make_compaction_stopped_exception(); } // retry current job or rethrows exception if ((co_await maybe_retry(std::move(ex))) == stop_iteration::yes) { co_return; } } } }; bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) { // Finish early if the keyspace has no owned token ranges (in this data center) if (sorted_owned_ranges.empty()) { return true; } auto first_token = sst->get_first_decorated_key().token(); auto last_token = sst->get_last_decorated_key().token(); dht::token_range sst_token_range = dht::token_range::make(first_token, last_token); auto r = std::lower_bound(sorted_owned_ranges.begin(), sorted_owned_ranges.end(), first_token, [] (const interval& a, const dht::token& b) { // check that range a is before token b. return a.after(b, dht::token_comparator()); }); // return true iff sst partition range isn't fully contained in any of the owned ranges. if (r != sorted_owned_ranges.end()) { if (r->contains(sst_token_range, dht::token_comparator())) { return false; } } return true; } bool compaction_manager::update_sstable_cleanup_state(compaction_group_view& t, const sstables::shared_sstable& sst, const dht::token_range_vector& sorted_owned_ranges) { auto& cs = get_compaction_state(&t); if (sst->is_shared()) { throw std::runtime_error(format("Shared SSTable {} cannot be marked as requiring cleanup, as it can only be processed by resharding", sst->get_filename())); } if (needs_cleanup(sst, sorted_owned_ranges)) { cs.sstables_requiring_cleanup.insert(sst); return true; } else { cs.sstables_requiring_cleanup.erase(sst); return false; } } bool compaction_manager::erase_sstable_cleanup_state(compaction_group_view& t, const sstables::shared_sstable& sst) { auto& cs = get_compaction_state(&t); return cs.sstables_requiring_cleanup.erase(sst); } bool compaction_manager::requires_cleanup(compaction_group_view& t, const sstables::shared_sstable& sst) const { const auto& cs = get_compaction_state(&t); return cs.sstables_requiring_cleanup.contains(sst); } const std::unordered_set& compaction_manager::sstables_requiring_cleanup(compaction_group_view& t) const { const auto& cs = get_compaction_state(&t); return cs.sstables_requiring_cleanup; } future<> compaction_manager::perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction_group_view& t, tasks::task_info info) { auto gh = start_compaction(t); if (!gh) { co_return; } constexpr auto sleep_duration = std::chrono::seconds(10); constexpr auto max_idle_duration = std::chrono::seconds(300); auto& cs = get_compaction_state(&t); co_await try_perform_cleanup(sorted_owned_ranges, t, info); auto last_idle = seastar::lowres_clock::now(); while (!cs.sstables_requiring_cleanup.empty()) { auto idle = seastar::lowres_clock::now() - last_idle; if (idle >= max_idle_duration) { auto msg = ::format("Cleanup timed out after {} seconds of no progress", std::chrono::duration_cast(idle).count()); cmlog.warn("{}", msg); co_await coroutine::return_exception(std::runtime_error(msg)); } auto has_sstables_eligible_for_compaction = [&] { for (auto& sst : cs.sstables_requiring_cleanup) { if (eligible_for_compaction(sst)) { return true; } } return false; }; cmlog.debug("perform_cleanup: waiting for sstables to become eligible for cleanup"); try { co_await t.get_staging_done_condition().when(sleep_duration, [&] { return has_sstables_eligible_for_compaction(); }); } catch (const seastar::condition_variable_timed_out&) { // Ignored. Keep retrying for max_idle_duration } if (!has_sstables_eligible_for_compaction()) { continue; } co_await try_perform_cleanup(sorted_owned_ranges, t, info); last_idle = seastar::lowres_clock::now(); } } future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_ranges, compaction_group_view& t, tasks::task_info info) { auto check_for_cleanup = [this, &t] { return std::ranges::any_of(_tasks, [&t] (auto& task) { return task.compacting_table() == &t && task.compaction_type() == compaction_type::Cleanup; }); }; if (check_for_cleanup()) { throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}", t)); } auto& cs = get_compaction_state(&t); co_await run_with_compaction_disabled(t, [&] () -> future<> { auto update_sstables_cleanup_state = [&] (lw_shared_ptr set) -> future<> { co_await set->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) { update_sstable_cleanup_state(t, sst, *sorted_owned_ranges); }); }; // No need to treat repaired and unrepaired sstables separately here, // since it only inserts or deletes sstables into or from // sstables_requiring_cleanup. co_await update_sstables_cleanup_state(co_await t.main_sstable_set()); co_await update_sstables_cleanup_state(co_await t.maintenance_sstable_set()); // Some sstables may remain in sstables_requiring_cleanup // for later processing if they can't be cleaned up right now. // They are erased from sstables_requiring_cleanup by compacting.release_compacting if (!cs.sstables_requiring_cleanup.empty()) { cs.owned_ranges_ptr = std::move(sorted_owned_ranges); } }, "cleanup"); if (cs.sstables_requiring_cleanup.empty()) { cmlog.debug("perform_cleanup for {} found no sstables requiring cleanup", t); co_return; } auto maintenance_set = co_await t.maintenance_sstable_set(); auto found_maintenance_sstables = bool(maintenance_set->for_each_sstable_until([this, &t] (const sstables::shared_sstable& sst) { return stop_iteration(requires_cleanup(t, sst)); })); if (found_maintenance_sstables) { co_await perform_offstrategy(t, info); } if (utils::get_local_injector().enter("major_compaction_before_cleanup")) { co_await perform_major_compaction(t, info); } // Called with compaction_disabled auto get_sstables = [this, &t] () -> future> { auto& cs = get_compaction_state(&t); co_return get_candidates(t, cs.sstables_requiring_cleanup); }; co_await perform_task_on_all_files("cleanup", info, t, compaction_type_options::make_cleanup(), std::move(sorted_owned_ranges), std::move(get_sstables), throw_if_stopping::yes); } // Submit a table to be upgraded and wait for its termination. future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction_group_view& t, bool exclude_current_version, tasks::task_info info) { auto get_sstables = [this, &t, exclude_current_version] () -> future> { std::vector tables; auto last_version = t.get_sstables_manager().get_preferred_sstable_version(); for (auto& sst : co_await get_candidates(t)) { // if we are a "normal" upgrade, we only care about // tables with other versions, but potentially // we are to actually rewrite everything. (-a) if (!exclude_current_version || sst->get_version() < last_version) { tables.emplace_back(sst); } } co_return std::move(tables); }; // doing a "cleanup" is about as compacting as we need // to be, provided we get to decide the tables to process, // and ignoring any existing operations. // Note that we potentially could be doing multiple // upgrades here in parallel, but that is really the users // problem. co_await rewrite_sstables(t, compaction_type_options::make_upgrade(), std::move(sorted_owned_ranges), std::move(get_sstables), info).discard_result(); } future compaction_manager::perform_split_compaction(compaction_group_view& t, compaction_type_options::split opt, tasks::task_info info) { auto get_sstables = [this, &t] () -> future> { return get_candidates(t); }; owned_ranges_ptr owned_ranges_ptr = {}; auto options = compaction_type_options::make_split(std::move(opt.classifier)); return perform_task_on_all_files("split", info, t, std::move(options), std::move(owned_ranges_ptr), std::move(get_sstables), throw_if_stopping::no); } std::exception_ptr compaction_manager::make_disabled_exception(compaction::compaction_group_view& cg) { std::exception_ptr ex; if (_in_critical_disk_utilization_mode) { ex = std::make_exception_ptr(std::runtime_error("critical disk utilization")); } else { ex = std::make_exception_ptr(compaction_stopped_exception(cg.schema()->ks_name(), cg.schema()->cf_name(), "compaction disabled")); } return ex; } future> compaction_manager::maybe_split_new_sstable(sstables::shared_sstable sst, compaction_group_view& t, compaction_type_options::split opt) { if (!split_compaction_task_executor::sstable_needs_split(sst, opt)) { co_return std::vector{sst}; } // Throw an error if split cannot be performed due to e.g. out of space prevention. // We don't want to prevent split because compaction is temporarily disabled on a view only for synchronization, // which is unneeded against new sstables that aren't part of any set yet, so never use can_proceed(&t) here. if (is_disabled()) { co_return coroutine::exception(make_disabled_exception(t)); } std::vector ret; auto gate = get_compaction_state(&t).gate.hold(); compaction_progress_monitor monitor; compaction_data info = create_compaction_data(); compaction_descriptor desc = split_compaction_task_executor::make_descriptor(sst, opt); desc.creator = [&t, sst] (shard_id _) { // NOTE: preserves the sstable state, since we want the output to be on the same state as the original. // For example, if base table has views, it's important that sstable produced by repair will be // in the staging state. return t.make_sstable(sst->state()); }; desc.replacer = [&] (compaction_completion_desc d) { std::move(d.new_sstables.begin(), d.new_sstables.end(), std::back_inserter(ret)); }; co_await compact_sstables(std::move(desc), info, t, monitor); co_await sst->unlink(); co_return ret; } future> compaction_manager::perform_component_rewrite(compaction::compaction_group_view& t, tasks::task_info info, std::vector sstables, sstables::component_type component, std::function modifier, compaction_type_options::component_rewrite::update_sstable_id update_id) { std::unordered_map rewritten_sstables; rewritten_sstables.reserve(sstables.size()); co_await rewrite_sstables_component(t, sstables, compaction_type_options::make_component_rewrite(component, std::move(modifier), update_id), rewritten_sstables, info); co_return rewritten_sstables; } // Submit a table to be scrubbed and wait for its termination. future compaction_manager::perform_sstable_scrub(compaction_group_view& t, compaction_type_options::scrub opts, tasks::task_info info) { auto scrub_mode = opts.operation_mode; if (scrub_mode == compaction_type_options::scrub::mode::validate) { co_return co_await perform_sstable_scrub_validate_mode(t, info, opts.quarantine_sstables); } owned_ranges_ptr owned_ranges_ptr = {}; sstring option_desc = fmt::format("mode: {};\nquarantine_mode: {}\n", opts.operation_mode, opts.quarantine_operation_mode); co_return co_await rewrite_sstables(t, compaction_type_options::make_scrub(scrub_mode, opts.quarantine_sstables, opts.drop_unfixable), std::move(owned_ranges_ptr), [&t, opts] -> future> { auto all_sstables = co_await get_all_sstables(t); std::vector sstables = all_sstables | std::views::filter([&opts] (const sstables::shared_sstable& sst) { if (sst->requires_view_building()) { return false; } switch (opts.quarantine_operation_mode) { case compaction_type_options::scrub::quarantine_mode::include: return true; case compaction_type_options::scrub::quarantine_mode::exclude: return !sst->is_quarantined(); case compaction_type_options::scrub::quarantine_mode::only: return sst->is_quarantined(); } on_internal_error(cmlog, "bad scrub quarantine mode"); }) | std::ranges::to(); co_return std::vector(std::move(sstables)); }, info, can_purge_tombstones::no, std::move(option_desc)); } compaction::compaction_state::compaction_state(compaction_group_view& t) : gate(format("compaction_state for table {}.{}", t.schema()->ks_name(), t.schema()->cf_name())) { } void compaction_manager::add(compaction_group_view& t) { auto [_, inserted] = _compaction_state.try_emplace(&t, t); if (!inserted) { on_internal_error(cmlog, format("compaction_state for table {} [{}] already exists", t, fmt::ptr(&t))); } } compaction_reenabler compaction_manager::add_with_compaction_disabled(compaction_group_view& view) { add(view); return compaction_reenabler(*this, view); } future<> compaction_manager::remove(compaction_group_view& t, sstring reason) noexcept { auto& c_state = get_compaction_state(&t); auto erase_state = defer([&t, this] () noexcept { t.get_backlog_tracker().disable(); _compaction_state.erase(&t); }); // We need to guarantee that a task being stopped will not retry to compact // a table being removed. // The requirement above is provided by stop_ongoing_compactions(). _postponed.erase(&t); // Wait for all compaction tasks running under gate to terminate // and prevent new tasks from entering the gate. if (!c_state.gate.is_closed()) { auto close_gate = c_state.gate.close(); co_await stop_ongoing_compactions(reason, &t); // Wait for users of incremental repair lock (can be either repair itself or maintenance compactions). co_await c_state.incremental_repair_lock.write_lock(); co_await std::move(close_gate); } #ifdef DEBUG auto found = false; sstring msg; for (auto& task : _tasks) { if (task.compacting_table() == &t) { if (!msg.empty()) { msg += "\n"; } msg += format("Found {} after remove", task); found = true; } } if (found) { on_internal_error_noexcept(cmlog, msg); } #endif } const std::vector compaction_manager::get_compactions(std::function filter) const { auto to_info = [] (const compaction_task_executor& task) { compaction_info ret; ret.compaction_uuid = task.compaction_data().compaction_uuid; ret.type = task.compaction_type(); ret.ks_name = task.compacting_table()->schema()->ks_name(); ret.cf_name = task.compacting_table()->schema()->cf_name(); ret.total_partitions = task.compaction_data().total_partitions; ret.total_keys_written = task.compaction_data().total_keys_written; return ret; }; return _tasks | std::views::filter([&filter] (const compaction_task_executor& task) { return filter(task.compacting_table()); }) | std::views::transform(to_info) | std::ranges::to(); } bool compaction_manager::has_table_ongoing_compaction(const compaction_group_view& t) const { return std::any_of(_tasks.begin(), _tasks.end(), [&t] (const compaction_task_executor& task) { return task.compacting_table() == &t && task.compaction_running(); }); }; bool compaction_manager::compaction_disabled(compaction_group_view& t) const { if (auto it = _compaction_state.find(&t); it != _compaction_state.end()) { return it->second.compaction_disabled(); } else { cmlog.debug("compaction_disabled: {}:{} not in compaction_state", t.schema()->id(), t.get_group_id()); // Compaction is not strictly disabled, but it is not enabled either. // The callers actually care about if it's enabled or not, not about the actual state of // compaction_state::compaction_disabled() return true; } } future<> compaction_manager::stop_compaction(sstring type, std::function filter) { compaction_type target_type; try { target_type = to_compaction_type(type); } catch (...) { throw std::runtime_error(format("Compaction of type {} cannot be stopped by compaction manager: {}", type.c_str(), std::current_exception())); } switch (target_type) { case compaction_type::Validation: case compaction_type::Index_build: throw std::runtime_error(format("Compaction type {} is unsupported", type.c_str())); case compaction_type::Reshard: throw std::runtime_error(format("Stopping compaction of type {} is disallowed", type.c_str())); default: break; } return stop_ongoing_compactions("user request", std::move(filter), target_type); } void compaction_manager::propagate_replacement(compaction_group_view& t, const std::vector& removed, const std::vector& added) { for (auto& task : _tasks) { if (task.compacting_table() == &t && task.compaction_running()) { task.compaction_data().pending_replacements.push_back({ removed, added }); } } } strategy_control& compaction_manager::get_strategy_control() const noexcept { return *_strategy_control; } void compaction_manager::plug_system_keyspace(db::system_keyspace& sys_ks) noexcept { _sys_ks.plug(sys_ks.shared_from_this()); } future<> compaction_manager::unplug_system_keyspace() noexcept { co_await _sys_ks.unplug(); } double compaction_backlog_tracker::backlog() const { return disabled() ? compaction_controller::disable_backlog : _impl->backlog(_ongoing_writes, _ongoing_compactions); } void compaction_backlog_tracker::replace_sstables(const std::vector& old_ssts, const std::vector& new_ssts) { if (disabled()) { return; } auto filter_and_revert_charges = [this] (const std::vector& ssts) { std::vector ret; for (auto& sst : ssts) { if (sstable_belongs_to_tracker(sst)) { revert_charges(sst); ret.push_back(sst); } } return ret; }; // FIXME: propagate exception to caller once all replace_sstables implementations provide strong exception safety guarantees. try { _impl->replace_sstables(filter_and_revert_charges(old_ssts), filter_and_revert_charges(new_ssts)); } catch (...) { cmlog.error("Disabling backlog tracker due to exception {}", std::current_exception()); // FIXME: tracker should be able to recover from a failure, e.g. OOM, by having its state reset. More details on https://github.com/scylladb/scylla/issues/10297. disable(); } } bool compaction_backlog_tracker::sstable_belongs_to_tracker(const sstables::shared_sstable& sst) { return is_eligible_for_compaction(sst); } void compaction_backlog_tracker::register_partially_written_sstable(sstables::shared_sstable sst, backlog_write_progress_manager& wp) { if (disabled()) { return; } try { _ongoing_writes.emplace(sst, &wp); } catch (...) { // We can potentially recover from adding ongoing compactions or writes when the process // ends. The backlog will just be temporarily wrong. If we are are suffering from something // more serious like memory exhaustion we will soon fail again in either add / remove and // then we'll disable the tracker. For now, try our best. cmlog.warn("backlog tracker couldn't register partially written SSTable to exception {}", std::current_exception()); } } void compaction_backlog_tracker::register_compacting_sstable(sstables::shared_sstable sst, backlog_read_progress_manager& rp) { if (disabled()) { return; } try { _ongoing_compactions.emplace(sst, &rp); } catch (...) { cmlog.warn("backlog tracker couldn't register partially compacting SSTable to exception {}", std::current_exception()); } } void compaction_backlog_tracker::copy_ongoing_charges(compaction_backlog_tracker& new_bt, bool move_read_charges) const { for (auto&& w : _ongoing_writes) { new_bt.register_partially_written_sstable(w.first, *w.second); } if (move_read_charges) { for (auto&& w : _ongoing_compactions) { new_bt.register_compacting_sstable(w.first, *w.second); } } } void compaction_backlog_tracker::revert_charges(sstables::shared_sstable sst) { _ongoing_writes.erase(sst); _ongoing_compactions.erase(sst); } compaction_backlog_tracker::compaction_backlog_tracker(compaction_backlog_tracker&& other) : _impl(std::move(other._impl)) , _ongoing_writes(std::move(other._ongoing_writes)) , _ongoing_compactions(std::move(other._ongoing_compactions)) { if (other._manager) { on_internal_error(cmlog, "compaction_backlog_tracker is moved while registered"); } } compaction_backlog_tracker::~compaction_backlog_tracker() { if (_manager) { _manager->remove_backlog_tracker(this); } } void compaction_backlog_manager::remove_backlog_tracker(compaction_backlog_tracker* tracker) { _backlog_trackers.erase(tracker); } double compaction_backlog_manager::backlog() const { try { double backlog = 0; for (auto& tracker: _backlog_trackers) { backlog += tracker->backlog(); } if (compaction_controller::backlog_disabled(backlog)) { return compaction_controller::disable_backlog; } else { return backlog; } } catch (...) { return _compaction_controller->backlog_of_shares(1000); } } void compaction_backlog_manager::register_backlog_tracker(compaction_backlog_tracker& tracker) { tracker._manager = this; _backlog_trackers.insert(&tracker); } compaction_backlog_manager::~compaction_backlog_manager() { for (auto* tracker : _backlog_trackers) { tracker->_manager = nullptr; } } compaction_backlog_tracker& compaction_manager::get_backlog_tracker(compaction_group_view& t) { return t.get_backlog_tracker(); } }