compaction/twcs: Make strategy easier to extend by removing duplicated knowledge

TWCS is hard to extend because its knowledge on what to do with a window
bucket is duplicated in two functions. Let's remove this duplication by
placing the knowledge into a single function.

This is important for the coming change that will perform size-tiered
instead of major on windows that are no longer active.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2020-08-12 16:59:04 -03:00
parent f2b588cfc4
commit 820b47e9a3
2 changed files with 31 additions and 10 deletions

View File

@@ -156,6 +156,16 @@ time_window_compaction_strategy::get_sstables_for_compaction(column_family& cf,
return compaction_descriptor(std::move(compaction_candidates), cf.get_sstable_set(), service::get_local_compaction_priority());
}
time_window_compaction_strategy::bucket_compaction_mode
time_window_compaction_strategy::compaction_mode(const bucket_t& bucket, timestamp_type bucket_key,
timestamp_type now, size_t min_threshold) const {
if (bucket.size() >= size_t(min_threshold) && bucket_key >= now) {
return bucket_compaction_mode::size_tiered;
} else if (bucket.size() >= 2 && bucket_key < now) {
return bucket_compaction_mode::major;
}
return bucket_compaction_mode::none;
}
std::vector<shared_sstable>
time_window_compaction_strategy::get_next_non_expired_sstables(column_family& cf,
@@ -236,7 +246,8 @@ time_window_compaction_strategy::newest_bucket(std::map<timestamp_type, std::vec
clogger.trace("Key {}, now {}", key, now);
if (bucket.size() >= size_t(min_threshold) && key >= now) {
switch (compaction_mode(bucket, key, now, min_threshold)) {
case bucket_compaction_mode::size_tiered: {
// If we're in the newest bucket, we'll use STCS to prioritize sstables
auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, stcs_options);
@@ -244,11 +255,14 @@ time_window_compaction_strategy::newest_bucket(std::map<timestamp_type, std::vec
if (!stcs_interesting_bucket.empty()) {
return stcs_interesting_bucket;
}
} else if (bucket.size() >= 2 && key < now) {
break;
}
case bucket_compaction_mode::major:
clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size());
return trim_to_threshold(std::move(bucket), max_threshold);
} else {
default:
clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now);
break;
}
}
return {};
@@ -270,15 +284,16 @@ void time_window_compaction_strategy::update_estimated_compaction_by_tasks(std::
int64_t n = 0;
timestamp_type now = _highest_window_seen;
for (auto task : tasks) {
auto key = task.first;
for (auto& task : tasks) {
const bucket_t& bucket = task.second;
timestamp_type bucket_key = task.first;
// For current window, make sure it's compactable
auto count = task.second.size();
if (key >= now && count >= size_t(min_threshold)) {
n++;
} else if (key < now && count >= 2) {
switch (compaction_mode(bucket, bucket_key, now, min_threshold)) {
case bucket_compaction_mode::size_tiered:
case bucket_compaction_mode::major:
n++;
default:
break;
}
}
_estimated_remaining_tasks = n;

View File

@@ -149,6 +149,8 @@ public:
// Better co-locate some windows into the same sstables than OOM.
static constexpr uint64_t max_data_segregation_window_count = 100;
using bucket_t = std::vector<shared_sstable>;
enum class bucket_compaction_mode { none, size_tiered, major };
public:
time_window_compaction_strategy(const std::map<sstring, sstring>& options);
virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector<shared_sstable> candidates) override;
@@ -165,6 +167,10 @@ private:
};
}
// Returns which compaction type should be performed on a given window bucket.
bucket_compaction_mode
compaction_mode(const bucket_t& bucket, timestamp_type bucket_key, timestamp_type now, size_t min_threshold) const;
std::vector<shared_sstable>
get_next_non_expired_sstables(column_family& cf, std::vector<shared_sstable> non_expiring_sstables, gc_clock::time_point gc_before);