replica: Futurize retrieval of sstable sets in compaction_group_view
This will allow upcoming work to gently produce a sstable set for each compaction group view. Example: repaired and unrepaired. Locking strategy for compaction's sstable selection: Since sstable retrieval path became futurized, tasks in compaction manager will now hold the write lock (compaction_state::lock) when retrieving the sstable list, feeding them into compaction strategy, and finally registering selected sstables as compacting. The last step prevents another concurrent task from picking the same sstable. Previously, all those steps were atomic, but we have seen stall in that area in large installations, so futurization of that area would come sooner or later. Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
committed by
Botond Dénes
parent
20c3301a1a
commit
9d3755f276
@@ -38,8 +38,8 @@ public:
|
||||
// min threshold as defined by table.
|
||||
virtual unsigned min_compaction_threshold() const noexcept = 0;
|
||||
virtual bool compaction_enforce_min_threshold() const noexcept = 0;
|
||||
virtual const sstables::sstable_set& main_sstable_set() const = 0;
|
||||
virtual const sstables::sstable_set& maintenance_sstable_set() const = 0;
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const = 0;
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const = 0;
|
||||
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const = 0;
|
||||
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const = 0;
|
||||
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept = 0;
|
||||
|
||||
@@ -233,15 +233,17 @@ void compaction_manager::deregister_weight(int weight) {
|
||||
reevaluate_postponed_compactions();
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable> in_strategy_sstables(compaction_group_view& table_s) {
|
||||
auto sstables = table_s.main_sstable_set().all();
|
||||
return *sstables | std::views::filter([] (const sstables::shared_sstable& sst) {
|
||||
future<std::vector<sstables::shared_sstable>> in_strategy_sstables(compaction_group_view& table_s) {
|
||||
auto set = co_await table_s.main_sstable_set();
|
||||
auto sstables = set->all();
|
||||
co_return *sstables | std::views::filter([] (const sstables::shared_sstable& sst) {
|
||||
return sstables::is_eligible_for_compaction(sst);
|
||||
}) | std::ranges::to<std::vector>();
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable> compaction_manager::get_candidates(compaction_group_view& t) const {
|
||||
return get_candidates(t, *t.main_sstable_set().all());
|
||||
future<std::vector<sstables::shared_sstable>> compaction_manager::get_candidates(compaction_group_view& t) const {
|
||||
auto main_set = co_await t.main_sstable_set();
|
||||
co_return get_candidates(t, *main_set->all());
|
||||
}
|
||||
|
||||
bool compaction_manager::eligible_for_compaction(const sstables::shared_sstable& sstable) const {
|
||||
@@ -568,6 +570,8 @@ protected:
|
||||
|
||||
switch_state(state::pending);
|
||||
auto units = co_await acquire_semaphore(_cm._maintenance_ops_sem);
|
||||
// Write lock is used to synchronize selection of sstables for compaction and their registration.
|
||||
// Also used to synchronize with regular compaction, so major waits for regular to cease before selecting candidates.
|
||||
auto lock_holder = co_await _compaction_state.lock.hold_write_lock();
|
||||
if (!can_proceed()) {
|
||||
co_return std::nullopt;
|
||||
@@ -577,7 +581,7 @@ protected:
|
||||
// those are eligible for major compaction.
|
||||
compaction_group_view* t = _compacting_table;
|
||||
sstables::compaction_strategy cs = t->get_compaction_strategy();
|
||||
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, _cm.get_candidates(*t));
|
||||
sstables::compaction_descriptor descriptor = cs.get_major_compaction_job(*t, co_await _cm.get_candidates(*t));
|
||||
descriptor.gc_check_only_compacting_sstables = _consider_only_existing_data;
|
||||
auto compacting = compacting_sstable_registration(_cm, _cm.get_compaction_state(t), descriptor.sstables);
|
||||
auto on_replace = compacting.update_on_sstable_replacement();
|
||||
@@ -928,12 +932,14 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable> candidates(compaction_group_view& t) const override {
|
||||
return _cm.get_candidates(t, *t.main_sstable_set().all());
|
||||
future<std::vector<sstables::shared_sstable>> candidates(compaction_group_view& t) const override {
|
||||
auto main_set = co_await t.main_sstable_set();
|
||||
co_return _cm.get_candidates(t, *main_set->all());
|
||||
}
|
||||
|
||||
std::vector<sstables::frozen_sstable_run> candidates_as_runs(compaction_group_view& t) const override {
|
||||
return _cm.get_candidates(t, t.main_sstable_set().all_sstable_runs());
|
||||
future<std::vector<sstables::frozen_sstable_run>> candidates_as_runs(compaction_group_view& t) const override {
|
||||
auto main_set = co_await t.main_sstable_set();
|
||||
co_return _cm.get_candidates(t, main_set->all_sstable_runs());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1289,15 +1295,15 @@ protected:
|
||||
co_return std::nullopt;
|
||||
}
|
||||
switch_state(state::pending);
|
||||
// take read lock for table, so major and regular compaction can't proceed in parallel.
|
||||
auto lock_holder = co_await _compaction_state.lock.hold_read_lock();
|
||||
// Write lock is used to synchronize selection of sstables for compaction and their registration.
|
||||
auto lock_holder = co_await _compaction_state.lock.hold_write_lock();
|
||||
if (!can_proceed()) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
|
||||
compaction_group_view& t = *_compacting_table;
|
||||
sstables::compaction_strategy cs = t.get_compaction_strategy();
|
||||
sstables::compaction_descriptor descriptor = cs.get_sstables_for_compaction(t, _cm.get_strategy_control());
|
||||
sstables::compaction_descriptor descriptor = co_await cs.get_sstables_for_compaction(t, _cm.get_strategy_control());
|
||||
int weight = calculate_weight(descriptor);
|
||||
|
||||
if (descriptor.sstables.empty() || !can_proceed() || t.is_auto_compaction_disabled_by_user()) {
|
||||
@@ -1317,6 +1323,10 @@ protected:
|
||||
cmlog.debug("Accepted compaction job: task={} ({} sstable(s)) of weight {} for {}",
|
||||
fmt::ptr(this), descriptor.sstables.size(), weight, t);
|
||||
|
||||
// Finished selecting and registering compacting sstables, so write lock can be released.
|
||||
lock_holder.return_all();
|
||||
lock_holder = co_await _compaction_state.lock.hold_read_lock();
|
||||
|
||||
setup_new_compaction(descriptor.run_identifier);
|
||||
_compaction_state.last_regular_compaction = gc_clock::now();
|
||||
std::exception_ptr ex;
|
||||
@@ -1384,15 +1394,15 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
|
||||
cmlog.trace("maybe_wait_for_sstable_count_reduction in {}: cannot perform regular compaction", t);
|
||||
co_return;
|
||||
}
|
||||
auto num_runs_for_compaction = [&, this] {
|
||||
auto num_runs_for_compaction = [&, this] -> future<size_t> {
|
||||
auto& cs = t.get_compaction_strategy();
|
||||
auto desc = cs.get_sstables_for_compaction(t, get_strategy_control());
|
||||
return std::ranges::size(desc.sstables
|
||||
auto desc = co_await cs.get_sstables_for_compaction(t, get_strategy_control());
|
||||
co_return std::ranges::size(desc.sstables
|
||||
| std::views::transform(std::mem_fn(&sstables::sstable::run_identifier))
|
||||
| std::ranges::to<std::unordered_set>());
|
||||
};
|
||||
const auto threshold = size_t(std::max(schema->max_compaction_threshold(), 32));
|
||||
auto count = num_runs_for_compaction();
|
||||
auto count = co_await num_runs_for_compaction();
|
||||
if (count <= threshold) {
|
||||
cmlog.trace("No need to wait for sstable count reduction in {}: {} <= {}",
|
||||
t, count, threshold);
|
||||
@@ -1405,9 +1415,11 @@ future<> compaction_manager::maybe_wait_for_sstable_count_reduction(compaction_g
|
||||
auto start = db_clock::now();
|
||||
auto& cstate = get_compaction_state(&t);
|
||||
try {
|
||||
co_await cstate.compaction_done.wait([this, &num_runs_for_compaction, threshold, &t] {
|
||||
return num_runs_for_compaction() <= threshold || !can_perform_regular_compaction(t);
|
||||
});
|
||||
while (can_perform_regular_compaction(t) && co_await num_runs_for_compaction() > threshold) {
|
||||
co_await cstate.compaction_done.wait([this, &t] {
|
||||
return !can_perform_regular_compaction(t);
|
||||
});
|
||||
}
|
||||
} catch (const broken_condition_variable&) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1459,8 +1471,9 @@ private:
|
||||
|
||||
// Filter out sstables that require view building, to avoid a race between off-strategy
|
||||
// and view building. Refs: #11882
|
||||
auto get_reshape_candidates = [&t] () {
|
||||
return *t.maintenance_sstable_set().all()
|
||||
auto get_reshape_candidates = [&t] () -> future<std::vector<sstables::shared_sstable>> {
|
||||
auto maintenance_set = co_await t.maintenance_sstable_set();
|
||||
co_return *maintenance_set->all()
|
||||
| std::views::filter([](const sstables::shared_sstable &sst) {
|
||||
return !sst->requires_view_building();
|
||||
})
|
||||
@@ -1468,14 +1481,14 @@ private:
|
||||
};
|
||||
|
||||
auto get_next_job = [&] () -> future<std::optional<sstables::compaction_descriptor>> {
|
||||
auto candidates = get_reshape_candidates();
|
||||
auto candidates = co_await get_reshape_candidates();
|
||||
if (candidates.empty()) {
|
||||
co_return std::nullopt;
|
||||
}
|
||||
// all sstables added to maintenance set share the same underlying storage.
|
||||
auto& storage = candidates.front()->get_storage();
|
||||
sstables::reshape_config cfg = co_await sstables::make_reshape_config(storage, sstables::reshape_mode::strict);
|
||||
auto desc = t.get_compaction_strategy().get_reshaping_job(get_reshape_candidates(), t.schema(), cfg);
|
||||
auto desc = t.get_compaction_strategy().get_reshaping_job(co_await get_reshape_candidates(), t.schema(), cfg);
|
||||
co_return desc.sstables.size() ? std::make_optional(std::move(desc)) : std::nullopt;
|
||||
};
|
||||
|
||||
@@ -1502,7 +1515,7 @@ private:
|
||||
// user has aborted off-strategy. So we can only integrate them into the main set, such that
|
||||
// they become candidates for regular compaction. We cannot hold them forever in maintenance set,
|
||||
// as that causes read and space amplification issues.
|
||||
if (auto sstables = get_reshape_candidates(); sstables.size()) {
|
||||
if (auto sstables = co_await get_reshape_candidates(); sstables.size()) {
|
||||
auto completion_desc = sstables::compaction_completion_desc{
|
||||
.old_sstables = sstables, // removes from maintenance set.
|
||||
.new_sstables = sstables, // adds into main set.
|
||||
@@ -1514,6 +1527,11 @@ private:
|
||||
co_await coroutine::return_exception_ptr(std::move(err));
|
||||
}
|
||||
}
|
||||
|
||||
future<size_t> maintenance_set_size() const {
|
||||
auto maintenance_set = co_await _compacting_table->maintenance_sstable_set();
|
||||
co_return maintenance_set->size();
|
||||
}
|
||||
protected:
|
||||
virtual future<compaction_manager::compaction_stats_opt> do_run() override {
|
||||
co_await coroutine::switch_to(_cm.maintenance_sg());
|
||||
@@ -1532,7 +1550,7 @@ protected:
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
compaction_group_view& t = *_compacting_table;
|
||||
auto size = t.maintenance_sstable_set().size();
|
||||
auto size = co_await maintenance_set_size();
|
||||
if (!size) {
|
||||
cmlog.debug("Skipping off-strategy compaction for {}, No candidates were found", t);
|
||||
finish_compaction();
|
||||
@@ -1827,11 +1845,13 @@ private:
|
||||
|
||||
}
|
||||
|
||||
static std::vector<sstables::shared_sstable> get_all_sstables(compaction_group_view& t) {
|
||||
auto s = *t.main_sstable_set().all() | std::ranges::to<std::vector>();
|
||||
auto maintenance_set = t.maintenance_sstable_set().all();
|
||||
s.insert(s.end(), maintenance_set->begin(), maintenance_set->end());
|
||||
return s;
|
||||
static future<std::vector<sstables::shared_sstable>> get_all_sstables(compaction_group_view& t) {
|
||||
auto main_set = co_await t.main_sstable_set();
|
||||
auto maintenance_set = co_await t.maintenance_sstable_set();
|
||||
auto s = *main_set->all() | std::ranges::to<std::vector>();
|
||||
auto maintenance_sstables = maintenance_set->all();
|
||||
s.insert(s.end(), maintenance_sstables->begin(), maintenance_sstables->end());
|
||||
co_return s;
|
||||
}
|
||||
|
||||
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub_validate_mode(compaction_group_view& t, tasks::task_info info, quarantine_invalid_sstables quarantine_sstables) {
|
||||
@@ -1840,7 +1860,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
|
||||
co_return compaction_stats_opt{};
|
||||
}
|
||||
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
|
||||
auto all_sstables = get_all_sstables(t);
|
||||
auto all_sstables = co_await get_all_sstables(t);
|
||||
co_return co_await perform_compaction<validate_sstables_compaction_task_executor>(throw_if_stopping::no, info, &t, info.id, std::move(all_sstables), quarantine_sstables);
|
||||
}
|
||||
|
||||
@@ -2062,16 +2082,13 @@ future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_r
|
||||
|
||||
auto& cs = get_compaction_state(&t);
|
||||
co_await run_with_compaction_disabled(t, [&] () -> future<> {
|
||||
auto update_sstables_cleanup_state = [&] (const sstables::sstable_set& set) -> future<> {
|
||||
// Hold on to the sstable set since it may be overwritten
|
||||
// while we yield in this loop.
|
||||
auto set_holder = set.shared_from_this();
|
||||
co_await set.for_each_sstable_gently([&] (const sstables::shared_sstable& sst) {
|
||||
auto update_sstables_cleanup_state = [&] (lw_shared_ptr<const sstables::sstable_set> set) -> future<> {
|
||||
co_await set->for_each_sstable_gently([&] (const sstables::shared_sstable& sst) {
|
||||
update_sstable_cleanup_state(t, sst, *sorted_owned_ranges);
|
||||
});
|
||||
};
|
||||
co_await update_sstables_cleanup_state(t.main_sstable_set());
|
||||
co_await update_sstables_cleanup_state(t.maintenance_sstable_set());
|
||||
co_await update_sstables_cleanup_state(co_await t.main_sstable_set());
|
||||
co_await update_sstables_cleanup_state(co_await t.maintenance_sstable_set());
|
||||
|
||||
// Some sstables may remain in sstables_requiring_cleanup
|
||||
// for later processing if they can't be cleaned up right now.
|
||||
@@ -2086,7 +2103,8 @@ future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_r
|
||||
co_return;
|
||||
}
|
||||
|
||||
auto found_maintenance_sstables = bool(t.maintenance_sstable_set().for_each_sstable_until([this, &t] (const sstables::shared_sstable& sst) {
|
||||
auto maintenance_set = co_await t.maintenance_sstable_set();
|
||||
auto found_maintenance_sstables = bool(maintenance_set->for_each_sstable_until([this, &t] (const sstables::shared_sstable& sst) {
|
||||
return stop_iteration(requires_cleanup(t, sst));
|
||||
}));
|
||||
if (found_maintenance_sstables) {
|
||||
@@ -2108,12 +2126,12 @@ future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_r
|
||||
|
||||
// Submit a table to be upgraded and wait for its termination.
|
||||
future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_owned_ranges, compaction_group_view& t, bool exclude_current_version, tasks::task_info info) {
|
||||
auto get_sstables = [this, &t, exclude_current_version] {
|
||||
auto get_sstables = [this, &t, exclude_current_version] () -> future<std::vector<sstables::shared_sstable>> {
|
||||
std::vector<sstables::shared_sstable> tables;
|
||||
|
||||
auto last_version = t.get_sstables_manager().get_highest_supported_format();
|
||||
|
||||
for (auto& sst : get_candidates(t)) {
|
||||
for (auto& sst : co_await get_candidates(t)) {
|
||||
// if we are a "normal" upgrade, we only care about
|
||||
// tables with older versions, but potentially
|
||||
// we are to actually rewrite everything. (-a)
|
||||
@@ -2122,7 +2140,7 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own
|
||||
}
|
||||
}
|
||||
|
||||
return make_ready_future<std::vector<sstables::shared_sstable>>(tables);
|
||||
co_return std::move(tables);
|
||||
};
|
||||
|
||||
// doing a "cleanup" is about as compacting as we need
|
||||
@@ -2135,8 +2153,8 @@ future<> compaction_manager::perform_sstable_upgrade(owned_ranges_ptr sorted_own
|
||||
}
|
||||
|
||||
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_split_compaction(compaction_group_view& t, sstables::compaction_type_options::split opt, tasks::task_info info) {
|
||||
auto get_sstables = [this, &t] {
|
||||
return make_ready_future<std::vector<sstables::shared_sstable>>(get_candidates(t));
|
||||
auto get_sstables = [this, &t] () -> future<std::vector<sstables::shared_sstable>> {
|
||||
return get_candidates(t);
|
||||
};
|
||||
owned_ranges_ptr owned_ranges_ptr = {};
|
||||
auto options = sstables::compaction_type_options::make_split(std::move(opt.classifier));
|
||||
@@ -2176,8 +2194,8 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
|
||||
}
|
||||
owned_ranges_ptr owned_ranges_ptr = {};
|
||||
sstring option_desc = fmt::format("mode: {};\nquarantine_mode: {}\n", opts.operation_mode, opts.quarantine_operation_mode);
|
||||
return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] {
|
||||
auto all_sstables = get_all_sstables(t);
|
||||
return rewrite_sstables(t, sstables::compaction_type_options::make_scrub(scrub_mode), std::move(owned_ranges_ptr), [&t, opts] -> future<std::vector<sstables::shared_sstable>> {
|
||||
auto all_sstables = co_await get_all_sstables(t);
|
||||
std::vector<sstables::shared_sstable> sstables = all_sstables
|
||||
| std::views::filter([&opts] (const sstables::shared_sstable& sst) {
|
||||
if (sst->requires_view_building()) {
|
||||
@@ -2194,7 +2212,7 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sst
|
||||
on_internal_error(cmlog, "bad scrub quarantine mode");
|
||||
})
|
||||
| std::ranges::to<std::vector>();
|
||||
return make_ready_future<std::vector<sstables::shared_sstable>>(std::move(sstables));
|
||||
co_return std::vector<sstables::shared_sstable>(std::move(sstables));
|
||||
}, info, can_purge_tombstones::no, std::move(option_desc));
|
||||
}
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ private:
|
||||
void deregister_weight(int weight);
|
||||
|
||||
// Get candidates for compaction strategy, which are all sstables but the ones being compacted.
|
||||
std::vector<sstables::shared_sstable> get_candidates(compaction::compaction_group_view& t) const;
|
||||
future<std::vector<sstables::shared_sstable>> get_candidates(compaction::compaction_group_view& t) const;
|
||||
|
||||
bool eligible_for_compaction(const sstables::shared_sstable& sstable) const;
|
||||
bool eligible_for_compaction(const sstables::frozen_sstable_run& sstable_run) const;
|
||||
@@ -638,4 +638,4 @@ struct fmt::formatter<compaction::compaction_task_executor> {
|
||||
bool needs_cleanup(const sstables::shared_sstable& sst, const dht::token_range_vector& owned_ranges);
|
||||
|
||||
// Return all sstables but those that are off-strategy like the ones in maintenance set and staging dir.
|
||||
std::vector<sstables::shared_sstable> in_strategy_sstables(compaction::compaction_group_view& table_s);
|
||||
future<std::vector<sstables::shared_sstable>> in_strategy_sstables(compaction::compaction_group_view& table_s);
|
||||
|
||||
@@ -27,7 +27,10 @@ struct compaction_state {
|
||||
// and by any function running under run_with_compaction_disabled().
|
||||
seastar::named_gate gate;
|
||||
|
||||
// Prevents table from running major and minor compaction at the same time.
|
||||
// Used for synchronizing selection of sstable for compaction.
|
||||
// Write lock is held when getting sstable list, feeding them into strategy, and registering compacting sstables.
|
||||
// The lock prevents two concurrent compaction tasks from picking the same sstables. And it also helps major
|
||||
// to synchronize with minor, such that major doesn't miss any sstable.
|
||||
seastar::rwlock lock;
|
||||
|
||||
// Raised by any function running under run_with_compaction_disabled();
|
||||
|
||||
@@ -581,8 +581,8 @@ struct null_backlog_tracker final : public compaction_backlog_tracker::impl {
|
||||
//
|
||||
class null_compaction_strategy : public compaction_strategy_impl {
|
||||
public:
|
||||
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override {
|
||||
return sstables::compaction_descriptor();
|
||||
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override {
|
||||
return make_ready_future<sstables::compaction_descriptor>();
|
||||
}
|
||||
|
||||
virtual future<int64_t> estimated_pending_compactions(compaction_group_view& table_s) const override {
|
||||
@@ -700,7 +700,7 @@ compaction_strategy_type compaction_strategy::type() const {
|
||||
return _compaction_strategy_impl->type();
|
||||
}
|
||||
|
||||
compaction_descriptor compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
future<compaction_descriptor> compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
return _compaction_strategy_impl->get_sstables_for_compaction(table_s, control);
|
||||
}
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ public:
|
||||
compaction_strategy& operator=(compaction_strategy&&);
|
||||
|
||||
// Return a list of sstables to be compacted after applying the strategy.
|
||||
compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control);
|
||||
future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control);
|
||||
|
||||
compaction_descriptor get_major_compaction_job(compaction_group_view& table_s, std::vector<shared_sstable> candidates);
|
||||
|
||||
|
||||
@@ -45,7 +45,7 @@ protected:
|
||||
uint64_t max_sstable_bytes = compaction_descriptor::default_max_sstable_bytes);
|
||||
public:
|
||||
virtual ~compaction_strategy_impl() {}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) = 0;
|
||||
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) = 0;
|
||||
virtual compaction_descriptor get_major_compaction_job(compaction_group_view& table_s, std::vector<sstables::shared_sstable> candidates) {
|
||||
return make_major_compaction_job(std::move(candidates));
|
||||
}
|
||||
|
||||
@@ -318,9 +318,9 @@ incremental_compaction_strategy::find_garbage_collection_job(const compaction::c
|
||||
return compaction_descriptor(runs_to_sstables(std::move(input)), 0, _fragment_size);
|
||||
}
|
||||
|
||||
compaction_descriptor
|
||||
future<compaction_descriptor>
|
||||
incremental_compaction_strategy::get_sstables_for_compaction(compaction_group_view& t, strategy_control& control) {
|
||||
auto candidates = control.candidates_as_runs(t);
|
||||
auto candidates = co_await control.candidates_as_runs(t);
|
||||
|
||||
// make local copies so they can't be changed out from under us mid-method
|
||||
size_t min_threshold = t.min_compaction_threshold();
|
||||
@@ -330,28 +330,28 @@ incremental_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
|
||||
|
||||
if (is_any_bucket_interesting(buckets, min_threshold)) {
|
||||
std::vector<sstables::frozen_sstable_run> most_interesting = most_interesting_bucket(std::move(buckets), min_threshold, max_threshold);
|
||||
return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
|
||||
co_return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
|
||||
}
|
||||
// If we are not enforcing min_threshold explicitly, try any pair of sstable runs in the same tier.
|
||||
if (!t.compaction_enforce_min_threshold() && is_any_bucket_interesting(buckets, 2)) {
|
||||
std::vector<sstables::frozen_sstable_run> most_interesting = most_interesting_bucket(std::move(buckets), 2, max_threshold);
|
||||
return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
|
||||
co_return sstables::compaction_descriptor(runs_to_sstables(std::move(most_interesting)), 0, _fragment_size);
|
||||
}
|
||||
|
||||
// The cross-tier behavior is only triggered once we're done with all the pending same-tier compaction to
|
||||
// increase overall efficiency.
|
||||
if (control.has_ongoing_compaction(t)) {
|
||||
return sstables::compaction_descriptor();
|
||||
co_return sstables::compaction_descriptor();
|
||||
}
|
||||
|
||||
auto desc = find_garbage_collection_job(t, buckets);
|
||||
if (!desc.sstables.empty()) {
|
||||
return desc;
|
||||
co_return desc;
|
||||
}
|
||||
|
||||
if (_space_amplification_goal) {
|
||||
if (buckets.size() < 2) {
|
||||
return sstables::compaction_descriptor();
|
||||
co_return sstables::compaction_descriptor();
|
||||
}
|
||||
// Let S0 be the size of largest tier
|
||||
// Let S1 be the size of second-largest tier,
|
||||
@@ -383,12 +383,12 @@ incremental_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
|
||||
cross_tier_input.reserve(cross_tier_input.size() + s1.size());
|
||||
std::move(s1.begin(), s1.end(), std::back_inserter(cross_tier_input));
|
||||
|
||||
return sstables::compaction_descriptor(runs_to_sstables(std::move(cross_tier_input)),
|
||||
co_return sstables::compaction_descriptor(runs_to_sstables(std::move(cross_tier_input)),
|
||||
0, _fragment_size);
|
||||
}
|
||||
}
|
||||
|
||||
return sstables::compaction_descriptor();
|
||||
co_return sstables::compaction_descriptor();
|
||||
}
|
||||
|
||||
compaction_descriptor
|
||||
@@ -404,7 +404,8 @@ future<int64_t> incremental_compaction_strategy::estimated_pending_compactions(c
|
||||
size_t max_threshold = t.schema()->max_compaction_threshold();
|
||||
int64_t n = 0;
|
||||
|
||||
for (auto& bucket : get_buckets(t.main_sstable_set().all_sstable_runs())) {
|
||||
auto main_set = co_await t.main_sstable_set();
|
||||
for (auto& bucket : get_buckets(main_set->all_sstable_runs())) {
|
||||
if (bucket.size() >= min_threshold) {
|
||||
n += (bucket.size() + max_threshold - 1) / max_threshold;
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ public:
|
||||
|
||||
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& t, strategy_control& control) override;
|
||||
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& t, strategy_control& control) override;
|
||||
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(compaction_group_view& t, std::vector<shared_sstable> candidates) const override;
|
||||
|
||||
|
||||
@@ -17,9 +17,9 @@ leveled_compaction_strategy_state& leveled_compaction_strategy::get_state(compac
|
||||
return table_s.get_compaction_strategy_state().get<leveled_compaction_strategy_state>();
|
||||
}
|
||||
|
||||
compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
future<compaction_descriptor> leveled_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
auto& state = get_state(table_s);
|
||||
auto candidates = control.candidates(table_s);
|
||||
auto candidates = co_await control.candidates(table_s);
|
||||
// NOTE: leveled_manifest creation may be slightly expensive, so later on,
|
||||
// we may want to store it in the strategy itself. However, the sstable
|
||||
// lists managed by the manifest may become outdated. For example, one
|
||||
@@ -32,12 +32,13 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
|
||||
auto candidate = manifest.get_compaction_candidates(*state.last_compacted_keys, state.compaction_counter);
|
||||
|
||||
if (!candidate.sstables.empty()) {
|
||||
leveled_manifest::logger.debug("leveled: Compacting {} out of {} sstables", candidate.sstables.size(), table_s.main_sstable_set().all()->size());
|
||||
return candidate;
|
||||
auto main_set = co_await table_s.main_sstable_set();
|
||||
leveled_manifest::logger.debug("leveled: Compacting {} out of {} sstables", candidate.sstables.size(), main_set->size());
|
||||
co_return candidate;
|
||||
}
|
||||
|
||||
if (!table_s.tombstone_gc_enabled()) {
|
||||
return compaction_descriptor();
|
||||
co_return compaction_descriptor();
|
||||
}
|
||||
|
||||
// if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio
|
||||
@@ -59,9 +60,9 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
|
||||
auto ratio_j = j->estimate_droppable_tombstone_ratio(compaction_time, table_s.get_tombstone_gc_state(), table_s.schema());
|
||||
return ratio_i < ratio_j;
|
||||
});
|
||||
return sstables::compaction_descriptor({ sst }, sst->get_sstable_level());
|
||||
co_return sstables::compaction_descriptor({ sst }, sst->get_sstable_level());
|
||||
}
|
||||
return {};
|
||||
co_return compaction_descriptor();
|
||||
}
|
||||
|
||||
compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(compaction_group_view& table_s, std::vector<sstables::shared_sstable> candidates) {
|
||||
@@ -134,7 +135,8 @@ void leveled_compaction_strategy::generate_last_compacted_keys(leveled_compactio
|
||||
|
||||
future<int64_t> leveled_compaction_strategy::estimated_pending_compactions(compaction_group_view& table_s) const {
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
auto all_sstables = table_s.main_sstable_set().all();
|
||||
auto main_set = co_await table_s.main_sstable_set();
|
||||
auto all_sstables = main_set->all();
|
||||
sstables.reserve(all_sstables->size());
|
||||
for (auto& entry : *all_sstables) {
|
||||
sstables.push_back(entry);
|
||||
|
||||
@@ -49,7 +49,7 @@ public:
|
||||
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
|
||||
|
||||
leveled_compaction_strategy(const std::map<sstring, sstring>& options);
|
||||
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
|
||||
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
|
||||
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(compaction_group_view& table_s, std::vector<shared_sstable> candidates) const override;
|
||||
|
||||
|
||||
@@ -207,13 +207,13 @@ size_tiered_compaction_strategy::most_interesting_bucket(std::vector<std::vector
|
||||
return std::move(max);
|
||||
}
|
||||
|
||||
compaction_descriptor
|
||||
future<compaction_descriptor>
|
||||
size_tiered_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
// make local copies so they can't be changed out from under us mid-method
|
||||
int min_threshold = table_s.min_compaction_threshold();
|
||||
int max_threshold = table_s.schema()->max_compaction_threshold();
|
||||
auto compaction_time = gc_clock::now();
|
||||
auto candidates = control.candidates(table_s);
|
||||
auto candidates = co_await control.candidates(table_s);
|
||||
|
||||
// TODO: Add support to filter cold sstables (for reference: SizeTieredCompactionStrategy::filterColdSSTables).
|
||||
|
||||
@@ -221,17 +221,17 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
|
||||
|
||||
if (is_any_bucket_interesting(buckets, min_threshold)) {
|
||||
std::vector<sstables::shared_sstable> most_interesting = most_interesting_bucket(std::move(buckets), min_threshold, max_threshold);
|
||||
return sstables::compaction_descriptor(std::move(most_interesting));
|
||||
co_return sstables::compaction_descriptor(std::move(most_interesting));
|
||||
}
|
||||
|
||||
// If we are not enforcing min_threshold explicitly, try any pair of SStables in the same tier.
|
||||
if (!table_s.compaction_enforce_min_threshold() && is_any_bucket_interesting(buckets, 2)) {
|
||||
std::vector<sstables::shared_sstable> most_interesting = most_interesting_bucket(std::move(buckets), 2, max_threshold);
|
||||
return sstables::compaction_descriptor(std::move(most_interesting));
|
||||
co_return sstables::compaction_descriptor(std::move(most_interesting));
|
||||
}
|
||||
|
||||
if (!table_s.tombstone_gc_enabled()) {
|
||||
return compaction_descriptor();
|
||||
co_return compaction_descriptor();
|
||||
}
|
||||
|
||||
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
|
||||
@@ -250,9 +250,9 @@ size_tiered_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
|
||||
auto it = std::min_element(sstables.begin(), sstables.end(), [] (auto& i, auto& j) {
|
||||
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
|
||||
});
|
||||
return sstables::compaction_descriptor({ *it });
|
||||
co_return sstables::compaction_descriptor({ *it });
|
||||
}
|
||||
return sstables::compaction_descriptor();
|
||||
co_return sstables::compaction_descriptor();
|
||||
}
|
||||
|
||||
int64_t size_tiered_compaction_strategy::estimated_pending_compactions(const std::vector<sstables::shared_sstable>& sstables,
|
||||
@@ -271,7 +271,8 @@ future<int64_t> size_tiered_compaction_strategy::estimated_pending_compactions(c
|
||||
int max_threshold = table_s.schema()->max_compaction_threshold();
|
||||
std::vector<sstables::shared_sstable> sstables;
|
||||
|
||||
auto all_sstables = table_s.main_sstable_set().all();
|
||||
auto main_set = co_await table_s.main_sstable_set();
|
||||
auto all_sstables = main_set->all();
|
||||
sstables.reserve(all_sstables->size());
|
||||
for (auto& entry : *all_sstables) {
|
||||
sstables.push_back(entry);
|
||||
|
||||
@@ -75,7 +75,7 @@ public:
|
||||
explicit size_tiered_compaction_strategy(const size_tiered_compaction_strategy_options& options);
|
||||
static void validate_options(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options);
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
|
||||
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
|
||||
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(compaction_group_view& table_s, std::vector<shared_sstable> candidates) const override;
|
||||
|
||||
|
||||
@@ -19,8 +19,8 @@ class strategy_control {
|
||||
public:
|
||||
virtual ~strategy_control() {}
|
||||
virtual bool has_ongoing_compaction(compaction_group_view& table_s) const noexcept = 0;
|
||||
virtual std::vector<sstables::shared_sstable> candidates(compaction_group_view&) const = 0;
|
||||
virtual std::vector<sstables::frozen_sstable_run> candidates_as_runs(compaction_group_view&) const = 0;
|
||||
virtual future<std::vector<sstables::shared_sstable>> candidates(compaction_group_view&) const = 0;
|
||||
virtual future<std::vector<sstables::frozen_sstable_run>> candidates_as_runs(compaction_group_view&) const = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@@ -332,14 +332,14 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
return compaction_descriptor();
|
||||
}
|
||||
|
||||
compaction_descriptor
|
||||
future<compaction_descriptor>
|
||||
time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
auto& state = get_state(table_s);
|
||||
auto compaction_time = gc_clock::now();
|
||||
auto candidates = control.candidates(table_s);
|
||||
auto candidates = co_await control.candidates(table_s);
|
||||
|
||||
if (candidates.empty()) {
|
||||
return compaction_descriptor();
|
||||
co_return compaction_descriptor();
|
||||
}
|
||||
|
||||
auto now = db_clock::now();
|
||||
@@ -350,7 +350,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
|
||||
auto expired = table_s.fully_expired_sstables(candidates, compaction_time);
|
||||
if (!expired.empty()) {
|
||||
clogger.debug("[{}] Going to compact {} expired sstables", fmt::ptr(this), expired.size());
|
||||
return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()));
|
||||
co_return compaction_descriptor(has_only_fully_expired::yes, std::vector<shared_sstable>(expired.begin(), expired.end()));
|
||||
}
|
||||
// Keep checking for fully_expired_sstables until we don't find
|
||||
// any among the candidates, meaning they are either already compacted
|
||||
@@ -362,7 +362,7 @@ time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_vi
|
||||
|
||||
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time);
|
||||
clogger.debug("[{}] Going to compact {} non-expired sstables", fmt::ptr(this), compaction_candidates.size());
|
||||
return compaction_descriptor(std::move(compaction_candidates));
|
||||
co_return compaction_descriptor(std::move(compaction_candidates));
|
||||
}
|
||||
|
||||
time_window_compaction_strategy::bucket_compaction_mode
|
||||
@@ -519,7 +519,8 @@ future<int64_t> time_window_compaction_strategy::estimated_pending_compactions(c
|
||||
auto& state = get_state(table_s);
|
||||
auto min_threshold = table_s.min_compaction_threshold();
|
||||
auto max_threshold = table_s.schema()->max_compaction_threshold();
|
||||
auto candidate_sstables = *table_s.main_sstable_set().all() | std::ranges::to<std::vector>();
|
||||
auto main_set = co_await table_s.main_sstable_set();
|
||||
auto candidate_sstables = *main_set->all() | std::ranges::to<std::vector>();
|
||||
auto [buckets, max_timestamp] = get_buckets(std::move(candidate_sstables), _options);
|
||||
|
||||
int64_t n = 0;
|
||||
|
||||
@@ -81,7 +81,7 @@ public:
|
||||
enum class bucket_compaction_mode { none, size_tiered, major };
|
||||
public:
|
||||
time_window_compaction_strategy(const std::map<sstring, sstring>& options);
|
||||
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
|
||||
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
|
||||
|
||||
virtual std::vector<compaction_descriptor> get_cleanup_compaction_jobs(compaction_group_view& table_s, std::vector<shared_sstable> candidates) const override;
|
||||
|
||||
|
||||
@@ -2467,11 +2467,11 @@ public:
|
||||
bool compaction_enforce_min_threshold() const noexcept override {
|
||||
return _t.get_config().compaction_enforce_min_threshold || _t._is_bootstrap_or_replace;
|
||||
}
|
||||
const sstables::sstable_set& main_sstable_set() const override {
|
||||
return *_cg.main_sstables();
|
||||
future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override {
|
||||
co_return _cg.main_sstables();
|
||||
}
|
||||
const sstables::sstable_set& maintenance_sstable_set() const override {
|
||||
return *_cg.maintenance_sstables();
|
||||
future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override {
|
||||
co_return _cg.maintenance_sstables();
|
||||
}
|
||||
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
|
||||
return _t.sstable_set_for_tombstone_gc(_cg);
|
||||
|
||||
@@ -101,9 +101,9 @@ public:
|
||||
virtual const schema_ptr& schema() const noexcept override { return _schema; }
|
||||
virtual unsigned min_compaction_threshold() const noexcept override { return _schema->min_compaction_threshold(); }
|
||||
virtual bool compaction_enforce_min_threshold() const noexcept override { return false; }
|
||||
virtual const sstables::sstable_set& main_sstable_set() const override { return _main_set; }
|
||||
virtual const sstables::sstable_set& maintenance_sstable_set() const override { return _maintenance_set; }
|
||||
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(main_sstable_set()); }
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_main_set); }
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_maintenance_set); }
|
||||
virtual lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(_main_set); }
|
||||
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const override { return {}; }
|
||||
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted_sstables; }
|
||||
virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept override { return _compaction_strategy; }
|
||||
@@ -164,8 +164,8 @@ SEASTAR_TEST_CASE(basic_compaction_group_splitting_test) {
|
||||
auto ret = cm.perform_split_compaction(*compaction_group, sstables::compaction_type_options::split{classifier}, tasks::task_info{}).get();
|
||||
BOOST_REQUIRE_EQUAL(ret->start_size, expected_compaction_size);
|
||||
|
||||
BOOST_REQUIRE(compaction_group->main_sstable_set().size() == expected_output);
|
||||
compaction_group->main_sstable_set().for_each_sstable([&] (const sstables::shared_sstable& sst) {
|
||||
BOOST_REQUIRE(compaction_group->main_sstable_set().get()->size() == expected_output);
|
||||
compaction_group->main_sstable_set().get()->for_each_sstable([&] (const sstables::shared_sstable& sst) {
|
||||
BOOST_REQUIRE(!sstable_needs_split(sst));
|
||||
validate(sst);
|
||||
});
|
||||
|
||||
@@ -1404,7 +1404,7 @@ SEASTAR_TEST_CASE(populate_from_quarantine_works) {
|
||||
auto& cf = db.find_column_family("ks", "cf");
|
||||
bool found = false;
|
||||
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
||||
auto sstables = in_strategy_sstables(ts);
|
||||
auto sstables = co_await in_strategy_sstables(ts);
|
||||
if (sstables.empty()) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1453,7 +1453,7 @@ SEASTAR_TEST_CASE(snapshot_with_quarantine_works) {
|
||||
co_await db.invoke_on((shard + i) % smp::count, [&] (replica::database& db) -> future<> {
|
||||
auto& cf = db.find_column_family("ks", "cf");
|
||||
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
||||
auto sstables = in_strategy_sstables(ts);
|
||||
auto sstables = co_await in_strategy_sstables(ts);
|
||||
if (sstables.empty()) {
|
||||
co_return;
|
||||
}
|
||||
@@ -1689,7 +1689,7 @@ SEASTAR_TEST_CASE(test_drop_quarantined_sstables) {
|
||||
auto& cf = _db.find_column_family("ks", "cf");
|
||||
std::atomic<size_t> quarantined_on_shard = 0;
|
||||
co_await cf.parallel_foreach_compaction_group_view([&] (compaction::compaction_group_view& ts) -> future<> {
|
||||
auto sstables = in_strategy_sstables(ts);
|
||||
auto sstables = co_await in_strategy_sstables(ts);
|
||||
if (sstables.empty()) {
|
||||
co_return;
|
||||
}
|
||||
|
||||
@@ -51,11 +51,13 @@ public:
|
||||
bool has_ongoing_compaction(compaction_group_view& table_s) const noexcept override {
|
||||
return _has_ongoing_compaction;
|
||||
}
|
||||
virtual std::vector<sstables::shared_sstable> candidates(compaction_group_view& t) const override {
|
||||
return boost::copy_range<std::vector<sstables::shared_sstable>>(*t.main_sstable_set().all());
|
||||
virtual future<std::vector<sstables::shared_sstable>> candidates(compaction_group_view& t) const override {
|
||||
auto main_set = co_await t.main_sstable_set();
|
||||
co_return boost::copy_range<std::vector<sstables::shared_sstable>>(*main_set->all());
|
||||
}
|
||||
virtual std::vector<sstables::frozen_sstable_run> candidates_as_runs(compaction_group_view& t) const override {
|
||||
return t.main_sstable_set().all_sstable_runs();
|
||||
virtual future<std::vector<sstables::frozen_sstable_run>> candidates_as_runs(compaction_group_view& t) const override {
|
||||
auto main_set = co_await t.main_sstable_set();
|
||||
co_return main_set->all_sstable_runs();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -135,7 +137,7 @@ SEASTAR_TEST_CASE(incremental_compaction_test) {
|
||||
|
||||
auto do_compaction = [&] (size_t expected_input, size_t expected_output) -> std::vector<shared_sstable> {
|
||||
auto control = make_strategy_control_for_test(false);
|
||||
auto desc = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
|
||||
auto desc = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
||||
|
||||
// nothing to compact, move on.
|
||||
if (desc.sstables.empty()) {
|
||||
@@ -252,7 +254,7 @@ SEASTAR_THREAD_TEST_CASE(incremental_compaction_sag_test) {
|
||||
auto& table_s = _cf.as_compaction_group_view();
|
||||
auto control = make_strategy_control_for_test(false);
|
||||
for (;;) {
|
||||
auto desc = _ics.get_sstables_for_compaction(table_s, *control);
|
||||
auto desc = _ics.get_sstables_for_compaction(table_s, *control).get();
|
||||
// no more jobs, bailing out...
|
||||
if (desc.sstables.empty()) {
|
||||
break;
|
||||
@@ -362,7 +364,7 @@ SEASTAR_TEST_CASE(basic_garbage_collection_test) {
|
||||
options.emplace("tombstone_compaction_interval", "1");
|
||||
sleep(2s).get();
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
||||
BOOST_REQUIRE(descriptor.sstables.front() == sst);
|
||||
}
|
||||
@@ -372,7 +374,7 @@ SEASTAR_TEST_CASE(basic_garbage_collection_test) {
|
||||
std::map<sstring, sstring> options;
|
||||
options.emplace("tombstone_threshold", "0.5f");
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 0);
|
||||
}
|
||||
// sstable which was recently created won't be included due to min interval
|
||||
@@ -381,7 +383,7 @@ SEASTAR_TEST_CASE(basic_garbage_collection_test) {
|
||||
options.emplace("tombstone_compaction_interval", "3600");
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
sstables::test(sst).set_data_file_write_time(db_clock::now());
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 0);
|
||||
}
|
||||
// sstable which should not be included because of droppable ratio of 0.3, will actually be included
|
||||
@@ -393,7 +395,7 @@ SEASTAR_TEST_CASE(basic_garbage_collection_test) {
|
||||
options.emplace("unchecked_tombstone_compaction", "true");
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
sstables::test(sst).set_data_file_write_time(db_clock::now() - std::chrono::seconds(7200));
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
||||
}
|
||||
});
|
||||
@@ -520,7 +522,7 @@ SEASTAR_TEST_CASE(gc_tombstone_with_grace_seconds_test) {
|
||||
forward_jump_clocks(std::chrono::seconds{1});
|
||||
auto control = make_strategy_control_for_test(false);
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::incremental, options);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control);
|
||||
auto descriptor = cs.get_sstables_for_compaction(cf.as_compaction_group_view(), *control).get();
|
||||
BOOST_REQUIRE_EQUAL(descriptor.sstables.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(descriptor.sstables.front(), sst);
|
||||
});
|
||||
|
||||
@@ -125,12 +125,14 @@ public:
|
||||
return _has_ongoing_compaction;
|
||||
}
|
||||
|
||||
std::vector<sstables::shared_sstable> candidates(compaction_group_view& t) const override {
|
||||
return _candidates_opt.value_or(*t.main_sstable_set().all() | std::ranges::to<std::vector>());
|
||||
future<std::vector<sstables::shared_sstable>> candidates(compaction_group_view& t) const override {
|
||||
auto main_set = co_await t.main_sstable_set();
|
||||
co_return _candidates_opt.value_or(*main_set->all() | std::ranges::to<std::vector>());
|
||||
}
|
||||
|
||||
std::vector<sstables::frozen_sstable_run> candidates_as_runs(compaction_group_view& t) const override {
|
||||
return t.main_sstable_set().all_sstable_runs();
|
||||
future<std::vector<sstables::frozen_sstable_run>> candidates_as_runs(compaction_group_view& t) const override {
|
||||
auto main_set = co_await t.main_sstable_set();
|
||||
co_return main_set->all_sstable_runs();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -140,11 +142,11 @@ static std::unique_ptr<strategy_control> make_strategy_control_for_test(bool has
|
||||
|
||||
template <typename CompactionStrategy>
|
||||
requires requires(CompactionStrategy cs, compaction_group_view& t, strategy_control& c) {
|
||||
{ cs.get_sstables_for_compaction(t, c) } -> std::same_as<sstables::compaction_descriptor>;
|
||||
{ cs.get_sstables_for_compaction(t, c) } -> std::same_as<future<sstables::compaction_descriptor>>;
|
||||
}
|
||||
static compaction_descriptor get_sstables_for_compaction(CompactionStrategy& cs, compaction_group_view& t, std::vector<shared_sstable> candidates) {
|
||||
static future<compaction_descriptor> get_sstables_for_compaction(CompactionStrategy& cs, compaction_group_view& t, std::vector<shared_sstable> candidates) {
|
||||
auto control = make_strategy_control_for_test(false, std::move(candidates));
|
||||
return cs.get_sstables_for_compaction(t, *control);
|
||||
co_return co_await cs.get_sstables_for_compaction(t, *control);
|
||||
}
|
||||
|
||||
static void assert_table_sstable_count(table_for_tests& t, size_t expected_count) {
|
||||
@@ -1958,7 +1960,7 @@ SEASTAR_TEST_CASE(size_tiered_beyond_max_threshold_test) {
|
||||
sstables::test(sst).set_data_file_size(1);
|
||||
candidates.push_back(std::move(sst));
|
||||
}
|
||||
auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(candidates));
|
||||
auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(candidates)).get();
|
||||
BOOST_REQUIRE(desc.sstables.size() == size_t(max_threshold));
|
||||
});
|
||||
}
|
||||
@@ -2029,7 +2031,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, options);
|
||||
// that's needed because sstable with expired data should be old enough.
|
||||
sstables::test(sst).set_data_file_write_time(db_clock::time_point::min());
|
||||
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst });
|
||||
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst }).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
||||
BOOST_REQUIRE(descriptor.sstables.front() == sst);
|
||||
|
||||
@@ -2039,7 +2041,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
|
||||
auto close_lcs_table = deferred_stop(lcs_table);
|
||||
cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, options);
|
||||
sst->set_sstable_level(1);
|
||||
descriptor = get_sstables_for_compaction(cs, lcs_table.as_compaction_group_view(), { sst });
|
||||
descriptor = get_sstables_for_compaction(cs, lcs_table.as_compaction_group_view(), { sst }).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
||||
BOOST_REQUIRE(descriptor.sstables.front() == sst);
|
||||
// make sure sstable picked for tombstone compaction removal won't be promoted or demoted.
|
||||
@@ -2049,10 +2051,10 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
|
||||
auto twcs_table = env.make_table_for_tests(make_schema("twcs", sstables::compaction_strategy_type::time_window));
|
||||
auto close_twcs_table = deferred_stop(twcs_table);
|
||||
cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, {});
|
||||
descriptor = get_sstables_for_compaction(cs, twcs_table.as_compaction_group_view(), { sst });
|
||||
descriptor = get_sstables_for_compaction(cs, twcs_table.as_compaction_group_view(), { sst }).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 0);
|
||||
cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::time_window, options);
|
||||
descriptor = get_sstables_for_compaction(cs, twcs_table.as_compaction_group_view(), { sst });
|
||||
descriptor = get_sstables_for_compaction(cs, twcs_table.as_compaction_group_view(), { sst }).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
||||
BOOST_REQUIRE(descriptor.sstables.front() == sst);
|
||||
|
||||
@@ -2061,7 +2063,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
|
||||
std::map<sstring, sstring> options;
|
||||
options.emplace("tombstone_threshold", "0.5f");
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, options);
|
||||
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst });
|
||||
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst }).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 0);
|
||||
}
|
||||
// sstable which was recently created won't be included due to min interval
|
||||
@@ -2070,7 +2072,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
|
||||
options.emplace("tombstone_compaction_interval", "3600");
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, options);
|
||||
sstables::test(sst).set_data_file_write_time(db_clock::now());
|
||||
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst });
|
||||
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst }).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 0);
|
||||
}
|
||||
// sstable which should not be included because of droppable ratio of 0.3, will actually be included
|
||||
@@ -2082,7 +2084,7 @@ SEASTAR_TEST_CASE(sstable_expired_data_ratio) {
|
||||
options.emplace("unchecked_tombstone_compaction", "true");
|
||||
auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::size_tiered, options);
|
||||
sstables::test(sst).set_data_file_write_time(db_clock::now() - std::chrono::seconds(7200));
|
||||
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst });
|
||||
auto descriptor = get_sstables_for_compaction(cs, stcs_table.as_compaction_group_view(), { sst }).get();
|
||||
BOOST_REQUIRE(descriptor.sstables.size() == 1);
|
||||
}
|
||||
});
|
||||
@@ -2378,7 +2380,7 @@ public:
|
||||
|
||||
bool found_sstable = false;
|
||||
foreach_compaction_group_view_with_thread(table, [&] (compaction::compaction_group_view& ts) {
|
||||
auto sstables = in_strategy_sstables(ts);
|
||||
auto sstables = in_strategy_sstables(ts).get();
|
||||
if (sstables.empty()) {
|
||||
return;
|
||||
}
|
||||
@@ -2429,7 +2431,7 @@ public:
|
||||
|
||||
bool found_sstable = false;
|
||||
foreach_compaction_group_view_with_thread(table, [&] (compaction::compaction_group_view& ts) {
|
||||
auto sstables = in_strategy_sstables(ts);
|
||||
auto sstables = in_strategy_sstables(ts).get();
|
||||
if (sstables.empty()) {
|
||||
return;
|
||||
}
|
||||
@@ -2467,7 +2469,7 @@ void scrub_validate_corrupted_content(compress_sstable compress) {
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_GT(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2503,7 +2505,7 @@ void scrub_validate_corrupted_file(compress_sstable compress) {
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_GT(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2541,7 +2543,7 @@ void scrub_validate_corrupted_digest(compress_sstable compress) {
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_GT(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2568,8 +2570,8 @@ void scrub_validate_no_digest(compress_sstable compress) {
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_EQUAL(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(!sst->is_quarantined());
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().front(), sst);
|
||||
BOOST_REQUIRE(!sst->get_checksum());
|
||||
|
||||
// Corrupt the data to cause an invalid checksum.
|
||||
@@ -2586,7 +2588,7 @@ void scrub_validate_no_digest(compress_sstable compress) {
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_GT(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2609,8 +2611,8 @@ void scrub_validate_valid(compress_sstable compress) {
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_EQUAL(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(!sst->is_quarantined());
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().front(), sst);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2695,8 +2697,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_multiple_instances_unc
|
||||
when_all_succeed(std::move(scrub1), std::move(scrub2)).get();
|
||||
|
||||
BOOST_REQUIRE(!sst->is_quarantined());
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().front(), sst);
|
||||
// Checksum component released after scrub instances terminate.
|
||||
BOOST_REQUIRE(sst->get_checksum() == nullptr);
|
||||
|
||||
@@ -2776,8 +2778,8 @@ void scrub_validate_cassandra_compat(const compression_parameters& cp, sstring s
|
||||
BOOST_REQUIRE_GT(stats->validation_errors, 0);
|
||||
}
|
||||
BOOST_REQUIRE(!sst->is_quarantined());
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).get().front(), sst);
|
||||
BOOST_REQUIRE(!sst->get_checksum());
|
||||
});
|
||||
}
|
||||
@@ -2988,8 +2990,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_abort_mode_test) {
|
||||
opts.operation_mode = sstables::compaction_type_options::scrub::mode::abort;
|
||||
BOOST_REQUIRE_THROW(table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get(), sstables::compaction_aborted_exception);
|
||||
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).size() == 1);
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).front() == sst);
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().size() == 1);
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().front() == sst);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3034,10 +3036,10 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_skip_mode_test) {
|
||||
opts.operation_mode = sstables::compaction_type_options::scrub::mode::skip;
|
||||
table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).size() == 1);
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).front() != sst);
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().size() == 1);
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().front() != sst);
|
||||
|
||||
verify_fragments(in_strategy_sstables(ts), test.env().make_reader_permit(), scrubbed_fragments);
|
||||
verify_fragments(in_strategy_sstables(ts).get(), test.env().make_reader_permit(), scrubbed_fragments);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3075,9 +3077,9 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_segregate_mode_test) {
|
||||
opts.operation_mode = sstables::compaction_type_options::scrub::mode::segregate;
|
||||
table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
|
||||
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(ts).size());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).size() > 1);
|
||||
verify_fragments(in_strategy_sstables(ts), test.env().make_reader_permit(), explode(test.env().make_reader_permit(), muts));
|
||||
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(ts).get().size());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().size() > 1);
|
||||
verify_fragments(in_strategy_sstables(ts).get(), test.env().make_reader_permit(), explode(test.env().make_reader_permit(), muts));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3117,7 +3119,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_quarantine_mode_test) {
|
||||
opts.operation_mode = sstables::compaction_type_options::scrub::mode::validate;
|
||||
table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
verify_fragments({sst}, permit, corrupt_fragments);
|
||||
|
||||
@@ -3132,13 +3134,13 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_quarantine_mode_test) {
|
||||
case sstables::compaction_type_options::scrub::quarantine_mode::include:
|
||||
case sstables::compaction_type_options::scrub::quarantine_mode::only:
|
||||
// The sstable should be found and scrubbed when scrub::quarantine_mode is scrub::quarantine_mode::{include,only}
|
||||
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(ts).size());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).size() > 1);
|
||||
verify_fragments(in_strategy_sstables(ts), permit, scrubbed_fragments);
|
||||
testlog.info("Scrub resulted in {} sstables", in_strategy_sstables(ts).get().size());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().size() > 1);
|
||||
verify_fragments(in_strategy_sstables(ts).get(), permit, scrubbed_fragments);
|
||||
break;
|
||||
case sstables::compaction_type_options::scrub::quarantine_mode::exclude:
|
||||
// The sstable should not be found when scrub::quarantine_mode is scrub::quarantine_mode::exclude
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).get().empty());
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
verify_fragments({sst}, permit, corrupt_fragments);
|
||||
break;
|
||||
@@ -3404,24 +3406,24 @@ SEASTAR_TEST_CASE(scrubbed_sstable_removal_test) {
|
||||
// add the sstable to cf's maintenance set
|
||||
cf->add_sstable_and_update_cache(sst, sstables::offstrategy::yes).get();
|
||||
auto& cf_ts = cf.as_compaction_group_view();
|
||||
auto maintenance_sst_set = cf_ts.maintenance_sstable_set();
|
||||
BOOST_REQUIRE_EQUAL(maintenance_sst_set.size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(*maintenance_sst_set.all()->begin(), sst);
|
||||
auto maintenance_sst_set = cf_ts.maintenance_sstable_set().get();
|
||||
BOOST_REQUIRE_EQUAL(maintenance_sst_set->size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(*maintenance_sst_set->all()->begin(), sst);
|
||||
// confirm main sstable_set is empty
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().get()->size(), 0);
|
||||
|
||||
// Perform scrub on the table
|
||||
cf->get_compaction_manager().perform_sstable_scrub(cf_ts, {}, {}).get();
|
||||
|
||||
// main set should have the resultant sst and the maintenance set should be empty now
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.maintenance_sstable_set().size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().get()->size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.maintenance_sstable_set().get()->size(), 0);
|
||||
|
||||
// Now that there is an sstable in main set, perform scrub on the table
|
||||
// again to verify that the result ends up again in main sstable_set
|
||||
cf->get_compaction_manager().perform_sstable_scrub(cf_ts, {}, {}).get();
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.maintenance_sstable_set().size(), 0);
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.main_sstable_set().get()->size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(cf_ts.maintenance_sstable_set().get()->size(), 0);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3488,7 +3490,7 @@ SEASTAR_TEST_CASE(sstable_run_based_compaction_test) {
|
||||
|
||||
auto do_compaction = [&] (size_t expected_input, size_t expected_output) mutable -> std::vector<shared_sstable> {
|
||||
auto input_ssts = std::vector<shared_sstable>(sstables.begin(), sstables.end());
|
||||
auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(input_ssts));
|
||||
auto desc = get_sstables_for_compaction(cs, cf.as_compaction_group_view(), std::move(input_ssts)).get();
|
||||
|
||||
// nothing to compact, move on.
|
||||
if (desc.sstables.empty()) {
|
||||
|
||||
@@ -14,9 +14,11 @@ namespace sstables {
|
||||
|
||||
sstable_run_based_compaction_strategy_for_tests::sstable_run_based_compaction_strategy_for_tests() = default;
|
||||
|
||||
compaction_descriptor sstable_run_based_compaction_strategy_for_tests::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
future<compaction_descriptor>
|
||||
sstable_run_based_compaction_strategy_for_tests::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
||||
// Get unique runs from all uncompacting sstables
|
||||
std::vector<frozen_sstable_run> runs = table_s.main_sstable_set().all_sstable_runs();
|
||||
auto main_set = co_await table_s.main_sstable_set();
|
||||
std::vector<frozen_sstable_run> runs = main_set->all_sstable_runs();
|
||||
|
||||
// Group similar sized runs into a bucket.
|
||||
std::map<uint64_t, std::vector<frozen_sstable_run>> similar_sized_runs;
|
||||
@@ -43,9 +45,9 @@ compaction_descriptor sstable_run_based_compaction_strategy_for_tests::get_sstab
|
||||
| std::views::transform([] (auto& run) -> auto& { return run->all(); })
|
||||
| std::views::join
|
||||
| std::ranges::to<std::vector>();
|
||||
return sstables::compaction_descriptor(std::move(all), 0, static_fragment_size_for_run);
|
||||
co_return sstables::compaction_descriptor(std::move(all), 0, static_fragment_size_for_run);
|
||||
}
|
||||
return sstables::compaction_descriptor();
|
||||
co_return sstables::compaction_descriptor();
|
||||
}
|
||||
|
||||
future<int64_t> sstable_run_based_compaction_strategy_for_tests::estimated_pending_compactions(compaction_group_view& table_s) const {
|
||||
|
||||
@@ -24,7 +24,7 @@ class sstable_run_based_compaction_strategy_for_tests : public compaction_strate
|
||||
public:
|
||||
sstable_run_based_compaction_strategy_for_tests();
|
||||
|
||||
virtual compaction_descriptor get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
|
||||
virtual future<compaction_descriptor> get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) override;
|
||||
|
||||
virtual future<int64_t> estimated_pending_compactions(compaction_group_view& table_s) const override;
|
||||
|
||||
|
||||
@@ -113,7 +113,7 @@ future<compaction_result> compact_sstables(test_env& env, sstables::compaction_d
|
||||
};
|
||||
descriptor.replacer = std::move(replacer);
|
||||
if (can_purge) {
|
||||
descriptor.enable_garbage_collection(table_s.main_sstable_set());
|
||||
descriptor.enable_garbage_collection(*co_await table_s.main_sstable_set());
|
||||
}
|
||||
sstables::compaction_result ret;
|
||||
co_await run_compaction_task(env, descriptor.run_identifier, table_s, [&] (sstables::compaction_data& cdata) {
|
||||
|
||||
@@ -63,14 +63,14 @@ public:
|
||||
bool compaction_enforce_min_threshold() const noexcept override {
|
||||
return true;
|
||||
}
|
||||
const sstables::sstable_set& main_sstable_set() const override {
|
||||
return table().try_get_compaction_group_view_with_static_sharding().main_sstable_set();
|
||||
future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override {
|
||||
co_return co_await table().try_get_compaction_group_view_with_static_sharding().main_sstable_set();
|
||||
}
|
||||
const sstables::sstable_set& maintenance_sstable_set() const override {
|
||||
return table().try_get_compaction_group_view_with_static_sharding().maintenance_sstable_set();
|
||||
future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override {
|
||||
co_return co_await table().try_get_compaction_group_view_with_static_sharding().maintenance_sstable_set();
|
||||
}
|
||||
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override {
|
||||
return make_lw_shared<const sstables::sstable_set>(main_sstable_set());
|
||||
return table().try_get_compaction_group_with_static_sharding()->main_sstables();
|
||||
}
|
||||
std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point query_time) const override {
|
||||
return sstables::get_fully_expired_sstables(*this, sstables, query_time);
|
||||
|
||||
@@ -953,9 +953,9 @@ public:
|
||||
virtual const schema_ptr& schema() const noexcept override { return _schema; }
|
||||
virtual unsigned min_compaction_threshold() const noexcept override { return _schema->min_compaction_threshold(); }
|
||||
virtual bool compaction_enforce_min_threshold() const noexcept override { return false; }
|
||||
virtual const sstables::sstable_set& main_sstable_set() const override { return _main_set; }
|
||||
virtual const sstables::sstable_set& maintenance_sstable_set() const override { return _maintenance_set; }
|
||||
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(main_sstable_set()); }
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> main_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_main_set); }
|
||||
virtual future<lw_shared_ptr<const sstables::sstable_set>> maintenance_sstable_set() const override { co_return make_lw_shared<const sstables::sstable_set>(_maintenance_set); }
|
||||
lw_shared_ptr<const sstables::sstable_set> sstable_set_for_tombstone_gc() const override { return make_lw_shared<const sstables::sstable_set>(_main_set); }
|
||||
virtual std::unordered_set<sstables::shared_sstable> fully_expired_sstables(const std::vector<sstables::shared_sstable>& sstables, gc_clock::time_point compaction_time) const override { return {}; }
|
||||
virtual const std::vector<sstables::shared_sstable>& compacted_undeleted_sstables() const noexcept override { return _compacted_undeleted_sstables; }
|
||||
virtual sstables::compaction_strategy& get_compaction_strategy() const noexcept override { return _compaction_strategy; }
|
||||
|
||||
Reference in New Issue
Block a user