replica: add compaction_group_for_logstor_segment

add the function table::compaction_group_for_logstor_segment that we use
when recovering a segment to find the compaction group for a segment
based on its token range, similarly to compaction_group_for_sstable for
sstables.

extract the common logic from compaction_group_for_sstable to a common
function compaction_group_for_token_range that finds a compaction group
for a token range.
This commit is contained in:
Michael Litvak
2026-03-19 19:44:28 +01:00
parent bf7bc5b410
commit d3db967802
4 changed files with 59 additions and 30 deletions

View File

@@ -484,6 +484,7 @@ public:
virtual compaction_group& compaction_group_for_token(dht::token token) const = 0;
virtual compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const = 0;
virtual compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const = 0;
virtual compaction_group& compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const = 0;
virtual size_t log2_storage_groups() const = 0;
virtual storage_group& storage_group_for_token(dht::token) const = 0;

View File

@@ -17,6 +17,7 @@
#include <seastar/core/when_all.hh>
#include "replica/global_table_ptr.hh"
#include "replica/logstor/compaction.hh"
#include "replica/logstor/types.hh"
#include "types/user.hh"
#include "utils/assert.hh"
#include "utils/hash.hh"
@@ -616,7 +617,7 @@ public:
sstables::offstrategy offstrategy = sstables::offstrategy::no);
future<> add_sstables_and_update_cache(const std::vector<sstables::shared_sstable>& ssts);
bool add_logstor_segment(logstor::segment_descriptor&, dht::token first_token, dht::token last_token);
bool add_logstor_segment(logstor::log_segment_id, logstor::segment_descriptor&, dht::token first_token, dht::token last_token);
logstor::separator_buffer& get_logstor_separator_buffer(dht::token token, size_t write_size);
@@ -734,6 +735,8 @@ private:
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const;
// Select a compaction group from a given sstable based on its token range.
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const;
// Select a compaction group from a given logstor segment based on its token range.
compaction_group& compaction_group_for_logstor_segment(logstor::log_segment_id, dht::token first_token, dht::token last_token) const;
// Safely iterate through compaction groups, while performing async operations on them.
future<> parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action);
void for_each_compaction_group(std::function<void(compaction_group&)> action);

View File

