diff --git a/compaction/compaction_manager.cc b/compaction/compaction_manager.cc index 5d9879a329..fdb9dbcb2d 100644 --- a/compaction/compaction_manager.cc +++ b/compaction/compaction_manager.cc @@ -1128,7 +1128,10 @@ void compaction_manager::enable() { _compaction_submission_timer.cancel(); _compaction_submission_timer.arm_periodic(periodic_compaction_submission_interval()); - _waiting_reevalution = postponed_compactions_reevaluation(); + if (_waiting_reevaluation) { + on_internal_error(cmlog, "postponed compactions reevaluation is already running when enabling compaction manager"); + } + _waiting_reevaluation.emplace(postponed_compactions_reevaluation()); cmlog.info("Enabled"); } @@ -1176,6 +1179,16 @@ void compaction_manager::reevaluate_postponed_compactions() noexcept { _postponed_reevaluation.signal(); } +future<> compaction_manager::stop_postponed_compactions() noexcept { + auto waiting_reevaluation = std::exchange(_waiting_reevaluation, std::nullopt); + if (!waiting_reevaluation) { + return make_ready_future(); + } + // Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber + reevaluate_postponed_compactions(); + return std::move(*waiting_reevaluation); +} + void compaction_manager::postpone_compaction_for_table(compaction_group_view* t) { _postponed.insert(t); } @@ -1259,8 +1272,7 @@ future<> compaction_manager::drain() { _compaction_submission_timer.cancel(); // Stop ongoing compactions, if the request has not been sent already and wait for them to stop. co_await stop_ongoing_compactions("drain"); - // Trigger a signal to properly exit from postponed_compactions_reevaluation() fiber - reevaluate_postponed_compactions(); + co_await stop_postponed_compactions(); cmlog.info("Drained"); } @@ -1304,8 +1316,7 @@ future<> compaction_manager::really_do_stop() noexcept { if (!_tasks.empty()) { on_fatal_internal_error(cmlog, format("{} tasks still exist after being stopped", _tasks.size())); } - reevaluate_postponed_compactions(); - co_await std::move(_waiting_reevalution); + co_await stop_postponed_compactions(); co_await _sys_ks.close(); _weight_tracker.clear(); _compaction_submission_timer.cancel(); diff --git a/compaction/compaction_manager.hh b/compaction/compaction_manager.hh index 92f215ecb6..0b776e6625 100644 --- a/compaction/compaction_manager.hh +++ b/compaction/compaction_manager.hh @@ -127,7 +127,7 @@ private: // a sstable from being compacted twice. std::unordered_set _compacting_sstables; - future<> _waiting_reevalution = make_ready_future<>(); + std::optional> _waiting_reevaluation; condition_variable _postponed_reevaluation; // tables that wait for compaction but had its submission postponed due to ongoing compaction. std::unordered_set _postponed; @@ -237,6 +237,7 @@ private: future<> postponed_compactions_reevaluation(); void reevaluate_postponed_compactions() noexcept; + future<> stop_postponed_compactions() noexcept; // Postpone compaction for a table that couldn't be executed due to ongoing // similar-sized compaction. void postpone_compaction_for_table(compaction::compaction_group_view* t);