diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 8d80dd4669..acb9416959 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -191,7 +191,8 @@ class partitioned_sstable_set : public sstable_set_impl { using map_iterator = interval_map_type::const_iterator; private: schema_ptr _schema; - interval_map_type _sstables; + std::vector _unleveled_sstables; + interval_map_type _leveled_sstables; private: static interval_type make_interval(const schema& s, const query::partition_range& range) { return interval_type::closed( @@ -207,16 +208,16 @@ private: } std::pair query(const query::partition_range& range) const { if (range.start() && range.end()) { - return _sstables.equal_range(make_interval(range)); + return _leveled_sstables.equal_range(make_interval(range)); } else if (range.start() && !range.end()) { auto start = singular(range.start()->value()); - return { _sstables.lower_bound(start), _sstables.end() }; + return { _leveled_sstables.lower_bound(start), _leveled_sstables.end() }; } else if (!range.start() && range.end()) { auto end = singular(range.end()->value()); - return { _sstables.begin(), _sstables.upper_bound(end) }; + return { _leveled_sstables.begin(), _leveled_sstables.upper_bound(end) }; } else { - return { _sstables.begin(), _sstables.end() }; + return { _leveled_sstables.begin(), _leveled_sstables.end() }; } } public: @@ -234,29 +235,39 @@ public: while (b != e) { boost::copy(b++->second, std::inserter(result, result.end())); } - return std::vector(result.begin(), result.end()); + auto r = _unleveled_sstables; + r.insert(r.end(), result.begin(), result.end()); + return r; } virtual void insert(shared_sstable sst) override { - auto first = sst->get_first_decorated_key().token(); - auto last = sst->get_last_decorated_key().token(); - using bound = query::partition_range::bound; - _sstables.add({ - make_interval( - query::partition_range( - bound(dht::ring_position::starting_at(first)), - bound(dht::ring_position::ending_at(last)))), - value_set({sst})}); + if (sst->get_sstable_level() == 0) { + _unleveled_sstables.push_back(std::move(sst)); + } else { + auto first = sst->get_first_decorated_key().token(); + auto last = sst->get_last_decorated_key().token(); + using bound = query::partition_range::bound; + _leveled_sstables.add({ + make_interval( + query::partition_range( + bound(dht::ring_position::starting_at(first)), + bound(dht::ring_position::ending_at(last)))), + value_set({sst})}); + } } virtual void erase(shared_sstable sst) override { - auto first = sst->get_first_decorated_key().token(); - auto last = sst->get_last_decorated_key().token(); - using bound = query::partition_range::bound; - _sstables.subtract({ - make_interval( - query::partition_range( - bound(dht::ring_position::starting_at(first)), - bound(dht::ring_position::ending_at(last)))), - value_set({sst})}); + if (sst->get_sstable_level() == 0) { + _unleveled_sstables.erase(std::remove(_unleveled_sstables.begin(), _unleveled_sstables.end(), sst), _unleveled_sstables.end()); + } else { + auto first = sst->get_first_decorated_key().token(); + auto last = sst->get_last_decorated_key().token(); + using bound = query::partition_range::bound; + _leveled_sstables.subtract({ + make_interval( + query::partition_range( + bound(dht::ring_position::starting_at(first)), + bound(dht::ring_position::ending_at(last)))), + value_set({sst})}); + } } virtual std::unique_ptr make_incremental_selector() const override; class incremental_selector; @@ -264,6 +275,7 @@ public: class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { schema_ptr _schema; + const std::vector& _unleveled_sstables; map_iterator _it; const map_iterator _end; private: @@ -272,32 +284,35 @@ private: {i.upper().token(), boost::icl::is_right_closed(i.bounds())}); } public: - incremental_selector(schema_ptr schema, const interval_map_type& sstables) + incremental_selector(schema_ptr schema, const std::vector& unleveled_sstables, const interval_map_type& leveled_sstables) : _schema(std::move(schema)) - , _it(sstables.begin()) - , _end(sstables.end()) { + , _unleveled_sstables(unleveled_sstables) + , _it(leveled_sstables.begin()) + , _end(leveled_sstables.end()) { } virtual std::pair, std::vector> select(const dht::token& token) override { auto pr = query::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token)); auto interval = make_interval(*_schema, std::move(pr)); + auto ssts = _unleveled_sstables; while (_it != _end) { if (boost::icl::contains(_it->first, interval)) { - return std::make_pair(to_token_range(_it->first), std::vector(_it->second.begin(), _it->second.end())); + ssts.insert(ssts.end(), _it->second.begin(), _it->second.end()); + return std::make_pair(to_token_range(_it->first), std::move(ssts)); } // we don't want to skip current interval if token lies before it. if (boost::icl::lower_less(interval, _it->first)) { return std::make_pair(nonwrapping_range::make({token, true}, {_it->first.lower().token(), false}), - std::vector()); + std::move(ssts)); } _it++; } - return std::make_pair(nonwrapping_range::make_open_ended_both_sides(), std::vector()); + return std::make_pair(nonwrapping_range::make_open_ended_both_sides(), std::move(ssts)); } }; std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { - return std::make_unique(_schema, _sstables); + return std::make_unique(_schema, _unleveled_sstables, _leveled_sstables); } class compaction_strategy_impl { diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index f69efdc4ac..a3025ee657 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -1702,9 +1702,9 @@ static lw_shared_ptr add_sstable_for_overlapping_test(lw_shared_ptr sstable_for_overlapping_test(const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key) { +static lw_shared_ptr sstable_for_overlapping_test(const schema_ptr& schema, int64_t gen, sstring first_key, sstring last_key, uint32_t level = 0) { auto sst = make_lw_shared(schema, "", gen, la, big); - sstables::test(sst).set_values(std::move(first_key), std::move(last_key), {}); + sstables::test(sst).set_values_for_leveled_strategy(0, level, 0, std::move(first_key), std::move(last_key)); return sst; } @@ -3100,29 +3100,52 @@ SEASTAR_TEST_CASE(sstable_set_incremental_selector) { auto cs = sstables::make_compaction_strategy(sstables::compaction_strategy_type::leveled, s->compaction_strategy_options()); auto key_and_token_pair = token_generation_for_current_shard(8); - sstable_set set = cs.make_sstable_set(s); - set.insert(sstable_for_overlapping_test(s, 1, key_and_token_pair[0].first, key_and_token_pair[1].first)); - set.insert(sstable_for_overlapping_test(s, 2, key_and_token_pair[0].first, key_and_token_pair[1].first)); - set.insert(sstable_for_overlapping_test(s, 3, key_and_token_pair[3].first, key_and_token_pair[4].first)); - set.insert(sstable_for_overlapping_test(s, 4, key_and_token_pair[4].first, key_and_token_pair[4].first)); - set.insert(sstable_for_overlapping_test(s, 5, key_and_token_pair[4].first, key_and_token_pair[5].first)); - - sstable_set::incremental_selector selector = set.make_incremental_selector(); - auto check = [&selector] (const dht::token& token, std::unordered_set expected_gens) { + auto check = [] (sstable_set::incremental_selector& selector, const dht::token& token, std::unordered_set expected_gens) { auto sstables = selector.select(token); BOOST_REQUIRE(sstables.size() == expected_gens.size()); for (auto& sst : sstables) { BOOST_REQUIRE(expected_gens.count(sst->generation()) == 1); } }; - check(key_and_token_pair[0].second, {1, 2}); - check(key_and_token_pair[1].second, {1, 2}); - check(key_and_token_pair[2].second, {}); - check(key_and_token_pair[3].second, {3}); - check(key_and_token_pair[4].second, {3, 4, 5}); - check(key_and_token_pair[5].second, {5}); - check(key_and_token_pair[6].second, {}); - check(key_and_token_pair[7].second, {}); + + { + sstable_set set = cs.make_sstable_set(s); + set.insert(sstable_for_overlapping_test(s, 1, key_and_token_pair[0].first, key_and_token_pair[1].first, 1)); + set.insert(sstable_for_overlapping_test(s, 2, key_and_token_pair[0].first, key_and_token_pair[1].first, 1)); + set.insert(sstable_for_overlapping_test(s, 3, key_and_token_pair[3].first, key_and_token_pair[4].first, 1)); + set.insert(sstable_for_overlapping_test(s, 4, key_and_token_pair[4].first, key_and_token_pair[4].first, 1)); + set.insert(sstable_for_overlapping_test(s, 5, key_and_token_pair[4].first, key_and_token_pair[5].first, 1)); + + sstable_set::incremental_selector sel = set.make_incremental_selector(); + check(sel, key_and_token_pair[0].second, {1, 2}); + check(sel, key_and_token_pair[1].second, {1, 2}); + check(sel, key_and_token_pair[2].second, {}); + check(sel, key_and_token_pair[3].second, {3}); + check(sel, key_and_token_pair[4].second, {3, 4, 5}); + check(sel, key_and_token_pair[5].second, {5}); + check(sel, key_and_token_pair[6].second, {}); + check(sel, key_and_token_pair[7].second, {}); + } + + { + sstable_set set = cs.make_sstable_set(s); + set.insert(sstable_for_overlapping_test(s, 0, key_and_token_pair[0].first, key_and_token_pair[1].first, 0)); + set.insert(sstable_for_overlapping_test(s, 1, key_and_token_pair[0].first, key_and_token_pair[1].first, 1)); + set.insert(sstable_for_overlapping_test(s, 2, key_and_token_pair[0].first, key_and_token_pair[1].first, 1)); + set.insert(sstable_for_overlapping_test(s, 3, key_and_token_pair[3].first, key_and_token_pair[4].first, 1)); + set.insert(sstable_for_overlapping_test(s, 4, key_and_token_pair[4].first, key_and_token_pair[4].first, 1)); + set.insert(sstable_for_overlapping_test(s, 5, key_and_token_pair[4].first, key_and_token_pair[5].first, 1)); + + sstable_set::incremental_selector sel = set.make_incremental_selector(); + check(sel, key_and_token_pair[0].second, {0, 1, 2}); + check(sel, key_and_token_pair[1].second, {0, 1, 2}); + check(sel, key_and_token_pair[2].second, {0}); + check(sel, key_and_token_pair[3].second, {0, 3}); + check(sel, key_and_token_pair[4].second, {0, 3, 4, 5}); + check(sel, key_and_token_pair[5].second, {0, 5}); + check(sel, key_and_token_pair[6].second, {0}); + check(sel, key_and_token_pair[7].second, {0}); + } return make_ready_future<>(); }