diff --git a/sstables/compaction.hh b/sstables/compaction.hh index 454794ac0d..b4c3ded1cf 100644 --- a/sstables/compaction.hh +++ b/sstables/compaction.hh @@ -26,6 +26,26 @@ #include namespace sstables { + + struct compaction_descriptor { + // List of sstables to be compacted. + std::vector sstables; + // Level of sstable(s) created by compaction procedure. + int level = 0; + // Threshold size for sstable(s) to be created. + uint64_t max_sstable_bytes = std::numeric_limits::max(); + + compaction_descriptor() = default; + + compaction_descriptor(std::vector sstables, int level, long max_sstable_bytes) + : sstables(std::move(sstables)) + , level(level) + , max_sstable_bytes(max_sstable_bytes) {} + + compaction_descriptor(std::vector sstables) + : sstables(std::move(sstables)) {} + }; + future<> compact_sstables(std::vector sstables, column_family& cf, std::function creator); diff --git a/sstables/LeveledManifest.java b/sstables/leveled_manifest.hh similarity index 60% rename from sstables/LeveledManifest.java rename to sstables/leveled_manifest.hh index f3c3356ca9..6dd851b326 100644 --- a/sstables/LeveledManifest.java +++ b/sstables/leveled_manifest.hh @@ -15,69 +15,76 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.cassandra.db.compaction; -import java.io.IOException; -import java.util.*; +/* + * Copyright 2015 Cloudius Systems + * + * Modified by Cloudius Systems + */ -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.base.Predicates; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.ImmutableSortedSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.google.common.primitives.Ints; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/* + * This file is part of Scylla. + * + * Scylla is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Scylla is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Scylla. If not, see . + */ -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.dht.Bounds; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.*; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.Pair; +#pragma once -public class LeveledManifest -{ - private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class); +#include "sstables.hh" +#include "compaction.hh" +#include "range.hh" +#include "log.hh" + +class leveled_manifest { + logging::logger logger; /** * limit the number of L0 sstables we do at once, because compaction bloom filter creation * uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory * or even OOMing when compacting highly overlapping sstables */ - private static final int MAX_COMPACTING_L0 = 32; + static constexpr int MAX_COMPACTING_L0 = 32; /** * If we go this many rounds without compacting * in the highest level, we start bringing in sstables from * that level into lower level compactions */ - private static final int NO_COMPACTION_LIMIT = 25; + static constexpr int NO_COMPACTION_LIMIT = 25; - private final ColumnFamilyStore cfs; - @VisibleForTesting - protected final List[] generations; + schema_ptr _schema; + std::vector> _generations; +#if 0 private final RowPosition[] lastCompactedKeys; - private final int maxSSTableSizeInBytes; +#endif + uint64_t _max_sstable_size_in_bytes; +#if 0 private final SizeTieredCompactionStrategyOptions options; private final int [] compactionCounter; +#endif - LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options) +public: + leveled_manifest(column_family& cfs, int max_sstable_size_in_MB) + : logger("LeveledManifest") + , _schema(cfs.schema()) + , _max_sstable_size_in_bytes(max_sstable_size_in_MB * 1024 * 1024) { - this.cfs = cfs; - this.maxSSTableSizeInBytes = maxSSTableSizeInMB * 1024 * 1024; - this.options = options; - // allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is // updated, we will still have sstables of the older, potentially smaller size. So don't make this // dependent on maxSSTableSize.) - int n = (int) Math.log10(1000 * 1000 * 1000); - generations = new List[n]; + uint64_t n = 9; // log10(1000^3) + _generations.resize(n); +#if 0 lastCompactedKeys = new RowPosition[n]; for (int i = 0; i < generations.length; i++) { @@ -85,43 +92,49 @@ public class LeveledManifest lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound(); } compactionCounter = new int[n]; +#endif } +#if 0 public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List sstables) { return create(cfs, maxSSTableSize, sstables, new SizeTieredCompactionStrategyOptions()); } +#endif - public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, Iterable sstables, SizeTieredCompactionStrategyOptions options) - { - LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize, options); + static leveled_manifest create(column_family& cfs, int max_sstable_size_in_mb) { + leveled_manifest manifest = leveled_manifest(cfs, max_sstable_size_in_mb); // ensure all SSTables are in the manifest - for (SSTableReader ssTableReader : sstables) - { - manifest.add(ssTableReader); + auto sstables = cfs.get_sstables(); + for (auto& map_entry : *sstables) { + auto& sstable = map_entry.second; + manifest.add(sstable); } - for (int i = 1; i < manifest.getAllLevelSize().length; i++) - { - manifest.repairOverlappingSSTables(i); + + for (auto i = 1U; i < manifest._generations.size(); i++) { + manifest.repair_overlapping_sstables(i); } + return manifest; } - public synchronized void add(SSTableReader reader) - { - int level = reader.getSSTableLevel(); + void add(sstables::shared_sstable& sstable) { + uint32_t level = sstable->get_sstable_level(); - assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1); - logDistribution(); - if (canAddSSTable(reader)) - { - // adding the sstable does not cause overlap in the level - logger.debug("Adding {} to L{}", reader, level); - generations[level].add(reader); + if (level >= _generations.size()) { + throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (_generations.size() - 1))); } - else - { +#if 0 + logDistribution(); +#endif + if (can_add_sstable(sstable)) { + // adding the sstable does not cause overlap in the level + + logger.debug("Adding {} to L{}", sstable->get_filename(), level); + + _generations[level].push_back(sstable); + } else { // this can happen if: // * a compaction has promoted an overlapping sstable to the given level, or // was also supposed to add an sstable at the given level. @@ -129,6 +142,7 @@ public class LeveledManifest // would cause overlap // // The add(..):ed sstable will be sent to level 0 +#if 0 try { reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0); @@ -138,12 +152,12 @@ public class LeveledManifest { logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e); } - generations[0].add(reader); +#endif + _generations[0].push_back(sstable); } } - - +#if 0 public synchronized void replace(Collection removed, Collection added) { assert !removed.isEmpty(); // use add() instead of promote when adding new sstables @@ -172,31 +186,37 @@ public class LeveledManifest add(ssTableReader); lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last; } +#endif - public synchronized void repairOverlappingSSTables(int level) - { - SSTableReader previous = null; - Collections.sort(generations[level], SSTableReader.sstableComparator); - List outOfOrderSSTables = new ArrayList<>(); - for (SSTableReader current : generations[level]) - { - if (previous != null && current.first.compareTo(previous.last) <= 0) - { + void repair_overlapping_sstables(int level) { + const sstables::sstable *previous = nullptr; + const schema& s = *_schema; + + _generations[level].sort([&s] (auto& i, auto& j) { + return i->compare_by_first_key(s, *j) < 0; + }); + + std::vector out_of_order_sstables; + + for (auto& current : _generations[level]) { + auto current_first = current->get_first_decorated_key(s); + + if (previous != nullptr && current_first.tri_compare(s, previous->get_last_decorated_key(s)) <= 0) { +#if 0 logger.warn(String.format("At level %d, %s [%s, %s] overlaps %s [%s, %s]. This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 or due to the fact that you have dropped sstables from another node into the data directory. " + - "Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable", - level, previous, previous.first, previous.last, current, current.first, current.last)); - outOfOrderSSTables.add(current); - } - else - { - previous = current; + "Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable", + level, previous, previous.first, previous.last, current, current.first, current.last)); +#endif + out_of_order_sstables.push_back(current); + } else { + previous = &*current; } } - if (!outOfOrderSSTables.isEmpty()) - { - for (SSTableReader sstable : outOfOrderSSTables) - sendBackToL0(sstable); + if (!out_of_order_sstables.empty()) { + for (auto& sstable : out_of_order_sstables) { + send_back_to_L0(sstable); + } } } @@ -205,29 +225,39 @@ public class LeveledManifest * @param sstable the sstable to add * @return true if it is safe to add the sstable in the level. */ - private boolean canAddSSTable(SSTableReader sstable) - { - int level = sstable.getSSTableLevel(); - if (level == 0) + bool can_add_sstable(sstables::shared_sstable& sstable) { + uint32_t level = sstable->get_sstable_level(); + const schema& s = *_schema; + + if (level == 0) { return true; - - List copyLevel = new ArrayList<>(generations[level]); - copyLevel.add(sstable); - Collections.sort(copyLevel, SSTableReader.sstableComparator); - - SSTableReader previous = null; - for (SSTableReader current : copyLevel) - { - if (previous != null && current.first.compareTo(previous.last) <= 0) - return false; - previous = current; } + + auto copy_level = _generations[level]; + copy_level.push_back(sstable); + copy_level.sort([&s] (auto& i, auto& j) { + return i->compare_by_first_key(s, *j) < 0; + }); + + const sstables::sstable *previous = nullptr; + for (auto& current : copy_level) { + if (previous != nullptr) { + auto current_first = current->get_first_decorated_key(s); + auto previous_last = previous->get_last_decorated_key(s); + + if (current_first.tri_compare(s, previous_last) <= 0) { + return false; + } + } + previous = &*current; + } + return true; } - private synchronized void sendBackToL0(SSTableReader sstable) - { + void send_back_to_L0(sstables::shared_sstable& sstable) { remove(sstable); +#if 0 try { sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0); @@ -238,8 +268,12 @@ public class LeveledManifest { throw new RuntimeException("Could not reload sstable meta data", e); } +#else + _generations[0].push_back(sstable); +#endif } +#if 0 private String toString(Collection sstables) { StringBuilder builder = new StringBuilder(); @@ -254,24 +288,31 @@ public class LeveledManifest } return builder.toString(); } +#endif - @VisibleForTesting - long maxBytesForLevel(int level) - { - if (level == 0) - return 4L * maxSSTableSizeInBytes; - double bytes = Math.pow(10, level) * maxSSTableSizeInBytes; - if (bytes > Long.MAX_VALUE) - throw new RuntimeException("At most " + Long.MAX_VALUE + " bytes may be in a compaction level; your maxSSTableSize must be absurdly high to compute " + bytes); - return (long) bytes; + static uint64_t max_bytes_for_level(int level, uint64_t max_sstable_size_in_bytes) { + if (level == 0) { + return 4L * max_sstable_size_in_bytes; + } + double bytes = pow(10, level) * max_sstable_size_in_bytes; + if (bytes > std::numeric_limits::max()) { + throw std::runtime_error(sprint("At most %ld bytes may be in a compaction level; your maxSSTableSize must be absurdly high to compute %f", + std::numeric_limits::max(), bytes)); + } + uint64_t bytes_u64 = bytes; + return bytes_u64; + } + + uint64_t max_bytes_for_level(int level) { + return max_bytes_for_level(level, _max_sstable_size_in_bytes); } /** * @return highest-priority sstables to compact, and level to compact them to * If no compactions are necessary, will return null */ - public synchronized CompactionCandidate getCompactionCandidates() - { + sstables::compaction_descriptor get_compaction_candidates() { +#if 0 // during bootstrap we only do size tiering in L0 to make sure // the streamed files can be placed in their original levels if (StorageService.instance.isBootstrapMode()) @@ -284,6 +325,7 @@ public class LeveledManifest } return null; } +#endif // LevelDB gives each level a score of how much data it contains vs its ideal amount, and // compacts the level with the highest score. But this falls apart spectacularly once you // get behind. Consider this set of levels: @@ -311,19 +353,22 @@ public class LeveledManifest // This isn't a magic wand -- if you are consistently writing too fast for LCS to keep // up, you're still screwed. But if instead you have intermittent bursts of activity, // it can help a lot. - for (int i = generations.length - 1; i > 0; i--) - { - List sstables = getLevel(i); - if (sstables.isEmpty()) + for (auto i = _generations.size() - 1; i > 0; i--) { + auto& sstables = get_level(i); + if (sstables.empty()) { continue; // mostly this just avoids polluting the debug log with zero scores + } +#if 0 // we want to calculate score excluding compacting ones Set sstablesInLevel = Sets.newHashSet(sstables); Set remaining = Sets.difference(sstablesInLevel, cfs.getDataTracker().getCompacting()); - double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i); +#endif + double score = (double) get_total_bytes(sstables) / (double) max_bytes_for_level(i); + logger.debug("Compaction score for level {} is {}", i, score); - if (score > 1.001) - { + if (score > 1.001) { +#if 0 // before proceeding with a higher level, let's see if L0 is far enough behind to warrant STCS if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0) { @@ -334,33 +379,37 @@ public class LeveledManifest return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE); } } - +#endif // L0 is fine, proceed with this level - Collection candidates = getCandidatesFor(i); - if (!candidates.isEmpty()) - { - int nextLevel = getNextLevel(candidates); + auto candidates = get_candidates_for(i); + if (!candidates.empty()) { + int next_level = get_next_level(candidates); +#if 0 candidates = getOverlappingStarvedSSTables(nextLevel, candidates); if (logger.isDebugEnabled()) logger.debug("Compaction candidates for L{} are {}", i, toString(candidates)); - return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategy().getMaxSSTableBytes()); +#endif + return sstables::compaction_descriptor(std::move(candidates), next_level, _max_sstable_size_in_bytes); } - else - { + else { logger.debug("No compaction candidates for L{}", i); } } } // Higher levels are happy, time for a standard, non-STCS L0 compaction - if (getLevel(0).isEmpty()) - return null; - Collection candidates = getCandidatesFor(0); - if (candidates.isEmpty()) - return null; - return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes()); + if (get_level(0).empty()) { + return sstables::compaction_descriptor(); + } + auto candidates = get_candidates_for(0); + if (candidates.empty()) { + return sstables::compaction_descriptor(); + } + auto next_level = get_next_level(candidates); + return sstables::compaction_descriptor(std::move(candidates), next_level, _max_sstable_size_in_bytes); } +#if 0 private List getSSTablesForSTCS(Collection sstables) { Iterable candidates = cfs.getDataTracker().getUncompactingSSTables(sstables); @@ -371,6 +420,7 @@ public class LeveledManifest options.minSSTableSize); return SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32); } +#endif /** * If we do something that makes many levels contain too little data (cleanup, change sstable size) we will "never" @@ -383,6 +433,7 @@ public class LeveledManifest * @param candidates the original sstables to compact * @return */ +#if 0 private Collection getOverlappingStarvedSSTables(int targetLevel, Collection candidates) { Set withStarvedCandidate = new HashSet<>(candidates); @@ -434,14 +485,17 @@ public class LeveledManifest return candidates; } +#endif - public synchronized int getLevelSize(int i) - { + size_t get_level_size(uint32_t level) { +#if 0 if (i >= generations.length) throw new ArrayIndexOutOfBoundsException("Maximum valid generation is " + (generations.length - 1)); - return getLevel(i).size(); +#endif + return get_level(level).size(); } +#if 0 public synchronized int[] getAllLevelSize() { int[] counts = new int[generations.length]; @@ -464,19 +518,21 @@ public class LeveledManifest } } } +#endif - @VisibleForTesting - public int remove(SSTableReader reader) - { - int level = reader.getSSTableLevel(); - assert level >= 0 : reader + " not present in manifest: "+level; - generations[level].remove(reader); + uint32_t remove(sstables::shared_sstable& sstable) { + uint32_t level = sstable->get_sstable_level(); + if (level >= _generations.size()) { + throw std::runtime_error("Invalid level"); + } + _generations[level].remove(sstable); return level; } - private static Set overlapping(Collection candidates, Iterable others) - { - assert !candidates.isEmpty(); + std::vector + overlapping(std::vector& candidates, std::list& others) { + const schema& s = *_schema; + assert(!candidates.empty()); /* * Picking each sstable from others that overlap one of the sstable of candidates is not enough * because you could have the following situation: @@ -488,42 +544,51 @@ public class LeveledManifest * Thus, the correct approach is to pick sstables overlapping anything between the first key in all * the candidate sstables, and the last. */ - Iterator iter = candidates.iterator(); - SSTableReader sstable = iter.next(); - Token first = sstable.first.getToken(); - Token last = sstable.last.getToken(); - while (iter.hasNext()) - { - sstable = iter.next(); - first = first.compareTo(sstable.first.getToken()) <= 0 ? first : sstable.first.getToken(); - last = last.compareTo(sstable.last.getToken()) >= 0 ? last : sstable.last.getToken(); + auto it = candidates.begin(); + auto& first_sstable = *it; + it++; + dht::token first = first_sstable->get_first_decorated_key(s)._token; + dht::token last = first_sstable->get_last_decorated_key(s)._token; + while (it != candidates.end()) { + auto& candidate_sstable = *it; + it++; + dht::token first_candidate = candidate_sstable->get_first_decorated_key(s)._token; + dht::token last_candidate = candidate_sstable->get_last_decorated_key(s)._token; + + first = first <= first_candidate? first : first_candidate; + last = last >= last_candidate ? last : last_candidate; } return overlapping(first, last, others); } - @VisibleForTesting - static Set overlapping(SSTableReader sstable, Iterable others) - { - return overlapping(sstable.first.getToken(), sstable.last.getToken(), others); + std::vector + overlapping(sstables::shared_sstable& sstable, std::list& others) { + const schema& s = *_schema; + return overlapping(sstable->get_first_decorated_key(s)._token, sstable->get_last_decorated_key(s)._token, others); } /** * @return sstables from @param sstables that contain keys between @param start and @param end, inclusive. */ - private static Set overlapping(Token start, Token end, Iterable sstables) - { - assert start.compareTo(end) <= 0; - Set overlapped = new HashSet<>(); - Bounds promotedBounds = new Bounds(start, end); - for (SSTableReader candidate : sstables) - { - Bounds candidateBounds = new Bounds(candidate.first.getToken(), candidate.last.getToken()); - if (candidateBounds.intersects(promotedBounds)) - overlapped.add(candidate); + std::vector + overlapping(dht::token start, dht::token end, std::list& sstables) { + const schema& s = *_schema; + assert(start <= end); + + std::vector overlapped; + auto range = ::range::make(start, end); + + for (auto& candidate : sstables) { + auto candidate_range = ::range::make(candidate->get_first_decorated_key(s)._token, candidate->get_last_decorated_key(s)._token); + + if (range.overlap(candidate_range, dht::token_comparator())) { + overlapped.push_back(candidate); + } } return overlapped; } +#if 0 private static final Predicate suspectP = new Predicate() { public boolean apply(SSTableReader candidate) @@ -531,21 +596,22 @@ public class LeveledManifest return candidate.isMarkedSuspect(); } }; - +#endif /** * @return highest-priority sstables to compact for the given level. * If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted * for prior failure), will return an empty list. Never returns null. */ - private Collection getCandidatesFor(int level) - { - assert !getLevel(level).isEmpty(); + std::vector get_candidates_for(int level) { + const schema& s = *_schema; + assert(!get_level(level).empty()); +#if 0 logger.debug("Choosing candidates for L{}", level); final Set compacting = cfs.getDataTracker().getCompacting(); - - if (level == 0) - { +#endif + if (level == 0) { +#if 0 Set compactingL0 = ImmutableSet.copyOf(Iterables.filter(getLevel(0), Predicates.in(compacting))); RowPosition lastCompactingKey = null; @@ -557,6 +623,7 @@ public class LeveledManifest if (lastCompactingKey == null || candidate.last.compareTo(lastCompactingKey) > 0) lastCompactingKey = candidate.last; } +#endif // L0 is the dumping ground for new sstables which thus may overlap each other. // @@ -571,53 +638,75 @@ public class LeveledManifest // Note that we ignore suspect-ness of L1 sstables here, since if an L1 sstable is suspect we're // basically screwed, since we expect all or most L0 sstables to overlap with each L1 sstable. // So if an L1 sstable is suspect we can't do much besides try anyway and hope for the best. - Set candidates = new HashSet<>(); - Set remaining = new HashSet<>(); + std::vector candidates; + std::list remaining = get_level(0); +#if 0 Iterables.addAll(remaining, Iterables.filter(getLevel(0), Predicates.not(suspectP))); - for (SSTableReader sstable : ageSortedSSTables(remaining)) - { - if (candidates.contains(sstable)) +#endif + for (auto& sstable : age_sorted_sstables(remaining)) { + auto it = std::find(candidates.begin(), candidates.end(), sstable); + if (it != candidates.end()) { continue; - - Sets.SetView overlappedL0 = Sets.union(Collections.singleton(sstable), overlapping(sstable, remaining)); - if (!Sets.intersection(overlappedL0, compactingL0).isEmpty()) - continue; - - for (SSTableReader newCandidate : overlappedL0) - { - if (firstCompactingKey == null || lastCompactingKey == null || overlapping(firstCompactingKey.getToken(), lastCompactingKey.getToken(), Arrays.asList(newCandidate)).size() == 0) - candidates.add(newCandidate); - remaining.remove(newCandidate); } - if (candidates.size() > MAX_COMPACTING_L0) - { + auto overlappedL0 = overlapping(sstable, remaining); + it = std::find(overlappedL0.begin(), overlappedL0.end(), sstable); + if (it == overlappedL0.end()) { + overlappedL0.push_back(sstable); + } + +#if 0 + if (!Sets.intersection(overlappedL0, compactingL0).isEmpty()) + continue; +#endif + + for (auto& new_candidate : overlappedL0) { +#if 0 + if (firstCompactingKey == null || lastCompactingKey == null || overlapping(firstCompactingKey.getToken(), lastCompactingKey.getToken(), Arrays.asList(newCandidate)).size() == 0) + candidates.add(newCandidate); +#else + candidates.push_back(new_candidate); +#endif + remaining.remove(new_candidate); + } + + if (candidates.size() > MAX_COMPACTING_L0) { // limit to only the MAX_COMPACTING_L0 oldest candidates - candidates = new HashSet<>(ageSortedSSTables(candidates).subList(0, MAX_COMPACTING_L0)); + auto age_sorted_candidates = age_sorted_sstables(candidates); + // create a sub list of age_sorted_candidates by resizing it. + age_sorted_candidates.resize(MAX_COMPACTING_L0); + candidates = std::move(age_sorted_candidates); break; } } // leave everything in L0 if we didn't end up with a full sstable's worth of data - if (SSTableReader.getTotalBytes(candidates) > maxSSTableSizeInBytes) - { + if (get_total_bytes(candidates) > _max_sstable_size_in_bytes) { // add sstables from L1 that overlap candidates // if the overlapping ones are already busy in a compaction, leave it out. // TODO try to find a set of L0 sstables that only overlaps with non-busy L1 sstables - Set l1overlapping = overlapping(candidates, getLevel(1)); - if (Sets.intersection(l1overlapping, compacting).size() > 0) - return Collections.emptyList(); - candidates = Sets.union(candidates, l1overlapping); + auto l1overlapping = overlapping(candidates, get_level(1)); + for (auto candidate : l1overlapping) { + auto it = std::find(candidates.begin(), candidates.end(), candidate); + if (it != candidates.end()) { + continue; + } + candidates.push_back(candidate); + } } - if (candidates.size() < 2) - return Collections.emptyList(); - else + if (candidates.size() < 2) { + return {}; + } else { return candidates; + } } // for non-L0 compactions, pick up where we left off last time - Collections.sort(getLevel(level), SSTableReader.sstableComparator); + get_level(level).sort([&s] (auto& i, auto& j) { + return i->compare_by_first_key(s, *j) < 0; + }); int start = 0; // handles case where the prior compaction touched the very last range +#if 0 for (int i = 0; i < getLevel(level).size(); i++) { SSTableReader sstable = getLevel(level).get(i); @@ -627,30 +716,55 @@ public class LeveledManifest break; } } - +#endif // look for a non-suspect keyspace to compact with, starting with where we left off last time, // and wrapping back to the beginning of the generation if necessary - for (int i = 0; i < getLevel(level).size(); i++) - { - SSTableReader sstable = getLevel(level).get((start + i) % getLevel(level).size()); - Set candidates = Sets.union(Collections.singleton(sstable), overlapping(sstable, getLevel(level + 1))); + for (auto i = 0U; i < get_level(level).size(); i++) { + // get an iterator to the element of position pos from the list get_level(level). + auto pos = (start + i) % get_level(level).size(); + auto it = get_level(level).begin(); + std::advance(it, pos); + + auto sstable = *it; + auto candidates = overlapping(sstable, get_level(level + 1)); + candidates.push_back(sstable); +#if 0 if (Iterables.any(candidates, suspectP)) continue; if (Sets.intersection(candidates, compacting).isEmpty()) return candidates; +#endif + if (candidates.size() < 2) { + return {}; + } else { + return candidates; + } } // all the sstables were suspect or overlapped with something suspect - return Collections.emptyList(); + return {}; } - private List ageSortedSSTables(Collection candidates) - { - List ageSortedCandidates = new ArrayList<>(candidates); - Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampComparator); - return ageSortedCandidates; + std::list age_sorted_sstables(std::list& candidates) { + auto age_sorted_candidates = candidates; + + age_sorted_candidates.sort([] (auto& i, auto& j) { + return i->compare_by_max_timestamp(*j) > 0; + }); + + return age_sorted_candidates; } + std::vector age_sorted_sstables(std::vector& candidates) { + auto age_sorted_candidates = candidates; + + std::sort(age_sorted_candidates.begin(), age_sorted_candidates.end(), [] (auto& i, auto& j) { + return i->compare_by_max_timestamp(*j) > 0; + }); + + return age_sorted_candidates; + } +#if 0 @Override public String toString() { @@ -671,12 +785,14 @@ public class LeveledManifest { return ImmutableSortedSet.copyOf(comparator, getLevel(level)); } - - public List getLevel(int i) - { - return generations[i]; +#endif + std::list& get_level(uint32_t level) { + if (level >= _generations.size()) { + throw std::runtime_error("Invalid level"); + } + return _generations[level]; } - +#if 0 public synchronized int getEstimatedTasks() { long tasks = 0; @@ -693,42 +809,34 @@ public class LeveledManifest Arrays.toString(estimated), cfs.keyspace.getName(), cfs.name); return Ints.checkedCast(tasks); } +#endif + int get_next_level(const std::vector& sstables) { + int maximum_level = std::numeric_limits::min(); + int minimum_level = std::numeric_limits::max(); + auto total_bytes = get_total_bytes(sstables); - public int getNextLevel(Collection sstables) - { - int maximumLevel = Integer.MIN_VALUE; - int minimumLevel = Integer.MAX_VALUE; - for (SSTableReader sstable : sstables) - { - maximumLevel = Math.max(sstable.getSSTableLevel(), maximumLevel); - minimumLevel = Math.min(sstable.getSSTableLevel(), minimumLevel); + for (auto& sstable : sstables) { + int sstable_level = sstable->get_sstable_level(); + maximum_level = std::max(sstable_level, maximum_level); + minimum_level = std::min(sstable_level, minimum_level); } - int newLevel; - if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTableReader.getTotalBytes(sstables) < maxSSTableSizeInBytes) - { - newLevel = 0; + int new_level; + if (minimum_level == 0 && minimum_level == maximum_level && total_bytes < _max_sstable_size_in_bytes) { + new_level = 0; + } else { + new_level = minimum_level == maximum_level ? maximum_level + 1 : maximum_level; + assert(new_level > 0); } - else - { - newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : maximumLevel; - assert newLevel > 0; - } - return newLevel; - + return new_level; } - public static class CompactionCandidate - { - public final Collection sstables; - public final int level; - public final long maxSSTableBytes; - - public CompactionCandidate(Collection sstables, int level, long maxSSTableBytes) - { - this.sstables = sstables; - this.level = level; - this.maxSSTableBytes = maxSSTableBytes; + template + static uint64_t get_total_bytes(const T& sstables) { + uint64_t sum = 0; + for (auto& sstable : sstables) { + sum += sstable->data_size(); } + return sum; } -} +};