@@ -1763,7 +1763,7 @@ future<> segment_manager_impl::add_segment_to_compaction_group(replica::database
} else {
auto tid = std::get<table_id>(*segment_table);
auto& t = db.find_column_family(tid);
if (t.add_logstor_segment(desc, *first_token, *last_token)) {
if (t.add_logstor_segment(seg_id, desc, *first_token, *last_token)) {
// all record belong to a single compaction group and the segment was added to the compaction group
logstor_logger.debug("Add segment {} with {} record with tokens [{},{}] to table", seg_id, live_record_count, *first_token, *last_token);
} else {

View File

@@ -738,6 +738,9 @@ public:
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override {
return get_compaction_group();
}
compaction_group& compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const override {
return get_compaction_group();
}
size_t log2_storage_groups() const override {
return 0;
}
@@ -918,7 +921,9 @@ public:
compaction_group& compaction_group_for_token(dht::token token) const override;
utils::chunked_vector<storage_group_ptr> storage_groups_for_token_range(dht::token_range tr) const override;
compaction_group& compaction_group_for_key(partition_key_view key, const schema_ptr& s) const override;
compaction_group& compaction_group_for_token_range(sstring desc, dht::token first_token, dht::token last_token) const;
compaction_group& compaction_group_for_sstable(const sstables::shared_sstable& sst) const override;
compaction_group& compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const override;
size_t log2_storage_groups() const override {
return log2ceil(tablet_map().tablet_count());
@@ -1320,9 +1325,38 @@ compaction_group& table::compaction_group_for_key(partition_key_view key, const
return _sg_manager->compaction_group_for_key(key, s);
}
compaction_group& tablet_storage_group_manager::compaction_group_for_token_range(sstring desc, dht::token first_token, dht::token last_token) const {
auto first_id = storage_group_of(first_token);
auto last_id = storage_group_of(last_token);
auto tablet_desc = [this] (locator::tablet_id id) {
return format("{} (replica set: {})", id, tablet_map().get_tablet_info(id).replicas);
};
if (first_id != last_id) {
on_internal_error(tlogger, format("Unable to load {} that belongs to tablets {} and {}",
desc,
tablet_desc(locator::tablet_id(first_id)),
tablet_desc(locator::tablet_id(last_id))));
}
try {
auto& sg = storage_group_for_id(first_id);
return *sg.select_compaction_group(
first_token,
last_token,
tablet_map());
} catch (std::out_of_range& e) {
on_internal_error(tlogger, format("Unable to load {} of tablet {}, due to {}",
desc,
tablet_desc(locator::tablet_id(first_id)),
e.what()));
}
}
compaction_group& tablet_storage_group_manager::compaction_group_for_sstable(const sstables::shared_sstable& sst) const {
auto first_id = storage_group_of(sst->get_first_decorated_key().token());
auto last_id = storage_group_of(sst->get_last_decorated_key().token());
auto first_token = sst->get_first_decorated_key().token();
auto last_token = sst->get_last_decorated_key().token();
auto sstable_desc = [] (const sstables::shared_sstable& sst) {
auto& identifier_opt = sst->sstable_identifier();
@@ -1332,35 +1366,23 @@ compaction_group& tablet_storage_group_manager::compaction_group_for_sstable(con
identifier_opt ? identifier_opt->to_sstring() : "unknown",
originating_host_id_opt ? originating_host_id_opt->to_sstring() : "unknown");
};
auto tablet_desc = [this] (locator::tablet_id id) {
return format("{} (replica set: {})", id, tablet_map().get_tablet_info(id).replicas);
};
if (first_id != last_id) {
on_internal_error(tlogger, format("Unable to load SSTable {} that belongs to tablets {} and {}",
sstable_desc(sst),
tablet_desc(locator::tablet_id(first_id)),
tablet_desc(locator::tablet_id(last_id))));
}
return compaction_group_for_token_range(sstable_desc(sst), first_token, last_token);
}
try {
auto& sg = storage_group_for_id(first_id);
return *sg.select_compaction_group(
sst->get_first_decorated_key().token(),
sst->get_last_decorated_key().token(),
tablet_map());
} catch (std::out_of_range& e) {
on_internal_error(tlogger, format("Unable to load SSTable {} of tablet {}, due to {}",
sstable_desc(sst),
tablet_desc(locator::tablet_id(first_id)),
e.what()));
}
compaction_group& tablet_storage_group_manager::compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const {
auto desc = format("logstor segment {}", seg_id);
return compaction_group_for_token_range(std::move(desc), first_token, last_token);
}
compaction_group& table::compaction_group_for_sstable(const sstables::shared_sstable& sst) const {
return _sg_manager->compaction_group_for_sstable(sst);
}
compaction_group& table::compaction_group_for_logstor_segment(logstor::log_segment_id seg_id, dht::token first_token, dht::token last_token) const {
return _sg_manager->compaction_group_for_logstor_segment(seg_id, first_token, last_token);
}
future<> table::parallel_foreach_compaction_group(std::function<future<>(compaction_group&)> action) {
co_await _sg_manager->parallel_foreach_storage_group([&] (storage_group& sg) -> future<> {
co_await utils::get_local_injector().inject("foreach_compaction_group_wait", [this, &sg] (auto& handler) -> future<> {
@@ -1630,13 +1652,16 @@ table::update_cache(compaction_group& cg, lw_shared_ptr<memtable> m, std::vector
}
}
bool table::add_logstor_segment(logstor::segment_descriptor& seg_desc, dht::token first_token, dht::token last_token) {
auto& cg = compaction_group_for_token(first_token);
if (&cg != &compaction_group_for_token(last_token)) {
bool table::add_logstor_segment(logstor::log_segment_id seg_id, logstor::segment_descriptor& seg_desc, dht::token first_token, dht::token last_token) {
dht::token_range tr(first_token, last_token);
if (storage_groups_for_token_range(tr).size() == 1) {
auto& cg = compaction_group_for_logstor_segment(seg_id, first_token, last_token);
cg.add_logstor_segment(seg_desc);
return true;
} else {
// the segment doesn't fit in a single storage group. need to write to separator.
return false;
}
cg.add_logstor_segment(seg_desc);
return true;
}
logstor::separator_buffer& table::get_logstor_separator_buffer(dht::token token, size_t write_size) {