diff --git a/sstables/time_window_compaction_strategy.cc b/sstables/time_window_compaction_strategy.cc index 5b53aa30e1..548bf121ab 100644 --- a/sstables/time_window_compaction_strategy.cc +++ b/sstables/time_window_compaction_strategy.cc @@ -156,6 +156,16 @@ time_window_compaction_strategy::get_sstables_for_compaction(column_family& cf, return compaction_descriptor(std::move(compaction_candidates), cf.get_sstable_set(), service::get_local_compaction_priority()); } +time_window_compaction_strategy::bucket_compaction_mode +time_window_compaction_strategy::compaction_mode(const bucket_t& bucket, timestamp_type bucket_key, + timestamp_type now, size_t min_threshold) const { + if (bucket.size() >= size_t(min_threshold) && bucket_key >= now) { + return bucket_compaction_mode::size_tiered; + } else if (bucket.size() >= 2 && bucket_key < now) { + return bucket_compaction_mode::major; + } + return bucket_compaction_mode::none; +} std::vector time_window_compaction_strategy::get_next_non_expired_sstables(column_family& cf, @@ -236,7 +246,8 @@ time_window_compaction_strategy::newest_bucket(std::map= size_t(min_threshold) && key >= now) { + switch (compaction_mode(bucket, key, now, min_threshold)) { + case bucket_compaction_mode::size_tiered: { // If we're in the newest bucket, we'll use STCS to prioritize sstables auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, stcs_options); @@ -244,11 +255,14 @@ time_window_compaction_strategy::newest_bucket(std::map= 2 && key < now) { + break; + } + case bucket_compaction_mode::major: clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size()); return trim_to_threshold(std::move(bucket), max_threshold); - } else { + default: clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now); + break; } } return {}; @@ -270,15 +284,16 @@ void time_window_compaction_strategy::update_estimated_compaction_by_tasks(std:: int64_t n = 0; timestamp_type now = _highest_window_seen; - for (auto task : tasks) { - auto key = task.first; + for (auto& task : tasks) { + const bucket_t& bucket = task.second; + timestamp_type bucket_key = task.first; - // For current window, make sure it's compactable - auto count = task.second.size(); - if (key >= now && count >= size_t(min_threshold)) { - n++; - } else if (key < now && count >= 2) { + switch (compaction_mode(bucket, bucket_key, now, min_threshold)) { + case bucket_compaction_mode::size_tiered: + case bucket_compaction_mode::major: n++; + default: + break; } } _estimated_remaining_tasks = n; diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh index 5c1133f638..1c01c52e34 100644 --- a/sstables/time_window_compaction_strategy.hh +++ b/sstables/time_window_compaction_strategy.hh @@ -149,6 +149,8 @@ public: // Better co-locate some windows into the same sstables than OOM. static constexpr uint64_t max_data_segregation_window_count = 100; + using bucket_t = std::vector; + enum class bucket_compaction_mode { none, size_tiered, major }; public: time_window_compaction_strategy(const std::map& options); virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector candidates) override; @@ -165,6 +167,10 @@ private: }; } + // Returns which compaction type should be performed on a given window bucket. + bucket_compaction_mode + compaction_mode(const bucket_t& bucket, timestamp_type bucket_key, timestamp_type now, size_t min_threshold) const; + std::vector get_next_non_expired_sstables(column_family& cf, std::vector non_expiring_sstables, gc_clock::time_point gc_before);