From e5cc0cc6c47d96fa197e34fff0cf4c80bb8b3f80 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Thu, 16 Jun 2016 23:32:32 -0300 Subject: [PATCH] compaction: implement date tiered compaction strategy This commit is basically about converting Java to C++. Date tiered compaction strategy isn't wired yet. Signed-off-by: Raphael S. Carvalho --- sstables/DateTieredCompactionStrategy.java | 418 -------------------- sstables/date_tiered_compaction_strategy.hh | 359 +++++++++++++++++ 2 files changed, 359 insertions(+), 418 deletions(-) delete mode 100644 sstables/DateTieredCompactionStrategy.java create mode 100644 sstables/date_tiered_compaction_strategy.hh diff --git a/sstables/DateTieredCompactionStrategy.java b/sstables/DateTieredCompactionStrategy.java deleted file mode 100644 index 15287bd1c6..0000000000 --- a/sstables/DateTieredCompactionStrategy.java +++ /dev/null @@ -1,418 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.compaction; - -import java.util.*; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.cql3.statements.CFPropDefs; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.utils.Pair; - -public class DateTieredCompactionStrategy extends AbstractCompactionStrategy -{ - private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class); - - private final DateTieredCompactionStrategyOptions options; - protected volatile int estimatedRemainingTasks; - private final Set sstables = new HashSet<>(); - - public DateTieredCompactionStrategy(ColumnFamilyStore cfs, Map options) - { - super(cfs, options); - this.estimatedRemainingTasks = 0; - this.options = new DateTieredCompactionStrategyOptions(options); - if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION)) - { - disableTombstoneCompactions = true; - logger.debug("Disabling tombstone compactions for DTCS"); - } - else - logger.debug("Enabling tombstone compactions for DTCS"); - - } - - @Override - public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) - { - if (!isEnabled()) - return null; - - while (true) - { - List latestBucket = getNextBackgroundSSTables(gcBefore); - - if (latestBucket.isEmpty()) - return null; - - if (cfs.getDataTracker().markCompacting(latestBucket)) - return new CompactionTask(cfs, latestBucket, gcBefore, false); - } - } - - /** - * - * @param gcBefore - * @return - */ - private List getNextBackgroundSSTables(final int gcBefore) - { - if (!isEnabled() || cfs.getSSTables().isEmpty()) - return Collections.emptyList(); - - Set uncompacting = Sets.intersection(sstables, cfs.getUncompactingSSTables()); - - // Find fully expired SSTables. Those will be included no matter what. - Set expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(uncompacting), gcBefore); - Set candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting)); - - List compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore)); - if (!expired.isEmpty()) - { - logger.debug("Including expired sstables: {}", expired); - compactionCandidates.addAll(expired); - } - return compactionCandidates; - } - - private List getNextNonExpiredSSTables(Iterable nonExpiringSSTables, final int gcBefore) - { - int base = cfs.getMinimumCompactionThreshold(); - long now = getNow(); - List mostInteresting = getCompactionCandidates(nonExpiringSSTables, now, base); - if (mostInteresting != null) - { - return mostInteresting; - } - - // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone - // ratio is greater than threshold. - List sstablesWithTombstones = Lists.newArrayList(); - for (SSTableReader sstable : nonExpiringSSTables) - { - if (worthDroppingTombstones(sstable, gcBefore)) - sstablesWithTombstones.add(sstable); - } - if (sstablesWithTombstones.isEmpty()) - return Collections.emptyList(); - - return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator())); - } - - private List getCompactionCandidates(Iterable candidateSSTables, long now, int base) - { - Iterable candidates = filterOldSSTables(Lists.newArrayList(candidateSSTables), options.maxSSTableAge, now); - - List> buckets = getBuckets(createSSTableAndMinTimestampPairs(candidates), options.baseTime, base, now); - logger.debug("Compaction buckets are {}", buckets); - updateEstimatedCompactionsByTasks(buckets); - List mostInteresting = newestBucket(buckets, - cfs.getMinimumCompactionThreshold(), - cfs.getMaximumCompactionThreshold(), - now, - options.baseTime); - if (!mostInteresting.isEmpty()) - return mostInteresting; - return null; - } - - /** - * Gets the timestamp that DateTieredCompactionStrategy considers to be the "current time". - * @return the maximum timestamp across all SSTables. - * @throws java.util.NoSuchElementException if there are no SSTables. - */ - private long getNow() - { - return Collections.max(cfs.getSSTables(), new Comparator() - { - public int compare(SSTableReader o1, SSTableReader o2) - { - return Long.compare(o1.getMaxTimestamp(), o2.getMaxTimestamp()); - } - }).getMaxTimestamp(); - } - - /** - * Removes all sstables with max timestamp older than maxSSTableAge. - * @param sstables all sstables to consider - * @param maxSSTableAge the age in milliseconds when an SSTable stops participating in compactions - * @param now current time. SSTables with max timestamp less than (now - maxSSTableAge) are filtered. - * @return a list of sstables with the oldest sstables excluded - */ - @VisibleForTesting - static Iterable filterOldSSTables(List sstables, long maxSSTableAge, long now) - { - if (maxSSTableAge == 0) - return sstables; - final long cutoff = now - maxSSTableAge; - return Iterables.filter(sstables, new Predicate() - { - @Override - public boolean apply(SSTableReader sstable) - { - return sstable.getMaxTimestamp() >= cutoff; - } - }); - } - - /** - * - * @param sstables - * @return - */ - public static List> createSSTableAndMinTimestampPairs(Iterable sstables) - { - List> sstableMinTimestampPairs = Lists.newArrayListWithCapacity(Iterables.size(sstables)); - for (SSTableReader sstable : sstables) - sstableMinTimestampPairs.add(Pair.create(sstable, sstable.getMinTimestamp())); - return sstableMinTimestampPairs; - } - @Override - public void addSSTable(SSTableReader sstable) - { - sstables.add(sstable); - } - - @Override - public void removeSSTable(SSTableReader sstable) - { - sstables.remove(sstable); - } - /** - * A target time span used for bucketing SSTables based on timestamps. - */ - private static class Target - { - // How big a range of timestamps fit inside the target. - public final long size; - // A timestamp t hits the target iff t / size == divPosition. - public final long divPosition; - - public Target(long size, long divPosition) - { - this.size = size; - this.divPosition = divPosition; - } - - /** - * Compares the target to a timestamp. - * @param timestamp the timestamp to compare. - * @return a negative integer, zero, or a positive integer as the target lies before, covering, or after than the timestamp. - */ - public int compareToTimestamp(long timestamp) - { - return Long.compare(divPosition, timestamp / size); - } - - /** - * Tells if the timestamp hits the target. - * @param timestamp the timestamp to test. - * @return true iff timestamp / size == divPosition. - */ - public boolean onTarget(long timestamp) - { - return compareToTimestamp(timestamp) == 0; - } - - /** - * Gets the next target, which represents an earlier time span. - * @param base The number of contiguous targets that will have the same size. Targets following those will be base times as big. - * @return - */ - public Target nextTarget(int base) - { - if (divPosition % base > 0) - return new Target(size, divPosition - 1); - else - return new Target(size * base, divPosition / base - 1); - } - } - - - /** - * Group files with similar min timestamp into buckets. Files with recent min timestamps are grouped together into - * buckets designated to short timespans while files with older timestamps are grouped into buckets representing - * longer timespans. - * @param files pairs consisting of a file and its min timestamp - * @param timeUnit - * @param base - * @param now - * @return a list of buckets of files. The list is ordered such that the files with newest timestamps come first. - * Each bucket is also a list of files ordered from newest to oldest. - */ - @VisibleForTesting - static List> getBuckets(Collection> files, long timeUnit, int base, long now) - { - // Sort files by age. Newest first. - final List> sortedFiles = Lists.newArrayList(files); - Collections.sort(sortedFiles, Collections.reverseOrder(new Comparator>() - { - public int compare(Pair p1, Pair p2) - { - return p1.right.compareTo(p2.right); - } - })); - - List> buckets = Lists.newArrayList(); - Target target = getInitialTarget(now, timeUnit); - PeekingIterator> it = Iterators.peekingIterator(sortedFiles.iterator()); - - outerLoop: - while (it.hasNext()) - { - while (!target.onTarget(it.peek().right)) - { - // If the file is too new for the target, skip it. - if (target.compareToTimestamp(it.peek().right) < 0) - { - it.next(); - - if (!it.hasNext()) - break outerLoop; - } - else // If the file is too old for the target, switch targets. - target = target.nextTarget(base); - } - - List bucket = Lists.newArrayList(); - while (target.onTarget(it.peek().right)) - { - bucket.add(it.next().left); - - if (!it.hasNext()) - break; - } - buckets.add(bucket); - } - - return buckets; - } - - @VisibleForTesting - static Target getInitialTarget(long now, long timeUnit) - { - return new Target(timeUnit, now / timeUnit); - } - - - private void updateEstimatedCompactionsByTasks(List> tasks) - { - int n = 0; - for (List bucket : tasks) - { - if (bucket.size() >= cfs.getMinimumCompactionThreshold()) - n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold()); - } - estimatedRemainingTasks = n; - } - - - /** - * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds. - * @param minThreshold minimum number of sstables in a bucket to qualify. - * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this). - * @return a bucket (list) of sstables to compact. - */ - @VisibleForTesting - static List newestBucket(List> buckets, int minThreshold, int maxThreshold, long now, long baseTime) - { - // If the "incoming window" has at least minThreshold SSTables, choose that one. - // For any other bucket, at least 2 SSTables is enough. - // In any case, limit to maxThreshold SSTables. - Target incomingWindow = getInitialTarget(now, baseTime); - for (List bucket : buckets) - { - if (bucket.size() >= minThreshold || - (bucket.size() >= 2 && !incomingWindow.onTarget(bucket.get(0).getMinTimestamp()))) - return trimToThreshold(bucket, maxThreshold); - } - return Collections.emptyList(); - } - - /** - * @param bucket list of sstables, ordered from newest to oldest by getMinTimestamp(). - * @param maxThreshold maximum number of sstables in a single compaction task. - * @return A bucket trimmed to the maxThreshold newest sstables. - */ - @VisibleForTesting - static List trimToThreshold(List bucket, int maxThreshold) - { - // Trim the oldest sstables off the end to meet the maxThreshold - return bucket.subList(0, Math.min(bucket.size(), maxThreshold)); - } - - @Override - public synchronized Collection getMaximalTask(int gcBefore) - { - Iterable sstables = cfs.markAllCompacting(); - if (sstables == null) - return null; - - return Arrays.asList(new CompactionTask(cfs, sstables, gcBefore, false)); - } - - @Override - public synchronized AbstractCompactionTask getUserDefinedTask(Collection sstables, int gcBefore) - { - assert !sstables.isEmpty(); // checked for by CM.submitUserDefined - - if (!cfs.getDataTracker().markCompacting(sstables)) - { - logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables); - return null; - } - - return new CompactionTask(cfs, sstables, gcBefore, false).setUserDefined(true); - } - - public int getEstimatedRemainingTasks() - { - return estimatedRemainingTasks; - } - - public long getMaxSSTableBytes() - { - return Long.MAX_VALUE; - } - - - public static Map validateOptions(Map options) throws ConfigurationException - { - Map uncheckedOptions = AbstractCompactionStrategy.validateOptions(options); - uncheckedOptions = DateTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions); - - uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD); - uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD); - - return uncheckedOptions; - } - - public String toString() - { - return String.format("DateTieredCompactionStrategy[%s/%s]", - cfs.getMinimumCompactionThreshold(), - cfs.getMaximumCompactionThreshold()); - } -} diff --git a/sstables/date_tiered_compaction_strategy.hh b/sstables/date_tiered_compaction_strategy.hh new file mode 100644 index 0000000000..d5ada6f3e8 --- /dev/null +++ b/sstables/date_tiered_compaction_strategy.hh @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Copyright (C) 2016 ScyllaDB + * + * Modified by ScyllaDB + */ + +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "sstables.hh" +#include "compaction.hh" + +class date_tiered_manifest { + static logging::logger logger; + + static constexpr double DEFAULT_MAX_SSTABLE_AGE_DAYS = 365; + static constexpr int64_t DEFAULT_BASE_TIME_SECONDS = 60; + + // TODO: implement date_tiered_compaction_strategy_options. + db_clock::duration _max_sstable_age; + db_clock::duration _base_time; +public: + date_tiered_manifest() = delete; + + date_tiered_manifest(const std::map& options) { + auto max_sstable_age_in_hours = int64_t(DEFAULT_MAX_SSTABLE_AGE_DAYS * 24); + _max_sstable_age = std::chrono::duration_cast(std::chrono::hours(max_sstable_age_in_hours)); + _base_time = std::chrono::duration_cast(std::chrono::seconds(DEFAULT_BASE_TIME_SECONDS)); + + // FIXME: implement option to disable tombstone compaction. +#if 0 + if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION)) + { + disableTombstoneCompactions = true; + logger.debug("Disabling tombstone compactions for DTCS"); + } + else + logger.debug("Enabling tombstone compactions for DTCS"); +#endif + } + + std::vector + get_next_sstables(column_family& cf, std::vector& uncompacting, gc_clock::time_point gc_before) { + if (cf.get_sstables()->empty()) { + return {}; + } + + // Find fully expired SSTables. Those will be included no matter what. + auto expired = get_fully_expired_sstables(cf, uncompacting, gc_before.time_since_epoch().count()); + + auto sort_ssts = [] (std::vector& sstables) { + std::sort(sstables.begin(), sstables.end(), [] (const auto& x, const auto& y) { + return x->generation() < y->generation(); + }); + }; + sort_ssts(uncompacting); + sort_ssts(expired); + + std::vector non_expired_set; + // Set non_expired_set with the elements that are in uncompacting, but not in the expired. + std::set_difference(uncompacting.begin(), uncompacting.end(), expired.begin(), expired.end(), + std::inserter(non_expired_set, non_expired_set.begin()), [] (const auto& x, const auto& y) { + return x->generation() < y->generation(); + }); + + auto compaction_candidates = get_next_non_expired_sstables(cf, non_expired_set, gc_before); + if (!expired.empty()) { + compaction_candidates.insert(compaction_candidates.end(), expired.begin(), expired.end()); + } + return compaction_candidates; + } + + int64_t get_estimated_tasks(column_family& cf) const { + int base = cf.schema()->min_compaction_threshold(); + int64_t now = get_now(cf); + std::vector sstables; + int64_t n = 0; + + sstables.reserve(cf.sstables_count()); + for (auto& entry : *cf.get_sstables()) { + sstables.push_back(entry); + } + auto candidates = filter_old_sstables(sstables, _max_sstable_age, now); + auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now); + + for (auto& bucket : buckets) { + if (bucket.size() >= size_t(cf.schema()->min_compaction_threshold())) { + n += std::ceil(double(bucket.size()) / cf.schema()->max_compaction_threshold()); + } + } + return n; + } +private: + std::vector + get_next_non_expired_sstables(column_family& cf, std::vector& non_expiring_sstables, gc_clock::time_point gc_before) { + int base = cf.schema()->min_compaction_threshold(); + int64_t now = get_now(cf); + auto most_interesting = get_compaction_candidates(cf, non_expiring_sstables, now, base); + + return most_interesting; + + // FIXME: implement functionality below that will look for a single sstable with worth dropping tombstone, + // iff strategy didn't find anything to compact. So it's not essential. +#if 0 + // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone + // ratio is greater than threshold. + + List sstablesWithTombstones = Lists.newArrayList(); + for (SSTableReader sstable : nonExpiringSSTables) + { + if (worthDroppingTombstones(sstable, gcBefore)) + sstablesWithTombstones.add(sstable); + } + if (sstablesWithTombstones.isEmpty()) + return Collections.emptyList(); + + return Collections.singletonList(Collections.min(sstablesWithTombstones, new SSTableReader.SizeComparator())); +#endif + } + + std::vector + get_compaction_candidates(column_family& cf, std::vector candidate_sstables, int64_t now, int base) { + int min_threshold = cf.schema()->min_compaction_threshold(); + int max_threshold = cf.schema()->max_compaction_threshold(); + auto candidates = filter_old_sstables(candidate_sstables, _max_sstable_age, now); + + auto buckets = get_buckets(create_sst_and_min_timestamp_pairs(candidates), _base_time, base, now); + + return newest_bucket(buckets, min_threshold, max_threshold, now, _base_time); + } + + /** + * Gets the timestamp that DateTieredCompactionStrategy considers to be the "current time". + * @return the maximum timestamp across all SSTables. + */ + static int64_t get_now(column_family& cf) { + int64_t max_timestamp = 0; + for (auto& sst : *cf.get_sstables()) { + int64_t candidate = sst->get_stats_metadata().max_timestamp; + max_timestamp = candidate > max_timestamp ? candidate : max_timestamp; + } + return max_timestamp; + } + + /** + * Removes all sstables with max timestamp older than maxSSTableAge. + * @return a list of sstables with the oldest sstables excluded + */ + static std::vector + filter_old_sstables(std::vector sstables, db_clock::duration max_sstable_age, int64_t now) { + int64_t max_sstable_age_count = std::chrono::duration_cast(max_sstable_age).count(); + if (max_sstable_age_count == 0) { + return sstables; + } + int64_t cutoff = now - max_sstable_age_count; + + sstables.erase(std::remove_if(sstables.begin(), sstables.end(), [cutoff] (auto& sst) { + return sst->get_stats_metadata().max_timestamp < cutoff; + }), sstables.end()); + + return sstables; + } + + /** + * + * @param sstables + * @return + */ + static std::vector> + create_sst_and_min_timestamp_pairs(const std::vector& sstables) { + std::vector> sstable_min_timestamp_pairs; + sstable_min_timestamp_pairs.reserve(sstables.size()); + for (auto& sst : sstables) { + sstable_min_timestamp_pairs.emplace_back(sst, sst->get_stats_metadata().min_timestamp); + } + return sstable_min_timestamp_pairs; + } + + /** + * A target time span used for bucketing SSTables based on timestamps. + */ + struct target { + // How big a range of timestamps fit inside the target. + int64_t size; + // A timestamp t hits the target iff t / size == divPosition. + int64_t div_position; + + target() = delete; + target(int64_t size, int64_t div_position) : size(size), div_position(div_position) {} + + /** + * Compares the target to a timestamp. + * @param timestamp the timestamp to compare. + * @return a negative integer, zero, or a positive integer as the target lies before, covering, or after than the timestamp. + */ + int compare_to_timestamp(int64_t timestamp) { + auto ts1 = div_position; + auto ts2 = timestamp / size; + return (ts1 > ts2 ? 1 : (ts1 == ts2 ? 0 : -1)); + } + + /** + * Tells if the timestamp hits the target. + * @param timestamp the timestamp to test. + * @return true iff timestamp / size == divPosition. + */ + bool on_target(int64_t timestamp) { + return compare_to_timestamp(timestamp) == 0; + } + + /** + * Gets the next target, which represents an earlier time span. + * @param base The number of contiguous targets that will have the same size. Targets following those will be base times as big. + * @return + */ + target next_target(int base) + { + if (div_position % base > 0) { + return target(size, div_position - 1); + } else { + return target(size * base, div_position / base - 1); + } + } + }; + + + /** + * Group files with similar min timestamp into buckets. Files with recent min timestamps are grouped together into + * buckets designated to short timespans while files with older timestamps are grouped into buckets representing + * longer timespans. + * @param files pairs consisting of a file and its min timestamp + * @param timeUnit + * @param base + * @param now + * @return a list of buckets of files. The list is ordered such that the files with newest timestamps come first. + * Each bucket is also a list of files ordered from newest to oldest. + */ + std::vector> + get_buckets(std::vector>&& files, db_clock::duration time_unit, int base, int64_t now) const { + // Sort files by age. Newest first. + std::sort(files.begin(), files.end(), [] (auto& i, auto& j) { + return i.second > j.second; + }); + + std::vector> buckets; + auto target = get_initial_target(now, std::chrono::duration_cast(time_unit).count()); + auto it = files.begin(); + + while (it != files.end()) { + bool finish = false; + while (!target.on_target(it->second)) { + // If the file is too new for the target, skip it. + if (target.compare_to_timestamp(it->second) < 0) { + it++; + if (it == files.end()) { + finish = true; + break; + } + } else { // If the file is too old for the target, switch targets. + target = target.next_target(base); + } + } + if (finish) { + break; + } + + std::vector bucket; + while (target.on_target(it->second)) { + bucket.push_back(it->first); + it++; + if (it == files.end()) { + break; + } + } + buckets.push_back(bucket); + } + + return buckets; + } + + target get_initial_target(uint64_t now, int64_t time_unit) const { + return target(time_unit, now / time_unit); + } + + /** + * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds. + * @param minThreshold minimum number of sstables in a bucket to qualify. + * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this). + * @return a bucket (list) of sstables to compact. + */ + std::vector + newest_bucket(std::vector>& buckets, int min_threshold, int max_threshold, + int64_t now, db_clock::duration base_time) { + + // If the "incoming window" has at least minThreshold SSTables, choose that one. + // For any other bucket, at least 2 SSTables is enough. + // In any case, limit to maxThreshold SSTables. + target incoming_window = get_initial_target(now, std::chrono::duration_cast(base_time).count()); + for (auto& bucket : buckets) { + auto min_timestamp = bucket.front()->get_stats_metadata().min_timestamp; + if (bucket.size() >= size_t(min_threshold) || + (bucket.size() >= 2 && !incoming_window.on_target(min_timestamp))) { + trim_to_threshold(bucket, max_threshold); + return bucket; + } + } + return {}; + } + + + /** + * @param bucket list of sstables, ordered from newest to oldest by getMinTimestamp(). + * @param maxThreshold maximum number of sstables in a single compaction task. + * @return A bucket trimmed to the maxThreshold newest sstables. + */ + static void trim_to_threshold(std::vector& bucket, int max_threshold) { + // Trim the oldest sstables off the end to meet the maxThreshold + bucket.resize(std::min(bucket.size(), size_t(max_threshold))); + } +};