From b95359314d5d45294d94795488dfd94fc390b77b Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 12 Aug 2020 16:14:38 -0300 Subject: [PATCH] compaction/twcs: Move TWCS implementation into source file Signed-off-by: Raphael S. Carvalho --- sstables/time_window_compaction_strategy.cc | 159 ++++++++++++++++++++ sstables/time_window_compaction_strategy.hh | 147 +----------------- 2 files changed, 167 insertions(+), 139 deletions(-) diff --git a/sstables/time_window_compaction_strategy.cc b/sstables/time_window_compaction_strategy.cc index d7b8b4ee56..5b53aa30e1 100644 --- a/sstables/time_window_compaction_strategy.cc +++ b/sstables/time_window_compaction_strategy.cc @@ -125,4 +125,163 @@ time_window_compaction_strategy::get_reshaping_job(std::vector i return compaction_descriptor(); } +compaction_descriptor +time_window_compaction_strategy::get_sstables_for_compaction(column_family& cf, std::vector candidates) { + auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds(); + + if (candidates.empty()) { + return compaction_descriptor(); + } + + // Find fully expired SSTables. Those will be included no matter what. + std::unordered_set expired; + + if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) { + clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables"); + expired = get_fully_expired_sstables(cf, candidates, gc_before); + _last_expired_check = db_clock::now(); + } else { + clogger.debug("TWCS skipping check for fully expired SSTables"); + } + + if (!expired.empty()) { + auto is_expired = [&] (const shared_sstable& s) { return expired.contains(s); }; + candidates.erase(boost::remove_if(candidates, is_expired), candidates.end()); + } + + auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(candidates), gc_before); + if (!expired.empty()) { + compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end()); + } + return compaction_descriptor(std::move(compaction_candidates), cf.get_sstable_set(), service::get_local_compaction_priority()); +} + + +std::vector +time_window_compaction_strategy::get_next_non_expired_sstables(column_family& cf, + std::vector non_expiring_sstables, gc_clock::time_point gc_before) { + auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables); + + if (!most_interesting.empty()) { + return most_interesting; + } + + // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone + // ratio is greater than threshold. + auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool { + return !worth_dropping_tombstones(sst, gc_before); + }); + non_expiring_sstables.erase(e, non_expiring_sstables.end()); + if (non_expiring_sstables.empty()) { + return {}; + } + auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) { + return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp; + }); + return { *it }; +} + +std::vector +time_window_compaction_strategy::get_compaction_candidates(column_family& cf, std::vector candidate_sstables) { + auto p = get_buckets(std::move(candidate_sstables), _options); + // Update the highest window seen, if necessary + _highest_window_seen = std::max(_highest_window_seen, p.second); + + update_estimated_compaction_by_tasks(p.first, cf.min_compaction_threshold()); + + return newest_bucket(std::move(p.first), cf.min_compaction_threshold(), cf.schema()->max_compaction_threshold(), + _options.sstable_window_size, _highest_window_seen, _stcs_options); +} + +timestamp_type +time_window_compaction_strategy::get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) { + using namespace std::chrono; + auto timestamp_in_sec = duration_cast(microseconds(timestamp)).count(); + + // mask out window size from timestamp to get lower bound of its window + auto window_lower_bound_in_sec = seconds(timestamp_in_sec - (timestamp_in_sec % sstable_window_size.count())); + + return timestamp_type(duration_cast(window_lower_bound_in_sec).count()); +} + +std::pair>, timestamp_type> +time_window_compaction_strategy::get_buckets(std::vector files, time_window_compaction_strategy_options& options) { + std::map> buckets; + + timestamp_type max_timestamp = 0; + // Create map to represent buckets + // For each sstable, add sstable to the time bucket + // Where the bucket is the file's max timestamp rounded to the nearest window bucket + for (auto&& f : files) { + timestamp_type ts = to_timestamp_type(options.timestamp_resolution, f->get_stats_metadata().max_timestamp); + timestamp_type lower_bound = get_window_lower_bound(options.sstable_window_size, ts); + buckets[lower_bound].push_back(std::move(f)); + max_timestamp = std::max(max_timestamp, lower_bound); + } + + return std::make_pair(std::move(buckets), max_timestamp); +} + +std::vector +time_window_compaction_strategy::newest_bucket(std::map> buckets, + int min_threshold, int max_threshold, std::chrono::seconds sstable_window_size, timestamp_type now, + size_tiered_compaction_strategy_options& stcs_options) { + // If the current bucket has at least minThreshold SSTables, choose that one. + // For any other bucket, at least 2 SSTables is enough. + // In any case, limit to maxThreshold SSTables. + + for (auto&& key_bucket : buckets | boost::adaptors::reversed) { + auto key = key_bucket.first; + auto& bucket = key_bucket.second; + + clogger.trace("Key {}, now {}", key, now); + + if (bucket.size() >= size_t(min_threshold) && key >= now) { + // If we're in the newest bucket, we'll use STCS to prioritize sstables + auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, stcs_options); + + // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets + if (!stcs_interesting_bucket.empty()) { + return stcs_interesting_bucket; + } + } else if (bucket.size() >= 2 && key < now) { + clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size()); + return trim_to_threshold(std::move(bucket), max_threshold); + } else { + clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now); + } + } + return {}; +} + +std::vector +time_window_compaction_strategy::trim_to_threshold(std::vector bucket, int max_threshold) { + auto n = std::min(bucket.size(), size_t(max_threshold)); + // Trim the largest sstables off the end to meet the maxThreshold + boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) { + return i->ondisk_data_size() < j->ondisk_data_size(); + }); + bucket.resize(n); + return bucket; +} + +void time_window_compaction_strategy::update_estimated_compaction_by_tasks(std::map>& tasks, + int min_threshold) { + int64_t n = 0; + timestamp_type now = _highest_window_seen; + + for (auto task : tasks) { + auto key = task.first; + + // For current window, make sure it's compactable + auto count = task.second.size(); + if (key >= now && count >= size_t(min_threshold)) { + n++; + } else if (key < now && count >= 2) { + n++; + } + } + _estimated_remaining_tasks = n; +} + } diff --git a/sstables/time_window_compaction_strategy.hh b/sstables/time_window_compaction_strategy.hh index 4f1049f751..1c14dd3e38 100644 --- a/sstables/time_window_compaction_strategy.hh +++ b/sstables/time_window_compaction_strategy.hh @@ -151,35 +151,7 @@ public: public: time_window_compaction_strategy(const std::map& options); - virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector candidates) override { - auto gc_before = gc_clock::now() - cf.schema()->gc_grace_seconds(); - - if (candidates.empty()) { - return compaction_descriptor(); - } - - // Find fully expired SSTables. Those will be included no matter what. - std::unordered_set expired; - - if (db_clock::now() - _last_expired_check > _options.expired_sstable_check_frequency) { - clogger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables"); - expired = get_fully_expired_sstables(cf, candidates, gc_before); - _last_expired_check = db_clock::now(); - } else { - clogger.debug("TWCS skipping check for fully expired SSTables"); - } - - if (!expired.empty()) { - auto is_expired = [&] (const shared_sstable& s) { return expired.contains(s); }; - candidates.erase(boost::remove_if(candidates, is_expired), candidates.end()); - } - - auto compaction_candidates = get_next_non_expired_sstables(cf, std::move(candidates), gc_before); - if (!expired.empty()) { - compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end()); - } - return compaction_descriptor(std::move(compaction_candidates), cf.get_sstable_set(), service::get_local_compaction_priority()); - } + virtual compaction_descriptor get_sstables_for_compaction(column_family& cf, std::vector candidates) override; private: static timestamp_type to_timestamp_type(time_window_compaction_strategy_options::timestamp_resolutions resolution, int64_t timestamp_from_sstable) { @@ -194,113 +166,26 @@ private: } std::vector - get_next_non_expired_sstables(column_family& cf, std::vector non_expiring_sstables, gc_clock::time_point gc_before) { - auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables); + get_next_non_expired_sstables(column_family& cf, std::vector non_expiring_sstables, gc_clock::time_point gc_before); - if (!most_interesting.empty()) { - return most_interesting; - } - - // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone - // ratio is greater than threshold. - auto e = boost::range::remove_if(non_expiring_sstables, [this, &gc_before] (const shared_sstable& sst) -> bool { - return !worth_dropping_tombstones(sst, gc_before); - }); - non_expiring_sstables.erase(e, non_expiring_sstables.end()); - if (non_expiring_sstables.empty()) { - return {}; - } - auto it = boost::min_element(non_expiring_sstables, [] (auto& i, auto& j) { - return i->get_stats_metadata().min_timestamp < j->get_stats_metadata().min_timestamp; - }); - return { *it }; - } - - std::vector get_compaction_candidates(column_family& cf, std::vector candidate_sstables) { - auto p = get_buckets(std::move(candidate_sstables), _options); - // Update the highest window seen, if necessary - _highest_window_seen = std::max(_highest_window_seen, p.second); - - update_estimated_compaction_by_tasks(p.first, cf.min_compaction_threshold()); - - return newest_bucket(std::move(p.first), cf.min_compaction_threshold(), cf.schema()->max_compaction_threshold(), - _options.sstable_window_size, _highest_window_seen, _stcs_options); - } + std::vector get_compaction_candidates(column_family& cf, std::vector candidate_sstables); public: // Find the lowest timestamp for window of given size static timestamp_type - get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp) { - using namespace std::chrono; - auto timestamp_in_sec = duration_cast(microseconds(timestamp)).count(); - - // mask out window size from timestamp to get lower bound of its window - auto window_lower_bound_in_sec = seconds(timestamp_in_sec - (timestamp_in_sec % sstable_window_size.count())); - - return timestamp_type(duration_cast(window_lower_bound_in_sec).count()); - } + get_window_lower_bound(std::chrono::seconds sstable_window_size, timestamp_type timestamp); // Group files with similar max timestamp into buckets. // @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), // and the right is the highest timestamp seen static std::pair>, timestamp_type> - get_buckets(std::vector files, time_window_compaction_strategy_options& options) { - std::map> buckets; - - timestamp_type max_timestamp = 0; - // Create map to represent buckets - // For each sstable, add sstable to the time bucket - // Where the bucket is the file's max timestamp rounded to the nearest window bucket - for (auto&& f : files) { - timestamp_type ts = to_timestamp_type(options.timestamp_resolution, f->get_stats_metadata().max_timestamp); - timestamp_type lower_bound = get_window_lower_bound(options.sstable_window_size, ts); - buckets[lower_bound].push_back(std::move(f)); - max_timestamp = std::max(max_timestamp, lower_bound); - } - - return std::make_pair(std::move(buckets), max_timestamp); - } + get_buckets(std::vector files, time_window_compaction_strategy_options& options); static std::vector newest_bucket(std::map> buckets, int min_threshold, int max_threshold, - std::chrono::seconds sstable_window_size, timestamp_type now, size_tiered_compaction_strategy_options& stcs_options) { - // If the current bucket has at least minThreshold SSTables, choose that one. - // For any other bucket, at least 2 SSTables is enough. - // In any case, limit to maxThreshold SSTables. - - for (auto&& key_bucket : buckets | boost::adaptors::reversed) { - auto key = key_bucket.first; - auto& bucket = key_bucket.second; - - clogger.trace("Key {}, now {}", key, now); - - if (bucket.size() >= size_t(min_threshold) && key >= now) { - // If we're in the newest bucket, we'll use STCS to prioritize sstables - auto stcs_interesting_bucket = size_tiered_compaction_strategy::most_interesting_bucket(bucket, min_threshold, max_threshold, stcs_options); - - // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets - if (!stcs_interesting_bucket.empty()) { - return stcs_interesting_bucket; - } - } else if (bucket.size() >= 2 && key < now) { - clogger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here", bucket.size()); - return trim_to_threshold(std::move(bucket), max_threshold); - } else { - clogger.debug("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now); - } - } - return {}; - } + std::chrono::seconds sstable_window_size, timestamp_type now, size_tiered_compaction_strategy_options& stcs_options); static std::vector - trim_to_threshold(std::vector bucket, int max_threshold) { - auto n = std::min(bucket.size(), size_t(max_threshold)); - // Trim the largest sstables off the end to meet the maxThreshold - boost::partial_sort(bucket, bucket.begin() + n, [] (auto& i, auto& j) { - return i->ondisk_data_size() < j->ondisk_data_size(); - }); - bucket.resize(n); - return bucket; - } + trim_to_threshold(std::vector bucket, int max_threshold); static int64_t get_window_for(const time_window_compaction_strategy_options& options, api::timestamp_type ts) { @@ -312,23 +197,7 @@ public: return timestamp_type(std::chrono::duration_cast(options.get_sstable_window_size()).count()); } private: - void update_estimated_compaction_by_tasks(std::map>& tasks, int min_threshold) { - int64_t n = 0; - timestamp_type now = _highest_window_seen; - - for (auto task : tasks) { - auto key = task.first; - - // For current window, make sure it's compactable - auto count = task.second.size(); - if (key >= now && count >= size_t(min_threshold)) { - n++; - } else if (key < now && count >= 2) { - n++; - } - } - _estimated_remaining_tasks = n; - } + void update_estimated_compaction_by_tasks(std::map>& tasks, int min_threshold); friend class time_window_backlog_tracker; public: