table: perform_cleanup_compaction: trim owned ranges on compaction_group boundaries

To cleanup tokens in sstables that are not owned
by the compaction group.  This may happen in the future
after a compaction group split if copying / linking
the sstables in the original compaction_group to
the split compaction_groups.

Fixes #12594

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
This commit is contained in:
Benny Halevy
2023-01-21 13:36:30 +02:00
parent 95a8e0b21d
commit 1123565eb0

View File

@@ -1225,8 +1225,45 @@ future<bool> table::perform_offstrategy_compaction() {
future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_owned_ranges) {
co_await flush();
co_await parallel_foreach_compaction_group([this, sorted_owned_ranges = std::move(sorted_owned_ranges)] (compaction_group& cg) {
return get_compaction_manager().perform_cleanup(sorted_owned_ranges, cg.as_table_state());
if (_compaction_groups.size() == 1) {
auto& cg = *_compaction_groups[0];
co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg.as_table_state());
}
// candidate ranges for the next compaction_group
std::deque<dht::token_range> candidates;
for (auto&& range : *sorted_owned_ranges) {
candidates.emplace_back(std::move(range));
}
auto cmp = dht::token_comparator();
dht::token_range_vector cg_ranges;
std::unordered_map<dht::token_range, compaction::owned_ranges_ptr> cg_ranges_map;
for (const auto& cg : _compaction_groups) {
const auto& cg_range = cg->token_range();
while (!candidates.empty()) {
auto range = std::move(candidates.front());
auto trimmed = range.intersection(cg_range, cmp);
if (!trimmed) {
assert(!cg_ranges.empty());
break;
}
cg_ranges.emplace_back(*trimmed);
candidates.pop_front();
if (!trimmed->contains(range, cmp)) {
auto remainder = range.subtract(*trimmed, cmp);
assert(remainder.size() == 1);
candidates.emplace_front(std::move(remainder[0]));
break;
}
}
cg_ranges_map[cg_range] = compaction::make_owned_ranges_ptr(std::move(cg_ranges));
co_await coroutine::maybe_yield();
}
co_await parallel_foreach_compaction_group([&] (compaction_group& cg) {
auto&& cg_ranges = std::move(cg_ranges_map.at(cg.token_range()));
return get_compaction_manager().perform_cleanup(std::move(cg_ranges), cg.as_table_state());
});
}