diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index c64eba953d..8c07ff272f 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -54,6 +54,7 @@ #include "date_tiered_compaction_strategy.hh" logging::logger date_tiered_manifest::logger = logging::logger("DateTieredCompactionStrategy"); +logging::logger leveled_manifest::logger("LeveledManifest"); namespace sstables { diff --git a/sstables/leveled_manifest.hh b/sstables/leveled_manifest.hh index 8ae3d0e524..279ec1a1e4 100644 --- a/sstables/leveled_manifest.hh +++ b/sstables/leveled_manifest.hh @@ -47,14 +47,7 @@ #include "log.hh" class leveled_manifest { - logging::logger logger; - - /** - * limit the number of L0 sstables we do at once, because compaction bloom filter creation - * uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory - * or even OOMing when compacting highly overlapping sstables - */ - static constexpr int MAX_COMPACTING_L0 = 32; + static logging::logger logger; schema_ptr _schema; std::vector> _generations; @@ -68,6 +61,12 @@ class leveled_manifest { bool can_promote = true; }; public: + /** + * limit the number of L0 sstables we do at once, because compaction bloom filter creation + * uses a pessimistic estimate of how many keys overlap (none), so we risk wasting memory + * or even OOMing when compacting highly overlapping sstables + */ + static constexpr int MAX_COMPACTING_L0 = 32; /** * If we go this many rounds without compacting * in the highest level, we start bringing in sstables from @@ -78,26 +77,15 @@ public: static constexpr int MAX_LEVELS = 9; // log10(1000^3); leveled_manifest(column_family& cfs, int max_sstable_size_in_MB) - : logger("LeveledManifest") - , _schema(cfs.schema()) + : _schema(cfs.schema()) , _max_sstable_size_in_bytes(max_sstable_size_in_MB * 1024 * 1024) { // allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is // updated, we will still have sstables of the older, potentially smaller size. So don't make this // dependent on maxSSTableSize.) _generations.resize(MAX_LEVELS); -#if 0 - compactionCounter = new int[n]; -#endif } -#if 0 - public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List sstables) - { - return create(cfs, maxSSTableSize, sstables, new SizeTieredCompactionStrategyOptions()); - } -#endif - static leveled_manifest create(column_family& cfs, std::vector& sstables, int max_sstable_size_in_mb) { leveled_manifest manifest = leveled_manifest(cfs, max_sstable_size_in_mb); @@ -417,10 +405,6 @@ public: } size_t get_level_size(uint32_t level) { -#if 0 - if (i >= generations.length) - throw new ArrayIndexOutOfBoundsException("Maximum valid generation is " + (generations.length - 1)); -#endif return get_level(level).size(); } @@ -538,23 +522,8 @@ public: assert(!get_level(level).empty()); logger.debug("Choosing candidates for L{}", level); -#if 0 - final Set compacting = cfs.getDataTracker().getCompacting(); -#endif - if (level == 0) { -#if 0 - Set compactingL0 = ImmutableSet.copyOf(Iterables.filter(getLevel(0), Predicates.in(compacting))); - RowPosition lastCompactingKey = null; - RowPosition firstCompactingKey = null; - for (SSTableReader candidate : compactingL0) - { - if (firstCompactingKey == null || candidate.first.compareTo(firstCompactingKey) < 0) - firstCompactingKey = candidate.first; - if (lastCompactingKey == null || candidate.last.compareTo(lastCompactingKey) > 0) - lastCompactingKey = candidate.last; - } -#endif + if (level == 0) { // L0 is the dumping ground for new sstables which thus may overlap each other. // @@ -569,46 +538,12 @@ public: // Note that we ignore suspect-ness of L1 sstables here, since if an L1 sstable is suspect we're // basically screwed, since we expect all or most L0 sstables to overlap with each L1 sstable. // So if an L1 sstable is suspect we can't do much besides try anyway and hope for the best. - std::vector candidates; - std::list remaining = get_level(0); -#if 0 - Iterables.addAll(remaining, Iterables.filter(getLevel(0), Predicates.not(suspectP))); -#endif - for (auto& sstable : age_sorted_sstables(remaining)) { - auto it = std::find(candidates.begin(), candidates.end(), sstable); - if (it != candidates.end()) { - continue; - } + auto candidates = boost::copy_range>(get_level(0)); - auto overlappedL0 = overlapping(*_schema, sstable, remaining); - it = std::find(overlappedL0.begin(), overlappedL0.end(), sstable); - if (it == overlappedL0.end()) { - overlappedL0.push_back(sstable); - } - -#if 0 - if (!Sets.intersection(overlappedL0, compactingL0).isEmpty()) - continue; -#endif - - for (auto& new_candidate : overlappedL0) { -#if 0 - if (firstCompactingKey == null || lastCompactingKey == null || overlapping(firstCompactingKey.getToken(), lastCompactingKey.getToken(), Arrays.asList(newCandidate)).size() == 0) - candidates.add(newCandidate); -#else - candidates.push_back(new_candidate); -#endif - remaining.remove(new_candidate); - } - - if (candidates.size() > MAX_COMPACTING_L0) { - // limit to only the MAX_COMPACTING_L0 oldest candidates - auto age_sorted_candidates = age_sorted_sstables(candidates); - // create a sub list of age_sorted_candidates by resizing it. - age_sorted_candidates.resize(MAX_COMPACTING_L0); - candidates = std::move(age_sorted_candidates); - break; - } + if (candidates.size() > MAX_COMPACTING_L0) { + // limit to only the MAX_COMPACTING_L0 oldest candidates + sort_sstables_by_age(candidates); + candidates.resize(MAX_COMPACTING_L0); } // leave everything in L0 if we didn't end up with a full sstable's worth of data @@ -617,13 +552,7 @@ public: // if the overlapping ones are already busy in a compaction, leave it out. // TODO try to find a set of L0 sstables that only overlaps with non-busy L1 sstables auto l1overlapping = overlapping(*_schema, candidates, get_level(1)); - for (auto candidate : l1overlapping) { - auto it = std::find(candidates.begin(), candidates.end(), candidate); - if (it != candidates.end()) { - continue; - } - candidates.push_back(candidate); - } + candidates.insert(candidates.end(), l1overlapping.begin(), l1overlapping.end()); } if (candidates.size() < 2) { return {}; @@ -688,24 +617,10 @@ public: return {}; } - std::list age_sorted_sstables(std::list& candidates) { - auto age_sorted_candidates = candidates; - - age_sorted_candidates.sort([] (auto& i, auto& j) { - return i->compare_by_max_timestamp(*j) > 0; + void sort_sstables_by_age(std::vector& candidates) { + std::sort(candidates.begin(), candidates.end(), [] (auto& i, auto& j) { + return i->compare_by_max_timestamp(*j) < 0; }); - - return age_sorted_candidates; - } - - std::vector age_sorted_sstables(std::vector& candidates) { - auto age_sorted_candidates = candidates; - - std::sort(age_sorted_candidates.begin(), age_sorted_candidates.end(), [] (auto& i, auto& j) { - return i->compare_by_max_timestamp(*j) > 0; - }); - - return age_sorted_candidates; } #if 0 @Override diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index 919e64f566..97563f7c71 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -2083,6 +2083,34 @@ SEASTAR_TEST_CASE(leveled_06) { return make_ready_future<>(); } +SEASTAR_TEST_CASE(leveled_07) { + auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, + {{"p1", utf8_type}}, {}, {}, {}, utf8_type)); + + column_family::config cfg; + cell_locker_stats cl_stats; + compaction_manager cm; + auto cf = make_lw_shared(s, cfg, column_family::no_commitlog(), cm, cl_stats); + cf->mark_ready_for_writes(); + + for (auto i = 0; i < leveled_manifest::MAX_COMPACTING_L0*2; i++) { + add_sstable_for_leveled_test(cf, i, 0, /*level*/0, "a", "a", i /* max timestamp */); + } + auto candidates = get_candidates_for_leveled_strategy(*cf); + leveled_manifest manifest = leveled_manifest::create(*cf, candidates, 1); + std::vector> last_compacted_keys(leveled_manifest::MAX_LEVELS); + std::vector compaction_counter(leveled_manifest::MAX_LEVELS); + auto desc = manifest.get_compaction_candidates(last_compacted_keys, compaction_counter); + BOOST_REQUIRE(desc.level == 0); + BOOST_REQUIRE(desc.sstables.size() == leveled_manifest::MAX_COMPACTING_L0); + // check that strategy returns the oldest sstables + for (auto& sst : desc.sstables) { + BOOST_REQUIRE(sst->get_stats_metadata().max_timestamp < leveled_manifest::MAX_COMPACTING_L0); + } + + return make_ready_future<>(); +} + SEASTAR_TEST_CASE(leveled_invariant_fix) { auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, {{"p1", utf8_type}}, {}, {}, {}, utf8_type));