logstor: add compaction reenabler

add a function that stops and disabled compaction for a compaction group
and returns a compaction reenabler object, similarly to the normal
compaction manager.

this will be useful for disabling compaction while doing operations on
the compaction group's logstor segment set.
This commit is contained in:
Michael Litvak
2026-03-18 18:41:38 +01:00
parent 1d7c2e4f52
commit 684ce8de71
2 changed files with 39 additions and 1 deletions

View File

@@ -177,6 +177,20 @@ struct separator_buffer {
}
};
class compaction_reenabler {
std::function<void()> _release;
public:
compaction_reenabler() = default;
explicit compaction_reenabler(std::function<void()> release)
: _release(std::move(release)) {}
~compaction_reenabler() { if (_release) _release(); }
compaction_reenabler(compaction_reenabler&&) = default;
compaction_reenabler& operator=(compaction_reenabler&&) = default;
compaction_reenabler(const compaction_reenabler&) = delete;
compaction_reenabler& operator=(const compaction_reenabler&) = delete;
};
class compaction_manager {
public:
virtual ~compaction_manager() = default;
@@ -188,6 +202,8 @@ public:
virtual void submit(replica::compaction_group&) = 0;
virtual future<> stop_ongoing_compactions(replica::compaction_group&) = 0;
virtual future<compaction_reenabler> disable_compaction(replica::compaction_group&) = 0;
};
}

View File

@@ -392,6 +392,7 @@ private:
bool running{false};
shared_future<> completion{make_ready_future<>()};
abort_source as;
int compaction_disabled_counter{0};
};
absl::flat_hash_map<compaction_group*, std::unique_ptr<group_compaction_state>> _groups;
@@ -418,6 +419,7 @@ private:
void submit(compaction_group&) override;
future<> stop_ongoing_compactions(compaction_group&) override;
future<compaction_reenabler> disable_compaction(replica::compaction_group&) override;
std::vector<log_segment_id> select_segments_for_compaction(const segment_descriptor_hist&);
future<> do_compact(compaction_group&, abort_source&);
@@ -1304,7 +1306,7 @@ void compaction_manager_impl::submit(compaction_group& cg) {
state_ptr = std::make_unique<group_compaction_state>();
}
auto& state = *state_ptr;
if (state.running) {
if (state.running || state.compaction_disabled_counter > 0) {
return;
}
state.running = true;
@@ -1329,6 +1331,26 @@ future<> compaction_manager_impl::stop_ongoing_compactions(compaction_group& cg)
_groups.erase(it);
}
future<compaction_reenabler> compaction_manager_impl::disable_compaction(compaction_group& cg) {
auto& state_ptr = _groups[&cg];
if (!state_ptr) {
state_ptr = std::make_unique<group_compaction_state>();
}
auto& state = *state_ptr;
++state.compaction_disabled_counter;
// Wait for any ongoing compaction to finish before disabling
co_await state.completion.get_future();
co_return compaction_reenabler([this, &cg] {
auto it = _groups.find(&cg);
if (it != _groups.end()) {
--it->second->compaction_disabled_counter;
}
});
}
std::vector<log_segment_id> compaction_manager_impl::select_segments_for_compaction(const segment_descriptor_hist& segments) {
size_t accum_net_data_size = 0;
size_t accum_record_count = 0;