From d3db967802afc047b4a77991112c0c42e2e7fd3c Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Thu, 19 Mar 2026 19:44:28 +0100 Subject: [PATCH] replica: add compaction_group_for_logstor_segment add the function table::compaction_group_for_logstor_segment that we use when recovering a segment to find the compaction group for a segment based on its token range, similarly to compaction_group_for_sstable for sstables. extract the common logic from compaction_group_for_sstable to a common function compaction_group_for_token_range that finds a compaction group for a token range. --- replica/compaction_group.hh | 1 + replica/database.hh | 5 +- replica/logstor/segment_manager.cc | 2 +- replica/table.cc | 81 +++++++++++++++++++----------- 4 files changed, 59 insertions(+), 30 deletions(-) diff --git a/replica/compaction_group.hh b/replica/compaction_group.hh index 080b5fcfea..4cbce9784e 100644 --- a/replica/compaction_group.hh +++ b/replica/compaction_group.hh @@ -484,6 +484,7 @@ public: virtual compaction_group& compaction_group_for_token(dht::token token) const = 0; virtual compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const = 0; virtual compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const = 0; + virtual compaction_group& compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const = 0; virtual size_t log2_storage_groups() const = 0; virtual storage_group& storage_group_for_token(dht::token) const = 0; diff --git a/replica/database.hh b/replica/database.hh index 4203d0242a..43d4438c3b 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -17,6 +17,7 @@ #include #include "replica/global_table_ptr.hh" #include "replica/logstor/compaction.hh" +#include "replica/logstor/types.hh" #include "types/user.hh" #include "utils/assert.hh" #include "utils/hash.hh" @@ -616,7 +617,7 @@ public: sstables::offstrategy offstrategy = sstables::offstrategy::no); future<> add_sstables_and_update_cache(const std::vector& ssts); - bool add_logstor_segment(logstor::segment_descriptor&, dht::token first_token, dht::token last_token); + bool add_logstor_segment(logstor::log_segment_id, logstor::segment_descriptor&, dht::token first_token, dht::token last_token); logstor::separator_buffer& get_logstor_separator_buffer(dht::token token, size_t write_size); @@ -734,6 +735,8 @@ private: compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const; // Select a compaction group from a given sstable based on its token range. compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const; + // Select a compaction group from a given logstor segment based on its token range. + compaction_group& compaction_group_for_logstor_segment(logstor::log_segment_id, dht::token first_token, dht::token last_token) const; // Safely iterate through compaction groups, while performing async operations on them. future<> parallel_foreach_compaction_group(std::function(compaction_group&)> action); void for_each_compaction_group(std::function action); diff --git a/replica/logstor/segment_manager.cc b/replica/logstor/segment_manager.cc index bc34abecde..cf4df55f18 100644 --- a/replica/logstor/segment_manager.cc +++ b/replica/logstor/segment_manager.cc @@ -1763,7 +1763,7 @@ future<> segment_manager_impl::add_segment_to_compaction_group(replica::database } else { auto tid = std::get(*segment_table); auto& t = db.find_column_family(tid); - if (t.add_logstor_segment(desc, *first_token, *last_token)) { + if (t.add_logstor_segment(seg_id, desc, *first_token, *last_token)) { // all record belong to a single compaction group and the segment was added to the compaction group logstor_logger.debug("Add segment {} with {} record with tokens [{},{}] to table", seg_id, live_record_count, *first_token, *last_token); } else { diff --git a/replica/table.cc b/replica/table.cc index 70287bbf54..03f00a3c83 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -738,6 +738,9 @@ public: compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override { return get_compaction_group(); } + compaction_group& compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const override { + return get_compaction_group(); + } size_t log2_storage_groups() const override { return 0; } @@ -918,7 +921,9 @@ public: compaction_group& compaction_group_for_token(dht::token token) const override; utils::chunked_vector storage_groups_for_token_range(dht::token_range tr) const override; compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const override; + compaction_group& compaction_group_for_token_range(sstring desc, dht::token first_token, dht::token last_token) const; compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override; + compaction_group& compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const override; size_t log2_storage_groups() const override { return log2ceil(tablet_map().tablet_count()); @@ -1320,9 +1325,38 @@ compaction_group& table::compaction_group_for_key(partition_key_view key, const return _sg_manager->compaction_group_for_key(key, s); } +compaction_group& tablet_storage_group_manager::compaction_group_for_token_range(sstring desc, dht::token first_token, dht::token last_token) const { + auto first_id = storage_group_of(first_token); + auto last_id = storage_group_of(last_token); + + auto tablet_desc = [this] (locator::tablet_id id) { + return format("{} (replica set: {})", id, tablet_map().get_tablet_info(id).replicas); + }; + + if (first_id != last_id) { + on_internal_error(tlogger, format("Unable to load {} that belongs to tablets {} and {}", + desc, + tablet_desc(locator::tablet_id(first_id)), + tablet_desc(locator::tablet_id(last_id)))); + } + + try { + auto& sg = storage_group_for_id(first_id); + return *sg.select_compaction_group( + first_token, + last_token, + tablet_map()); + } catch (std::out_of_range& e) { + on_internal_error(tlogger, format("Unable to load {} of tablet {}, due to {}", + desc, + tablet_desc(locator::tablet_id(first_id)), + e.what())); + } +} + compaction_group& tablet_storage_group_manager::compaction_group_for_sstable(const sstables::shared_sstable& sst) const { - auto first_id = storage_group_of(sst->get_first_decorated_key().token()); - auto last_id = storage_group_of(sst->get_last_decorated_key().token()); + auto first_token = sst->get_first_decorated_key().token(); + auto last_token = sst->get_last_decorated_key().token(); auto sstable_desc = [] (const sstables::shared_sstable& sst) { auto& identifier_opt = sst->sstable_identifier(); @@ -1332,35 +1366,23 @@ compaction_group& tablet_storage_group_manager::compaction_group_for_sstable(con identifier_opt ? identifier_opt->to_sstring() : "unknown", originating_host_id_opt ? originating_host_id_opt->to_sstring() : "unknown"); }; - auto tablet_desc = [this] (locator::tablet_id id) { - return format("{} (replica set: {})", id, tablet_map().get_tablet_info(id).replicas); - }; - if (first_id != last_id) { - on_internal_error(tlogger, format("Unable to load SSTable {} that belongs to tablets {} and {}", - sstable_desc(sst), - tablet_desc(locator::tablet_id(first_id)), - tablet_desc(locator::tablet_id(last_id)))); - } + return compaction_group_for_token_range(sstable_desc(sst), first_token, last_token); +} - try { - auto& sg = storage_group_for_id(first_id); - return *sg.select_compaction_group( - sst->get_first_decorated_key().token(), - sst->get_last_decorated_key().token(), - tablet_map()); - } catch (std::out_of_range& e) { - on_internal_error(tlogger, format("Unable to load SSTable {} of tablet {}, due to {}", - sstable_desc(sst), - tablet_desc(locator::tablet_id(first_id)), - e.what())); - } +compaction_group& tablet_storage_group_manager::compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const { + auto desc = format("logstor segment {}", seg_id); + return compaction_group_for_token_range(std::move(desc), first_token, last_token); } compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const { return _sg_manager->compaction_group_for_sstable(sst); } +compaction_group& table::compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const { + return _sg_manager->compaction_group_for_logstor_segment(seg_id, first_token, last_token); +} + future<> table::parallel_foreach_compaction_group(std::function(compaction_group&)> action) { co_await _sg_manager->parallel_foreach_storage_group([&] (storage_group& sg) -> future<> { co_await utils::get_local_injector().inject("foreach_compaction_group_wait", [this, &sg] (auto& handler) -> future<> { @@ -1630,13 +1652,16 @@ table::update_cache(compaction_group& cg, lw_shared_ptr m, std::vector } } -bool table::add_logstor_segment(logstor::segment_descriptor& seg_desc, dht::token first_token, dht::token last_token) { - auto& cg = compaction_group_for_token(first_token); - if (&cg != &compaction_group_for_token(last_token)) { +bool table::add_logstor_segment(logstor::log_segment_id seg_id, logstor::segment_descriptor& seg_desc, dht::token first_token, dht::token last_token) { + dht::token_range tr(first_token, last_token); + if (storage_groups_for_token_range(tr).size() == 1) { + auto& cg = compaction_group_for_logstor_segment(seg_id, first_token, last_token); + cg.add_logstor_segment(seg_desc); + return true; + } else { + // the segment doesn't fit in a single storage group. need to write to separator. return false; } - cg.add_logstor_segment(seg_desc); - return true; } logstor::separator_buffer& table::get_logstor_separator_buffer(dht::token token, size_t write_size) {