diff --git a/compaction_strategy.hh b/compaction_strategy.hh index b536376068..610a699076 100644 --- a/compaction_strategy.hh +++ b/compaction_strategy.hh @@ -54,6 +54,10 @@ public: // Return a list of sstables to be compacted after applying the strategy. compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector> candidates); + // Some strategies may look at the compacted and resulting sstables to + // get some useful information for subsequent compactions. + void notify_completion(const std::vector>& removed, const std::vector>& added); + // Return if parallel compaction is allowed by strategy. bool parallel_compaction() const; diff --git a/database.cc b/database.cc index c9519b23c5..ed11ac3563 100644 --- a/database.cc +++ b/database.cc @@ -1283,6 +1283,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool }; return sstables::compact_sstables(*sstables_to_compact, *this, create_sstable, descriptor.max_sstable_bytes, descriptor.level, cleanup).then([this, sstables_to_compact] (auto new_sstables) { + _compaction_strategy.notify_completion(*sstables_to_compact, new_sstables); return this->rebuild_sstable_list(new_sstables, *sstables_to_compact); }); }); diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 0da8454b19..d3b2f5bc14 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -216,6 +216,7 @@ protected: public: virtual ~compaction_strategy_impl() {} virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector candidates) = 0; + virtual void notify_completion(const std::vector>& removed, const std::vector>& added) { } virtual compaction_strategy_type type() const = 0; virtual bool parallel_compaction() const { return true; @@ -583,6 +584,8 @@ class leveled_compaction_strategy : public compaction_strategy_impl { const sstring SSTABLE_SIZE_OPTION = "sstable_size_in_mb"; int32_t _max_sstable_size_in_mb = DEFAULT_MAX_SSTABLE_SIZE_IN_MB; + std::vector> _last_compacted_keys; + std::vector _compaction_counter; public: leveled_compaction_strategy(const std::map& options) { using namespace cql3::statements; @@ -596,10 +599,14 @@ public: logger.warn("Max sstable size of {}MB is configured. Testing done for CASSANDRA-5727 indicates that performance improves up to 160MB", _max_sstable_size_in_mb); } + _last_compacted_keys.resize(leveled_manifest::MAX_LEVELS); + _compaction_counter.resize(leveled_manifest::MAX_LEVELS); } virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector candidates) override; + virtual void notify_completion(const std::vector>& removed, const std::vector>& added) override; + virtual int64_t estimated_pending_compactions(column_family& cf) const override; virtual bool parallel_compaction() const override { @@ -621,7 +628,7 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c // sstable in it may be marked for deletion after compacted. // Currently, we create a new manifest whenever it's time for compaction. leveled_manifest manifest = leveled_manifest::create(cfs, candidates, _max_sstable_size_in_mb); - auto candidate = manifest.get_compaction_candidates(); + auto candidate = manifest.get_compaction_candidates(_last_compacted_keys, _compaction_counter); if (candidate.sstables.empty()) { return sstables::compaction_descriptor(); @@ -632,6 +639,24 @@ compaction_descriptor leveled_compaction_strategy::get_sstables_for_compaction(c return std::move(candidate); } +void leveled_compaction_strategy::notify_completion(const std::vector>& removed, const std::vector>& added) { + if (removed.empty() || added.empty()) { + return; + } + auto min_level = std::numeric_limits::max(); + for (auto& sstable : removed) { + min_level = std::min(min_level, sstable->get_sstable_level()); + } + + const sstables::sstable *last = nullptr; + for (auto& candidate : added) { + if (!last || last->compare_by_first_key(*candidate) < 0) { + last = &*candidate; + } + } + _last_compacted_keys[min_level] = last->get_last_decorated_key(); +} + int64_t leveled_compaction_strategy::estimated_pending_compactions(column_family& cf) const { std::vector sstables; sstables.reserve(cf.sstables_count()); @@ -686,6 +711,10 @@ compaction_descriptor compaction_strategy::get_sstables_for_compaction(column_fa return _compaction_strategy_impl->get_sstables_for_compaction(cfs, std::move(candidates)); } +void compaction_strategy::notify_completion(const std::vector>& removed, const std::vector>& added) { + _compaction_strategy_impl->notify_completion(removed, added); +} + bool compaction_strategy::parallel_compaction() const { return _compaction_strategy_impl->parallel_compaction(); } diff --git a/sstables/leveled_manifest.hh b/sstables/leveled_manifest.hh index 3f7378bef5..b287fab662 100644 --- a/sstables/leveled_manifest.hh +++ b/sstables/leveled_manifest.hh @@ -64,16 +64,14 @@ class leveled_manifest { schema_ptr _schema; std::vector> _generations; -#if 0 - private final RowPosition[] lastCompactedKeys; -#endif uint64_t _max_sstable_size_in_bytes; #if 0 private final SizeTieredCompactionStrategyOptions options; - private final int [] compactionCounter; #endif public: + static constexpr int MAX_LEVELS = 9; // log10(1000^3); + leveled_manifest(column_family& cfs, int max_sstable_size_in_MB) : logger("LeveledManifest") , _schema(cfs.schema()) @@ -82,15 +80,8 @@ public: // allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is // updated, we will still have sstables of the older, potentially smaller size. So don't make this // dependent on maxSSTableSize.) - uint64_t n = 9; // log10(1000^3) - _generations.resize(n); + _generations.resize(MAX_LEVELS); #if 0 - lastCompactedKeys = new RowPosition[n]; - for (int i = 0; i < generations.length; i++) - { - generations[i] = new ArrayList<>(); - lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound(); - } compactionCounter = new int[n]; #endif } @@ -129,37 +120,6 @@ public: _generations[level].push_back(sstable); } -#if 0 - public synchronized void replace(Collection removed, Collection added) - { - assert !removed.isEmpty(); // use add() instead of promote when adding new sstables - logDistribution(); - if (logger.isDebugEnabled()) - logger.debug("Replacing [{}]", toString(removed)); - - // the level for the added sstables is the max of the removed ones, - // plus one if the removed were all on the same level - int minLevel = Integer.MAX_VALUE; - - for (SSTableReader sstable : removed) - { - int thisLevel = remove(sstable); - minLevel = Math.min(minLevel, thisLevel); - } - - // it's valid to do a remove w/o an add (e.g. on truncate) - if (added.isEmpty()) - return; - - if (logger.isDebugEnabled()) - logger.debug("Adding [{}]", toString(added)); - - for (SSTableReader ssTableReader : added) - add(ssTableReader); - lastCompactedKeys[minLevel] = SSTableReader.sstableOrdering.max(added).last; - } -#endif - void repair_overlapping_sstables(int level) { const sstables::sstable *previous = nullptr; const schema& s = *_schema; @@ -272,7 +232,8 @@ public: * @return highest-priority sstables to compact, and level to compact them to * If no compactions are necessary, will return null */ - sstables::compaction_descriptor get_compaction_candidates() { + sstables::compaction_descriptor get_compaction_candidates(const std::vector>& last_compacted_keys, + std::vector& compaction_counter) { #if 0 // during bootstrap we only do size tiering in L0 to make sure // the streamed files can be placed in their original levels @@ -339,11 +300,12 @@ public: } } // L0 is fine, proceed with this level - auto candidates = get_candidates_for(i); + auto candidates = get_candidates_for(i, last_compacted_keys); if (!candidates.empty()) { int next_level = get_next_level(candidates); + + candidates = get_overlapping_starved_sstables(next_level, std::move(candidates), compaction_counter); #if 0 - candidates = getOverlappingStarvedSSTables(nextLevel, candidates); if (logger.isDebugEnabled()) logger.debug("Compaction candidates for L{} are {}", i, toString(candidates)); #endif @@ -359,7 +321,7 @@ public: if (get_level(0).empty()) { return sstables::compaction_descriptor(); } - auto candidates = get_candidates_for(0); + auto candidates = get_candidates_for(0, last_compacted_keys); if (candidates.empty()) { return sstables::compaction_descriptor(); } @@ -391,49 +353,57 @@ public: * @param candidates the original sstables to compact * @return */ -#if 0 - private Collection getOverlappingStarvedSSTables(int targetLevel, Collection candidates) - { - Set withStarvedCandidate = new HashSet<>(candidates); + std::vector + get_overlapping_starved_sstables(int target_level, std::vector&& candidates, std::vector& compaction_counter) { + for (int i = _generations.size() - 1; i > 0; i--) { + compaction_counter[i]++; + } + compaction_counter[target_level] = 0; - for (int i = generations.length - 1; i > 0; i--) - compactionCounter[i]++; - compactionCounter[targetLevel] = 0; - if (logger.isDebugEnabled()) - { - for (int j = 0; j < compactionCounter.length; j++) - logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]); + if (logger.level() == logging::log_level::debug) { + for (auto j = 0U; j < compaction_counter.size(); j++) { + logger.debug("CompactionCounter: {}: {}", j, compaction_counter[j]); + } } - for (int i = generations.length - 1; i > 0; i--) - { - if (getLevelSize(i) > 0) - { - if (compactionCounter[i] > NO_COMPACTION_LIMIT) - { + for (int i = _generations.size() - 1; i > 0; i--) { + if (get_level_size(i) > 0) { + if (compaction_counter[i] > NO_COMPACTION_LIMIT) { // we try to find an sstable that is fully contained within the boundaries we are compacting; // say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2 // this means that we will not create overlap in L2 if we add an sstable // contained within 0 -> 33 to the compaction - RowPosition max = null; - RowPosition min = null; - for (SSTableReader candidate : candidates) - { - if (min == null || candidate.first.compareTo(min) < 0) - min = candidate.first; - if (max == null || candidate.last.compareTo(max) > 0) - max = candidate.last; + stdx::optional max; + stdx::optional min; + for (auto& candidate : candidates) { + auto& candidate_first = candidate->get_first_decorated_key(); + if (!min || candidate_first.tri_compare(*_schema, *min) < 0) { + min = candidate_first; + } + auto& candidate_last = candidate->get_first_decorated_key(); + if (!max || candidate_last.tri_compare(*_schema, *max) > 0) { + max = candidate_last; + } } +#if 0 + // NOTE: We don't need to filter out compacting sstables by now because strategy only deals with + // uncompacting sstables and parallel compaction is also disabled for lcs. Set compacting = cfs.getDataTracker().getCompacting(); - Range boundaries = new Range<>(min, max); - for (SSTableReader sstable : getLevel(i)) - { - Range r = new Range(sstable.first, sstable.last); - if (boundaries.contains(r) && !compacting.contains(sstable)) - { - logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable); - withStarvedCandidate.add(sstable); - return withStarvedCandidate; +#endif + auto boundaries = ::range::make(*min, *max); + for (auto& sstable : get_level(i)) { + auto r = ::range::make(sstable->get_first_decorated_key(), sstable->get_last_decorated_key()); + if (boundaries.contains(r, dht::ring_position_comparator(*_schema))) { + logger.info("Adding high-level (L{}) {} to candidates", sstable->get_sstable_level(), sstable->get_filename()); + + auto result = std::find_if(std::begin(candidates), std::end(candidates), [&sstable] (auto& candidate) { + return sstable->generation() == candidate->generation(); + }); + if (result != std::end(candidates)) { + continue; + } + candidates.push_back(sstable); + return candidates; } } } @@ -443,7 +413,6 @@ public: return candidates; } -#endif size_t get_level_size(uint32_t level) { #if 0 @@ -557,7 +526,7 @@ public: * If no compactions are possible (because of concurrent compactions or because some sstables are blacklisted * for prior failure), will return an empty list. Never returns null. */ - std::vector get_candidates_for(int level) { + std::vector get_candidates_for(int level, const std::vector>& last_compacted_keys) { const schema& s = *_schema; assert(!get_level(level).empty()); @@ -657,31 +626,35 @@ public: } // for non-L0 compactions, pick up where we left off last time - get_level(level).sort([&s] (auto& i, auto& j) { + std::list& sstables = get_level(level); + sstables.sort([&s] (auto& i, auto& j) { return i->compare_by_first_key(*j) < 0; }); int start = 0; // handles case where the prior compaction touched the very last range -#if 0 - for (int i = 0; i < getLevel(level).size(); i++) - { - SSTableReader sstable = getLevel(level).get(i); - if (sstable.first.compareTo(lastCompactedKeys[level]) > 0) - { - start = i; + int idx = 0; + for (auto& sstable : sstables) { + if (uint32_t(level) >= last_compacted_keys.size()) { + throw std::runtime_error(sprint("Invalid level %u out of %ld", level, (last_compacted_keys.size() - 1))); + } + auto& sstable_first = sstable->get_first_decorated_key(); + if (!last_compacted_keys[level] || sstable_first.tri_compare(s, *last_compacted_keys[level]) > 0) { + start = idx; break; } + idx++; } -#endif + // look for a non-suspect keyspace to compact with, starting with where we left off last time, // and wrapping back to the beginning of the generation if necessary - for (auto i = 0U; i < get_level(level).size(); i++) { + for (auto i = 0U; i < sstables.size(); i++) { // get an iterator to the element of position pos from the list get_level(level). - auto pos = (start + i) % get_level(level).size(); - auto it = get_level(level).begin(); + auto pos = (start + i) % sstables.size(); + auto it = sstables.begin(); std::advance(it, pos); - auto sstable = *it; + auto& sstable = *it; auto candidates = overlapping(*_schema, sstable, get_level(level + 1)); + candidates.push_back(sstable); #if 0 if (Iterables.any(candidates, suspectP)) diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index b139cfc581..b0ca812e58 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -1252,7 +1252,9 @@ static future> compact_sstables(std::vector> last_compacted_keys(leveled_manifest::MAX_LEVELS); + std::vector compaction_counter(leveled_manifest::MAX_LEVELS); + auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter); BOOST_REQUIRE(candidate.sstables.size() == sstables->size()); BOOST_REQUIRE(candidate.level == 1); BOOST_REQUIRE(candidate.max_sstable_bytes == 1024*1024); @@ -1731,7 +1733,9 @@ SEASTAR_TEST_CASE(leveled_01) { auto candidates = get_candidates_for_leveled_strategy(*cf); leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb); BOOST_REQUIRE(manifest.get_level_size(0) == 2); - auto candidate = manifest.get_compaction_candidates(); + std::vector> last_compacted_keys(leveled_manifest::MAX_LEVELS); + std::vector compaction_counter(leveled_manifest::MAX_LEVELS); + auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter); BOOST_REQUIRE(candidate.sstables.size() == 2); BOOST_REQUIRE(candidate.level == 0); @@ -1786,7 +1790,9 @@ SEASTAR_TEST_CASE(leveled_02) { auto candidates = get_candidates_for_leveled_strategy(*cf); leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb); BOOST_REQUIRE(manifest.get_level_size(0) == 3); - auto candidate = manifest.get_compaction_candidates(); + std::vector> last_compacted_keys(leveled_manifest::MAX_LEVELS); + std::vector compaction_counter(leveled_manifest::MAX_LEVELS); + auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter); BOOST_REQUIRE(candidate.sstables.size() == 3); BOOST_REQUIRE(candidate.level == 0); @@ -1844,7 +1850,9 @@ SEASTAR_TEST_CASE(leveled_03) { leveled_manifest manifest = leveled_manifest::create(*cf, candidates, max_sstable_size_in_mb); BOOST_REQUIRE(manifest.get_level_size(0) == 2); BOOST_REQUIRE(manifest.get_level_size(1) == 2); - auto candidate = manifest.get_compaction_candidates(); + std::vector> last_compacted_keys(leveled_manifest::MAX_LEVELS); + std::vector compaction_counter(leveled_manifest::MAX_LEVELS); + auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter); BOOST_REQUIRE(candidate.sstables.size() == 3); BOOST_REQUIRE(candidate.level == 1); @@ -1914,7 +1922,9 @@ SEASTAR_TEST_CASE(leveled_04) { auto level2_score = (double) manifest.get_total_bytes(manifest.get_level(2)) / (double) manifest.max_bytes_for_level(2); BOOST_REQUIRE(level2_score < 1.001); - auto candidate = manifest.get_compaction_candidates(); + std::vector> last_compacted_keys(leveled_manifest::MAX_LEVELS); + std::vector compaction_counter(leveled_manifest::MAX_LEVELS); + auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter); BOOST_REQUIRE(candidate.sstables.size() == 2); BOOST_REQUIRE(candidate.level == 2); @@ -1976,7 +1986,9 @@ SEASTAR_TEST_CASE(leveled_06) { BOOST_REQUIRE(manifest.get_level_size(1) == 1); BOOST_REQUIRE(manifest.get_level_size(2) == 0); - auto candidate = manifest.get_compaction_candidates(); + std::vector> last_compacted_keys(leveled_manifest::MAX_LEVELS); + std::vector compaction_counter(leveled_manifest::MAX_LEVELS); + auto candidate = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter); BOOST_REQUIRE(candidate.level == 2); BOOST_REQUIRE(candidate.sstables.size() == 1); auto& sst = (candidate.sstables)[0];