Merge "Fixes for leveled compaction strategy" from Raphael

* 'lcs_fixes' of github.com:raphaelsc/scylla:
  lcs: fix starvation at higher levels
  lcs: fix broken token range distribution at higher levels
This commit is contained in:
Avi Kivity
2016-10-01 21:34:21 +03:00
5 changed files with 122 additions and 103 deletions

View File

@@ -54,6 +54,10 @@ public:
// Return a list of sstables to be compacted after applying the strategy.
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
// Some strategies may look at the compacted and resulting sstables to
// get some useful information for subsequent compactions.
void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added);
// Return if parallel compaction is allowed by strategy.
bool parallel_compaction() const;

View File

@@ -1283,6 +1283,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
};
return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level,
cleanup).then([this, sstables_to_compact] (auto new_sstables) {
_compaction_strategy.notify_completion(*sstables_to_compact, new_sstables);
return this->rebuild_sstable_list(new_sstables, *sstables_to_compact);
});
});

View File

@@ -216,6 +216,7 @@ protected:
public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) { }
virtual compaction_strategy_type type() const = 0;
virtual bool parallel_compaction() const {
return true;
@@ -583,6 +584,8 @@ class leveled_compaction_strategy : public compaction_strategy_impl {
const sstring SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
int32_t _max_sstable_size_in_mb = DEFAULT_MAX_SSTABLE_SIZE_IN_MB;
std::vector<stdx::optional<dht::decorated_key>> _last_compacted_keys;
std::vector<int> _compaction_counter;
public:
leveled_compaction_strategy(const std::map<sstring, sstring>& options) {
using namespace cql3::statements;
@@ -596,10 +599,14 @@ public:
logger.warn("Max sstable size of {}MB is configured. Testing done for CASSANDRA-5727 indicates that performance improves up to 160MB",
_max_sstable_size_in_mb);
}
_last_compacted_keys.resize(leveled_manifest::MAX_LEVELS);
_compaction_counter.resize(leveled_manifest::MAX_LEVELS);
}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) override;
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) override;
virtual int64_t estimated_pending_compactions(column_family& cf) const override;
virtual bool parallel_compaction() const override {
@@ -621,7 +628,7 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
// sstable in it may be marked for deletion after compacted.
// Currently, we create a new manifest whenever it's time for compaction.
leveled_manifest manifest = leveled_manifest::create(cfs, candidates, _max_sstable_size_in_mb);
auto candidate = manifest.get_compaction_candidates();
auto candidate = manifest.get_compaction_candidates(_last_compacted_keys, _compaction_counter);
if (candidate.sstables.empty()) {
return sstables::compaction_descriptor();
@@ -632,6 +639,24 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c
return std::move(candidate);
}
void leveled_compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
if (removed.empty() || added.empty()) {
return;
}
auto min_level = std::numeric_limits<uint32_t>::max();
for (auto& sstable : removed) {
min_level = std::min(min_level, sstable->get_sstable_level());
}
const sstables::sstable *last = nullptr;
for (auto& candidate : added) {
if (!last || last->compare_by_first_key(*candidate) < 0) {
last = &*candidate;
}
}
_last_compacted_keys[min_level] = last->get_last_decorated_key();
}
int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family& cf) const {
std::vector<sstables::shared_sstable> sstables;
sstables.reserve(cf.sstables_count());
@@ -686,6 +711,10 @@ compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_fa
return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates));
}
void compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
_compaction_strategy_impl->notify_completion(removed, added);
}
bool compaction_strategy::parallel_compaction() const {
return _compaction_strategy_impl->parallel_compaction();
}

View File

