Files
scylladb/compaction/time_window_compaction_strategy.cc
Yaniv Michael Kaul 8635a2aa6f compaction/time_window_compaction_strategy: defer O(n^2) overlap computation in debug log with value_of()
Severity: high

Two sstable_set_overlapping_count() calls (O(n^2) overlap computation)
were passed as direct arguments to a debug log, causing them to be
evaluated eagerly even when debug logging is disabled. Use
seastar::value_of() to defer the computation to log-time.
AI-assisted: OpenCode / Claude Opus 4.6
Signed-off-by: Yaniv Kaul <yaniv.kaul@scylladb.com>
2026-03-24 18:30:38 +02:00

567 lines
29 KiB
C++

/*
* Copyright (C) 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "time_window_compaction_strategy.hh"
#include "mutation_writer/timestamp_based_splitting_writer.hh"
#include "mutation/mutation_source_metadata.hh"
#include "cql3/statements/property_definitions.hh"
#include "sstables/sstables.hh"
#include "sstables/sstable_set_impl.hh"
#include "compaction_strategy_state.hh"
#include "utils/error_injection.hh"
#include <seastar/util/lazy.hh>
#include <ranges>
namespace compaction {
extern logging::logger clogger;
using timestamp_type = api::timestamp_type;
time_window_compaction_strategy_state_ptr time_window_compaction_strategy::get_state(compaction_group_view& table_s) const {
return table_s.get_compaction_strategy_state().get<time_window_compaction_strategy_state_ptr>();
}
const std::unordered_map<sstring, std::chrono::seconds> time_window_compaction_strategy_options::valid_window_units = {
{"MINUTES", 60s}, {"HOURS", 3600s}, {"DAYS", 86400s}};
const std::unordered_map<sstring, time_window_compaction_strategy_options::timestamp_resolutions>
time_window_compaction_strategy_options::valid_timestamp_resolutions = {
{"MICROSECONDS", timestamp_resolutions::microsecond},
{"MILLISECONDS", timestamp_resolutions::millisecond},
};
static std::chrono::seconds validate_compaction_window_unit(const std::map<sstring, sstring>& options) {
std::chrono::seconds window_unit = time_window_compaction_strategy_options::DEFAULT_COMPACTION_WINDOW_UNIT;
auto tmp_value = compaction_strategy_impl::get_value(options, time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY);
if (tmp_value) {
auto valid_window_units_it = time_window_compaction_strategy_options::valid_window_units.find(tmp_value.value());
if (valid_window_units_it == time_window_compaction_strategy_options::valid_window_units.end()) {
throw exceptions::configuration_exception(
fmt::format("Invalid window unit {} for {}", tmp_value.value(), time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY));
}
window_unit = valid_window_units_it->second;
}
return window_unit;
}
static std::chrono::seconds validate_compaction_window_unit(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
auto window_unit = validate_compaction_window_unit(options);
unchecked_options.erase(time_window_compaction_strategy_options::COMPACTION_WINDOW_UNIT_KEY);
return window_unit;
}
static int validate_compaction_window_size(const std::map<sstring, sstring>& options) {
auto tmp_value = compaction_strategy_impl::get_value(options, time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY);
int window_size = cql3::statements::property_definitions::to_long(time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, tmp_value,
time_window_compaction_strategy_options::DEFAULT_COMPACTION_WINDOW_SIZE);
if (window_size <= 0) {
throw exceptions::configuration_exception(
fmt::format("{} value ({}) must be greater than 1", time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY, window_size));
}
return window_size;
}
static int validate_compaction_window_size(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
int window_size = validate_compaction_window_size(options);
unchecked_options.erase(time_window_compaction_strategy_options::COMPACTION_WINDOW_SIZE_KEY);
return window_size;
}
static db_clock::duration validate_expired_sstable_check_frequency_seconds(const std::map<sstring, sstring>& options) {
db_clock::duration expired_sstable_check_frequency = time_window_compaction_strategy_options::DEFAULT_EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS();
auto tmp_value = compaction_strategy_impl::get_value(options, time_window_compaction_strategy_options::EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
if (tmp_value) {
try {
expired_sstable_check_frequency = std::chrono::seconds(std::stol(tmp_value.value()));
} catch (const std::exception& e) {
throw exceptions::syntax_exception(fmt::format(
"Invalid long value {} for {}", tmp_value.value(), time_window_compaction_strategy_options::EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY));
}
}
return expired_sstable_check_frequency;
}
static db_clock::duration validate_expired_sstable_check_frequency_seconds(
const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
db_clock::duration expired_sstable_check_frequency = validate_expired_sstable_check_frequency_seconds(options);
unchecked_options.erase(time_window_compaction_strategy_options::EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY);
return expired_sstable_check_frequency;
}
static time_window_compaction_strategy_options::timestamp_resolutions validate_timestamp_resolution(const std::map<sstring, sstring>& options) {
time_window_compaction_strategy_options::timestamp_resolutions timestamp_resolution =
time_window_compaction_strategy_options::timestamp_resolutions::microsecond;
auto tmp_value = compaction_strategy_impl::get_value(options, time_window_compaction_strategy_options::TIMESTAMP_RESOLUTION_KEY);
if (tmp_value) {
if (!time_window_compaction_strategy_options::valid_timestamp_resolutions.contains(tmp_value.value())) {
throw exceptions::configuration_exception(fmt::format(
"Invalid timestamp resolution {} for {}", tmp_value.value(), time_window_compaction_strategy_options::TIMESTAMP_RESOLUTION_KEY));
} else {
timestamp_resolution = time_window_compaction_strategy_options::valid_timestamp_resolutions.at(tmp_value.value());
}
}
return timestamp_resolution;
}
static time_window_compaction_strategy_options::timestamp_resolutions validate_timestamp_resolution(
const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
time_window_compaction_strategy_options::timestamp_resolutions timestamp_resolution = validate_timestamp_resolution(options);
unchecked_options.erase(time_window_compaction_strategy_options::TIMESTAMP_RESOLUTION_KEY);
return timestamp_resolution;
}
time_window_compaction_strategy_options::time_window_compaction_strategy_options(const std::map<sstring, sstring>& options) {
auto window_unit = validate_compaction_window_unit(options);
int window_size = validate_compaction_window_size(options);
sstable_window_size = window_size * window_unit;
expired_sstable_check_frequency = validate_expired_sstable_check_frequency_seconds(options);
timestamp_resolution = validate_timestamp_resolution(options);
auto it = options.find("enable_optimized_twcs_queries");
if (it != options.end() && it->second == "false") {
enable_optimized_twcs_queries = false;
}
}
time_window_compaction_strategy_options::time_window_compaction_strategy_options(time_window_compaction_strategy_options&&) = default;
time_window_compaction_strategy_options::time_window_compaction_strategy_options(const time_window_compaction_strategy_options&) = default;
// options is a map of compaction strategy options and their values.
// unchecked_options is an analogical map from which already checked options are deleted.
// This helps making sure that only allowed options are being set.
void time_window_compaction_strategy_options::validate(const std::map<sstring, sstring>& options, std::map<sstring, sstring>& unchecked_options) {
validate_compaction_window_unit(options, unchecked_options);
validate_compaction_window_size(options, unchecked_options);
validate_expired_sstable_check_frequency_seconds(options, unchecked_options);
validate_timestamp_resolution(options, unchecked_options);
compaction_strategy_impl::validate_min_max_threshold(options, unchecked_options);
auto it = options.find("enable_optimized_twcs_queries");
if (it != options.end() && it->second != "true" && it->second != "false") {
throw exceptions::configuration_exception(fmt::format("enable_optimized_twcs_queries value ({}) must be \"true\" or \"false\"", it->second));
}
unchecked_options.erase("enable_optimized_twcs_queries");
it = unchecked_options.find("unsafe_aggressive_sstable_expiration");
if (it != unchecked_options.end()) {
clogger.warn("unsafe_aggressive_sstable_expiration option is not supported for time window compaction strategy");
unchecked_options.erase(it);
}
}
class classify_by_timestamp {
time_window_compaction_strategy_options _options;
std::vector<int64_t> _known_windows;
public:
explicit classify_by_timestamp(time_window_compaction_strategy_options options)
: _options(std::move(options)) {
}
int64_t operator()(api::timestamp_type ts) {
const auto window = time_window_compaction_strategy::get_window_for(_options, ts);
if (const auto it = std::ranges::find(_known_windows, window); it != _known_windows.end()) {
std::swap(*it, _known_windows.front());
return window;
}
if (_known_windows.size() < time_window_compaction_strategy::max_data_segregation_window_count) {
_known_windows.push_back(window);
return window;
}
int64_t closest_window;
int64_t min_diff = std::numeric_limits<int64_t>::max();
for (const auto known_window : _known_windows) {
if (const auto diff = std::abs(known_window - window); diff < min_diff) {
min_diff = diff;
closest_window = known_window;
}
}
return closest_window;
};
};
uint64_t time_window_compaction_strategy::adjust_partition_estimate(const mutation_source_metadata& ms_meta, uint64_t partition_estimate, schema_ptr s) const {
// If not enough information, we assume the worst
auto estimated_window_count = max_data_segregation_window_count;
auto default_ttl = std::chrono::duration_cast<std::chrono::microseconds>(s->default_time_to_live());
bool min_and_max_ts_available = ms_meta.min_timestamp && ms_meta.max_timestamp;
auto estimate_window_count = [this](timestamp_type min_window, timestamp_type max_window) {
const auto window_size = get_window_size(_options);
return (max_window + (window_size - 1) - min_window) / window_size;
};
if (!min_and_max_ts_available && default_ttl.count()) {
auto min_window = get_window_for(_options, timestamp_type(0));
auto max_window = get_window_for(_options, timestamp_type(default_ttl.count()));
estimated_window_count = estimate_window_count(min_window, max_window);
} else if (min_and_max_ts_available) {
auto min_window = get_window_for(_options, *ms_meta.min_timestamp);
auto max_window = get_window_for(_options, *ms_meta.max_timestamp);
estimated_window_count = estimate_window_count(min_window, max_window);
}
return partition_estimate / std::max(1UL, uint64_t(estimated_window_count));
}
mutation_reader_consumer time_window_compaction_strategy::make_interposer_consumer(
const mutation_source_metadata& ms_meta, mutation_reader_consumer end_consumer) const {
if (ms_meta.min_timestamp && ms_meta.max_timestamp &&
get_window_for(_options, *ms_meta.min_timestamp) == get_window_for(_options, *ms_meta.max_timestamp)) {
return end_consumer;
}
return [options = _options, end_consumer = std::move(end_consumer)](mutation_reader rd) mutable -> future<> {
return mutation_writer::segregate_by_timestamp(std::move(rd), classify_by_timestamp(std::move(options)), end_consumer);
};
}
compaction_descriptor time_window_compaction_strategy::get_reshaping_job(
std::vector<sstables::shared_sstable> input, schema_ptr schema, reshape_config cfg) const {
auto mode = cfg.mode;
std::vector<sstables::shared_sstable> single_window;
std::vector<sstables::shared_sstable> multi_window;
size_t offstrategy_threshold = std::max(schema->min_compaction_threshold(), 4);
size_t max_sstables = std::max(schema->max_compaction_threshold(), int(offstrategy_threshold));
const uint64_t target_job_size = cfg.free_storage_space * reshape_target_space_overhead;
if (mode == reshape_mode::relaxed) {
offstrategy_threshold = max_sstables;
}
// Sort input sstables by first_key order
// to allow efficient reshaping of disjoint sstables.
std::sort(input.begin(), input.end(), [&schema](const sstables::shared_sstable& a, const sstables::shared_sstable& b) {
return dht::ring_position(a->get_first_decorated_key()).less_compare(*schema, dht::ring_position(b->get_first_decorated_key()));
});
for (auto& sst : input) {
auto min = sst->get_stats_metadata().min_timestamp;
auto max = sst->get_stats_metadata().max_timestamp;
if (get_window_for(_options, min) != get_window_for(_options, max)) {
multi_window.push_back(sst);
} else {
single_window.push_back(sst);
}
}
auto is_disjoint = [&schema, mode, max_sstables](const std::vector<sstables::shared_sstable>& ssts) {
size_t tolerance = (mode == reshape_mode::relaxed) ? max_sstables : 0;
return sstable_set_overlapping_count(schema, ssts) <= tolerance;
};
clogger.debug("time_window_compaction_strategy::get_reshaping_job: offstrategy_threshold={} max_sstables={} multi_window={} disjoint={} "
"single_window={} disjoint={}",
offstrategy_threshold, max_sstables, multi_window.size(), seastar::value_of([&] {
return !multi_window.empty() && sstable_set_overlapping_count(schema, multi_window) == 0;
}),
single_window.size(), seastar::value_of([&] {
return !single_window.empty() && sstable_set_overlapping_count(schema, single_window) == 0;
}));
auto get_job_size = [](const std::vector<sstables::shared_sstable>& ssts) {
return std::ranges::fold_left(ssts | std::views::transform(std::mem_fn(&sstables::sstable::bytes_on_disk)), uint64_t(0), std::plus{});
};
// Targets a space overhead of 10%. All disjoint sstables can be compacted together as long as they won't
// cause an overhead above target. Otherwise, the job targets a maximum of #max_threshold sstables.
auto need_trimming = [&](const std::vector<sstables::shared_sstable>& ssts, const uint64_t job_size, bool is_disjoint) {
const size_t min_sstables = 2;
auto is_above_target_size = job_size > target_job_size;
return (ssts.size() > max_sstables && !is_disjoint) || (ssts.size() > min_sstables && is_above_target_size);
};
auto maybe_trim_job = [&need_trimming](std::vector<sstables::shared_sstable>& ssts, uint64_t job_size, bool is_disjoint) {
while (need_trimming(ssts, job_size, is_disjoint)) {
auto sst = ssts.back();
ssts.pop_back();
job_size -= sst->bytes_on_disk();
}
};
if (!multi_window.empty()) {
auto disjoint = is_disjoint(multi_window);
auto job_size = get_job_size(multi_window);
// Everything that spans multiple windows will need reshaping
if (need_trimming(multi_window, job_size, disjoint)) {
// When trimming, let's keep sstables with overlapping time window, so as to reduce write amplification.
// For example, if there are N sstables spanning window W, where N <= 32, then we can produce all data for W
// in a single compaction round, removing the need to later compact W to reduce its number of files.
auto sort_size = std::min(max_sstables, multi_window.size());
std::ranges::partial_sort(multi_window, multi_window.begin() + sort_size, std::ranges::less(), [](const sstables::shared_sstable& a) {
return a->get_stats_metadata().max_timestamp;
});
maybe_trim_job(multi_window, job_size, disjoint);
}
compaction_descriptor desc(std::move(multi_window));
desc.options = compaction_type_options::make_reshape();
return desc;
}
// For things that don't span multiple windows, we compact windows that are individually too big
auto all_disjoint = !single_window.empty() && is_disjoint(single_window);
auto all_buckets = get_buckets(single_window, _options);
single_window.clear();
for (auto& [bucket, ssts] : all_buckets.first) {
if (ssts.size() >= offstrategy_threshold) {
clogger.debug("time_window_compaction_strategy::get_reshaping_job: bucket={} bucket_size={}", bucket, ssts.size());
if (all_disjoint) {
std::copy(ssts.begin(), ssts.end(), std::back_inserter(single_window));
continue;
}
// reuse STCS reshape logic which will only compact similar-sized files, to increase overall efficiency
// when reshaping time buckets containing a huge amount of files
auto desc = size_tiered_compaction_strategy(_stcs_options).get_reshaping_job(std::move(ssts), schema, cfg);
if (!desc.sstables.empty()) {
return desc;
}
}
}
if (!single_window.empty()) {
maybe_trim_job(single_window, get_job_size(single_window), all_disjoint);
compaction_descriptor desc(std::move(single_window));
desc.options = compaction_type_options::make_reshape();
return desc;
}
return compaction_descriptor();
}
future<compaction_descriptor> time_window_compaction_strategy::get_sstables_for_compaction(compaction_group_view& table_s, strategy_control& control) {
auto state = get_state(table_s);
auto compaction_time = gc_clock::now();
auto candidates = co_await control.candidates(table_s);
if (candidates.empty()) {
co_return compaction_descriptor();
}
auto now = db_clock::now();
if (now - state->last_expired_check > _options.expired_sstable_check_frequency) {
clogger.debug("[{}] TWCS expired check sufficiently far in the past, checking for fully expired SSTables", fmt::ptr(this));
// Find fully expired SSTables. Those will be included no matter what.
auto expired = table_s.fully_expired_sstables(candidates, compaction_time);
if (!expired.empty()) {
clogger.debug("[{}] Going to compact {} expired sstables", fmt::ptr(this), expired.size());
co_return compaction_descriptor(has_only_fully_expired::yes, std::vector<sstables::shared_sstable>(expired.begin(), expired.end()));
}
// Keep checking for fully_expired_sstables until we don't find
// any among the candidates, meaning they are either already compacted
// or registered for compaction.
state->last_expired_check = now;
} else {
clogger.debug("[{}] TWCS skipping check for fully expired SSTables", fmt::ptr(this));
}
co_await utils::get_local_injector().inject("twcs_get_sstables_for_compaction", utils::wait_for_message(30s));
auto compaction_candidates = get_next_non_expired_sstables(table_s, control, std::move(candidates), compaction_time, *state);
clogger.debug("[{}] Going to compact {} non-expired sstables", fmt::ptr(this), compaction_candidates.size());
co_return compaction_descriptor(std::move(compaction_candidates));
}
time_window_compaction_strategy::bucket_compaction_mode time_window_compaction_strategy::compaction_mode(
const time_window_compaction_strategy_state& state, const bucket_t& bucket, timestamp_type bucket_key, timestamp_type now, size_t min_threshold) const {
// STCS will also be performed on older window buckets, to avoid a bad write and
// space amplification when something like read repair cause small updates to
// those past windows.
if (bucket.size() >= 2 && !is_last_active_bucket(bucket_key, now) && state.recent_active_windows.contains(bucket_key)) {
return bucket_compaction_mode::major;
} else if (bucket.size() >= size_t(min_threshold)) {
return bucket_compaction_mode::size_tiered;
}
return bucket_compaction_mode::none;
}
std::vector<sstables::shared_sstable> time_window_compaction_strategy::get_next_non_expired_sstables(compaction_group_view& table_s, strategy_control& control,
std::vector<sstables::shared_sstable> non_expiring_sstables, gc_clock::time_point compaction_time, time_window_compaction_strategy_state& state) {
auto most_interesting = get_compaction_candidates(table_s, control, non_expiring_sstables, state);
if (!most_interesting.empty()) {
return most_interesting;
}
if (!table_s.tombstone_gc_enabled()) {
return {};
}
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
// ratio is greater than threshold.
std::erase_if(non_expiring_sstables, [this, compaction_time, &table_s](const sstables::shared_sstable& sst) -> bool {
return !worth_dropping_tombstones(sst, compaction_time, table_s);
});
if (non_expiring_sstables.empty()) {
return {};
}
auto it = std::ranges::min_element(non_expiring_sstables, [](auto& i, auto& j) {
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
});
return {*it};
}
std::vector<sstables::shared_sstable> time_window_compaction_strategy::get_compaction_candidates(compaction_group_view& table_s, strategy_control& control,
std::vector<sstables::shared_sstable> candidate_sstables, time_window_compaction_strategy_state& state) {
auto [buckets, max_timestamp] = get_buckets(std::move(candidate_sstables), _options);
// Update the highest window seen, if necessary
state.highest_window_seen = std::max(state.highest_window_seen, max_timestamp);
return newest_bucket(table_s, control, std::move(buckets), table_s.min_compaction_threshold(), table_s.schema()->max_compaction_threshold(),
state.highest_window_seen, state);
}
timestamp_type time_window_compaction_strategy::get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) {
using namespace std::chrono;
// mask out window size from timestamp to get lower bound of its window
auto num_windows = microseconds(timestamp) / sstable_window_size;
return duration_cast<microseconds>(num_windows * sstable_window_size).count();
}
std::pair<std::map<timestamp_type, std::vector<sstables::shared_sstable>>, timestamp_type> time_window_compaction_strategy::get_buckets(
std::vector<sstables::shared_sstable> files, const time_window_compaction_strategy_options& options) {
std::map<timestamp_type, std::vector<sstables::shared_sstable>> buckets;
timestamp_type max_timestamp = 0;
// Create map to represent buckets
// For each sstable, add sstable to the time bucket
// Where the bucket is the file's max timestamp rounded to the nearest window bucket
for (auto&& f : files) {
timestamp_type ts = to_timestamp_type(options.timestamp_resolution, f->get_stats_metadata().max_timestamp);
timestamp_type lower_bound = get_window_lower_bound(options.sstable_window_size, ts);
buckets[lower_bound].push_back(std::move(f));
max_timestamp = std::max(max_timestamp, lower_bound);
}
return std::make_pair(std::move(buckets), max_timestamp);
}
} // namespace compaction
template <>
struct fmt::formatter<std::map<compaction::timestamp_type, std::vector<sstables::shared_sstable>>> {
constexpr auto parse(format_parse_context& ctx) {
return ctx.begin();
}
auto format(const std::map<compaction::timestamp_type, std::vector<sstables::shared_sstable>>& buckets, fmt::format_context& ctx) const {
auto out = fmt::format_to(ctx.out(), " buckets = {{\n");
for (auto& [timestamp, sstables] : buckets | std::views::reverse) {
out = fmt::format_to(out, " key={}, size={}\n", timestamp, sstables.size());
}
return fmt::format_to(out, " }}\n");
}
};
namespace compaction {
std::vector<sstables::shared_sstable> time_window_compaction_strategy::newest_bucket(compaction_group_view& table_s, strategy_control& control,
std::map<timestamp_type, std::vector<sstables::shared_sstable>> buckets, int min_threshold, int max_threshold, timestamp_type now,
time_window_compaction_strategy_state& state) {
clogger.debug("time_window_compaction_strategy::newest_bucket:\n now {}\n{}", now, buckets);
for (auto&& [key, bucket] : buckets | std::views::reverse) {
bool last_active_bucket = is_last_active_bucket(key, now);
if (last_active_bucket) {
state.recent_active_windows.insert(key);
}
switch (compaction_mode(state, bucket, key, now, min_threshold)) {
case bucket_compaction_mode::size_tiered: {
// If we're in the newest bucket, we'll use STCS to prioritize sstables.
auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, _stcs_options);
// If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
if (!stcs_interesting_bucket.empty()) {
clogger.debug("bucket size {} >= 2, key {}, performing STCS on what's here", bucket.size(), key);
return stcs_interesting_bucket;
}
break;
}
case bucket_compaction_mode::major:
// serializes per-window major on a past window, to avoid missing its files being currently compacted.
if (control.has_ongoing_compaction(table_s)) {
break;
}
clogger.debug("bucket size {} >= 2 and not in current bucket, key {}, compacting what's here", bucket.size(), key);
return trim_to_threshold(std::move(bucket), max_threshold);
default:
// windows needing major will remain with major state until they're compacted into one file.
// after that, they will fall into default mode where we'll stop considering them as a recent window
// which needs major. That's to avoid terrible writeamp as streaming may push data into older windows.
if (!last_active_bucket) {
state.recent_active_windows.erase(key);
}
clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
break;
}
}
return {};
}
std::vector<sstables::shared_sstable> time_window_compaction_strategy::trim_to_threshold(std::vector<sstables::shared_sstable> bucket, int max_threshold) {
auto n = std::min(bucket.size(), size_t(max_threshold));
// Trim the largest sstables off the end to meet the maxThreshold
std::ranges::partial_sort(bucket, bucket.begin() + n, std::ranges::less(), std::mem_fn(&sstables::sstable::ondisk_data_size));
bucket.resize(n);
return bucket;
}
future<int64_t> time_window_compaction_strategy::estimated_pending_compactions(compaction_group_view& table_s) const {
auto state = get_state(table_s);
auto min_threshold = table_s.min_compaction_threshold();
auto max_threshold = table_s.schema()->max_compaction_threshold();
auto main_set = co_await table_s.main_sstable_set();
auto candidate_sstables = *main_set->all() | std::ranges::to<std::vector>();
auto [buckets, max_timestamp] = get_buckets(std::move(candidate_sstables), _options);
int64_t n = 0;
for (auto& [bucket_key, bucket] : buckets) {
switch (compaction_mode(*state, bucket, bucket_key, max_timestamp, min_threshold)) {
case bucket_compaction_mode::size_tiered:
n += size_tiered_compaction_strategy::estimated_pending_compactions(bucket, min_threshold, max_threshold, _stcs_options);
break;
case bucket_compaction_mode::major:
n++;
break;
default:
break;
}
}
co_return n;
}
std::vector<compaction_descriptor> time_window_compaction_strategy::get_cleanup_compaction_jobs(
compaction_group_view& table_s, std::vector<sstables::shared_sstable> candidates) const {
std::vector<compaction_descriptor> ret;
for (auto&& [_, sstables] : get_buckets(std::move(candidates), _options).first) {
auto per_window_jobs = size_tiered_compaction_strategy(_stcs_options).get_cleanup_compaction_jobs(table_s, std::move(sstables));
std::move(per_window_jobs.begin(), per_window_jobs.end(), std::back_inserter(ret));
}
return ret;
}
std::unique_ptr<sstables::sstable_set_impl> time_window_compaction_strategy::make_sstable_set(const compaction_group_view& ts) const {
return std::make_unique<sstables::time_series_sstable_set>(ts.schema(), _options.enable_optimized_twcs_queries);
}
} // namespace compaction