replica: Futurize retrieval of sstable sets in compaction_group_view

This will allow upcoming work to gently produce a sstable set for
each compaction group view. Example: repaired and unrepaired.

Locking strategy for compaction's sstable selection:
Since sstable retrieval path became futurized, tasks in compaction
manager will now hold the write lock (compaction_state::lock)
when retrieving the sstable list, feeding them into compaction
strategy, and finally registering selected sstables as compacting.
The last step prevents another concurrent task from picking the
same sstable. Previously, all those steps were atomic, but
we have seen stall in that area in large installations, so
futurization of that area would come sooner or later.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2025-06-23 20:57:51 -03:00
committed by Botond Dénes
parent 20c3301a1a
commit 9d3755f276
26 changed files with 216 additions and 184 deletions

View File

@@ -38,8 +38,8 @@ public:
// min threshold as defined by table.
virtual unsigned min_compaction_threshold() const noexcept = 0;
virtual bool compaction_enforce_min_threshold() const noexcept = 0;
virtual const sstables::sstable_set& main_sstable_set() const = 0;
virtual const sstables::sstable_set& maintenance_sstable_set() const = 0;
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const = 0;
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const = 0;
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const = 0;
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const = 0;
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept = 0;

View File

@@ -233,15 +233,17 @@ void compaction_manager::deregister_weight(int weight) {
reevaluate_postponed_compactions();
}
std::vector<sstables::shared_sstable> in_strategy_sstables(compaction_group_view& table_s) {
auto sstables = table_s.main_sstable_set().all();
return *sstables | std::views::filter([] (const sstables::shared_sstable& sst) {
future<std::vector<sstables::shared_sstable>> in_strategy_sstables(compaction_group_view& table_s) {
auto set = co_await table_s.main_sstable_set();
auto sstables = set->all();
co_return *sstables | std::views::filter([] (const sstables::shared_sstable& sst) {
return sstables::is_eligible_for_compaction(sst);
}) | std::ranges::to<std::vector>();
}
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(compaction_group_view& t) const {
return get_candidates(t, *t.main_sstable_set().all());
future<std::vector<sstables::shared_sstable>> compaction_manager::get_candidates(compaction_group_view& t) const {
auto main_set = co_await t.main_sstable_set();
co_return get_candidates(t, *main_set->all());
}
bool compaction_manager::eligible_for_compaction(const sstables::shared_sstable& sstable) const {
@@ -568,6 +570,8 @@ protected:
switch_state(state::pending);
auto units = co_await acquire_semaphore(_cm._maintenance_ops_sem);
// Write lock is used to synchronize selection of sstables for compaction and their registration.
// Also used to synchronize with regular compaction, so major waits for regular to cease before selecting candidates.
auto lock_holder = co_await _compaction_state.lock.hold_write_lock();
if (!can_proceed()) {
co_return std::nullopt;
@@ -577,7 +581,7 @@ protected:
// those are eligible for major compaction.
compaction_group_view* t = _compacting_table;
sstables::compaction_strategy cs = t->get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, _cm.get_candidates(*t));
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, co_await _cm.get_candidates(*t));
descriptor.gc_check_only_compacting_sstables = _consider_only_existing_data;
auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(t), descriptor.sstables);
auto on_replace = compacting.update_on_sstable_replacement();
@@ -928,12 +932,14 @@ public:
});
}
std::vector<sstables::shared_sstable> candidates(compaction_group_view& t) const override {
return _cm.get_candidates(t, *t.main_sstable_set().all());
future<std::vector<sstables::shared_sstable>> candidates(compaction_group_view& t) const override {
auto main_set = co_await t.main_sstable_set();
co_return _cm.get_candidates(t, *main_set->all());
}
std::vector<sstables::frozen_sstable_run> candidates_as_runs(compaction_group_view& t) const override {
return _cm.get_candidates(t, t.main_sstable_set().all_sstable_runs());
future<std::vector<sstables::frozen_sstable_run>> candidates_as_runs(compaction_group_view& t) const override {
auto main_set = co_await t.main_sstable_set();
co_return _cm.get_candidates(t, main_set->all_sstable_runs());
}
};
@@ -1289,15 +1295,15 @@ protected:
co_return std::nullopt;
}
switch_state(state::pending);
// take read lock for table, so major and regular compaction can't proceed in parallel.
auto lock_holder = co_await _compaction_state.lock.hold_read_lock();
// Write lock is used to synchronize selection of sstables for compaction and their registration.
auto lock_holder = co_await _compaction_state.lock.hold_write_lock();
if (!can_proceed()) {
co_return std::nullopt;
}
compaction_group_view& t = *_compacting_table;
sstables::compaction_strategy cs = t.get_compaction_strategy();
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t, _cm.get_strategy_control());
sstables::compaction_descriptor descriptor = co_await cs.get_sstables_for_compaction(t, _cm.get_strategy_control());
int weight = calculate_weight(descriptor);
if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) {
@@ -1317,6 +1323,10 @@ protected:
cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}",
fmt::ptr(this), descriptor.sstables.size(), weight, t);
// Finished selecting and registering compacting sstables, so write lock can be released.
lock_holder.return_all();
lock_holder = co_await _compaction_state.lock.hold_read_lock();
setup_new_compaction(descriptor.run_identifier);
_compaction_state.last_regular_compaction = gc_clock::now();
std::exception_ptr ex;
@@ -1384,15 +1394,15 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
cmlog.trace("maybe_wait_for_sstable_count_reduction in {}: cannot perform regular compaction", t);
co_return;
}
auto num_runs_for_compaction = [&, this] {
auto num_runs_for_compaction = [&, this] -> future<size_t> {
auto& cs = t.get_compaction_strategy();
auto desc = cs.get_sstables_for_compaction(t, get_strategy_control());
return std::ranges::size(desc.sstables
auto desc = co_await cs.get_sstables_for_compaction(t, get_strategy_control());
co_return std::ranges::size(desc.sstables
| std::views::transform(std::mem_fn(&sstables::sstable::run_identifier))
| std::ranges::to<std::unordered_set>());
};
const auto threshold = size_t(std::max(schema->max_compaction_threshold(), 32));
auto count = num_runs_for_compaction();
auto count = co_await num_runs_for_compaction();
if (count <= threshold) {
cmlog.trace("No need to wait for sstable count reduction in {}: {} <= {}",
t, count, threshold);
@@ -1405,9 +1415,11 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
auto start = db_clock::now();
auto& cstate = get_compaction_state(&t);
try {
co_await cstate.compaction_done.wait([this, &num_runs_for_compaction, threshold, &t] {
return num_runs_for_compaction() <= threshold || !can_perform_regular_compaction(t);
});
while (can_perform_regular_compaction(t) && co_await num_runs_for_compaction() > threshold) {
co_await cstate.compaction_done.wait([this, &t] {
return !can_perform_regular_compaction(t);
});
}
} catch (const broken_condition_variable&) {
co_return;
}
@@ -1459,8 +1471,9 @@ private:
// Filter out sstables that require view building, to avoid a race between off-strategy
// and view building. Refs: #11882
auto get_reshape_candidates = [&t] () {
return *t.maintenance_sstable_set().all()
auto get_reshape_candidates = [&t] () -> future<std::vector<sstables::shared_sstable>> {
auto maintenance_set = co_await t.maintenance_sstable_set();
co_return *maintenance_set->all()
| std::views::filter([](const sstables::shared_sstable &sst) {
return !sst->requires_view_building();
})
@@ -1468,14 +1481,14 @@ private:
};
auto get_next_job = [&] () -> future<std::optional<sstables::compaction_descriptor>> {
auto candidates = get_reshape_candidates();
auto candidates = co_await get_reshape_candidates();
if (candidates.empty()) {
co_return std::nullopt;
}
// all sstables added to maintenance set share the same underlying storage.
auto& storage = candidates.front()->get_storage();
sstables::reshape_config cfg = co_await sstables::make_reshape_config(storage, sstables::reshape_mode::strict);
auto desc = t.get_compaction_strategy().get_reshaping_job(get_reshape_candidates(), t.schema(), cfg);
auto desc = t.get_compaction_strategy().get_reshaping_job(co_await get_reshape_candidates(), t.schema(), cfg);
co_return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt;
};
@@ -1502,7 +1515,7 @@ private:
// user has aborted off-strategy. So we can only integrate them into the main set, such that
// they become candidates for regular compaction. We cannot hold them forever in maintenance set,
// as that causes read and space amplification issues.
if (auto sstables = get_reshape_candidates(); sstables.size()) {
if (auto sstables = co_await get_reshape_candidates(); sstables.size()) {
auto completion_desc = sstables::compaction_completion_desc{
.old_sstables = sstables, // removes from maintenance set.
.new_sstables = sstables, // adds into main set.
@@ -1514,6 +1527,11 @@ private:
co_await coroutine::return_exception_ptr(std::move(err));
}
}
future<size_t> maintenance_set_size() const {
auto maintenance_set = co_await _compacting_table->maintenance_sstable_set();
co_return maintenance_set->size();
}
protected:
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
co_await coroutine::switch_to(_cm.maintenance_sg());
@@ -1532,7 +1550,7 @@ protected:
std::exception_ptr ex;
try {
compaction_group_view& t = *_compacting_table;
auto size = t.maintenance_sstable_set().size();
auto size = co_await maintenance_set_size();
if (!size) {
cmlog.debug("Skipping off-strategy compaction for {}, No candidates were found", t);
finish_compaction();
@@ -1827,11 +1845,13 @@ private:
}
static std::vector<sstables::shared_sstable> get_all_sstables(compaction_group_view& t) {
auto s = *t.main_sstable_set().all() | std::ranges::to<std::vector>();
auto maintenance_set = t.maintenance_sstable_set().all();
s.insert(s.end(), maintenance_set->begin(), maintenance_set->end());
return s;
static future<std::vector<sstables::shared_sstable>> get_all_sstables(compaction_group_view& t) {
auto main_set = co_await t.main_sstable_set();
auto maintenance_set = co_await t.maintenance_sstable_set();
auto s = *main_set->all() | std::ranges::to<std::vector>();
auto maintenance_sstables = maintenance_set->all();
s.insert(s.end(), maintenance_sstables->begin(), maintenance_sstables->end());
co_return s;
}
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub_validate_mode(compaction_group_view& t, tasks::task_info info, quarantine_invalid_sstables quarantine_sstables) {
@@ -1840,7 +1860,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
co_return compaction_stats_opt{};
}
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
auto all_sstables = get_all_sstables(t);
auto all_sstables = co_await get_all_sstables(t);
co_return co_await perform_compaction<validate_sstables_compaction_task_executor>(throw_if_stopping::no, info, &t, info.id, std::move(all_sstables), quarantine_sstables);
}
@@ -2062,16 +2082,13 @@ future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_r
auto& cs = get_compaction_state(&t);
co_await run_with_compaction_disabled(t, [&] () -> future<> {
auto update_sstables_cleanup_state = [&] (const sstables::sstable_set& set) -> future<> {
// Hold on to the sstable set since it may be overwritten
// while we yield in this loop.
auto set_holder = set.shared_from_this();
co_await set.for_each_sstable_gently([&] (const sstables::shared_sstable& sst) {
auto update_sstables_cleanup_state = [&] (lw_shared_ptr<const sstables::sstable_set> set) -> future<> {
co_await set->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) {
update_sstable_cleanup_state(t, sst, *sorted_owned_ranges);
});
};
co_await update_sstables_cleanup_state(t.main_sstable_set());
co_await update_sstables_cleanup_state(t.maintenance_sstable_set());
co_await update_sstables_cleanup_state(co_await t.main_sstable_set());
co_await update_sstables_cleanup_state(co_await t.maintenance_sstable_set());
// Some sstables may remain in sstables_requiring_cleanup
// for later processing if they can't be cleaned up right now.
@@ -2086,7 +2103,8 @@ future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_r
co_return;
}
auto found_maintenance_sstables = bool(t.maintenance_sstable_set().for_each_sstable_until([this, &t] (const sstables::shared_sstable& sst) {
auto maintenance_set = co_await t.maintenance_sstable_set();
auto found_maintenance_sstables = bool(maintenance_set->for_each_sstable_until([this, &t] (const sstables::shared_sstable& sst) {
return stop_iteration(requires_cleanup(t, sst));
}));
if (found_maintenance_sstables) {
@@ -2108,12 +2126,12 @@ future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_r
// Submit a table to be upgraded and wait for its termination.
future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction_group_view& t, bool exclude_current_version, tasks::task_info info) {
auto get_sstables = [this, &t, exclude_current_version] {
auto get_sstables = [this, &t, exclude_current_version] () -> future<std::vector<sstables::shared_sstable>> {
std::vector<sstables::shared_sstable> tables;
auto last_version = t.get_sstables_manager().get_highest_supported_format();
for (auto& sst : get_candidates(t)) {
for (auto& sst : co_await get_candidates(t)) {
// if we are a "normal" upgrade, we only care about
// tables with older versions, but potentially
// we are to actually rewrite everything. (-a)
@@ -2122,7 +2140,7 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own
}
}
return make_ready_future<std::vector<sstables::shared_sstable>>(tables);
co_return std::move(tables);
};
// doing a "cleanup" is about as compacting as we need
@@ -2135,8 +2153,8 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own
}
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_split_compaction(compaction_group_view& t, sstables::compaction_type_options::split opt, tasks::task_info info) {
auto get_sstables = [this, &t] {
return make_ready_future<std::vector<sstables::shared_sstable>>(get_candidates(t));
auto get_sstables = [this, &t] () -> future<std::vector<sstables::shared_sstable>> {
return get_candidates(t);
};
owned_ranges_ptr owned_ranges_ptr = {};
auto options = sstables::compaction_type_options::make_split(std::move(opt.classifier));
@@ -2176,8 +2194,8 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
}
owned_ranges_ptr owned_ranges_ptr = {};
sstring option_desc = fmt::format("mode: {};\nquarantine_mode: {}\n", opts.operation_mode, opts.quarantine_operation_mode);
return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] {
auto all_sstables = get_all_sstables(t);
return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] -> future<std::vector<sstables::shared_sstable>> {
auto all_sstables = co_await get_all_sstables(t);
std::vector<sstables::shared_sstable> sstables = all_sstables
| std::views::filter([&opts] (const sstables::shared_sstable& sst) {
if (sst->requires_view_building()) {
@@ -2194,7 +2212,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
on_internal_error(cmlog, "bad scrub quarantine mode");
})
| std::ranges::to<std::vector>();
return make_ready_future<std::vector<sstables::shared_sstable>>(std::move(sstables));
co_return std::vector<sstables::shared_sstable>(std::move(sstables));
}, info, can_purge_tombstones::no, std::move(option_desc));
}

View File

@@ -192,7 +192,7 @@ private:
void deregister_weight(int weight);
// Get candidates for compaction strategy, which are all sstables but the ones being compacted.
std::vector<sstables::shared_sstable> get_candidates(compaction::compaction_group_view& t) const;
future<std::vector<sstables::shared_sstable>> get_candidates(compaction::compaction_group_view& t) const;
bool eligible_for_compaction(const sstables::shared_sstable& sstable) const;
bool eligible_for_compaction(const sstables::frozen_sstable_run& sstable_run) const;
@@ -638,4 +638,4 @@ struct fmt::formatter<compaction::compaction_task_executor> {
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges);
// Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir.
std::vector<sstables::shared_sstable> in_strategy_sstables(compaction::compaction_group_view& table_s);
future<std::vector<sstables::shared_sstable>> in_strategy_sstables(compaction::compaction_group_view& table_s);

View File

@@ -27,7 +27,10 @@ struct compaction_state {
// and by any function running under run_with_compaction_disabled().
seastar::named_gate gate;
// Prevents table from running major and minor compaction at the same time.
// Used for synchronizing selection of sstable for compaction.
// Write lock is held when getting sstable list, feeding them into strategy, and registering compacting sstables.
// The lock prevents two concurrent compaction tasks from picking the same sstables. And it also helps major
// to synchronize with minor, such that major doesn't miss any sstable.
seastar::rwlock lock;
// Raised by any function running under run_with_compaction_disabled();

View File

@@ -581,8 +581,8 @@ struct null_backlog_tracker final : public compaction_backlog_tracker::impl {
//
class null_compaction_strategy : public compaction_strategy_impl {
public:
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override {
return sstables::compaction_descriptor();
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override {
return make_ready_future<sstables::compaction_descriptor>();
}
virtual future<int64_t> estimated_pending_compactions(compaction_group_view& table_s) const override {
@@ -700,7 +700,7 @@ compaction_strategy_type compaction_strategy::type() const {
return _compaction_strategy_impl->type();
}
compaction_descriptor compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
future<compaction_descriptor> compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
return _compaction_strategy_impl->get_sstables_for_compaction(table_s, control);
}

View File

@@ -41,7 +41,7 @@ public:
compaction_strategy& operator=(compaction_strategy&&);
// Return a list of sstables to be compacted after applying the strategy.
compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control);
future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control);
compaction_descriptor get_major_compaction_job(compaction_group_view& table_s, std::vector<shared_sstable> candidates);

View File

@@ -45,7 +45,7 @@ protected:
uint64_t max_sstable_bytes = compaction_descriptor::default_max_sstable_bytes);
public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) = 0;
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) = 0;
virtual compaction_descriptor get_major_compaction_job(compaction_group_view& table_s, std::vector<sstables::shared_sstable> candidates) {
return make_major_compaction_job(std::move(candidates));
}

View File

@@ -318,9 +318,9 @@ incremental_compaction_strategy::find_garbage_collection_job(const compaction::c
return compaction_descriptor(runs_to_sstables(std::move(input)), 0, _fragment_size);
}
compaction_descriptor
future<compaction_descriptor>
incremental_compaction_strategy::get_sstables_for_compaction(compaction_group_view& t, strategy_control& control) {
auto candidates = control.candidates_as_runs(t);
auto candidates = co_await control.candidates_as_runs(t);
// make local copies so they can't be changed out from under us mid-method
size_t min_threshold = t.min_compaction_threshold();
@@ -330,28 +330,28 @@ incremental_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
if (is_any_bucket_interesting(buckets, min_threshold)) {
std::vector<sstables::frozen_sstable_run> most_interesting = most_interesting_bucket(std::move(buckets), min_threshold, max_threshold);
return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
co_return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
}
// If we are not enforcing min_threshold explicitly, try any pair of sstable runs in the same tier.
if (!t.compaction_enforce_min_threshold() && is_any_bucket_interesting(buckets, 2)) {
std::vector<sstables::frozen_sstable_run> most_interesting = most_interesting_bucket(std::move(buckets), 2, max_threshold);
return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
co_return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
}
// The cross-tier behavior is only triggered once we're done with all the pending same-tier compaction to
// increase overall efficiency.
if (control.has_ongoing_compaction(t)) {
return sstables::compaction_descriptor();
co_return sstables::compaction_descriptor();
}
auto desc = find_garbage_collection_job(t, buckets);
if (!desc.sstables.empty()) {
return desc;
co_return desc;
}
if (_space_amplification_goal) {
if (buckets.size() < 2) {
return sstables::compaction_descriptor();
co_return sstables::compaction_descriptor();
}
// Let S0 be the size of largest tier
// Let S1 be the size of second-largest tier,
@@ -383,12 +383,12 @@ incremental_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
cross_tier_input.reserve(cross_tier_input.size() + s1.size());
std::move(s1.begin(), s1.end(), std::back_inserter(cross_tier_input));
return sstables::compaction_descriptor(runs_to_sstables(std::move(cross_tier_input)),
co_return sstables::compaction_descriptor(runs_to_sstables(std::move(cross_tier_input)),
0, _fragment_size);
}
}
return sstables::compaction_descriptor();
co_return sstables::compaction_descriptor();
}
compaction_descriptor
@@ -404,7 +404,8 @@ future<int64_t> incremental_compaction_strategy::estimated_pending_compactions(c
size_t max_threshold = t.schema()->max_compaction_threshold();
int64_t n = 0;
for (auto& bucket : get_buckets(t.main_sstable_set().all_sstable_runs())) {
auto main_set = co_await t.main_sstable_set();
for (auto& bucket : get_buckets(main_set->all_sstable_runs())) {
if (bucket.size() >= min_threshold) {
n += (bucket.size() + max_threshold - 1) / max_threshold;
}

View File

@@ -82,7 +82,7 @@ public:
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& t, strategy_control& control) override;
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& t, strategy_control& control) override;
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(compaction_group_view& t, std::vector<shared_sstable> candidates) const override;

View File

@@ -17,9 +17,9 @@ leveled_compaction_strategy_state& leveled_compaction_strategy::get_state(compac
return table_s.get_compaction_strategy_state().get<leveled_compaction_strategy_state>();
}
compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
future<compaction_descriptor> leveled_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
auto& state = get_state(table_s);
auto candidates = control.candidates(table_s);
auto candidates = co_await control.candidates(table_s);
// NOTE: leveled_manifest creation may be slightly expensive, so later on,
// we may want to store it in the strategy itself. However, the sstable
// lists managed by the manifest may become outdated. For example, one
@@ -32,12 +32,13 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
auto candidate = manifest.get_compaction_candidates(*state.last_compacted_keys, state.compaction_counter);
if (!candidate.sstables.empty()) {
leveled_manifest::logger.debug("leveled: Compacting {} out of {} sstables", candidate.sstables.size(), table_s.main_sstable_set().all()->size());
return candidate;
auto main_set = co_await table_s.main_sstable_set();
leveled_manifest::logger.debug("leveled: Compacting {} out of {} sstables", candidate.sstables.size(), main_set->size());
co_return candidate;
}
if (!table_s.tombstone_gc_enabled()) {
return compaction_descriptor();
co_return compaction_descriptor();
}
// if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio
@@ -59,9 +60,9 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
auto ratio_j = j->estimate_droppable_tombstone_ratio(compaction_time, table_s.get_tombstone_gc_state(), table_s.schema());
return ratio_i < ratio_j;
});
return sstables::compaction_descriptor({ sst }, sst->get_sstable_level());
co_return sstables::compaction_descriptor({ sst }, sst->get_sstable_level());
}
return {};
co_return compaction_descriptor();
}
compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(compaction_group_view& table_s, std::vector<sstables::shared_sstable> candidates) {
@@ -134,7 +135,8 @@ void leveled_compaction_strategy::generate_last_compacted_keys(leveled_compactio
future<int64_t> leveled_compaction_strategy::estimated_pending_compactions(compaction_group_view& table_s) const {
std::vector<sstables::shared_sstable> sstables;
auto all_sstables = table_s.main_sstable_set().all();
auto main_set = co_await table_s.main_sstable_set();
auto all_sstables = main_set->all();
sstables.reserve(all_sstables->size());
for (auto& entry : *all_sstables) {
sstables.push_back(entry);

View File

@@ -49,7 +49,7 @@ public:
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
leveled_compaction_strategy(const std::map<sstring, sstring>& options);
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(compaction_group_view& table_s, std::vector<shared_sstable> candidates) const override;

View File

@@ -207,13 +207,13 @@ size_tiered_compaction_strategy::most_interesting_bucket(std::vector<std::vector
return std::move(max);
}
compaction_descriptor
future<compaction_descriptor>
size_tiered_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
// make local copies so they can't be changed out from under us mid-method
int min_threshold = table_s.min_compaction_threshold();
int max_threshold = table_s.schema()->max_compaction_threshold();
auto compaction_time = gc_clock::now();
auto candidates = control.candidates(table_s);
auto candidates = co_await control.candidates(table_s);
// TODO: Add support to filter cold sstables (for reference: SizeTieredCompactionStrategy::filterColdSSTables).
@@ -221,17 +221,17 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
if (is_any_bucket_interesting(buckets, min_threshold)) {
std::vector<sstables::shared_sstable> most_interesting = most_interesting_bucket(std::move(buckets), min_threshold, max_threshold);
return sstables::compaction_descriptor(std::move(most_interesting));
co_return sstables::compaction_descriptor(std::move(most_interesting));
}
// If we are not enforcing min_threshold explicitly, try any pair of SStables in the same tier.
if (!table_s.compaction_enforce_min_threshold() && is_any_bucket_interesting(buckets, 2)) {
std::vector<sstables::shared_sstable> most_interesting = most_interesting_bucket(std::move(buckets), 2, max_threshold);
return sstables::compaction_descriptor(std::move(most_interesting));
co_return sstables::compaction_descriptor(std::move(most_interesting));
}
if (!table_s.tombstone_gc_enabled()) {
return compaction_descriptor();
co_return compaction_descriptor();
}
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
@@ -250,9 +250,9 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
auto it = std::min_element(sstables.begin(), sstables.end(), [] (auto& i, auto& j) {
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
});
return sstables::compaction_descriptor({ *it });
co_return sstables::compaction_descriptor({ *it });
}
return sstables::compaction_descriptor();
co_return sstables::compaction_descriptor();
}
int64_t size_tiered_compaction_strategy::estimated_pending_compactions(const std::vector<sstables::shared_sstable>& sstables,
@@ -271,7 +271,8 @@ future<int64_t> size_tiered_compaction_strategy::estimated_pending_compactions(c
int max_threshold = table_s.schema()->max_compaction_threshold();
std::vector<sstables::shared_sstable> sstables;
auto all_sstables = table_s.main_sstable_set().all();
auto main_set = co_await table_s.main_sstable_set();
auto all_sstables = main_set->all();
sstables.reserve(all_sstables->size());
for (auto& entry : *all_sstables) {
sstables.push_back(entry);

View File

@@ -75,7 +75,7 @@ public:
explicit size_tiered_compaction_strategy(const size_tiered_compaction_strategy_options& options);
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(compaction_group_view& table_s, std::vector<shared_sstable> candidates) const override;

View File

@@ -19,8 +19,8 @@ class strategy_control {
public:
virtual ~strategy_control() {}
virtual bool has_ongoing_compaction(compaction_group_view& table_s) const noexcept = 0;
virtual std::vector<sstables::shared_sstable> candidates(compaction_group_view&) const = 0;
virtual std::vector<sstables::frozen_sstable_run> candidates_as_runs(compaction_group_view&) const = 0;
virtual future<std::vector<sstables::shared_sstable>> candidates(compaction_group_view&) const = 0;
virtual future<std::vector<sstables::frozen_sstable_run>> candidates_as_runs(compaction_group_view&) const = 0;
};
}

View File

@@ -332,14 +332,14 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
return compaction_descriptor();
}
compaction_descriptor
future<compaction_descriptor>
time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
auto& state = get_state(table_s);
auto compaction_time = gc_clock::now();
auto candidates = control.candidates(table_s);
auto candidates = co_await control.candidates(table_s);
if (candidates.empty()) {
return compaction_descriptor();
co_return compaction_descriptor();
}
auto now = db_clock::now();
@@ -350,7 +350,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
auto expired = table_s.fully_expired_sstables(candidates, compaction_time);
if (!expired.empty()) {
clogger.debug("[{}] Going to compact {} expired sstables", fmt::ptr(this), expired.size());
return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()));
co_return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()));
}
// Keep checking for fully_expired_sstables until we don't find
// any among the candidates, meaning they are either already compacted
@@ -362,7 +362,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time);
clogger.debug("[{}] Going to compact {} non-expired sstables", fmt::ptr(this), compaction_candidates.size());
return compaction_descriptor(std::move(compaction_candidates));
co_return compaction_descriptor(std::move(compaction_candidates));
}
time_window_compaction_strategy::bucket_compaction_mode
@@ -519,7 +519,8 @@ future<int64_t> time_window_compaction_strategy::estimated_pending_compactions(c
auto& state = get_state(table_s);
auto min_threshold = table_s.min_compaction_threshold();
auto max_threshold = table_s.schema()->max_compaction_threshold();
auto candidate_sstables = *table_s.main_sstable_set().all() | std::ranges::to<std::vector>();
auto main_set = co_await table_s.main_sstable_set();
auto candidate_sstables = *main_set->all() | std::ranges::to<std::vector>();
auto [buckets, max_timestamp] = get_buckets(std::move(candidate_sstables), _options);
int64_t n = 0;

View File

@@ -81,7 +81,7 @@ public:
enum class bucket_compaction_mode { none, size_tiered, major };
public:
time_window_compaction_strategy(const std::map<sstring, sstring>& options);
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(compaction_group_view& table_s, std::vector<shared_sstable> candidates) const override;

View File

@@ -2467,11 +2467,11 @@ public:
bool compaction_enforce_min_threshold() const noexcept override {
return _t.get_config().compaction_enforce_min_threshold || _t._is_bootstrap_or_replace;
}
const sstables::sstable_set& main_sstable_set() const override {
return *_cg.main_sstables();
future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override {
co_return _cg.main_sstables();
}
const sstables::sstable_set& maintenance_sstable_set() const override {
return *_cg.maintenance_sstables();
future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override {
co_return _cg.maintenance_sstables();
}
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
return _t.sstable_set_for_tombstone_gc(_cg);

View File

@@ -101,9 +101,9 @@ public:
virtual const schema_ptr& schema() const noexcept override { return _schema; }
virtual unsigned min_compaction_threshold() const noexcept override { return _schema->min_compaction_threshold(); }
virtual bool compaction_enforce_min_threshold() const noexcept override { return false; }
virtual const sstables::sstable_set& main_sstable_set() const override { return _main_set; }
virtual const sstables::sstable_set& maintenance_sstable_set() const override { return _maintenance_set; }
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(main_sstable_set()); }
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_main_set); }
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_maintenance_set); }
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(_main_set); }
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const override { return {}; }
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted_sstables; }
virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept override { return _compaction_strategy; }
@@ -164,8 +164,8 @@ SEASTAR_TEST_CASE(basic_compaction_group_splitting_test) {
auto ret = cm.perform_split_compaction(*compaction_group, sstables::compaction_type_options::split{classifier}, tasks::task_info{}).get();
BOOST_REQUIRE_EQUAL(ret->start_size, expected_compaction_size);
BOOST_REQUIRE(compaction_group->main_sstable_set().size() == expected_output);
compaction_group->main_sstable_set().for_each_sstable([&] (const sstables::shared_sstable& sst) {
BOOST_REQUIRE(compaction_group->main_sstable_set().get()->size() == expected_output);
compaction_group->main_sstable_set().get()->for_each_sstable([&] (const sstables::shared_sstable& sst) {
BOOST_REQUIRE(!sstable_needs_split(sst));
validate(sst);
});

View File

@@ -1404,7 +1404,7 @@ SEASTAR_TEST_CASE(populate_from_quarantine_works) {
auto& cf = db.find_column_family("ks", "cf");
bool found = false;
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
auto sstables = in_strategy_sstables(ts);
auto sstables = co_await in_strategy_sstables(ts);
if (sstables.empty()) {
co_return;
}
@@ -1453,7 +1453,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
co_await db.invoke_on((shard + i) % smp::count, [&] (replica::database& db) -> future<> {
auto& cf = db.find_column_family("ks", "cf");
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
auto sstables = in_strategy_sstables(ts);
auto sstables = co_await in_strategy_sstables(ts);
if (sstables.empty()) {
co_return;
}
@@ -1689,7 +1689,7 @@ SEASTAR_TEST_CASE(test_drop_quarantined_sstables) {
auto& cf = _db.find_column_family("ks", "cf");
std::atomic<size_t> quarantined_on_shard = 0;
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
auto sstables = in_strategy_sstables(ts);
auto sstables = co_await in_strategy_sstables(ts);
if (sstables.empty()) {
co_return;
}

View File

@@ -51,11 +51,13 @@ public:
bool has_ongoing_compaction(compaction_group_view& table_s) const noexcept override {
return _has_ongoing_compaction;
}
virtual std::vector<sstables::shared_sstable> candidates(compaction_group_view& t) const override {
return boost::copy_range<std::vector<sstables::shared_sstable>>(*t.main_sstable_set().all());
virtual future<std::vector<sstables::shared_sstable>> candidates(compaction_group_view& t) const override {
auto main_set = co_await t.main_sstable_set();
co_return boost::copy_range<std::vector<sstables::shared_sstable>>(*main_set->all());
}
virtual std::vector<sstables::frozen_sstable_run> candidates_as_runs(compaction_group_view& t) const override {
return t.main_sstable_set().all_sstable_runs();
virtual future<std::vector<sstables::frozen_sstable_run>> candidates_as_runs(compaction_group_view& t) const override {
auto main_set = co_await t.main_sstable_set();
co_return main_set->all_sstable_runs();
}
};
@@ -135,7 +137,7 @@ SEASTAR_TEST_CASE(incremental_compaction_test) {
auto do_compaction = [&] (size_t expected_input, size_t expected_output) -> std::vector<shared_sstable> {
auto control = make_strategy_control_for_test(false);
auto desc = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
auto desc = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
// nothing to compact, move on.
if (desc.sstables.empty()) {
@@ -252,7 +254,7 @@ SEASTAR_THREAD_TEST_CASE(incremental_compaction_sag_test) {
auto& table_s = _cf.as_compaction_group_view();
auto control = make_strategy_control_for_test(false);
for (;;) {
auto desc = _ics.get_sstables_for_compaction(table_s, *control);
auto desc = _ics.get_sstables_for_compaction(table_s, *control).get();
// no more jobs, bailing out...
if (desc.sstables.empty()) {
break;
@@ -362,7 +364,7 @@ SEASTAR_TEST_CASE(basic_garbage_collection_test) {
options.emplace("tombstone_compaction_interval", "1");
sleep(2s).get();
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
BOOST_REQUIRE(descriptor.sstables.size() == 1);
BOOST_REQUIRE(descriptor.sstables.front() == sst);
}
@@ -372,7 +374,7 @@ SEASTAR_TEST_CASE(basic_garbage_collection_test) {
std::map<sstring, sstring> options;
options.emplace("tombstone_threshold", "0.5f");
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
BOOST_REQUIRE(descriptor.sstables.size() == 0);
}
// sstable which was recently created won't be included due to min interval
@@ -381,7 +383,7 @@ SEASTAR_TEST_CASE(basic_garbage_collection_test) {
options.emplace("tombstone_compaction_interval", "3600");
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
sstables::test(sst).set_data_file_write_time(db_clock::now());
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
BOOST_REQUIRE(descriptor.sstables.size() == 0);
}
// sstable which should not be included because of droppable ratio of 0.3, will actually be included
@@ -393,7 +395,7 @@ SEASTAR_TEST_CASE(basic_garbage_collection_test) {
options.emplace("unchecked_tombstone_compaction", "true");
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
sstables::test(sst).set_data_file_write_time(db_clock::now() - std::chrono::seconds(7200));
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
BOOST_REQUIRE(descriptor.sstables.size() == 1);
}
});
@@ -520,7 +522,7 @@ SEASTAR_TEST_CASE(gc_tombstone_with_grace_seconds_test) {
forward_jump_clocks(std::chrono::seconds{1});
auto control = make_strategy_control_for_test(false);
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
BOOST_REQUIRE_EQUAL(descriptor.sstables.size(), 1);
BOOST_REQUIRE_EQUAL(descriptor.sstables.front(), sst);
});

View File

@@ -125,12 +125,14 @@ public:
return _has_ongoing_compaction;
}
std::vector<sstables::shared_sstable> candidates(compaction_group_view& t) const override {
return _candidates_opt.value_or(*t.main_sstable_set().all() | std::ranges::to<std::vector>());
future<std::vector<sstables::shared_sstable>> candidates(compaction_group_view& t) const override {
auto main_set = co_await t.main_sstable_set();
co_return _candidates_opt.value_or(*main_set->all() | std::ranges::to<std::vector>());
}
std::vector<sstables::frozen_sstable_run> candidates_as_runs(compaction_group_view& t) const override {
return t.main_sstable_set().all_sstable_runs();
future<std::vector<sstables::frozen_sstable_run>> candidates_as_runs(compaction_group_view& t) const override {
auto main_set = co_await t.main_sstable_set();
co_return main_set->all_sstable_runs();
}
};
@@ -140,11 +142,11 @@ static std::unique_ptr<strategy_control> make_strategy_control_for_test(bool has
template <typename CompactionStrategy>
requires requires(CompactionStrategy cs, compaction_group_view& t, strategy_control& c) {
{ cs.get_sstables_for_compaction(t, c) } -> std::same_as<sstables::compaction_descriptor>;
{ cs.get_sstables_for_compaction(t, c) } -> std::same_as<future<sstables::compaction_descriptor>>;
}
static compaction_descriptor get_sstables_for_compaction(CompactionStrategy& cs, compaction_group_view& t, std::vector<shared_sstable> candidates) {
static future<compaction_descriptor> get_sstables_for_compaction(CompactionStrategy& cs, compaction_group_view& t, std::vector<shared_sstable> candidates) {
auto control = make_strategy_control_for_test(false, std::move(candidates));
return cs.get_sstables_for_compaction(t, *control);
co_return co_await cs.get_sstables_for_compaction(t, *control);
}
static void assert_table_sstable_count(table_for_tests& t, size_t expected_count) {
@@ -1958,7 +1960,7 @@ SEASTAR_TEST_CASE(size_tiered_beyond_max_threshold_test) {
sstables::test(sst).set_data_file_size(1);
candidates.push_back(std::move(sst));
}
auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(candidates));
auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(candidates)).get();
BOOST_REQUIRE(desc.sstables.size() == size_t(max_threshold));
});
}
@@ -2029,7 +2031,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, options);
// that's needed because sstable with expired data should be old enough.
sstables::test(sst).set_data_file_write_time(db_clock::time_point::min());
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst });
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst }).get();
BOOST_REQUIRE(descriptor.sstables.size() == 1);
BOOST_REQUIRE(descriptor.sstables.front() == sst);
@@ -2039,7 +2041,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
auto close_lcs_table = deferred_stop(lcs_table);
cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, options);
sst->set_sstable_level(1);
descriptor = get_sstables_for_compaction(cs, lcs_table.as_compaction_group_view(), { sst });
descriptor = get_sstables_for_compaction(cs, lcs_table.as_compaction_group_view(), { sst }).get();
BOOST_REQUIRE(descriptor.sstables.size() == 1);
BOOST_REQUIRE(descriptor.sstables.front() == sst);
// make sure sstable picked for tombstone compaction removal won't be promoted or demoted.
@@ -2049,10 +2051,10 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
auto twcs_table = env.make_table_for_tests(make_schema("twcs", sstables::compaction_strategy_type::time_window));
auto close_twcs_table = deferred_stop(twcs_table);
cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, {});
descriptor = get_sstables_for_compaction(cs, twcs_table.as_compaction_group_view(), { sst });
descriptor = get_sstables_for_compaction(cs, twcs_table.as_compaction_group_view(), { sst }).get();
BOOST_REQUIRE(descriptor.sstables.size() == 0);
cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, options);
descriptor = get_sstables_for_compaction(cs, twcs_table.as_compaction_group_view(), { sst });
descriptor = get_sstables_for_compaction(cs, twcs_table.as_compaction_group_view(), { sst }).get();
BOOST_REQUIRE(descriptor.sstables.size() == 1);
BOOST_REQUIRE(descriptor.sstables.front() == sst);
@@ -2061,7 +2063,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
std::map<sstring, sstring> options;
options.emplace("tombstone_threshold", "0.5f");
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, options);
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst });
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst }).get();
BOOST_REQUIRE(descriptor.sstables.size() == 0);
}
// sstable which was recently created won't be included due to min interval
@@ -2070,7 +2072,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
options.emplace("tombstone_compaction_interval", "3600");
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, options);
sstables::test(sst).set_data_file_write_time(db_clock::now());
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst });
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst }).get();
BOOST_REQUIRE(descriptor.sstables.size() == 0);
}
// sstable which should not be included because of droppable ratio of 0.3, will actually be included
@@ -2082,7 +2084,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
options.emplace("unchecked_tombstone_compaction", "true");
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, options);
sstables::test(sst).set_data_file_write_time(db_clock::now() - std::chrono::seconds(7200));
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst });
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst }).get();
BOOST_REQUIRE(descriptor.sstables.size() == 1);
}
});
@@ -2378,7 +2380,7 @@ public:
bool found_sstable = false;
foreach_compaction_group_view_with_thread(table, [&] (compaction::compaction_group_view& ts) {
auto sstables = in_strategy_sstables(ts);
auto sstables = in_strategy_sstables(ts).get();
if (sstables.empty()) {
return;
}
@@ -2429,7 +2431,7 @@ public:
bool found_sstable = false;
foreach_compaction_group_view_with_thread(table, [&] (compaction::compaction_group_view& ts) {
auto sstables = in_strategy_sstables(ts);
auto sstables = in_strategy_sstables(ts).get();
if (sstables.empty()) {
return;
}
@@ -2467,7 +2469,7 @@ void scrub_validate_corrupted_content(compress_sstable compress) {
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_GT(stats->validation_errors, 0);
BOOST_REQUIRE(sst->is_quarantined());
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
});
}
@@ -2503,7 +2505,7 @@ void scrub_validate_corrupted_file(compress_sstable compress) {
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_GT(stats->validation_errors, 0);
BOOST_REQUIRE(sst->is_quarantined());
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
});
}
@@ -2541,7 +2543,7 @@ void scrub_validate_corrupted_digest(compress_sstable compress) {
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_GT(stats->validation_errors, 0);
BOOST_REQUIRE(sst->is_quarantined());
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
});
}
@@ -2568,8 +2570,8 @@ void scrub_validate_no_digest(compress_sstable compress) {
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_EQUAL(stats->validation_errors, 0);
BOOST_REQUIRE(!sst->is_quarantined());
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().front(), sst);
BOOST_REQUIRE(!sst->get_checksum());
// Corrupt the data to cause an invalid checksum.
@@ -2586,7 +2588,7 @@ void scrub_validate_no_digest(compress_sstable compress) {
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_GT(stats->validation_errors, 0);
BOOST_REQUIRE(sst->is_quarantined());
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
});
}
@@ -2609,8 +2611,8 @@ void scrub_validate_valid(compress_sstable compress) {
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_EQUAL(stats->validation_errors, 0);
BOOST_REQUIRE(!sst->is_quarantined());
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().front(), sst);
});
}
@@ -2695,8 +2697,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_multiple_instances_unc
when_all_succeed(std::move(scrub1), std::move(scrub2)).get();
BOOST_REQUIRE(!sst->is_quarantined());
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().front(), sst);
// Checksum component released after scrub instances terminate.
BOOST_REQUIRE(sst->get_checksum() == nullptr);
@@ -2776,8 +2778,8 @@ void scrub_validate_cassandra_compat(const compression_parameters& cp, sstring s
BOOST_REQUIRE_GT(stats->validation_errors, 0);
}
BOOST_REQUIRE(!sst->is_quarantined());
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().front(), sst);
BOOST_REQUIRE(!sst->get_checksum());
});
}
@@ -2988,8 +2990,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_abort_mode_test) {
opts.operation_mode = sstables::compaction_type_options::scrub::mode::abort;
BOOST_REQUIRE_THROW(table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get(), sstables::compaction_aborted_exception);
BOOST_REQUIRE(in_strategy_sstables(ts).size() == 1);
BOOST_REQUIRE(in_strategy_sstables(ts).front() == sst);
BOOST_REQUIRE(in_strategy_sstables(ts).get().size() == 1);
BOOST_REQUIRE(in_strategy_sstables(ts).get().front() == sst);
});
}
@@ -3034,10 +3036,10 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_skip_mode_test) {
opts.operation_mode = sstables::compaction_type_options::scrub::mode::skip;
table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
BOOST_REQUIRE(in_strategy_sstables(ts).size() == 1);
BOOST_REQUIRE(in_strategy_sstables(ts).front() != sst);
BOOST_REQUIRE(in_strategy_sstables(ts).get().size() == 1);
BOOST_REQUIRE(in_strategy_sstables(ts).get().front() != sst);
verify_fragments(in_strategy_sstables(ts), test.env().make_reader_permit(), scrubbed_fragments);
verify_fragments(in_strategy_sstables(ts).get(), test.env().make_reader_permit(), scrubbed_fragments);
});
}
@@ -3075,9 +3077,9 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_segregate_mode_test) {
opts.operation_mode = sstables::compaction_type_options::scrub::mode::segregate;
table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(ts).size());
BOOST_REQUIRE(in_strategy_sstables(ts).size() > 1);
verify_fragments(in_strategy_sstables(ts), test.env().make_reader_permit(), explode(test.env().make_reader_permit(), muts));
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(ts).get().size());
BOOST_REQUIRE(in_strategy_sstables(ts).get().size() > 1);
verify_fragments(in_strategy_sstables(ts).get(), test.env().make_reader_permit(), explode(test.env().make_reader_permit(), muts));
});
}
@@ -3117,7 +3119,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_quarantine_mode_test) {
opts.operation_mode = sstables::compaction_type_options::scrub::mode::validate;
table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
BOOST_REQUIRE(sst->is_quarantined());
verify_fragments({sst}, permit, corrupt_fragments);
@@ -3132,13 +3134,13 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_quarantine_mode_test) {
case sstables::compaction_type_options::scrub::quarantine_mode::include:
case sstables::compaction_type_options::scrub::quarantine_mode::only:
// The sstable should be found and scrubbed when scrub::quarantine_mode is scrub::quarantine_mode::{include,only}
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(ts).size());
BOOST_REQUIRE(in_strategy_sstables(ts).size() > 1);
verify_fragments(in_strategy_sstables(ts), permit, scrubbed_fragments);
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(ts).get().size());
BOOST_REQUIRE(in_strategy_sstables(ts).get().size() > 1);
verify_fragments(in_strategy_sstables(ts).get(), permit, scrubbed_fragments);
break;
case sstables::compaction_type_options::scrub::quarantine_mode::exclude:
// The sstable should not be found when scrub::quarantine_mode is scrub::quarantine_mode::exclude
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
BOOST_REQUIRE(sst->is_quarantined());
verify_fragments({sst}, permit, corrupt_fragments);
break;
@@ -3404,24 +3406,24 @@ SEASTAR_TEST_CASE(scrubbed_sstable_removal_test) {
// add the sstable to cf's maintenance set
cf->add_sstable_and_update_cache(sst, sstables::offstrategy::yes).get();
auto& cf_ts = cf.as_compaction_group_view();
auto maintenance_sst_set = cf_ts.maintenance_sstable_set();
BOOST_REQUIRE_EQUAL(maintenance_sst_set.size(), 1);
BOOST_REQUIRE_EQUAL(*maintenance_sst_set.all()->begin(), sst);
auto maintenance_sst_set = cf_ts.maintenance_sstable_set().get();
BOOST_REQUIRE_EQUAL(maintenance_sst_set->size(), 1);
BOOST_REQUIRE_EQUAL(*maintenance_sst_set->all()->begin(), sst);
// confirm main sstable_set is empty
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().size(), 0);
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().get()->size(), 0);
// Perform scrub on the table
cf->get_compaction_manager().perform_sstable_scrub(cf_ts, {}, {}).get();
// main set should have the resultant sst and the maintenance set should be empty now
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().size(), 1);
BOOST_REQUIRE_EQUAL(cf_ts.maintenance_sstable_set().size(), 0);
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().get()->size(), 1);
BOOST_REQUIRE_EQUAL(cf_ts.maintenance_sstable_set().get()->size(), 0);
// Now that there is an sstable in main set, perform scrub on the table
// again to verify that the result ends up again in main sstable_set
cf->get_compaction_manager().perform_sstable_scrub(cf_ts, {}, {}).get();
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().size(), 1);
BOOST_REQUIRE_EQUAL(cf_ts.maintenance_sstable_set().size(), 0);
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().get()->size(), 1);
BOOST_REQUIRE_EQUAL(cf_ts.maintenance_sstable_set().get()->size(), 0);
});
}
@@ -3488,7 +3490,7 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
auto do_compaction = [&] (size_t expected_input, size_t expected_output) mutable -> std::vector<shared_sstable> {
auto input_ssts = std::vector<shared_sstable>(sstables.begin(), sstables.end());
auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(input_ssts));
auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(input_ssts)).get();
// nothing to compact, move on.
if (desc.sstables.empty()) {

View File

@@ -14,9 +14,11 @@ namespace sstables {
sstable_run_based_compaction_strategy_for_tests::sstable_run_based_compaction_strategy_for_tests() = default;
compaction_descriptor sstable_run_based_compaction_strategy_for_tests::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
future<compaction_descriptor>
sstable_run_based_compaction_strategy_for_tests::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
// Get unique runs from all uncompacting sstables
std::vector<frozen_sstable_run> runs = table_s.main_sstable_set().all_sstable_runs();
auto main_set = co_await table_s.main_sstable_set();
std::vector<frozen_sstable_run> runs = main_set->all_sstable_runs();
// Group similar sized runs into a bucket.
std::map<uint64_t, std::vector<frozen_sstable_run>> similar_sized_runs;
@@ -43,9 +45,9 @@ compaction_descriptor sstable_run_based_compaction_strategy_for_tests::get_sstab
| std::views::transform([] (auto& run) -> auto& { return run->all(); })
| std::views::join
| std::ranges::to<std::vector>();
return sstables::compaction_descriptor(std::move(all), 0, static_fragment_size_for_run);
co_return sstables::compaction_descriptor(std::move(all), 0, static_fragment_size_for_run);
}
return sstables::compaction_descriptor();
co_return sstables::compaction_descriptor();
}
future<int64_t> sstable_run_based_compaction_strategy_for_tests::estimated_pending_compactions(compaction_group_view& table_s) const {

View File

@@ -24,7 +24,7 @@ class sstable_run_based_compaction_strategy_for_tests : public compaction_strate
public:
sstable_run_based_compaction_strategy_for_tests();
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
virtual future<int64_t> estimated_pending_compactions(compaction_group_view& table_s) const override;

View File

@@ -113,7 +113,7 @@ future<compaction_result> compact_sstables(test_env& env, sstables::compaction_d
};
descriptor.replacer = std::move(replacer);
if (can_purge) {
descriptor.enable_garbage_collection(table_s.main_sstable_set());
descriptor.enable_garbage_collection(*co_await table_s.main_sstable_set());
}
sstables::compaction_result ret;
co_await run_compaction_task(env, descriptor.run_identifier, table_s, [&] (sstables::compaction_data& cdata) {

View File

@@ -63,14 +63,14 @@ public:
bool compaction_enforce_min_threshold() const noexcept override {
return true;
}
const sstables::sstable_set& main_sstable_set() const override {
return table().try_get_compaction_group_view_with_static_sharding().main_sstable_set();
future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override {
co_return co_await table().try_get_compaction_group_view_with_static_sharding().main_sstable_set();
}
const sstables::sstable_set& maintenance_sstable_set() const override {
return table().try_get_compaction_group_view_with_static_sharding().maintenance_sstable_set();
future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override {
co_return co_await table().try_get_compaction_group_view_with_static_sharding().maintenance_sstable_set();
}
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
return make_lw_shared<const sstables::sstable_set>(main_sstable_set());
return table().try_get_compaction_group_with_static_sharding()->main_sstables();
}
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
return sstables::get_fully_expired_sstables(*this, sstables, query_time);

View File

@@ -953,9 +953,9 @@ public:
virtual const schema_ptr& schema() const noexcept override { return _schema; }
virtual unsigned min_compaction_threshold() const noexcept override { return _schema->min_compaction_threshold(); }
virtual bool compaction_enforce_min_threshold() const noexcept override { return false; }
virtual const sstables::sstable_set& main_sstable_set() const override { return _main_set; }
virtual const sstables::sstable_set& maintenance_sstable_set() const override { return _maintenance_set; }
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(main_sstable_set()); }
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_main_set); }
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_maintenance_set); }
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(_main_set); }
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const override { return {}; }
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted_sstables; }
virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept override { return _compaction_strategy; }