diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index e614d2d634..ea3900085e 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -737,7 +737,7 @@ void compaction_manager::enable() { assert(_state == state::none || _state == state::disabled); _state = state::enabled; _compaction_submission_timer.arm(periodic_compaction_submission_interval()); - postponed_compactions_reevaluation(); + _waiting_reevalution = postponed_compactions_reevaluation(); } std::function compaction_manager::compaction_submission_callback() { @@ -748,26 +748,24 @@ std::function compaction_manager::compaction_submission_callback() { }; } -void compaction_manager::postponed_compactions_reevaluation() { - _waiting_reevalution = repeat([this] { - return _postponed_reevaluation.wait().then([this] { - if (_state != state::enabled) { - _postponed.clear(); - return stop_iteration::yes; +future<> compaction_manager::postponed_compactions_reevaluation() { + while (true) { + co_await _postponed_reevaluation.when(); + if (_state != state::enabled) { + _postponed.clear(); + co_return; + } + auto postponed = std::move(_postponed); + try { + for (auto& t : postponed) { + auto s = t->schema(); + cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t)); + submit(*t); } - auto postponed = std::move(_postponed); - try { - for (auto& t : postponed) { - auto s = t->schema(); - cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t)); - submit(*t); - } - } catch (...) { - _postponed = std::move(postponed); - } - return stop_iteration::no; - }); - }); + } catch (...) { + _postponed = std::move(postponed); + } + } } void compaction_manager::reevaluate_postponed_compactions() noexcept { diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 9ec90bec1a..ec5bee12da 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -350,7 +350,7 @@ private: // table still exists and compaction is not disabled for the table. inline bool can_proceed(compaction::table_state* t) const; - void postponed_compactions_reevaluation(); + future<> postponed_compactions_reevaluation(); void reevaluate_postponed_compactions() noexcept; // Postpone compaction for a table that couldn't be executed due to ongoing // similar-sized compaction.