diff --git a/replica/table.cc b/replica/table.cc index a8e6fe08d5..722d0041b5 100644 --- a/replica/table.cc +++ b/replica/table.cc @@ -1225,8 +1225,45 @@ future table::perform_offstrategy_compaction() { future<> table::perform_cleanup_compaction(compaction::owned_ranges_ptr sorted_owned_ranges) { co_await flush(); - co_await parallel_foreach_compaction_group([this, sorted_owned_ranges = std::move(sorted_owned_ranges)] (compaction_group& cg) { - return get_compaction_manager().perform_cleanup(sorted_owned_ranges, cg.as_table_state()); + + if (_compaction_groups.size() == 1) { + auto& cg = *_compaction_groups[0]; + co_return co_await get_compaction_manager().perform_cleanup(std::move(sorted_owned_ranges), cg.as_table_state()); + } + + // candidate ranges for the next compaction_group + std::deque candidates; + for (auto&& range : *sorted_owned_ranges) { + candidates.emplace_back(std::move(range)); + } + + auto cmp = dht::token_comparator(); + dht::token_range_vector cg_ranges; + std::unordered_map cg_ranges_map; + for (const auto& cg : _compaction_groups) { + const auto& cg_range = cg->token_range(); + while (!candidates.empty()) { + auto range = std::move(candidates.front()); + auto trimmed = range.intersection(cg_range, cmp); + if (!trimmed) { + assert(!cg_ranges.empty()); + break; + } + cg_ranges.emplace_back(*trimmed); + candidates.pop_front(); + if (!trimmed->contains(range, cmp)) { + auto remainder = range.subtract(*trimmed, cmp); + assert(remainder.size() == 1); + candidates.emplace_front(std::move(remainder[0])); + break; + } + } + cg_ranges_map[cg_range] = compaction::make_owned_ranges_ptr(std::move(cg_ranges)); + co_await coroutine::maybe_yield(); + } + co_await parallel_foreach_compaction_group([&] (compaction_group& cg) { + auto&& cg_ranges = std::move(cg_ranges_map.at(cg.token_range())); + return get_compaction_manager().perform_cleanup(std::move(cg_ranges), cg.as_table_state()); }); }