From 02541e15c1effe5aa300110a13ec130d2e19bb84 Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 7 Dec 2016 16:21:30 -0200 Subject: [PATCH] sstable_set: introduce incremental selector Incrementally select sstables from sstable set using token in ascending order. For leveled strategy, it returns all sstables that belong to current interval. For other strategies, it just return all sstables from the set. Useful for compaction which needs all sstables that overlap with key being currently compacted to calculate maximum purgeable timestamp. Signed-off-by: Raphael S. Carvalho --- sstables/compaction_strategy.cc | 98 +++++++++++++++++++++++++++++++-- sstables/sstable_set.hh | 16 ++++++ 2 files changed, 110 insertions(+), 4 deletions(-) diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 1b6ad515d6..8d80dd4669 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -59,6 +59,12 @@ namespace sstables { extern logging::logger logger; +class incremental_selector_impl { +public: + virtual ~incremental_selector_impl() {} + virtual std::pair, std::vector> select(const dht::token& token) = 0; +}; + class sstable_set_impl { public: virtual ~sstable_set_impl() {} @@ -66,6 +72,7 @@ public: virtual std::vector select(const query::partition_range& range) const = 0; virtual void insert(shared_sstable sst) = 0; virtual void erase(shared_sstable sst) = 0; + virtual std::unique_ptr make_incremental_selector() const = 0; }; sstable_set::sstable_set(std::unique_ptr impl, lw_shared_ptr all) @@ -116,6 +123,29 @@ sstable_set::erase(shared_sstable sst) { sstable_set::~sstable_set() = default; +sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl) + : _impl(std::move(impl)) { +} + +sstable_set::incremental_selector::~incremental_selector() = default; + +sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; + +const std::vector& +sstable_set::incremental_selector::select(const dht::token& t) const { + if (!_current_token_range || !_current_token_range->contains(t, dht::token_comparator())) { + auto&& x = _impl->select(t); + _current_token_range = std::move(std::get<0>(x)); + _current_sstables = std::move(std::get<1>(x)); + } + return _current_sstables; +} + +sstable_set::incremental_selector +sstable_set::make_incremental_selector() const { + return incremental_selector(_impl->make_incremental_selector()); +} + // default sstable_set, not specialized for anything class bag_sstable_set : public sstable_set_impl { // erasing is slow, but select() is fast @@ -124,7 +154,7 @@ public: virtual std::unique_ptr clone() const override { return std::make_unique(*this); } - virtual std::vector select(const query::partition_range& range) const override { + virtual std::vector select(const query::partition_range& range = query::full_partition_range) const override { return _sstables; } virtual void insert(shared_sstable sst) override { @@ -133,8 +163,25 @@ public: virtual void erase(shared_sstable sst) override { _sstables.erase(boost::find(_sstables, sst)); } + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; }; +class bag_sstable_set::incremental_selector : public incremental_selector_impl { + const std::vector& _sstables; +public: + incremental_selector(const std::vector& sstables) + : _sstables(sstables) { + } + virtual std::pair, std::vector> select(const dht::token& token) override { + return std::make_pair(nonwrapping_range::make_open_ended_both_sides(), _sstables); + } +}; + +std::unique_ptr bag_sstable_set::make_incremental_selector() const { + return std::make_unique(_sstables); +} + // specialized when sstables are partitioned in the token range space // e.g. leveled compaction strategy class partitioned_sstable_set : public sstable_set_impl { @@ -146,10 +193,13 @@ private: schema_ptr _schema; interval_map_type _sstables; private: - interval_type make_interval(const query::partition_range& range) const { + static interval_type make_interval(const schema& s, const query::partition_range& range) { return interval_type::closed( - compatible_ring_position(*_schema, range.start()->value()), - compatible_ring_position(*_schema, range.end()->value())); + compatible_ring_position(s, range.start()->value()), + compatible_ring_position(s, range.end()->value())); + } + interval_type make_interval(const query::partition_range& range) const { + return make_interval(*_schema, range); } interval_type singular(const dht::ring_position& rp) const { auto crp = compatible_ring_position(*_schema, rp); @@ -208,8 +258,48 @@ public: bound(dht::ring_position::ending_at(last)))), value_set({sst})}); } + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; }; +class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { + schema_ptr _schema; + map_iterator _it; + const map_iterator _end; +private: + static nonwrapping_range to_token_range(const interval_type& i) { + return nonwrapping_range::make({i.lower().token(), boost::icl::is_left_closed(i.bounds())}, + {i.upper().token(), boost::icl::is_right_closed(i.bounds())}); + } +public: + incremental_selector(schema_ptr schema, const interval_map_type& sstables) + : _schema(std::move(schema)) + , _it(sstables.begin()) + , _end(sstables.end()) { + } + virtual std::pair, std::vector> select(const dht::token& token) override { + auto pr = query::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token)); + auto interval = make_interval(*_schema, std::move(pr)); + + while (_it != _end) { + if (boost::icl::contains(_it->first, interval)) { + return std::make_pair(to_token_range(_it->first), std::vector(_it->second.begin(), _it->second.end())); + } + // we don't want to skip current interval if token lies before it. + if (boost::icl::lower_less(interval, _it->first)) { + return std::make_pair(nonwrapping_range::make({token, true}, {_it->first.lower().token(), false}), + std::vector()); + } + _it++; + } + return std::make_pair(nonwrapping_range::make_open_ended_both_sides(), std::vector()); + } +}; + +std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { + return std::make_unique(_schema, _sstables); +} + class compaction_strategy_impl { protected: bool _use_clustering_key_filter = false; diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index d537acb9dd..c0f39aacce 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -29,6 +29,7 @@ namespace sstables { class sstable_set_impl; +class incremental_selector_impl; class sstable_set { std::unique_ptr _impl; @@ -46,6 +47,21 @@ public: lw_shared_ptr all() const { return _all; } void insert(shared_sstable sst); void erase(shared_sstable sst); + + // Used to incrementally select sstables from sstable set using tokens. + // sstable set must be alive and cannot be modified while incremental + // selector is used. + class incremental_selector { + std::unique_ptr _impl; + mutable stdx::optional> _current_token_range; + mutable std::vector _current_sstables; + public: + ~incremental_selector(); + incremental_selector(std::unique_ptr impl); + incremental_selector(incremental_selector&&) noexcept; + const std::vector& select(const dht::token& t) const; + }; + incremental_selector make_incremental_selector() const; }; }