mirror of
https://github.com/scylladb/scylladb.git
synced 2026-04-19 16:15:07 +00:00
logstor: tablet merge
implement tablet merge with logstor. disable compaction for the new compaction group, then merge the merging compaction groups by merging their logstor segments set into the new cg - simply merging the segment histogram.
This commit is contained in:
@@ -219,6 +219,9 @@ public:
|
||||
// Merges all sstables from another group into this one.
|
||||
future<> merge_sstables_from(compaction_group& group);
|
||||
|
||||
// Merges all logstor segments from another group into this one.
|
||||
future<> merge_logstor_segments_from(compaction_group& group);
|
||||
|
||||
const lw_shared_ptr<sstables::sstable_set>& main_sstables() const noexcept;
|
||||
sstables::sstable_set make_main_sstable_set() const;
|
||||
void set_main_sstables(lw_shared_ptr<sstables::sstable_set> new_main_sstables);
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "utils/chunked_vector.hh"
|
||||
#include "write_buffer.hh"
|
||||
#include "utils/log_heap.hh"
|
||||
#include <seastar/coroutine/maybe_yield.hh>
|
||||
|
||||
namespace replica::logstor {
|
||||
|
||||
@@ -68,6 +69,18 @@ struct segment_set {
|
||||
segment_descriptor_hist _segments;
|
||||
size_t _segment_count{0};
|
||||
|
||||
future<> merge(segment_set& other) {
|
||||
while (!other._segments.empty()) {
|
||||
auto& desc = other._segments.one_of_largest();
|
||||
other._segments.erase(desc);
|
||||
--other._segment_count;
|
||||
desc.owner = this;
|
||||
_segments.push(desc);
|
||||
++_segment_count;
|
||||
co_await coroutine::maybe_yield();
|
||||
}
|
||||
}
|
||||
|
||||
void add_segment(segment_descriptor& desc) {
|
||||
if (desc.owner) {
|
||||
on_internal_error(logstor_logger, "add_segment called for segment that has an owner");
|
||||
@@ -204,6 +217,7 @@ public:
|
||||
virtual future<> stop_ongoing_compactions(replica::compaction_group&) = 0;
|
||||
|
||||
virtual future<compaction_reenabler> disable_compaction(replica::compaction_group&) = 0;
|
||||
virtual compaction_reenabler disable_compaction_no_wait(replica::compaction_group&) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -420,6 +420,7 @@ private:
|
||||
void submit(compaction_group&) override;
|
||||
future<> stop_ongoing_compactions(compaction_group&) override;
|
||||
future<compaction_reenabler> disable_compaction(replica::compaction_group&) override;
|
||||
compaction_reenabler disable_compaction_no_wait(replica::compaction_group&) override;
|
||||
|
||||
std::vector<log_segment_id> select_segments_for_compaction(const segment_descriptor_hist&);
|
||||
future<> do_compact(compaction_group&, abort_source&);
|
||||
@@ -1351,6 +1352,23 @@ future<compaction_reenabler> compaction_manager_impl::disable_compaction(compact
|
||||
});
|
||||
}
|
||||
|
||||
compaction_reenabler compaction_manager_impl::disable_compaction_no_wait(compaction_group& cg) {
|
||||
auto& state_ptr = _groups[&cg];
|
||||
if (!state_ptr) {
|
||||
state_ptr = std::make_unique<group_compaction_state>();
|
||||
}
|
||||
auto& state = *state_ptr;
|
||||
|
||||
++state.compaction_disabled_counter;
|
||||
|
||||
return compaction_reenabler([this, &cg] {
|
||||
auto it = _groups.find(&cg);
|
||||
if (it != _groups.end()) {
|
||||
--it->second->compaction_disabled_counter;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<log_segment_id> compaction_manager_impl::select_segments_for_compaction(const segment_descriptor_hist& segments) {
|
||||
size_t accum_net_data_size = 0;
|
||||
size_t accum_record_count = 0;
|
||||
|
||||
@@ -20,9 +20,11 @@
|
||||
#include <seastar/json/json_elements.hh>
|
||||
|
||||
#include "dht/decorated_key.hh"
|
||||
#include "readers/mutation_reader.hh"
|
||||
#include "replica/database.hh"
|
||||
#include "replica/data_dictionary_impl.hh"
|
||||
#include "replica/compaction_group.hh"
|
||||
#include "replica/logstor/compaction.hh"
|
||||
#include "replica/query_state.hh"
|
||||
#include "sstables/shared_sstable.hh"
|
||||
#include "sstables/sstable_set.hh"
|
||||
@@ -797,6 +799,7 @@ class tablet_storage_group_manager final : public storage_group_manager {
|
||||
std::optional<utils::phased_barrier::operation> _pending_merge_fiber_work;
|
||||
// Holds compaction reenabler which disables compaction temporarily during tablet merge
|
||||
std::vector<background_merge_guard> _compaction_reenablers_for_merging;
|
||||
std::vector<logstor::compaction_reenabler> _compaction_reenablers_for_logstor_merging;
|
||||
private:
|
||||
const schema_ptr& schema() const {
|
||||
return _t.schema();
|
||||
@@ -2339,6 +2342,15 @@ compaction_group::merge_sstables_from(compaction_group& group) {
|
||||
_t.rebuild_statistics();
|
||||
}
|
||||
|
||||
future<>
|
||||
compaction_group::merge_logstor_segments_from(compaction_group& group) {
|
||||
if (!_t.uses_logstor()) {
|
||||
co_return;
|
||||
}
|
||||
auto permit = co_await _t.get_sstable_list_permit();
|
||||
co_await _logstor_segments->merge(*group._logstor_segments);
|
||||
}
|
||||
|
||||
future<>
|
||||
compaction_group::update_sstable_sets_on_compaction_completion(compaction::compaction_completion_desc desc) {
|
||||
// Build a new list of _sstables: We remove from the existing list the
|
||||
@@ -3341,6 +3353,7 @@ future<> tablet_storage_group_manager::merge_completion_fiber() {
|
||||
auto cf_name = schema()->cf_name();
|
||||
// Enable compaction after merge is done.
|
||||
auto cres = std::exchange(_compaction_reenablers_for_merging, {});
|
||||
auto logstor_cres = std::exchange(_compaction_reenablers_for_logstor_merging, {});
|
||||
co_await for_each_storage_group_gently([ks_name, cf_name] (storage_group& sg) -> future<> {
|
||||
auto main_group = sg.main_compaction_group();
|
||||
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} started",
|
||||
@@ -3359,6 +3372,7 @@ future<> tablet_storage_group_manager::merge_completion_fiber() {
|
||||
co_await sleep(std::chrono::seconds(60));
|
||||
}
|
||||
co_await main_group->merge_sstables_from(*group);
|
||||
co_await main_group->merge_logstor_segments_from(*group);
|
||||
}
|
||||
co_await sg.remove_empty_merging_groups();
|
||||
tlogger.debug("Merge compaction groups for table={}.{} group_id={} range={} finished",
|
||||
@@ -3404,6 +3418,9 @@ void tablet_storage_group_manager::handle_tablet_merge_completion(locator::effec
|
||||
auto cre = _t.get_compaction_manager().stop_and_disable_compaction_no_wait(*view, "tablet merging");
|
||||
_compaction_reenablers_for_merging.push_back(background_merge_guard{std::move(cre), old_erm});
|
||||
}
|
||||
if (_t.uses_logstor()) {
|
||||
_compaction_reenablers_for_logstor_merging.push_back(_t.get_logstor_compaction_manager().disable_compaction_no_wait(*new_cg));
|
||||
}
|
||||
auto new_sg = make_lw_shared<storage_group>(std::move(new_cg));
|
||||
|
||||
for (unsigned i = 0; i < merge_size; i++) {
|
||||
|
||||
Reference in New Issue
Block a user