From f26e655646ab40f3dfd8d8412ba51bd5168be93d Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sun, 3 Jul 2022 18:36:50 +0300 Subject: [PATCH] compaction_manager: add maybe_wait_for_sstable_count_reduction Called from try_flush_memtable_to_sstable, maybe_wait_for_sstable_count_reduction will wait for compaction to catch up with memtable flush if there the bucket to compact is inflated, having too many sstables. In that case we don't want to add fuel to the fire by creating yet another sstable. Fixes #4116 Signed-off-by: Benny Halevy --- compaction/compaction_manager.cc | 49 ++++++++++++++++++++++++++++++++ compaction/compaction_manager.hh | 14 +++++++++ replica/table.cc | 3 ++ 3 files changed, 66 insertions(+) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index b5a254a85f..f9018f8ebe 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -513,6 +513,10 @@ inline compaction_controller make_compaction_controller(compaction_manager::sche return compaction_controller(csg, static_shares, 250ms, std::move(fn)); } +compaction_manager::compaction_state::~compaction_state() { + compaction_done.broken(); +} + std::string compaction_manager::task::describe() const { auto* t = _compacting_table; auto s = t->schema(); @@ -611,6 +615,7 @@ void compaction_manager::task::finish_compaction(state finish_state) noexcept { if (finish_state != state::failed) { _compaction_retry.reset(); } + _compaction_state.compaction_done.signal(); } void compaction_manager::task::stop(sstring reason) noexcept { @@ -981,6 +986,50 @@ void compaction_manager::submit(compaction::table_state& t) { (void)perform_task(make_shared(*this, t)); } +bool compaction_manager::can_perform_regular_compaction(compaction::table_state& t) { + return can_proceed(&t) && !t.is_auto_compaction_disabled_by_user(); +} + +future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction::table_state& t) { + auto schema = t.schema(); + if (!can_perform_regular_compaction(t)) { + cmlog.trace("maybe_wait_for_sstable_count_reduction in {}.{}: cannot perform regular compaction", + schema->ks_name(), schema->cf_name()); + co_return; + } + auto num_runs_for_compaction = [&, this] { + auto& cs = t.get_compaction_strategy(); + auto desc = cs.get_sstables_for_compaction(t, get_strategy_control(), get_candidates(t)); + return boost::copy_range>( + desc.sstables + | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier))).size(); + }; + const auto threshold = std::max(schema->max_compaction_threshold(), 32); + auto count = num_runs_for_compaction(); + if (count <= threshold) { + cmlog.trace("No need to wait for sstable count reduction in {}.{}: {} <= {}", + schema->ks_name(), schema->cf_name(), count, threshold); + co_return; + } + // Reduce the chances of falling into an endless wait, if compaction + // wasn't scheduled for the table due to a problem. + submit(t); + using namespace std::chrono_literals; + auto start = db_clock::now(); + auto& cstate = get_compaction_state(&t); + try { + co_await cstate.compaction_done.wait([this, &num_runs_for_compaction, threshold, &t] { + return num_runs_for_compaction() <= threshold || !can_perform_regular_compaction(t); + }); + } catch (const broken_condition_variable&) { + co_return; + } + auto end = db_clock::now(); + auto elapsed_ms = (end - start) / 1ms; + cmlog.warn("Waited {}ms for compaction of {}.{} to catch up on {} sstable runs", + elapsed_ms, schema->ks_name(), schema->cf_name(), count); +} + class compaction_manager::offstrategy_compaction_task : public compaction_manager::task { bool _performed = false; public: diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 067399b3a7..b348e4e659 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -66,6 +66,13 @@ private: // Raised by any function running under run_with_compaction_disabled(); long compaction_disabled_counter = 0; + // Signaled whenever a compaction task completes. + condition_variable compaction_done; + + compaction_state() = default; + compaction_state(compaction_state&&) = default; + ~compaction_state(); + bool compaction_disabled() const noexcept { return compaction_disabled_counter > 0; } @@ -379,6 +386,13 @@ public: // Submit a table to be compacted. void submit(compaction::table_state& t); + // Can regular compaction be performed in the given table + bool can_perform_regular_compaction(compaction::table_state& t); + + // Maybe wait before adding more sstables + // if there are too many sstables. + future<> maybe_wait_for_sstable_count_reduction(compaction::table_state& t); + // Submit a table to be off-strategy compacted. // Returns true iff off-strategy compaction was required and performed. future perform_offstrategy(compaction::table_state& t); diff --git a/replica/table.cc b/replica/table.cc index c0882b7f93..26bb6944fd 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -761,6 +761,9 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ _memtables->erase(old); co_return; } + if (!_async_gate.is_closed()) { + co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(as_table_state()); + } } catch (...) { err = std::current_exception(); }