mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-04 14:03:06 +00:00
compaction/twcs: Move TWCS implementation into source file
Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
@@ -125,4 +125,163 @@ time_window_compaction_strategy::get_reshaping_job(std::vector<shared_sstable> i
|
||||
return compaction_descriptor();
|
||||
}
|
||||
|
||||
compaction_descriptor
|
||||
time_window_compaction_strategy::get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) {
|
||||
auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds();
|
||||
|
||||
if (candidates.empty()) {
|
||||
return compaction_descriptor();
|
||||
}
|
||||
|
||||
// Find fully expired SSTables. Those will be included no matter what.
|
||||
std::unordered_set<shared_sstable> expired;
|
||||
|
||||
if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
|
||||
clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
|
||||
expired = get_fully_expired_sstables(cf, candidates, gc_before);
|
||||
_last_expired_check = db_clock::now();
|
||||
} else {
|
||||
clogger.debug("TWCS skipping check for fully expired SSTables");
|
||||
}
|
||||
|
||||
if (!expired.empty()) {
|
||||
auto is_expired = [&] (const shared_sstable& s) { return expired.contains(s); };
|
||||
candidates.erase(boost::remove_if(candidates, is_expired), candidates.end());
|
||||
}
|
||||
|
||||
auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(candidates), gc_before);
|
||||
if (!expired.empty()) {
|
||||
compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
|
||||
}
|
||||
return compaction_descriptor(std::move(compaction_candidates), cf.get_sstable_set(), service::get_local_compaction_priority());
|
||||
}
|
||||
|
||||
|
||||
std::vector<shared_sstable>
|
||||
time_window_compaction_strategy::get_next_non_expired_sstables(column_family& cf,
|
||||
std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
|
||||
auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
|
||||
|
||||
if (!most_interesting.empty()) {
|
||||
return most_interesting;
|
||||
}
|
||||
|
||||
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
|
||||
// ratio is greater than threshold.
|
||||
auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
|
||||
return !worth_dropping_tombstones(sst, gc_before);
|
||||
});
|
||||
non_expiring_sstables.erase(e, non_expiring_sstables.end());
|
||||
if (non_expiring_sstables.empty()) {
|
||||
return {};
|
||||
}
|
||||
auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) {
|
||||
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
|
||||
});
|
||||
return { *it };
|
||||
}
|
||||
|
||||
std::vector<shared_sstable>
|
||||
time_window_compaction_strategy::get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables) {
|
||||
auto p = get_buckets(std::move(candidate_sstables), _options);
|
||||
// Update the highest window seen, if necessary
|
||||
_highest_window_seen = std::max(_highest_window_seen, p.second);
|
||||
|
||||
update_estimated_compaction_by_tasks(p.first, cf.min_compaction_threshold());
|
||||
|
||||
return newest_bucket(std::move(p.first), cf.min_compaction_threshold(), cf.schema()->max_compaction_threshold(),
|
||||
_options.sstable_window_size, _highest_window_seen, _stcs_options);
|
||||
}
|
||||
|
||||
timestamp_type
|
||||
time_window_compaction_strategy::get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) {
|
||||
using namespace std::chrono;
|
||||
auto timestamp_in_sec = duration_cast<seconds>(microseconds(timestamp)).count();
|
||||
|
||||
// mask out window size from timestamp to get lower bound of its window
|
||||
auto window_lower_bound_in_sec = seconds(timestamp_in_sec - (timestamp_in_sec % sstable_window_size.count()));
|
||||
|
||||
return timestamp_type(duration_cast<microseconds>(window_lower_bound_in_sec).count());
|
||||
}
|
||||
|
||||
std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
|
||||
time_window_compaction_strategy::get_buckets(std::vector<shared_sstable> files, time_window_compaction_strategy_options& options) {
|
||||
std::map<timestamp_type, std::vector<shared_sstable>> buckets;
|
||||
|
||||
timestamp_type max_timestamp = 0;
|
||||
// Create map to represent buckets
|
||||
// For each sstable, add sstable to the time bucket
|
||||
// Where the bucket is the file's max timestamp rounded to the nearest window bucket
|
||||
for (auto&& f : files) {
|
||||
timestamp_type ts = to_timestamp_type(options.timestamp_resolution, f->get_stats_metadata().max_timestamp);
|
||||
timestamp_type lower_bound = get_window_lower_bound(options.sstable_window_size, ts);
|
||||
buckets[lower_bound].push_back(std::move(f));
|
||||
max_timestamp = std::max(max_timestamp, lower_bound);
|
||||
}
|
||||
|
||||
return std::make_pair(std::move(buckets), max_timestamp);
|
||||
}
|
||||
|
||||
std::vector<shared_sstable>
|
||||
time_window_compaction_strategy::newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>> buckets,
|
||||
int min_threshold, int max_threshold, std::chrono::seconds sstable_window_size, timestamp_type now,
|
||||
size_tiered_compaction_strategy_options& stcs_options) {
|
||||
// If the current bucket has at least minThreshold SSTables, choose that one.
|
||||
// For any other bucket, at least 2 SSTables is enough.
|
||||
// In any case, limit to maxThreshold SSTables.
|
||||
|
||||
for (auto&& key_bucket : buckets | boost::adaptors::reversed) {
|
||||
auto key = key_bucket.first;
|
||||
auto& bucket = key_bucket.second;
|
||||
|
||||
clogger.trace("Key {}, now {}", key, now);
|
||||
|
||||
if (bucket.size() >= size_t(min_threshold) && key >= now) {
|
||||
// If we're in the newest bucket, we'll use STCS to prioritize sstables
|
||||
auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, stcs_options);
|
||||
|
||||
// If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
|
||||
if (!stcs_interesting_bucket.empty()) {
|
||||
return stcs_interesting_bucket;
|
||||
}
|
||||
} else if (bucket.size() >= 2 && key < now) {
|
||||
clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size());
|
||||
return trim_to_threshold(std::move(bucket), max_threshold);
|
||||
} else {
|
||||
clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<shared_sstable>
|
||||
time_window_compaction_strategy::trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold) {
|
||||
auto n = std::min(bucket.size(), size_t(max_threshold));
|
||||
// Trim the largest sstables off the end to meet the maxThreshold
|
||||
boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) {
|
||||
return i->ondisk_data_size() < j->ondisk_data_size();
|
||||
});
|
||||
bucket.resize(n);
|
||||
return bucket;
|
||||
}
|
||||
|
||||
void time_window_compaction_strategy::update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks,
|
||||
int min_threshold) {
|
||||
int64_t n = 0;
|
||||
timestamp_type now = _highest_window_seen;
|
||||
|
||||
for (auto task : tasks) {
|
||||
auto key = task.first;
|
||||
|
||||
// For current window, make sure it's compactable
|
||||
auto count = task.second.size();
|
||||
if (key >= now && count >= size_t(min_threshold)) {
|
||||
n++;
|
||||
} else if (key < now && count >= 2) {
|
||||
n++;
|
||||
}
|
||||
}
|
||||
_estimated_remaining_tasks = n;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -151,35 +151,7 @@ public:
|
||||
|
||||
public:
|
||||
time_window_compaction_strategy(const std::map<sstring, sstring>& options);
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) override {
|
||||
auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds();
|
||||
|
||||
if (candidates.empty()) {
|
||||
return compaction_descriptor();
|
||||
}
|
||||
|
||||
// Find fully expired SSTables. Those will be included no matter what.
|
||||
std::unordered_set<shared_sstable> expired;
|
||||
|
||||
if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) {
|
||||
clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
|
||||
expired = get_fully_expired_sstables(cf, candidates, gc_before);
|
||||
_last_expired_check = db_clock::now();
|
||||
} else {
|
||||
clogger.debug("TWCS skipping check for fully expired SSTables");
|
||||
}
|
||||
|
||||
if (!expired.empty()) {
|
||||
auto is_expired = [&] (const shared_sstable& s) { return expired.contains(s); };
|
||||
candidates.erase(boost::remove_if(candidates, is_expired), candidates.end());
|
||||
}
|
||||
|
||||
auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(candidates), gc_before);
|
||||
if (!expired.empty()) {
|
||||
compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end());
|
||||
}
|
||||
return compaction_descriptor(std::move(compaction_candidates), cf.get_sstable_set(), service::get_local_compaction_priority());
|
||||
}
|
||||
virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) override;
|
||||
private:
|
||||
static timestamp_type
|
||||
to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) {
|
||||
@@ -194,113 +166,26 @@ private:
|
||||
}
|
||||
|
||||
std::vector<shared_sstable>
|
||||
get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before) {
|
||||
auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables);
|
||||
get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before);
|
||||
|
||||
if (!most_interesting.empty()) {
|
||||
return most_interesting;
|
||||
}
|
||||
|
||||
// if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
|
||||
// ratio is greater than threshold.
|
||||
auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool {
|
||||
return !worth_dropping_tombstones(sst, gc_before);
|
||||
});
|
||||
non_expiring_sstables.erase(e, non_expiring_sstables.end());
|
||||
if (non_expiring_sstables.empty()) {
|
||||
return {};
|
||||
}
|
||||
auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) {
|
||||
return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp;
|
||||
});
|
||||
return { *it };
|
||||
}
|
||||
|
||||
std::vector<shared_sstable> get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables) {
|
||||
auto p = get_buckets(std::move(candidate_sstables), _options);
|
||||
// Update the highest window seen, if necessary
|
||||
_highest_window_seen = std::max(_highest_window_seen, p.second);
|
||||
|
||||
update_estimated_compaction_by_tasks(p.first, cf.min_compaction_threshold());
|
||||
|
||||
return newest_bucket(std::move(p.first), cf.min_compaction_threshold(), cf.schema()->max_compaction_threshold(),
|
||||
_options.sstable_window_size, _highest_window_seen, _stcs_options);
|
||||
}
|
||||
std::vector<shared_sstable> get_compaction_candidates(column_family& cf, std::vector<shared_sstable> candidate_sstables);
|
||||
public:
|
||||
// Find the lowest timestamp for window of given size
|
||||
static timestamp_type
|
||||
get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) {
|
||||
using namespace std::chrono;
|
||||
auto timestamp_in_sec = duration_cast<seconds>(microseconds(timestamp)).count();
|
||||
|
||||
// mask out window size from timestamp to get lower bound of its window
|
||||
auto window_lower_bound_in_sec = seconds(timestamp_in_sec - (timestamp_in_sec % sstable_window_size.count()));
|
||||
|
||||
return timestamp_type(duration_cast<microseconds>(window_lower_bound_in_sec).count());
|
||||
}
|
||||
get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp);
|
||||
|
||||
// Group files with similar max timestamp into buckets.
|
||||
// @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader),
|
||||
// and the right is the highest timestamp seen
|
||||
static std::pair<std::map<timestamp_type, std::vector<shared_sstable>>, timestamp_type>
|
||||
get_buckets(std::vector<shared_sstable> files, time_window_compaction_strategy_options& options) {
|
||||
std::map<timestamp_type, std::vector<shared_sstable>> buckets;
|
||||
|
||||
timestamp_type max_timestamp = 0;
|
||||
// Create map to represent buckets
|
||||
// For each sstable, add sstable to the time bucket
|
||||
// Where the bucket is the file's max timestamp rounded to the nearest window bucket
|
||||
for (auto&& f : files) {
|
||||
timestamp_type ts = to_timestamp_type(options.timestamp_resolution, f->get_stats_metadata().max_timestamp);
|
||||
timestamp_type lower_bound = get_window_lower_bound(options.sstable_window_size, ts);
|
||||
buckets[lower_bound].push_back(std::move(f));
|
||||
max_timestamp = std::max(max_timestamp, lower_bound);
|
||||
}
|
||||
|
||||
return std::make_pair(std::move(buckets), max_timestamp);
|
||||
}
|
||||
get_buckets(std::vector<shared_sstable> files, time_window_compaction_strategy_options& options);
|
||||
|
||||
static std::vector<shared_sstable>
|
||||
newest_bucket(std::map<timestamp_type, std::vector<shared_sstable>> buckets, int min_threshold, int max_threshold,
|
||||
std::chrono::seconds sstable_window_size, timestamp_type now, size_tiered_compaction_strategy_options& stcs_options) {
|
||||
// If the current bucket has at least minThreshold SSTables, choose that one.
|
||||
// For any other bucket, at least 2 SSTables is enough.
|
||||
// In any case, limit to maxThreshold SSTables.
|
||||
|
||||
for (auto&& key_bucket : buckets | boost::adaptors::reversed) {
|
||||
auto key = key_bucket.first;
|
||||
auto& bucket = key_bucket.second;
|
||||
|
||||
clogger.trace("Key {}, now {}", key, now);
|
||||
|
||||
if (bucket.size() >= size_t(min_threshold) && key >= now) {
|
||||
// If we're in the newest bucket, we'll use STCS to prioritize sstables
|
||||
auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, stcs_options);
|
||||
|
||||
// If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets
|
||||
if (!stcs_interesting_bucket.empty()) {
|
||||
return stcs_interesting_bucket;
|
||||
}
|
||||
} else if (bucket.size() >= 2 && key < now) {
|
||||
clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size());
|
||||
return trim_to_threshold(std::move(bucket), max_threshold);
|
||||
} else {
|
||||
clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
std::chrono::seconds sstable_window_size, timestamp_type now, size_tiered_compaction_strategy_options& stcs_options);
|
||||
|
||||
static std::vector<shared_sstable>
|
||||
trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold) {
|
||||
auto n = std::min(bucket.size(), size_t(max_threshold));
|
||||
// Trim the largest sstables off the end to meet the maxThreshold
|
||||
boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) {
|
||||
return i->ondisk_data_size() < j->ondisk_data_size();
|
||||
});
|
||||
bucket.resize(n);
|
||||
return bucket;
|
||||
}
|
||||
trim_to_threshold(std::vector<shared_sstable> bucket, int max_threshold);
|
||||
|
||||
static int64_t
|
||||
get_window_for(const time_window_compaction_strategy_options& options, api::timestamp_type ts) {
|
||||
@@ -312,23 +197,7 @@ public:
|
||||
return timestamp_type(std::chrono::duration_cast<std::chrono::microseconds>(options.get_sstable_window_size()).count());
|
||||
}
|
||||
private:
|
||||
void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks, int min_threshold) {
|
||||
int64_t n = 0;
|
||||
timestamp_type now = _highest_window_seen;
|
||||
|
||||
for (auto task : tasks) {
|
||||
auto key = task.first;
|
||||
|
||||
// For current window, make sure it's compactable
|
||||
auto count = task.second.size();
|
||||
if (key >= now && count >= size_t(min_threshold)) {
|
||||
n++;
|
||||
} else if (key < now && count >= 2) {
|
||||
n++;
|
||||
}
|
||||
}
|
||||
_estimated_remaining_tasks = n;
|
||||
}
|
||||
void update_estimated_compaction_by_tasks(std::map<timestamp_type, std::vector<shared_sstable>>& tasks, int min_threshold);
|
||||
|
||||
friend class time_window_backlog_tracker;
|
||||
public:
|
||||
|
||||
Reference in New Issue
Block a user