convert LeveledManifest to C++

Signed-off-by: Raphael S. Carvalho <raphaelsc@cloudius-systems.com>
This commit is contained in:
Raphael S. Carvalho
2015-10-07 16:47:28 -03:00
parent 683e29e0bd
commit fbec0d0254
2 changed files with 377 additions and 249 deletions

View File

@@ -26,6 +26,26 @@
#include <functional>
namespace sstables {
struct compaction_descriptor {
// List of sstables to be compacted.
std::vector<sstables::shared_sstable> sstables;
// Level of sstable(s) created by compaction procedure.
int level = 0;
// Threshold size for sstable(s) to be created.
uint64_t max_sstable_bytes = std::numeric_limits<uint64_t>::max();
compaction_descriptor() = default;
compaction_descriptor(std::vector<sstables::shared_sstable> sstables, int level, long max_sstable_bytes)
: sstables(std::move(sstables))
, level(level)
, max_sstable_bytes(max_sstable_bytes) {}
compaction_descriptor(std::vector<sstables::shared_sstable> sstables)
: sstables(std::move(sstables)) {}
};
future<> compact_sstables(std::vector<shared_sstable> sstables,
column_family& cf, std::function<shared_sstable()> creator);

View File

@@ -15,69 +15,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.compaction;
import java.io.IOException;
import java.util.*;
/*
* Copyright 2015 Cloudius Systems
*
* Modified by Cloudius Systems
*/
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.Pair;
#pragma once
public class LeveledManifest
{
private static final Logger logger = LoggerFactory.getLogger(LeveledManifest.class);
#include "sstables.hh"
#include "compaction.hh"
#include "range.hh"
#include "log.hh"
class leveled_manifest {
logging::logger logger;
/**
* limit the number of L0 sstables we do at once, because compaction bloom filter creation
* uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory
* or even OOMing when compacting highly overlapping sstables
*/
private static final int MAX_COMPACTING_L0 = 32;
static constexpr int MAX_COMPACTING_L0 = 32;
/**
* If we go this many rounds without compacting
* in the highest level, we start bringing in sstables from
* that level into lower level compactions
*/
private static final int NO_COMPACTION_LIMIT = 25;
static constexpr int NO_COMPACTION_LIMIT = 25;
private final ColumnFamilyStore cfs;
@VisibleForTesting
protected final List<SSTableReader>[] generations;
schema_ptr _schema;
std::vector<std::list<sstables::shared_sstable>> _generations;
#if 0
private final RowPosition[] lastCompactedKeys;
private final int maxSSTableSizeInBytes;
#endif
uint64_t _max_sstable_size_in_bytes;
#if 0
private final SizeTieredCompactionStrategyOptions options;
private final int [] compactionCounter;
#endif
LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
public:
leveled_manifest(column_family& cfs, int max_sstable_size_in_MB)
: logger("LeveledManifest")
, _schema(cfs.schema())
, _max_sstable_size_in_bytes(max_sstable_size_in_MB * 1024 * 1024)
{
this.cfs = cfs;
this.maxSSTableSizeInBytes = maxSSTableSizeInMB * 1024 * 1024;
this.options = options;
// allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is
// updated, we will still have sstables of the older, potentially smaller size. So don't make this
// dependent on maxSSTableSize.)
int n = (int) Math.log10(1000 * 1000 * 1000);
generations = new List[n];
uint64_t n = 9; // log10(1000^3)
_generations.resize(n);
#if 0
lastCompactedKeys = new RowPosition[n];
for (int i = 0; i < generations.length; i++)
{
@@ -85,43 +92,49 @@ public class LeveledManifest
lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
}
compactionCounter = new int[n];
#endif
}
#if 0
public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables)
{
return create(cfs, maxSSTableSize, sstables, new SizeTieredCompactionStrategyOptions());
}
#endif
public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, Iterable<SSTableReader> sstables, SizeTieredCompactionStrategyOptions options)
{
LeveledManifest manifest = new LeveledManifest(cfs, maxSSTableSize, options);
static leveled_manifest create(column_family& cfs, int max_sstable_size_in_mb) {
leveled_manifest manifest = leveled_manifest(cfs, max_sstable_size_in_mb);
// ensure all SSTables are in the manifest
for (SSTableReader ssTableReader : sstables)
{
manifest.add(ssTableReader);
auto sstables = cfs.get_sstables();
for (auto& map_entry : *sstables) {
auto& sstable = map_entry.second;
manifest.add(sstable);
}
for (int i = 1; i < manifest.getAllLevelSize().length; i++)
{
manifest.repairOverlappingSSTables(i);
for (auto i = 1U; i < manifest._generations.size(); i++) {
manifest.repair_overlapping_sstables(i);
}
return manifest;
}
public synchronized void add(SSTableReader reader)
{
int level = reader.getSSTableLevel();
void add(sstables::shared_sstable& sstable) {
uint32_t level = sstable->get_sstable_level();
assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
logDistribution();
if (canAddSSTable(reader))
{
// adding the sstable does not cause overlap in the level
logger.debug("Adding {} to L{}", reader, level);
generations[level].add(reader);
if (level >= _generations.size()) {
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (_generations.size() - 1)));
}
else
{
#if 0
logDistribution();
#endif
if (can_add_sstable(sstable)) {
// adding the sstable does not cause overlap in the level
logger.debug("Adding {} to L{}", sstable->get_filename(), level);
_generations[level].push_back(sstable);
} else {
// this can happen if:
// * a compaction has promoted an overlapping sstable to the given level, or
// was also supposed to add an sstable at the given level.
@@ -129,6 +142,7 @@ public class LeveledManifest
// would cause overlap
//
// The add(..):ed sstable will be sent to level 0
#if 0
try
{
reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
@@ -138,12 +152,12 @@ public class LeveledManifest
{
logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
}
generations[0].add(reader);
#endif
_generations[0].push_back(sstable);
}
}
#if 0
public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)
{
assert !removed.isEmpty(); // use add() instead of promote when adding new sstables
@@ -172,31 +186,37 @@ public class LeveledManifest
add(ssTableReader);
lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last;
}
#endif
public synchronized void repairOverlappingSSTables(int level)
{
SSTableReader previous = null;
Collections.sort(generations[level], SSTableReader.sstableComparator);
List<SSTableReader> outOfOrderSSTables = new ArrayList<>();
for (SSTableReader current : generations[level])
{
if (previous != null && current.first.compareTo(previous.last) <= 0)
{
void repair_overlapping_sstables(int level) {
const sstables::sstable *previous = nullptr;
const schema& s = *_schema;
_generations[level].sort([&s] (auto& i, auto& j) {
return i->compare_by_first_key(s, *j) < 0;
});
std::vector<sstables::shared_sstable> out_of_order_sstables;
for (auto& current : _generations[level]) {
auto current_first = current->get_first_decorated_key(s);
if (previous != nullptr && current_first.tri_compare(s, previous->get_last_decorated_key(s)) <= 0) {
#if 0
logger.warn(String.format("At level %d, %s [%s, %s] overlaps %s [%s, %s]. This could be caused by a bug in Cassandra 1.1.0 .. 1.1.3 or due to the fact that you have dropped sstables from another node into the data directory. " +
"Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable",
level, previous, previous.first, previous.last, current, current.first, current.last));
outOfOrderSSTables.add(current);
}
else
{
previous = current;
"Sending back to L0. If you didn't drop in sstables, and have not yet run scrub, you should do so since you may also have rows out-of-order within an sstable",
level, previous, previous.first, previous.last, current, current.first, current.last));
#endif
out_of_order_sstables.push_back(current);
} else {
previous = &*current;
}
}
if (!outOfOrderSSTables.isEmpty())
{
for (SSTableReader sstable : outOfOrderSSTables)
sendBackToL0(sstable);
if (!out_of_order_sstables.empty()) {
for (auto& sstable : out_of_order_sstables) {
send_back_to_L0(sstable);
}
}
}
@@ -205,29 +225,39 @@ public class LeveledManifest
* @param sstable the sstable to add
* @return true if it is safe to add the sstable in the level.
*/
private boolean canAddSSTable(SSTableReader sstable)
{
int level = sstable.getSSTableLevel();
if (level == 0)
bool can_add_sstable(sstables::shared_sstable& sstable) {
uint32_t level = sstable->get_sstable_level();
const schema& s = *_schema;
if (level == 0) {
return true;
List<SSTableReader> copyLevel = new ArrayList<>(generations[level]);
copyLevel.add(sstable);
Collections.sort(copyLevel, SSTableReader.sstableComparator);
SSTableReader previous = null;
for (SSTableReader current : copyLevel)
{
if (previous != null && current.first.compareTo(previous.last) <= 0)
return false;
previous = current;
}
auto copy_level = _generations[level];
copy_level.push_back(sstable);
copy_level.sort([&s] (auto& i, auto& j) {
return i->compare_by_first_key(s, *j) < 0;
});
const sstables::sstable *previous = nullptr;
for (auto& current : copy_level) {
if (previous != nullptr) {
auto current_first = current->get_first_decorated_key(s);
auto previous_last = previous->get_last_decorated_key(s);
if (current_first.tri_compare(s, previous_last) <= 0) {
return false;
}
}
previous = &*current;
}
return true;
}
private synchronized void sendBackToL0(SSTableReader sstable)
{
void send_back_to_L0(sstables::shared_sstable& sstable) {
remove(sstable);
#if 0
try
{
sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 0);
@@ -238,8 +268,12 @@ public class LeveledManifest
{
throw new RuntimeException("Could not reload sstable meta data", e);
}
#else
_generations[0].push_back(sstable);
#endif
}
#if 0
private String toString(Collection<SSTableReader> sstables)
{
StringBuilder builder = new StringBuilder();
@@ -254,24 +288,31 @@ public class LeveledManifest
}
return builder.toString();
}
#endif
@VisibleForTesting
long maxBytesForLevel(int level)
{
if (level == 0)
return 4L * maxSSTableSizeInBytes;
double bytes = Math.pow(10, level) * maxSSTableSizeInBytes;
if (bytes > Long.MAX_VALUE)
throw new RuntimeException("At most " + Long.MAX_VALUE + " bytes may be in a compaction level; your maxSSTableSize must be absurdly high to compute " + bytes);
return (long) bytes;
static uint64_t max_bytes_for_level(int level, uint64_t max_sstable_size_in_bytes) {
if (level == 0) {
return 4L * max_sstable_size_in_bytes;
}
double bytes = pow(10, level) * max_sstable_size_in_bytes;
if (bytes > std::numeric_limits<int64_t>::max()) {
throw std::runtime_error(sprint("At most %ld bytes may be in a compaction level; your maxSSTableSize must be absurdly high to compute %f",
std::numeric_limits<int64_t>::max(), bytes));
}
uint64_t bytes_u64 = bytes;
return bytes_u64;
}
uint64_t max_bytes_for_level(int level) {
return max_bytes_for_level(level, _max_sstable_size_in_bytes);
}
/**
* @return highest-priority sstables to compact, and level to compact them to
* If no compactions are necessary, will return null
*/
public synchronized CompactionCandidate getCompactionCandidates()
{
sstables::compaction_descriptor get_compaction_candidates() {
#if 0
// during bootstrap we only do size tiering in L0 to make sure
// the streamed files can be placed in their original levels
if (StorageService.instance.isBootstrapMode())
@@ -284,6 +325,7 @@ public class LeveledManifest
}
return null;
}
#endif
// LevelDB gives each level a score of how much data it contains vs its ideal amount, and
// compacts the level with the highest score. But this falls apart spectacularly once you
// get behind. Consider this set of levels:
@@ -311,19 +353,22 @@ public class LeveledManifest
// This isn't a magic wand -- if you are consistently writing too fast for LCS to keep
// up, you're still screwed. But if instead you have intermittent bursts of activity,
// it can help a lot.
for (int i = generations.length - 1; i > 0; i--)
{
List<SSTableReader> sstables = getLevel(i);
if (sstables.isEmpty())
for (auto i = _generations.size() - 1; i > 0; i--) {
auto& sstables = get_level(i);
if (sstables.empty()) {
continue; // mostly this just avoids polluting the debug log with zero scores
}
#if 0
// we want to calculate score excluding compacting ones
Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables);
Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getDataTracker().getCompacting());
double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i);
#endif
double score = (double) get_total_bytes(sstables) / (double) max_bytes_for_level(i);
logger.debug("Compaction score for level {} is {}", i, score);
if (score > 1.001)
{
if (score > 1.001) {
#if 0
// before proceeding with a higher level, let's see if L0 is far enough behind to warrant STCS
if (!DatabaseDescriptor.getDisableSTCSInL0() && getLevel(0).size() > MAX_COMPACTING_L0)
{
@@ -334,33 +379,37 @@ public class LeveledManifest
return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE);
}
}
#endif
// L0 is fine, proceed with this level
Collection<SSTableReader> candidates = getCandidatesFor(i);
if (!candidates.isEmpty())
{
int nextLevel = getNextLevel(candidates);
auto candidates = get_candidates_for(i);
if (!candidates.empty()) {
int next_level = get_next_level(candidates);
#if 0
candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
if (logger.isDebugEnabled())
logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategy().getMaxSSTableBytes());
#endif
return sstables::compaction_descriptor(std::move(candidates), next_level, _max_sstable_size_in_bytes);
}
else
{
else {
logger.debug("No compaction candidates for L{}", i);
}
}
}
// Higher levels are happy, time for a standard, non-STCS L0 compaction
if (getLevel(0).isEmpty())
return null;
Collection<SSTableReader> candidates = getCandidatesFor(0);
if (candidates.isEmpty())
return null;
return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
if (get_level(0).empty()) {
return sstables::compaction_descriptor();
}
auto candidates = get_candidates_for(0);
if (candidates.empty()) {
return sstables::compaction_descriptor();
}
auto next_level = get_next_level(candidates);
return sstables::compaction_descriptor(std::move(candidates), next_level, _max_sstable_size_in_bytes);
}
#if 0
private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)
{
Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(sstables);
@@ -371,6 +420,7 @@ public class LeveledManifest
options.minSSTableSize);
return SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32);
}
#endif
/**
* If we do something that makes many levels contain too little data (cleanup, change sstable size) we will "never"
@@ -383,6 +433,7 @@ public class LeveledManifest
* @param candidates the original sstables to compact
* @return
*/
#if 0
private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
{
Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
@@ -434,14 +485,17 @@ public class LeveledManifest
return candidates;
}
#endif
public synchronized int getLevelSize(int i)
{
size_t get_level_size(uint32_t level) {
#if 0
if (i >= generations.length)
throw new ArrayIndexOutOfBoundsException("Maximum valid generation is " + (generations.length - 1));
return getLevel(i).size();
#endif
return get_level(level).size();
}
#if 0
public synchronized int[] getAllLevelSize()
{
int[] counts = new int[generations.length];
@@ -464,19 +518,21 @@ public class LeveledManifest
}
}
}
#endif
@VisibleForTesting
public int remove(SSTableReader reader)
{
int level = reader.getSSTableLevel();
assert level >= 0 : reader + " not present in manifest: "+level;
generations[level].remove(reader);
uint32_t remove(sstables::shared_sstable& sstable) {
uint32_t level = sstable->get_sstable_level();
if (level >= _generations.size()) {
throw std::runtime_error("Invalid level");
}
_generations[level].remove(sstable);
return level;
}
private static Set<SSTableReader> overlapping(Collection<SSTableReader> candidates, Iterable<SSTableReader> others)
{
assert !candidates.isEmpty();
std::vector<sstables::shared_sstable>
overlapping(std::vector<sstables::shared_sstable>& candidates, std::list<sstables::shared_sstable>& others) {
const schema& s = *_schema;
assert(!candidates.empty());
/*
* Picking each sstable from others that overlap one of the sstable of candidates is not enough
* because you could have the following situation:
@@ -488,42 +544,51 @@ public class LeveledManifest
* Thus, the correct approach is to pick sstables overlapping anything between the first key in all
* the candidate sstables, and the last.
*/
Iterator<SSTableReader> iter = candidates.iterator();
SSTableReader sstable = iter.next();
Token first = sstable.first.getToken();
Token last = sstable.last.getToken();
while (iter.hasNext())
{
sstable = iter.next();
first = first.compareTo(sstable.first.getToken()) <= 0 ? first : sstable.first.getToken();
last = last.compareTo(sstable.last.getToken()) >= 0 ? last : sstable.last.getToken();
auto it = candidates.begin();
auto& first_sstable = *it;
it++;
dht::token first = first_sstable->get_first_decorated_key(s)._token;
dht::token last = first_sstable->get_last_decorated_key(s)._token;
while (it != candidates.end()) {
auto& candidate_sstable = *it;
it++;
dht::token first_candidate = candidate_sstable->get_first_decorated_key(s)._token;
dht::token last_candidate = candidate_sstable->get_last_decorated_key(s)._token;
first = first <= first_candidate? first : first_candidate;
last = last >= last_candidate ? last : last_candidate;
}
return overlapping(first, last, others);
}
@VisibleForTesting
static Set<SSTableReader> overlapping(SSTableReader sstable, Iterable<SSTableReader> others)
{
return overlapping(sstable.first.getToken(), sstable.last.getToken(), others);
std::vector<sstables::shared_sstable>
overlapping(sstables::shared_sstable& sstable, std::list<sstables::shared_sstable>& others) {
const schema& s = *_schema;
return overlapping(sstable->get_first_decorated_key(s)._token, sstable->get_last_decorated_key(s)._token, others);
}
/**
* @return sstables from @param sstables that contain keys between @param start and @param end, inclusive.
*/
private static Set<SSTableReader> overlapping(Token start, Token end, Iterable<SSTableReader> sstables)
{
assert start.compareTo(end) <= 0;
Set<SSTableReader> overlapped = new HashSet<>();
Bounds<Token> promotedBounds = new Bounds<Token>(start, end);
for (SSTableReader candidate : sstables)
{
Bounds<Token> candidateBounds = new Bounds<Token>(candidate.first.getToken(), candidate.last.getToken());
if (candidateBounds.intersects(promotedBounds))
overlapped.add(candidate);
std::vector<sstables::shared_sstable>
overlapping(dht::token start, dht::token end, std::list<sstables::shared_sstable>& sstables) {
const schema& s = *_schema;
assert(start <= end);
std::vector<sstables::shared_sstable> overlapped;
auto range = ::range<dht::token>::make(start, end);
for (auto& candidate : sstables) {
auto candidate_range = ::range<dht::token>::make(candidate->get_first_decorated_key(s)._token, candidate->get_last_decorated_key(s)._token);
if (range.overlap(candidate_range, dht::token_comparator())) {
overlapped.push_back(candidate);
}
}
return overlapped;
}
#if 0
private static final Predicate<SSTableReader> suspectP = new Predicate<SSTableReader>()
{
public boolean apply(SSTableReader candidate)
@@ -531,21 +596,22 @@ public class LeveledManifest
return candidate.isMarkedSuspect();
}
};
#endif
/**
* @return highest-priority sstables to compact for the given level.
* If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted
* for prior failure), will return an empty list. Never returns null.
*/
private Collection<SSTableReader> getCandidatesFor(int level)
{
assert !getLevel(level).isEmpty();
std::vector<sstables::shared_sstable> get_candidates_for(int level) {
const schema& s = *_schema;
assert(!get_level(level).empty());
#if 0
logger.debug("Choosing candidates for L{}", level);
final Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
if (level == 0)
{
#endif
if (level == 0) {
#if 0
Set<SSTableReader> compactingL0 = ImmutableSet.copyOf(Iterables.filter(getLevel(0), Predicates.in(compacting)));
RowPosition lastCompactingKey = null;
@@ -557,6 +623,7 @@ public class LeveledManifest
if (lastCompactingKey == null || candidate.last.compareTo(lastCompactingKey) > 0)
lastCompactingKey = candidate.last;
}
#endif
// L0 is the dumping ground for new sstables which thus may overlap each other.
//
@@ -571,53 +638,75 @@ public class LeveledManifest
// Note that we ignore suspect-ness of L1 sstables here, since if an L1 sstable is suspect we're
// basically screwed, since we expect all or most L0 sstables to overlap with each L1 sstable.
// So if an L1 sstable is suspect we can't do much besides try anyway and hope for the best.
Set<SSTableReader> candidates = new HashSet<>();
Set<SSTableReader> remaining = new HashSet<>();
std::vector<sstables::shared_sstable> candidates;
std::list<sstables::shared_sstable> remaining = get_level(0);
#if 0
Iterables.addAll(remaining, Iterables.filter(getLevel(0), Predicates.not(suspectP)));
for (SSTableReader sstable : ageSortedSSTables(remaining))
{
if (candidates.contains(sstable))
#endif
for (auto& sstable : age_sorted_sstables(remaining)) {
auto it = std::find(candidates.begin(), candidates.end(), sstable);
if (it != candidates.end()) {
continue;
Sets.SetView<SSTableReader> overlappedL0 = Sets.union(Collections.singleton(sstable), overlapping(sstable, remaining));
if (!Sets.intersection(overlappedL0, compactingL0).isEmpty())
continue;
for (SSTableReader newCandidate : overlappedL0)
{
if (firstCompactingKey == null || lastCompactingKey == null || overlapping(firstCompactingKey.getToken(), lastCompactingKey.getToken(), Arrays.asList(newCandidate)).size() == 0)
candidates.add(newCandidate);
remaining.remove(newCandidate);
}
if (candidates.size() > MAX_COMPACTING_L0)
{
auto overlappedL0 = overlapping(sstable, remaining);
it = std::find(overlappedL0.begin(), overlappedL0.end(), sstable);
if (it == overlappedL0.end()) {
overlappedL0.push_back(sstable);
}
#if 0
if (!Sets.intersection(overlappedL0, compactingL0).isEmpty())
continue;
#endif
for (auto& new_candidate : overlappedL0) {
#if 0
if (firstCompactingKey == null || lastCompactingKey == null || overlapping(firstCompactingKey.getToken(), lastCompactingKey.getToken(), Arrays.asList(newCandidate)).size() == 0)
candidates.add(newCandidate);
#else
candidates.push_back(new_candidate);
#endif
remaining.remove(new_candidate);
}
if (candidates.size() > MAX_COMPACTING_L0) {
// limit to only the MAX_COMPACTING_L0 oldest candidates
candidates = new HashSet<>(ageSortedSSTables(candidates).subList(0, MAX_COMPACTING_L0));
auto age_sorted_candidates = age_sorted_sstables(candidates);
// create a sub list of age_sorted_candidates by resizing it.
age_sorted_candidates.resize(MAX_COMPACTING_L0);
candidates = std::move(age_sorted_candidates);
break;
}
}
// leave everything in L0 if we didn't end up with a full sstable's worth of data
if (SSTableReader.getTotalBytes(candidates) > maxSSTableSizeInBytes)
{
if (get_total_bytes(candidates) > _max_sstable_size_in_bytes) {
// add sstables from L1 that overlap candidates
// if the overlapping ones are already busy in a compaction, leave it out.
// TODO try to find a set of L0 sstables that only overlaps with non-busy L1 sstables
Set<SSTableReader> l1overlapping = overlapping(candidates, getLevel(1));
if (Sets.intersection(l1overlapping, compacting).size() > 0)
return Collections.emptyList();
candidates = Sets.union(candidates, l1overlapping);
auto l1overlapping = overlapping(candidates, get_level(1));
for (auto candidate : l1overlapping) {
auto it = std::find(candidates.begin(), candidates.end(), candidate);
if (it != candidates.end()) {
continue;
}
candidates.push_back(candidate);
}
}
if (candidates.size() < 2)
return Collections.emptyList();
else
if (candidates.size() < 2) {
return {};
} else {
return candidates;
}
}
// for non-L0 compactions, pick up where we left off last time
Collections.sort(getLevel(level), SSTableReader.sstableComparator);
get_level(level).sort([&s] (auto& i, auto& j) {
return i->compare_by_first_key(s, *j) < 0;
});
int start = 0; // handles case where the prior compaction touched the very last range
#if 0
for (int i = 0; i < getLevel(level).size(); i++)
{
SSTableReader sstable = getLevel(level).get(i);
@@ -627,30 +716,55 @@ public class LeveledManifest
break;
}
}
#endif
// look for a non-suspect keyspace to compact with, starting with where we left off last time,
// and wrapping back to the beginning of the generation if necessary
for (int i = 0; i < getLevel(level).size(); i++)
{
SSTableReader sstable = getLevel(level).get((start + i) % getLevel(level).size());
Set<SSTableReader> candidates = Sets.union(Collections.singleton(sstable), overlapping(sstable, getLevel(level + 1)));
for (auto i = 0U; i < get_level(level).size(); i++) {
// get an iterator to the element of position pos from the list get_level(level).
auto pos = (start + i) % get_level(level).size();
auto it = get_level(level).begin();
std::advance(it, pos);
auto sstable = *it;
auto candidates = overlapping(sstable, get_level(level + 1));
candidates.push_back(sstable);
#if 0
if (Iterables.any(candidates, suspectP))
continue;
if (Sets.intersection(candidates, compacting).isEmpty())
return candidates;
#endif
if (candidates.size() < 2) {
return {};
} else {
return candidates;
}
}
// all the sstables were suspect or overlapped with something suspect
return Collections.emptyList();
return {};
}
private List<SSTableReader> ageSortedSSTables(Collection<SSTableReader> candidates)
{
List<SSTableReader> ageSortedCandidates = new ArrayList<>(candidates);
Collections.sort(ageSortedCandidates, SSTableReader.maxTimestampComparator);
return ageSortedCandidates;
std::list<sstables::shared_sstable> age_sorted_sstables(std::list<sstables::shared_sstable>& candidates) {
auto age_sorted_candidates = candidates;
age_sorted_candidates.sort([] (auto& i, auto& j) {
return i->compare_by_max_timestamp(*j) > 0;
});
return age_sorted_candidates;
}
std::vector<sstables::shared_sstable> age_sorted_sstables(std::vector<sstables::shared_sstable>& candidates) {
auto age_sorted_candidates = candidates;
std::sort(age_sorted_candidates.begin(), age_sorted_candidates.end(), [] (auto& i, auto& j) {
return i->compare_by_max_timestamp(*j) > 0;
});
return age_sorted_candidates;
}
#if 0
@Override
public String toString()
{
@@ -671,12 +785,14 @@ public class LeveledManifest
{
return ImmutableSortedSet.copyOf(comparator, getLevel(level));
}
public List<SSTableReader> getLevel(int i)
{
return generations[i];
#endif
std::list<sstables::shared_sstable>& get_level(uint32_t level) {
if (level >= _generations.size()) {
throw std::runtime_error("Invalid level");
}
return _generations[level];
}
#if 0
public synchronized int getEstimatedTasks()
{
long tasks = 0;
@@ -693,42 +809,34 @@ public class LeveledManifest
Arrays.toString(estimated), cfs.keyspace.getName(), cfs.name);
return Ints.checkedCast(tasks);
}
#endif
int get_next_level(const std::vector<sstables::shared_sstable>& sstables) {
int maximum_level = std::numeric_limits<int>::min();
int minimum_level = std::numeric_limits<int>::max();
auto total_bytes = get_total_bytes(sstables);
public int getNextLevel(Collection<SSTableReader> sstables)
{
int maximumLevel = Integer.MIN_VALUE;
int minimumLevel = Integer.MAX_VALUE;
for (SSTableReader sstable : sstables)
{
maximumLevel = Math.max(sstable.getSSTableLevel(), maximumLevel);
minimumLevel = Math.min(sstable.getSSTableLevel(), minimumLevel);
for (auto& sstable : sstables) {
int sstable_level = sstable->get_sstable_level();
maximum_level = std::max(sstable_level, maximum_level);
minimum_level = std::min(sstable_level, minimum_level);
}
int newLevel;
if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTableReader.getTotalBytes(sstables) < maxSSTableSizeInBytes)
{
newLevel = 0;
int new_level;
if (minimum_level == 0 && minimum_level == maximum_level && total_bytes < _max_sstable_size_in_bytes) {
new_level = 0;
} else {
new_level = minimum_level == maximum_level ? maximum_level + 1 : maximum_level;
assert(new_level > 0);
}
else
{
newLevel = minimumLevel == maximumLevel ? maximumLevel + 1 : maximumLevel;
assert newLevel > 0;
}
return newLevel;
return new_level;
}
public static class CompactionCandidate
{
public final Collection<SSTableReader> sstables;
public final int level;
public final long maxSSTableBytes;
public CompactionCandidate(Collection<SSTableReader> sstables, int level, long maxSSTableBytes)
{
this.sstables = sstables;
this.level = level;
this.maxSSTableBytes = maxSSTableBytes;
template <typename T>
static uint64_t get_total_bytes(const T& sstables) {
uint64_t sum = 0;
for (auto& sstable : sstables) {
sum += sstable->data_size();
}
return sum;
}
}
};