diff --git a/replica/logstor/compaction.hh b/replica/logstor/compaction.hh index 98074a1b9b..9e99f0e828 100644 --- a/replica/logstor/compaction.hh +++ b/replica/logstor/compaction.hh @@ -177,6 +177,20 @@ struct separator_buffer { } }; +class compaction_reenabler { + std::function _release; +public: + compaction_reenabler() = default; + explicit compaction_reenabler(std::function release) + : _release(std::move(release)) {} + ~compaction_reenabler() { if (_release) _release(); } + + compaction_reenabler(compaction_reenabler&&) = default; + compaction_reenabler& operator=(compaction_reenabler&&) = default; + compaction_reenabler(const compaction_reenabler&) = delete; + compaction_reenabler& operator=(const compaction_reenabler&) = delete; +}; + class compaction_manager { public: virtual ~compaction_manager() = default; @@ -188,6 +202,8 @@ public: virtual void submit(replica::compaction_group&) = 0; virtual future<> stop_ongoing_compactions(replica::compaction_group&) = 0; + + virtual future disable_compaction(replica::compaction_group&) = 0; }; } diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index ad7f1770dc..ad45824e00 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -392,6 +392,7 @@ private: bool running{false}; shared_future<> completion{make_ready_future<>()}; abort_source as; + int compaction_disabled_counter{0}; }; absl::flat_hash_map> _groups; @@ -418,6 +419,7 @@ private: void submit(compaction_group&) override; future<> stop_ongoing_compactions(compaction_group&) override; + future disable_compaction(replica::compaction_group&) override; std::vector select_segments_for_compaction(const segment_descriptor_hist&); future<> do_compact(compaction_group&, abort_source&); @@ -1304,7 +1306,7 @@ void compaction_manager_impl::submit(compaction_group& cg) { state_ptr = std::make_unique(); } auto& state = *state_ptr; - if (state.running) { + if (state.running || state.compaction_disabled_counter > 0) { return; } state.running = true; @@ -1329,6 +1331,26 @@ future<> compaction_manager_impl::stop_ongoing_compactions(compaction_group& cg) _groups.erase(it); } +future compaction_manager_impl::disable_compaction(compaction_group& cg) { + auto& state_ptr = _groups[&cg]; + if (!state_ptr) { + state_ptr = std::make_unique(); + } + auto& state = *state_ptr; + + ++state.compaction_disabled_counter; + + // Wait for any ongoing compaction to finish before disabling + co_await state.completion.get_future(); + + co_return compaction_reenabler([this, &cg] { + auto it = _groups.find(&cg); + if (it != _groups.end()) { + --it->second->compaction_disabled_counter; + } + }); +} + std::vector compaction_manager_impl::select_segments_for_compaction(const segment_descriptor_hist& segments) { size_t accum_net_data_size = 0; size_t accum_record_count = 0;