Files
scylladb/compaction/leveled_compaction_strategy.cc
Raphael S. Carvalho c25d8f6770 compaction: Move decision of garbage collection from strategy to task type
For compaction to be able to purge expired data, like tombstones, a
sstable set snapshot is set in the compaction descriptor.

That's a decision that belongs to task type. For example, all regular
compaction enable GC, whereas scrub for example doesn't for safety
reasons.

The problem is that the decision is being made by every instantiation
of compaction_descriptor in the strategies, which is both unnecessary
and also adds lots of boilerplate to the code, making it hard to
understand and work with.

As sstable set snapshot is an implementation detail, a new method
is being added to compaction_descriptor to make the intention
clearer, making the interface easier to understand.

can_purge_tombstones, used previously by rewrite task only, is being
reused for communicating GC intention into task::compact_sstables().

The boilerplate was a pain when adding a new strategy method for
the ongoing work on cleanup, described by issue #10097.
Another benefit is that we'll now only create a set snapshot when
compaction will really run. Before, it could happen that the snapshot
would be discarded if the compaction attempt had to be postponed,
which is a waste of cpu cycles.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
2022-03-21 12:14:04 -03:00

220 lines
11 KiB
C++

/*
* Copyright (C) 2019-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "leveled_compaction_strategy.hh"
#include "leveled_manifest.hh"
#include <algorithm>
#include <boost/range/algorithm/remove_if.hpp>
namespace sstables {
compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(table_state& table_s, strategy_control& control, std::vector<sstables::shared_sstable> candidates) {
// NOTE: leveled_manifest creation may be slightly expensive, so later on,
// we may want to store it in the strategy itself. However, the sstable
// lists managed by the manifest may become outdated. For example, one
// sstable in it may be marked for deletion after compacted.
// Currently, we create a new manifest whenever it's time for compaction.
leveled_manifest manifest = leveled_manifest::create(table_s, candidates, _max_sstable_size_in_mb, _stcs_options);
if (!_last_compacted_keys) {
generate_last_compacted_keys(manifest);
}
auto candidate = manifest.get_compaction_candidates(*_last_compacted_keys, _compaction_counter);
if (!candidate.sstables.empty()) {
leveled_manifest::logger.debug("leveled: Compacting {} out of {} sstables", candidate.sstables.size(), table_s.get_sstable_set().all()->size());
return candidate;
}
// if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio
// unlike stcs, lcs can look for sstable with highest droppable tombstone ratio, so as not to choose
// a sstable which droppable data shadow data in older sstable, by starting from highest levels which
// theoretically contain oldest non-overlapping data.
auto compaction_time = gc_clock::now();
for (auto level = int(manifest.get_level_count()); level >= 0; level--) {
auto& sstables = manifest.get_level(level);
// filter out sstables which droppable tombstone ratio isn't greater than the defined threshold.
auto e = boost::range::remove_if(sstables, [this, compaction_time] (const sstables::shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, compaction_time);
});
sstables.erase(e, sstables.end());
if (sstables.empty()) {
continue;
}
auto& sst = *std::max_element(sstables.begin(), sstables.end(), [&] (auto& i, auto& j) {
auto gc_before1 = i->get_gc_before_for_drop_estimation(compaction_time);
auto gc_before2 = j->get_gc_before_for_drop_estimation(compaction_time);
return i->estimate_droppable_tombstone_ratio(gc_before1) < j->estimate_droppable_tombstone_ratio(gc_before2);
});
return sstables::compaction_descriptor({ sst }, service::get_local_compaction_priority(), sst->get_sstable_level());
}
return {};
}
compaction_descriptor leveled_compaction_strategy::get_major_compaction_job(table_state& table_s, std::vector<sstables::shared_sstable> candidates) {
if (candidates.empty()) {
return compaction_descriptor();
}
auto& sst = *std::max_element(candidates.begin(), candidates.end(), [&] (sstables::shared_sstable& sst1, sstables::shared_sstable& sst2) {
return sst1->get_sstable_level() < sst2->get_sstable_level();
});
return compaction_descriptor(std::move(candidates), service::get_local_compaction_priority(),
sst->get_sstable_level(), _max_sstable_size_in_mb*1024*1024);
}
void leveled_compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
if (removed.empty() || added.empty()) {
return;
}
auto min_level = std::numeric_limits<uint32_t>::max();
for (auto& sstable : removed) {
min_level = std::min(min_level, sstable->get_sstable_level());
}
const sstables::sstable *last = nullptr;
int target_level = 0;
for (auto& candidate : added) {
if (!last || last->compare_by_first_key(*candidate) < 0) {
last = &*candidate;
}
target_level = std::max(target_level, int(candidate->get_sstable_level()));
}
_last_compacted_keys.value().at(min_level) = last->get_last_decorated_key();
for (int i = leveled_manifest::MAX_LEVELS - 1; i > 0; i--) {
_compaction_counter[i]++;
}
_compaction_counter[target_level] = 0;
if (leveled_manifest::logger.level() == logging::log_level::debug) {
for (auto j = 0U; j < _compaction_counter.size(); j++) {
leveled_manifest::logger.debug("CompactionCounter: {}: {}", j, _compaction_counter[j]);
}
}
}
void leveled_compaction_strategy::generate_last_compacted_keys(leveled_manifest& manifest) {
std::vector<std::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
for (auto i = 0; i < leveled_manifest::MAX_LEVELS - 1; i++) {
if (manifest.get_level(i + 1).empty()) {
continue;
}
const sstables::sstable* sstable_with_last_compacted_key = nullptr;
std::optional<db_clock::time_point> max_creation_time;
for (auto& sst : manifest.get_level(i + 1)) {
auto wtime = sst->data_file_write_time();
if (!max_creation_time || wtime >= *max_creation_time) {
sstable_with_last_compacted_key = &*sst;
max_creation_time = wtime;
}
}
last_compacted_keys[i] = sstable_with_last_compacted_key->get_last_decorated_key();
}
_last_compacted_keys = std::move(last_compacted_keys);
}
int64_t leveled_compaction_strategy::estimated_pending_compactions(table_state& table_s) const {
std::vector<sstables::shared_sstable> sstables;
auto all_sstables = table_s.get_sstable_set().all();
sstables.reserve(all_sstables->size());
for (auto& entry : *all_sstables) {
sstables.push_back(entry);
}
return leveled_manifest::get_estimated_tasks(leveled_manifest::get_levels(sstables), _max_sstable_size_in_mb * 1024 * 1024);
}
compaction_descriptor
leveled_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> input, schema_ptr schema, const ::io_priority_class& iop, reshape_mode mode) {
std::array<std::vector<shared_sstable>, leveled_manifest::MAX_LEVELS> level_info;
auto is_disjoint = [this, schema] (const std::vector<shared_sstable>& sstables, unsigned tolerance) -> std::tuple<bool, unsigned> {
auto overlapping_sstables = sstable_set_overlapping_count(schema, sstables);
return { overlapping_sstables <= tolerance, overlapping_sstables };
};
auto max_sstable_size_in_bytes = _max_sstable_size_in_mb * 1024 * 1024;
for (auto& sst : input) {
auto sst_level = sst->get_sstable_level();
if (sst_level > leveled_manifest::MAX_LEVELS - 1) {
leveled_manifest::logger.warn("Found SSTable with level {}, higher than the maximum {}. This is unexpected, but will fix", sst_level, leveled_manifest::MAX_LEVELS - 1);
// This is really unexpected, so we'll just compact it all to fix it
compaction_descriptor desc(std::move(input), iop, leveled_manifest::MAX_LEVELS - 1, max_sstable_size_in_bytes);
desc.options = compaction_type_options::make_reshape();
return desc;
}
level_info[sst_level].push_back(sst);
}
// Can't use std::ranges::views::drop due to https://bugs.llvm.org/show_bug.cgi?id=47509
for (auto i = level_info.begin(); i != level_info.end(); ++i) {
auto& level = *i;
std::sort(level.begin(), level.end(), [&schema] (const shared_sstable& a, const shared_sstable& b) {
return dht::ring_position(a->get_first_decorated_key()).less_compare(*schema, dht::ring_position(b->get_first_decorated_key()));
});
}
unsigned max_filled_level = 0;
size_t offstrategy_threshold = (mode == reshape_mode::strict) ? std::max(schema->min_compaction_threshold(), 4) : std::max(schema->max_compaction_threshold(), 32);
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
auto tolerance = [mode] (unsigned level) -> unsigned {
if (mode == reshape_mode::strict) {
return 0;
}
constexpr unsigned fan_out = leveled_manifest::leveled_fan_out;
return std::max(double(fan_out), std::ceil(std::pow(fan_out, level) * 0.1));
};
// If there's only disjoint L0 sstables like on bootstrap, let's compact them all into a level L which has capacity to store the output.
// The best possible level can be calculated with the formula: log (base fan_out) of (L0_total_bytes / max_sstable_size)
auto [l0_disjoint, _] = is_disjoint(level_info[0], 0);
if (mode == reshape_mode::strict && level_info[0].size() >= offstrategy_threshold && level_info[0].size() == input.size() && l0_disjoint) {
auto log_fanout = [fanout = leveled_manifest::leveled_fan_out] (double x) {
double inv_log_fanout = 1.0f / std::log(fanout);
return log(x) * inv_log_fanout;
};
auto total_bytes = std::max(leveled_manifest::get_total_bytes(level_info[0]), uint64_t(max_sstable_size_in_bytes));
unsigned ideal_level = std::ceil(log_fanout(total_bytes / max_sstable_size_in_bytes));
leveled_manifest::logger.info("Reshaping {} disjoint sstables in level 0 into level {}", level_info[0].size(), ideal_level);
compaction_descriptor desc(std::move(input), iop, ideal_level, max_sstable_size_in_bytes);
desc.options = compaction_type_options::make_reshape();
return desc;
}
if (level_info[0].size() > offstrategy_threshold) {
size_tiered_compaction_strategy stcs(_stcs_options);
return stcs.get_reshaping_job(std::move(level_info[0]), schema, iop, mode);
}
for (unsigned level = leveled_manifest::MAX_LEVELS - 1; level > 0; --level) {
if (level_info[level].empty()) {
continue;
}
max_filled_level = std::max(max_filled_level, level);
auto [disjoint, overlapping_sstables] = is_disjoint(level_info[level], tolerance(level));
if (!disjoint) {
leveled_manifest::logger.warn("Turns out that level {} is not disjoint, found {} overlapping SSTables, so compacting everything on behalf of {}.{}", level, overlapping_sstables, schema->ks_name(), schema->cf_name());
// Unfortunately no good limit to limit input size to max_sstables for LCS major
compaction_descriptor desc(std::move(input), iop, max_filled_level, max_sstable_size_in_bytes);
desc.options = compaction_type_options::make_reshape();
return desc;
}
}
return compaction_descriptor();
}
}