From d2c44cba771defa02c47d0c503c5c817163d0842 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 5 Dec 2022 21:58:48 +0200 Subject: [PATCH 1/3] compaction_manager: make postponed_compactions_reevaluation() return a future postponed_compactions_reevaluation() runs until compaction_manager is stopped, checking if it needs to launch new compactions. Make it return a future instead of stashing its completion somewhere. This makes is easier to convert it to a coroutine. --- compaction/compaction_manager.cc | 6 +++--- compaction/compaction_manager.hh | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index e614d2d634..15f11823ad 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -737,7 +737,7 @@ void compaction_manager::enable() { assert(_state == state::none || _state == state::disabled); _state = state::enabled; _compaction_submission_timer.arm(periodic_compaction_submission_interval()); - postponed_compactions_reevaluation(); + _waiting_reevalution = postponed_compactions_reevaluation(); } std::function compaction_manager::compaction_submission_callback() { @@ -748,8 +748,8 @@ std::function compaction_manager::compaction_submission_callback() { }; } -void compaction_manager::postponed_compactions_reevaluation() { - _waiting_reevalution = repeat([this] { +future<> compaction_manager::postponed_compactions_reevaluation() { + return repeat([this] { return _postponed_reevaluation.wait().then([this] { if (_state != state::enabled) { _postponed.clear(); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 9ec90bec1a..ec5bee12da 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -350,7 +350,7 @@ private: // table still exists and compaction is not disabled for the table. inline bool can_proceed(compaction::table_state* t) const; - void postponed_compactions_reevaluation(); + future<> postponed_compactions_reevaluation(); void reevaluate_postponed_compactions() noexcept; // Postpone compaction for a table that couldn't be executed due to ongoing // similar-sized compaction. From 1669025736fb647d4e840340ea44867cda6eb7dc Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 5 Dec 2022 22:01:41 +0200 Subject: [PATCH 2/3] compaction_manager: coroutinize postponed_compactions_reevaluation() So much nicer. --- compaction/compaction_manager.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 15f11823ad..9c29a42ebe 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -749,11 +749,12 @@ std::function compaction_manager::compaction_submission_callback() { } future<> compaction_manager::postponed_compactions_reevaluation() { - return repeat([this] { - return _postponed_reevaluation.wait().then([this] { + while (true) { + co_await _postponed_reevaluation.when(); + { if (_state != state::enabled) { _postponed.clear(); - return stop_iteration::yes; + co_return; } auto postponed = std::move(_postponed); try { @@ -765,9 +766,8 @@ future<> compaction_manager::postponed_compactions_reevaluation() { } catch (...) { _postponed = std::move(postponed); } - return stop_iteration::no; - }); - }); + } + } } void compaction_manager::reevaluate_postponed_compactions() noexcept { From d2b1d2f69526d64da3c7688bfb7abfddeda09966 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Mon, 5 Dec 2022 22:02:27 +0200 Subject: [PATCH 3/3] compaction_manager: reindent postponed_compactions_reevaluation() --- compaction/compaction_manager.cc | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 9c29a42ebe..ea3900085e 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -751,21 +751,19 @@ std::function compaction_manager::compaction_submission_callback() { future<> compaction_manager::postponed_compactions_reevaluation() { while (true) { co_await _postponed_reevaluation.when(); - { - if (_state != state::enabled) { - _postponed.clear(); - co_return; - } - auto postponed = std::move(_postponed); - try { - for (auto& t : postponed) { - auto s = t->schema(); - cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t)); - submit(*t); - } - } catch (...) { - _postponed = std::move(postponed); + if (_state != state::enabled) { + _postponed.clear(); + co_return; + } + auto postponed = std::move(_postponed); + try { + for (auto& t : postponed) { + auto s = t->schema(); + cmlog.debug("resubmitting postponed compaction for table {}.{} [{}]", s->ks_name(), s->cf_name(), fmt::ptr(t)); + submit(*t); } + } catch (...) { + _postponed = std::move(postponed); } } }