diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 4cbce9784e..02ab48f7c7 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -219,6 +219,9 @@ public: // Merges all sstables from another group into this one. future<> merge_sstables_from(compaction_group& group); + // Merges all logstor segments from another group into this one. + future<> merge_logstor_segments_from(compaction_group& group); + const lw_shared_ptr& main_sstables() const noexcept; sstables::sstable_set make_main_sstable_set() const; void set_main_sstables(lw_shared_ptr new_main_sstables); diff --git a/replica/logstor/compaction.hh b/replica/logstor/compaction.hh index 9e99f0e828..60540d2fc0 100644 --- a/replica/logstor/compaction.hh +++ b/replica/logstor/compaction.hh @@ -11,6 +11,7 @@ #include "utils/chunked_vector.hh" #include "write_buffer.hh" #include "utils/log_heap.hh" +#include namespace replica::logstor { @@ -68,6 +69,18 @@ struct segment_set { segment_descriptor_hist _segments; size_t _segment_count{0}; + future<> merge(segment_set& other) { + while (!other._segments.empty()) { + auto& desc = other._segments.one_of_largest(); + other._segments.erase(desc); + --other._segment_count; + desc.owner = this; + _segments.push(desc); + ++_segment_count; + co_await coroutine::maybe_yield(); + } + } + void add_segment(segment_descriptor& desc) { if (desc.owner) { on_internal_error(logstor_logger, "add_segment called for segment that has an owner"); @@ -204,6 +217,7 @@ public: virtual future<> stop_ongoing_compactions(replica::compaction_group&) = 0; virtual future disable_compaction(replica::compaction_group&) = 0; + virtual compaction_reenabler disable_compaction_no_wait(replica::compaction_group&) = 0; }; } diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index ad45824e00..c3165572a4 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -420,6 +420,7 @@ private: void submit(compaction_group&) override; future<> stop_ongoing_compactions(compaction_group&) override; future disable_compaction(replica::compaction_group&) override; + compaction_reenabler disable_compaction_no_wait(replica::compaction_group&) override; std::vector select_segments_for_compaction(const segment_descriptor_hist&); future<> do_compact(compaction_group&, abort_source&); @@ -1351,6 +1352,23 @@ future compaction_manager_impl::disable_compaction(compact }); } +compaction_reenabler compaction_manager_impl::disable_compaction_no_wait(compaction_group& cg) { + auto& state_ptr = _groups[&cg]; + if (!state_ptr) { + state_ptr = std::make_unique(); + } + auto& state = *state_ptr; + + ++state.compaction_disabled_counter; + + return compaction_reenabler([this, &cg] { + auto it = _groups.find(&cg); + if (it != _groups.end()) { + --it->second->compaction_disabled_counter; + } + }); +} + std::vector compaction_manager_impl::select_segments_for_compaction(const segment_descriptor_hist& segments) { size_t accum_net_data_size = 0; size_t accum_record_count = 0; diff --git a/replica/table.cc b/replica/table.cc index 44c1e63637..e4253d7719 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -20,9 +20,11 @@ #include #include "dht/decorated_key.hh" +#include "readers/mutation_reader.hh" #include "replica/database.hh" #include "replica/data_dictionary_impl.hh" #include "replica/compaction_group.hh" +#include "replica/logstor/compaction.hh" #include "replica/query_state.hh" #include "sstables/shared_sstable.hh" #include "sstables/sstable_set.hh" @@ -797,6 +799,7 @@ class tablet_storage_group_manager final : public storage_group_manager { std::optional _pending_merge_fiber_work; // Holds compaction reenabler which disables compaction temporarily during tablet merge std::vector _compaction_reenablers_for_merging; + std::vector _compaction_reenablers_for_logstor_merging; private: const schema_ptr& schema() const { return _t.schema(); @@ -2339,6 +2342,15 @@ compaction_group::merge_sstables_from(compaction_group& group) { _t.rebuild_statistics(); } +future<> +compaction_group::merge_logstor_segments_from(compaction_group& group) { + if (!_t.uses_logstor()) { + co_return; + } + auto permit = co_await _t.get_sstable_list_permit(); + co_await _logstor_segments->merge(*group._logstor_segments); +} + future<> compaction_group::update_sstable_sets_on_compaction_completion(compaction::compaction_completion_desc desc) { // Build a new list of _sstables: We remove from the existing list the @@ -3341,6 +3353,7 @@ future<> tablet_storage_group_manager::merge_completion_fiber() { auto cf_name = schema()->cf_name(); // Enable compaction after merge is done. auto cres = std::exchange(_compaction_reenablers_for_merging, {}); + auto logstor_cres = std::exchange(_compaction_reenablers_for_logstor_merging, {}); co_await for_each_storage_group_gently([ks_name, cf_name] (storage_group& sg) -> future<> { auto main_group = sg.main_compaction_group(); tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} started", @@ -3359,6 +3372,7 @@ future<> tablet_storage_group_manager::merge_completion_fiber() { co_await sleep(std::chrono::seconds(60)); } co_await main_group->merge_sstables_from(*group); + co_await main_group->merge_logstor_segments_from(*group); } co_await sg.remove_empty_merging_groups(); tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} finished", @@ -3404,6 +3418,9 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(locator::effec auto cre = _t.get_compaction_manager().stop_and_disable_compaction_no_wait(*view, "tablet merging"); _compaction_reenablers_for_merging.push_back(background_merge_guard{std::move(cre), old_erm}); } + if (_t.uses_logstor()) { + _compaction_reenablers_for_logstor_merging.push_back(_t.get_logstor_compaction_manager().disable_compaction_no_wait(*new_cg)); + } auto new_sg = make_lw_shared(std::move(new_cg)); for (unsigned i = 0; i < merge_size; i++) {