diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index b5a254a85f..f9018f8ebe 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -513,6 +513,10 @@ inline compaction_controller make_compaction_controller(compaction_manager::sche return compaction_controller(csg, static_shares, 250ms, std::move(fn)); } +compaction_manager::compaction_state::~compaction_state() { + compaction_done.broken(); +} + std::string compaction_manager::task::describe() const { auto* t = _compacting_table; auto s = t->schema(); @@ -611,6 +615,7 @@ void compaction_manager::task::finish_compaction(state finish_state) noexcept { if (finish_state != state::failed) { _compaction_retry.reset(); } + _compaction_state.compaction_done.signal(); } void compaction_manager::task::stop(sstring reason) noexcept { @@ -981,6 +986,50 @@ void compaction_manager::submit(compaction::table_state& t) { (void)perform_task(make_shared(*this, t)); } +bool compaction_manager::can_perform_regular_compaction(compaction::table_state& t) { + return can_proceed(&t) && !t.is_auto_compaction_disabled_by_user(); +} + +future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction::table_state& t) { + auto schema = t.schema(); + if (!can_perform_regular_compaction(t)) { + cmlog.trace("maybe_wait_for_sstable_count_reduction in {}.{}: cannot perform regular compaction", + schema->ks_name(), schema->cf_name()); + co_return; + } + auto num_runs_for_compaction = [&, this] { + auto& cs = t.get_compaction_strategy(); + auto desc = cs.get_sstables_for_compaction(t, get_strategy_control(), get_candidates(t)); + return boost::copy_range>( + desc.sstables + | boost::adaptors::transformed(std::mem_fn(&sstables::sstable::run_identifier))).size(); + }; + const auto threshold = std::max(schema->max_compaction_threshold(), 32); + auto count = num_runs_for_compaction(); + if (count <= threshold) { + cmlog.trace("No need to wait for sstable count reduction in {}.{}: {} <= {}", + schema->ks_name(), schema->cf_name(), count, threshold); + co_return; + } + // Reduce the chances of falling into an endless wait, if compaction + // wasn't scheduled for the table due to a problem. + submit(t); + using namespace std::chrono_literals; + auto start = db_clock::now(); + auto& cstate = get_compaction_state(&t); + try { + co_await cstate.compaction_done.wait([this, &num_runs_for_compaction, threshold, &t] { + return num_runs_for_compaction() <= threshold || !can_perform_regular_compaction(t); + }); + } catch (const broken_condition_variable&) { + co_return; + } + auto end = db_clock::now(); + auto elapsed_ms = (end - start) / 1ms; + cmlog.warn("Waited {}ms for compaction of {}.{} to catch up on {} sstable runs", + elapsed_ms, schema->ks_name(), schema->cf_name(), count); +} + class compaction_manager::offstrategy_compaction_task : public compaction_manager::task { bool _performed = false; public: diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 067399b3a7..b348e4e659 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -66,6 +66,13 @@ private: // Raised by any function running under run_with_compaction_disabled(); long compaction_disabled_counter = 0; + // Signaled whenever a compaction task completes. + condition_variable compaction_done; + + compaction_state() = default; + compaction_state(compaction_state&&) = default; + ~compaction_state(); + bool compaction_disabled() const noexcept { return compaction_disabled_counter > 0; } @@ -379,6 +386,13 @@ public: // Submit a table to be compacted. void submit(compaction::table_state& t); + // Can regular compaction be performed in the given table + bool can_perform_regular_compaction(compaction::table_state& t); + + // Maybe wait before adding more sstables + // if there are too many sstables. + future<> maybe_wait_for_sstable_count_reduction(compaction::table_state& t); + // Submit a table to be off-strategy compacted. // Returns true iff off-strategy compaction was required and performed. future perform_offstrategy(compaction::table_state& t); diff --git a/replica/table.cc b/replica/table.cc index c0882b7f93..26bb6944fd 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -761,6 +761,9 @@ table::try_flush_memtable_to_sstable(lw_shared_ptr old, sstable_write_ _memtables->erase(old); co_return; } + if (!_async_gate.is_closed()) { + co_await _compaction_manager.maybe_wait_for_sstable_count_reduction(as_table_state()); + } } catch (...) { err = std::current_exception(); }