@@ -64,16 +64,14 @@ class leveled_manifest {
schema_ptr _schema;
std::vector<std::list<sstables::shared_sstable>> _generations;
#if 0
private final RowPosition[] lastCompactedKeys;
#endif
uint64_t _max_sstable_size_in_bytes;
#if 0
private final SizeTieredCompactionStrategyOptions options;
private final int [] compactionCounter;
#endif
public:
static constexpr int MAX_LEVELS = 9; // log10(1000^3);
leveled_manifest(column_family& cfs, int max_sstable_size_in_MB)
: logger("LeveledManifest")
, _schema(cfs.schema())
@@ -82,15 +80,8 @@ public:
// allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is
// updated, we will still have sstables of the older, potentially smaller size. So don't make this
// dependent on maxSSTableSize.)
uint64_t n = 9; // log10(1000^3)
_generations.resize(n);
_generations.resize(MAX_LEVELS);
#if 0
lastCompactedKeys = new RowPosition[n];
for (int i = 0; i < generations.length; i++)
{
generations[i] = new ArrayList<>();
lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
}
compactionCounter = new int[n];
#endif
}
@@ -129,37 +120,6 @@ public:
_generations[level].push_back(sstable);
}
#if 0
public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)
{
assert !removed.isEmpty(); // use add() instead of promote when adding new sstables
logDistribution();
if (logger.isDebugEnabled())
logger.debug("Replacing [{}]", toString(removed));
// the level for the added sstables is the max of the removed ones,
// plus one if the removed were all on the same level
int minLevel = Integer.MAX_VALUE;
for (SSTableReader sstable : removed)
{
int thisLevel = remove(sstable);
minLevel = Math.min(minLevel, thisLevel);
}
// it's valid to do a remove w/o an add (e.g. on truncate)
if (added.isEmpty())
return;
if (logger.isDebugEnabled())
logger.debug("Adding [{}]", toString(added));
for (SSTableReader ssTableReader : added)
add(ssTableReader);
lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last;
}
#endif
void repair_overlapping_sstables(int level) {
const sstables::sstable *previous = nullptr;
const schema& s = *_schema;
@@ -272,7 +232,8 @@ public:
* @return highest-priority sstables to compact, and level to compact them to
* If no compactions are necessary, will return null
*/
sstables::compaction_descriptor get_compaction_candidates() {
sstables::compaction_descriptor get_compaction_candidates(const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys,
std::vector<int>& compaction_counter) {
#if 0
// during bootstrap we only do size tiering in L0 to make sure
// the streamed files can be placed in their original levels
@@ -339,11 +300,12 @@ public:
}
}
// L0 is fine, proceed with this level
auto candidates = get_candidates_for(i);
auto candidates = get_candidates_for(i, last_compacted_keys);
if (!candidates.empty()) {
int next_level = get_next_level(candidates);
candidates = get_overlapping_starved_sstables(next_level, std::move(candidates), compaction_counter);
#if 0
candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
if (logger.isDebugEnabled())
logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
#endif
@@ -359,7 +321,7 @@ public:
if (get_level(0).empty()) {
return sstables::compaction_descriptor();
}
auto candidates = get_candidates_for(0);
auto candidates = get_candidates_for(0, last_compacted_keys);
if (candidates.empty()) {
return sstables::compaction_descriptor();
}
@@ -391,49 +353,57 @@ public:
* @param candidates the original sstables to compact
* @return
*/
#if 0
private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
{
Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
std::vector<sstables::shared_sstable>
get_overlapping_starved_sstables(int target_level, std::vector<sstables::shared_sstable>&& candidates, std::vector<int>& compaction_counter) {
for (int i = _generations.size() - 1; i > 0; i--) {
compaction_counter[i]++;
}
compaction_counter[target_level] = 0;
for (int i = generations.length - 1; i > 0; i--)
compactionCounter[i]++;
compactionCounter[targetLevel] = 0;
if (logger.isDebugEnabled())
{
for (int j = 0; j < compactionCounter.length; j++)
logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
if (logger.level() == logging::log_level::debug) {
for (auto j = 0U; j < compaction_counter.size(); j++) {
logger.debug("CompactionCounter: {}: {}", j, compaction_counter[j]);
}
}
for (int i = generations.length - 1; i > 0; i--)
{
if (getLevelSize(i) > 0)
{
if (compactionCounter[i] > NO_COMPACTION_LIMIT)
{
for (int i = _generations.size() - 1; i > 0; i--) {
if (get_level_size(i) > 0) {
if (compaction_counter[i] > NO_COMPACTION_LIMIT) {
// we try to find an sstable that is fully contained within the boundaries we are compacting;
// say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
// this means that we will not create overlap in L2 if we add an sstable
// contained within 0 -> 33 to the compaction
RowPosition max = null;
RowPosition min = null;
for (SSTableReader candidate : candidates)
{
if (min == null || candidate.first.compareTo(min) < 0)
min = candidate.first;
if (max == null || candidate.last.compareTo(max) > 0)
max = candidate.last;
stdx::optional<dht::decorated_key> max;
stdx::optional<dht::decorated_key> min;
for (auto& candidate : candidates) {
auto& candidate_first = candidate->get_first_decorated_key();
if (!min || candidate_first.tri_compare(*_schema, *min) < 0) {
min = candidate_first;
}
auto& candidate_last = candidate->get_first_decorated_key();
if (!max || candidate_last.tri_compare(*_schema, *max) > 0) {
max = candidate_last;
}
}
#if 0
// NOTE: We don't need to filter out compacting sstables by now because strategy only deals with
// uncompacting sstables and parallel compaction is also disabled for lcs.
Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
Range<RowPosition> boundaries = new Range<>(min, max);
for (SSTableReader sstable : getLevel(i))
{
Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
if (boundaries.contains(r) && !compacting.contains(sstable))
{
logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
withStarvedCandidate.add(sstable);
return withStarvedCandidate;
#endif
auto boundaries = ::range<dht::decorated_key>::make(*min, *max);
for (auto& sstable : get_level(i)) {
auto r = ::range<dht::decorated_key>::make(sstable->get_first_decorated_key(), sstable->get_last_decorated_key());
if (boundaries.contains(r, dht::ring_position_comparator(*_schema))) {
logger.info("Adding high-level (L{}) {} to candidates", sstable->get_sstable_level(), sstable->get_filename());
auto result = std::find_if(std::begin(candidates), std::end(candidates), [&sstable] (auto& candidate) {
return sstable->generation() == candidate->generation();
});
if (result != std::end(candidates)) {
continue;
}
candidates.push_back(sstable);
return candidates;
}
}
}
@@ -443,7 +413,6 @@ public:
return candidates;
}
#endif
size_t get_level_size(uint32_t level) {
#if 0
@@ -557,7 +526,7 @@ public:
* If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted
* for prior failure), will return an empty list. Never returns null.
*/
std::vector<sstables::shared_sstable> get_candidates_for(int level) {
std::vector<sstables::shared_sstable> get_candidates_for(int level, const std::vector<stdx::optional<dht::decorated_key>>& last_compacted_keys) {
const schema& s = *_schema;
assert(!get_level(level).empty());
@@ -657,31 +626,35 @@ public:
}
// for non-L0 compactions, pick up where we left off last time
get_level(level).sort([&s] (auto& i, auto& j) {
std::list<sstables::shared_sstable>& sstables = get_level(level);
sstables.sort([&s] (auto& i, auto& j) {
return i->compare_by_first_key(*j) < 0;
});
int start = 0; // handles case where the prior compaction touched the very last range
#if 0
for (int i = 0; i < getLevel(level).size(); i++)
{
SSTableReader sstable = getLevel(level).get(i);
if (sstable.first.compareTo(lastCompactedKeys[level]) > 0)
{
start = i;
int idx = 0;
for (auto& sstable : sstables) {
if (uint32_t(level) >= last_compacted_keys.size()) {
throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (last_compacted_keys.size() - 1)));
}
auto& sstable_first = sstable->get_first_decorated_key();
if (!last_compacted_keys[level] || sstable_first.tri_compare(s, *last_compacted_keys[level]) > 0) {
start = idx;
break;
}
idx++;
}
#endif
// look for a non-suspect keyspace to compact with, starting with where we left off last time,
// and wrapping back to the beginning of the generation if necessary
for (auto i = 0U; i < get_level(level).size(); i++) {
for (auto i = 0U; i < sstables.size(); i++) {
// get an iterator to the element of position pos from the list get_level(level).
auto pos = (start + i) % get_level(level).size();
auto it = get_level(level).begin();
auto pos = (start + i) % sstables.size();
auto it = sstables.begin();
std::advance(it, pos);
auto sstable = *it;
auto& sstable = *it;
auto candidates = overlapping(*_schema, sstable, get_level(level + 1));
candidates.push_back(sstable);
#if 0
if (Iterables.any(candidates, suspectP))

View File

@@ -1252,7 +1252,9 @@ static future<std::vector<unsigned long>> compact_sstables(std::vector<unsigned
}
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, 1);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == sstables->size());
BOOST_REQUIRE(candidate.level == 1);
BOOST_REQUIRE(candidate.max_sstable_bytes == 1024*1024);
@@ -1731,7 +1733,9 @@ SEASTAR_TEST_CASE(leveled_01) {
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 2);
BOOST_REQUIRE(candidate.level == 0);
@@ -1786,7 +1790,9 @@ SEASTAR_TEST_CASE(leveled_02) {
auto candidates = get_candidates_for_leveled_strategy(*cf);
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 3);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 3);
BOOST_REQUIRE(candidate.level == 0);
@@ -1844,7 +1850,9 @@ SEASTAR_TEST_CASE(leveled_03) {
leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb);
BOOST_REQUIRE(manifest.get_level_size(0) == 2);
BOOST_REQUIRE(manifest.get_level_size(1) == 2);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 3);
BOOST_REQUIRE(candidate.level == 1);
@@ -1914,7 +1922,9 @@ SEASTAR_TEST_CASE(leveled_04) {
auto level2_score = (double) manifest.get_total_bytes(manifest.get_level(2)) / (double) manifest.max_bytes_for_level(2);
BOOST_REQUIRE(level2_score < 1.001);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.sstables.size() == 2);
BOOST_REQUIRE(candidate.level == 2);
@@ -1976,7 +1986,9 @@ SEASTAR_TEST_CASE(leveled_06) {
BOOST_REQUIRE(manifest.get_level_size(1) == 1);
BOOST_REQUIRE(manifest.get_level_size(2) == 0);
auto candidate = manifest.get_compaction_candidates();
std::vector<stdx::optional<dht::decorated_key>> last_compacted_keys(leveled_manifest::MAX_LEVELS);
std::vector<int> compaction_counter(leveled_manifest::MAX_LEVELS);
auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter);
BOOST_REQUIRE(candidate.level == 2);
BOOST_REQUIRE(candidate.sstables.size() == 1);
auto& sst = (candidate.sstables)[0];