The `compaction_strategy_state` class holds strategy specific state via a `std::variant` containing different state types. When a compaction strategy performs compaction, it retrieves a reference to its state from the `compaction_strategy_state` object. If the table's compaction strategy is ALTERed while a compaction is in progress, the `compaction_strategy_state` object gets replaced, destroying the old state. This leaves the ongoing compaction holding a dangling reference, resulting in a use after free. Fix this by using `seastar::shared_ptr` for the state variant alternatives(`leveled_compaction_strategy_state_ptr` and `time_window_compaction_strategy_state_ptr`). The compaction strategies now hold a copy of the shared_ptr, ensuring the state remains valid for the duration of the compaction even if the strategy is altered. The `compaction_strategy_state` itself is still passed by reference and only the variant alternatives use shared_ptrs. This allows ongoing compactions to retain ownership of the state independently of the wrapper's lifetime. Fixes #25913 Signed-off-by: Lakshmi Narayanan Sreethar <lakshmi.sreethar@scylladb.com>
264 lines
13 KiB
C++
264 lines
13 KiB
C++
/*
|
|
* Copyright (C) 2019-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "leveled_compaction_strategy.hh"
|
|
#include "leveled_manifest.hh"
|
|
#include "compaction_strategy_state.hh"
|
|
#include "sstables/sstable_set_impl.hh"
|
|
#include <algorithm>
|
|
|
|
namespace compaction {
|
|
|
|
leveled_compaction_strategy_state_ptr leveled_compaction_strategy::get_state(compaction_group_view& table_s) const {
|
|
return table_s.get_compaction_strategy_state().get<leveled_compaction_strategy_state_ptr>();
|
|
}
|
|
|
|
future<compaction_descriptor> leveled_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
|
|
auto state = get_state(table_s);
|
|
auto candidates = co_await control.candidates(table_s);
|
|
// NOTE: leveled_manifest creation may be slightly expensive, so later on,
|
|
// we may want to store it in the strategy itself. However, the sstable
|
|
// lists managed by the manifest may become outdated. For example, one
|
|
// sstable in it may be marked for deletion after compacted.
|
|
// Currently, we create a new manifest whenever it's time for compaction.
|
|
leveled_manifest manifest = leveled_manifest::create(table_s, candidates, _max_sstable_size_in_mb, _stcs_options);
|
|
if (!state->last_compacted_keys) {
|
|
generate_last_compacted_keys(*state, manifest);
|
|
}
|
|
auto candidate = manifest.get_compaction_candidates(*state->last_compacted_keys, state->compaction_counter);
|
|
|
|
if (!candidate.sstables.empty()) {
|
|
auto main_set = co_await table_s.main_sstable_set();
|
|
leveled_manifest::logger.debug("leveled: Compacting {} out of {} sstables", candidate.sstables.size(), main_set->size());
|
|
co_return candidate;
|
|
}
|
|
|
|
if (!table_s.tombstone_gc_enabled()) {
|
|
co_return compaction_descriptor();
|
|
}
|
|
|
|
// if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio
|
|
// unlike stcs, lcs can look for sstable with highest droppable tombstone ratio, so as not to choose
|
|
// a sstable which droppable data shadow data in older sstable, by starting from highest levels which
|
|
// theoretically contain oldest non-overlapping data.
|
|
auto compaction_time = gc_clock::now();
|
|
for (auto level = int(manifest.get_level_count()); level >= 0; level--) {
|
|
auto& sstables = manifest.get_level(level);
|
|
// filter out sstables which droppable tombstone ratio isn't greater than the defined threshold.
|
|
std::erase_if(sstables, [this, compaction_time, &table_s] (const sstables::shared_sstable& sst) -> bool {
|
|
return !worth_dropping_tombstones(sst, compaction_time, table_s);
|
|
});
|
|
if (sstables.empty()) {
|
|
continue;
|
|
}
|
|
auto& sst = *std::max_element(sstables.begin(), sstables.end(), [&] (auto& i, auto& j) {
|
|
auto ratio_i = i->estimate_droppable_tombstone_ratio(compaction_time, table_s.get_tombstone_gc_state(), table_s.schema());
|
|
auto ratio_j = j->estimate_droppable_tombstone_ratio(compaction_time, table_s.get_tombstone_gc_state(), table_s.schema());
|
|
return ratio_i < ratio_j;
|
|
});
|
|
co_return compaction_descriptor({ sst }, sst->get_sstable_level());
|
|
}
|
|
co_return compaction_descriptor();
|
|
}
|
|
|
|
compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(compaction_group_view& table_s, std::vector<sstables::shared_sstable> candidates) {
|
|
if (candidates.empty()) {
|
|
return compaction_descriptor();
|
|
}
|
|
|
|
auto max_sstable_size_in_bytes = _max_sstable_size_in_mb*1024*1024;
|
|
auto ideal_level = ideal_level_for_input(candidates, max_sstable_size_in_bytes);
|
|
return make_major_compaction_job(std::move(candidates),
|
|
ideal_level, max_sstable_size_in_bytes);
|
|
}
|
|
|
|
void leveled_compaction_strategy::notify_completion(compaction_group_view& table_s, const std::vector<sstables::shared_sstable>& removed, const std::vector<sstables::shared_sstable>& added) {
|
|
auto state = get_state(table_s);
|
|
// All the update here is only relevant for regular compaction's round-robin picking policy, and if
|
|
// last_compacted_keys wasn't generated by regular, it means regular is disabled since last restart,
|
|
// therefore we can skip the updates here until regular runs for the first time. Once it runs,
|
|
// it will be able to generate last_compacted_keys correctly by looking at metadata of files.
|
|
if (removed.empty() || added.empty() || !state->last_compacted_keys) {
|
|
return;
|
|
}
|
|
auto min_level = std::numeric_limits<uint32_t>::max();
|
|
for (auto& sstable : removed) {
|
|
min_level = std::min(min_level, sstable->get_sstable_level());
|
|
}
|
|
|
|
const sstables::sstable *last = nullptr;
|
|
int target_level = 0;
|
|
for (auto& candidate : added) {
|
|
if (!last || last->compare_by_first_key(*candidate) < 0) {
|
|
last = &*candidate;
|
|
}
|
|
target_level = std::max(target_level, int(candidate->get_sstable_level()));
|
|
}
|
|
state->last_compacted_keys.value().at(min_level) = last->get_last_decorated_key();
|
|
|
|
for (int i = leveled_manifest::MAX_LEVELS - 1; i > 0; i--) {
|
|
state->compaction_counter[i]++;
|
|
}
|
|
state->compaction_counter[target_level] = 0;
|
|
|
|
if (leveled_manifest::logger.level() == logging::log_level::debug) {
|
|
for (auto j = 0U; j < state->compaction_counter.size(); j++) {
|
|
leveled_manifest::logger.debug("CompactionCounter: {}: {}", j, state->compaction_counter[j]);
|
|
}
|
|
}
|
|
}
|
|
|
|
void leveled_compaction_strategy::generate_last_compacted_keys(leveled_compaction_strategy_state& state, leveled_manifest& manifest) {
|
|
std::vector<std::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
|
|
for (auto i = 0; i < leveled_manifest::MAX_LEVELS - 1; i++) {
|
|
if (manifest.get_level(i + 1).empty()) {
|
|
continue;
|
|
}
|
|
|
|
const sstables::sstable* sstable_with_last_compacted_key = nullptr;
|
|
std::optional<db_clock::time_point> max_creation_time;
|
|
for (auto& sst : manifest.get_level(i + 1)) {
|
|
auto wtime = sst->data_file_write_time();
|
|
if (!max_creation_time || wtime >= *max_creation_time) {
|
|
sstable_with_last_compacted_key = &*sst;
|
|
max_creation_time = wtime;
|
|
}
|
|
}
|
|
last_compacted_keys[i] = sstable_with_last_compacted_key->get_last_decorated_key();
|
|
}
|
|
state.last_compacted_keys = std::move(last_compacted_keys);
|
|
}
|
|
|
|
future<int64_t> leveled_compaction_strategy::estimated_pending_compactions(compaction_group_view& table_s) const {
|
|
std::vector<sstables::shared_sstable> sstables;
|
|
auto main_set = co_await table_s.main_sstable_set();
|
|
auto all_sstables = main_set->all();
|
|
sstables.reserve(all_sstables->size());
|
|
for (auto& entry : *all_sstables) {
|
|
sstables.push_back(entry);
|
|
}
|
|
co_return leveled_manifest::get_estimated_tasks(leveled_manifest::get_levels(sstables), _max_sstable_size_in_mb * 1024 * 1024);
|
|
}
|
|
|
|
compaction_descriptor
|
|
leveled_compaction_strategy::get_reshaping_job(std::vector<sstables::shared_sstable> input, schema_ptr schema, reshape_config cfg) const {
|
|
auto mode = cfg.mode;
|
|
std::array<std::vector<sstables::shared_sstable>, leveled_manifest::MAX_LEVELS> level_info;
|
|
|
|
auto is_disjoint = [schema] (const std::vector<sstables::shared_sstable>& sstables, unsigned tolerance) -> std::tuple<bool, unsigned> {
|
|
auto overlapping_sstables = sstable_set_overlapping_count(schema, sstables);
|
|
return { overlapping_sstables <= tolerance, overlapping_sstables };
|
|
};
|
|
|
|
auto max_sstable_size_in_bytes = _max_sstable_size_in_mb * 1024 * 1024;
|
|
|
|
clogger.debug("get_reshaping_job: mode={} input.size={} max_sstable_size_in_bytes={}", mode == reshape_mode::relaxed ? "relaxed" : "strict", input.size(), max_sstable_size_in_bytes);
|
|
|
|
for (auto& sst : input) {
|
|
auto sst_level = sst->get_sstable_level();
|
|
if (sst_level > leveled_manifest::MAX_LEVELS - 1) {
|
|
leveled_manifest::logger.warn("Found SSTable with level {}, higher than the maximum {}. This is unexpected, but will fix", sst_level, leveled_manifest::MAX_LEVELS - 1);
|
|
|
|
// This is really unexpected, so we'll just compact it all to fix it
|
|
auto ideal_level = ideal_level_for_input(input, max_sstable_size_in_bytes);
|
|
compaction_descriptor desc(std::move(input), ideal_level, max_sstable_size_in_bytes);
|
|
desc.options = compaction_type_options::make_reshape();
|
|
return desc;
|
|
}
|
|
level_info[sst_level].push_back(sst);
|
|
}
|
|
|
|
// Can't use std::ranges::views::drop due to https://bugs.llvm.org/show_bug.cgi?id=47509
|
|
for (auto i = level_info.begin(); i != level_info.end(); ++i) {
|
|
auto& level = *i;
|
|
std::sort(level.begin(), level.end(), [&schema] (const sstables::shared_sstable& a, const sstables::shared_sstable& b) {
|
|
return dht::ring_position(a->get_first_decorated_key()).less_compare(*schema, dht::ring_position(b->get_first_decorated_key()));
|
|
});
|
|
}
|
|
|
|
size_t offstrategy_threshold = (mode == reshape_mode::strict) ? std::max(schema->min_compaction_threshold(), 4) : std::max(schema->max_compaction_threshold(), 32);
|
|
auto tolerance = [mode] (unsigned level) -> unsigned {
|
|
if (mode == reshape_mode::strict) {
|
|
return 0;
|
|
}
|
|
constexpr unsigned fan_out = leveled_manifest::leveled_fan_out;
|
|
return std::max(double(fan_out), std::ceil(std::pow(fan_out, level) * 0.1));
|
|
};
|
|
|
|
// If there's only disjoint L0 sstables like on bootstrap, let's compact them all into a level L which has capacity to store the output.
|
|
// The best possible level can be calculated with the formula: log (base fan_out) of (L0_total_bytes / max_sstable_size)
|
|
auto [l0_disjoint, _] = is_disjoint(level_info[0], 0);
|
|
if (mode == reshape_mode::strict && level_info[0].size() >= offstrategy_threshold && level_info[0].size() == input.size() && l0_disjoint) {
|
|
unsigned ideal_level = ideal_level_for_input(level_info[0], max_sstable_size_in_bytes);
|
|
|
|
leveled_manifest::logger.info("Reshaping {} disjoint sstables in level 0 into level {}", level_info[0].size(), ideal_level);
|
|
compaction_descriptor desc(std::move(input), ideal_level, max_sstable_size_in_bytes);
|
|
desc.options = compaction_type_options::make_reshape();
|
|
return desc;
|
|
}
|
|
|
|
if (level_info[0].size() > offstrategy_threshold) {
|
|
size_tiered_compaction_strategy stcs(_stcs_options);
|
|
return stcs.get_reshaping_job(std::move(level_info[0]), schema, cfg);
|
|
}
|
|
|
|
for (unsigned level = leveled_manifest::MAX_LEVELS - 1; level > 0; --level) {
|
|
if (level_info[level].empty()) {
|
|
continue;
|
|
}
|
|
|
|
auto [disjoint, overlapping_sstables] = is_disjoint(level_info[level], tolerance(level));
|
|
if (!disjoint) {
|
|
leveled_manifest::logger.warn("Turns out that level {} is not disjoint, found {} overlapping SSTables, so the level will be entirely compacted on behalf of {}.{}", level, overlapping_sstables, schema->ks_name(), schema->cf_name());
|
|
compaction_descriptor desc(std::move(level_info[level]), level, max_sstable_size_in_bytes);
|
|
desc.options = compaction_type_options::make_reshape();
|
|
return desc;
|
|
}
|
|
}
|
|
|
|
return compaction_descriptor();
|
|
}
|
|
|
|
std::vector<compaction_descriptor>
|
|
leveled_compaction_strategy::get_cleanup_compaction_jobs(compaction_group_view& table_s, std::vector<sstables::shared_sstable> candidates) const {
|
|
std::vector<compaction_descriptor> ret;
|
|
|
|
auto levels = leveled_manifest::get_levels(candidates);
|
|
|
|
ret = size_tiered_compaction_strategy(_stcs_options).get_cleanup_compaction_jobs(table_s, std::move(levels[0]));
|
|
for (size_t level = 1; level < levels.size(); level++) {
|
|
if (levels[level].empty()) {
|
|
continue;
|
|
}
|
|
ret.push_back(compaction_descriptor(std::move(levels[level]), level, _max_sstable_size_in_mb * 1024 * 1024));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
unsigned leveled_compaction_strategy::ideal_level_for_input(const std::vector<sstables::shared_sstable>& input, uint64_t max_sstable_size) {
|
|
if (!max_sstable_size) {
|
|
return 1;
|
|
}
|
|
auto log_fanout = [fanout = leveled_manifest::leveled_fan_out] (double x) {
|
|
double inv_log_fanout = 1.0f / std::log(fanout);
|
|
return log(x) * inv_log_fanout;
|
|
};
|
|
uint64_t total_bytes = std::max(leveled_manifest::get_total_bytes(input), max_sstable_size);
|
|
return std::ceil(log_fanout((total_bytes + max_sstable_size - 1) / max_sstable_size));
|
|
}
|
|
|
|
leveled_compaction_strategy_state::leveled_compaction_strategy_state() {
|
|
compaction_counter.resize(leveled_manifest::MAX_LEVELS);
|
|
}
|
|
|
|
std::unique_ptr<sstables::sstable_set_impl> leveled_compaction_strategy::make_sstable_set(const compaction_group_view& ts) const {
|
|
return std::make_unique<sstables::partitioned_sstable_set>(ts.schema(), ts.token_range());
|
|
}
|
|
|
|
}
|