diff --git a/sstables/compaction_strategy.cc b/sstables/compaction_strategy.cc index 1b6ad515d6..8d80dd4669 100644 --- a/sstables/compaction_strategy.cc +++ b/sstables/compaction_strategy.cc @@ -59,6 +59,12 @@ namespace sstables { extern logging::logger logger; +class incremental_selector_impl { +public: + virtual ~incremental_selector_impl() {} + virtual std::pair, std::vector> select(const dht::token& token) = 0; +}; + class sstable_set_impl { public: virtual ~sstable_set_impl() {} @@ -66,6 +72,7 @@ public: virtual std::vector select(const query::partition_range& range) const = 0; virtual void insert(shared_sstable sst) = 0; virtual void erase(shared_sstable sst) = 0; + virtual std::unique_ptr make_incremental_selector() const = 0; }; sstable_set::sstable_set(std::unique_ptr impl, lw_shared_ptr all) @@ -116,6 +123,29 @@ sstable_set::erase(shared_sstable sst) { sstable_set::~sstable_set() = default; +sstable_set::incremental_selector::incremental_selector(std::unique_ptr impl) + : _impl(std::move(impl)) { +} + +sstable_set::incremental_selector::~incremental_selector() = default; + +sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default; + +const std::vector& +sstable_set::incremental_selector::select(const dht::token& t) const { + if (!_current_token_range || !_current_token_range->contains(t, dht::token_comparator())) { + auto&& x = _impl->select(t); + _current_token_range = std::move(std::get<0>(x)); + _current_sstables = std::move(std::get<1>(x)); + } + return _current_sstables; +} + +sstable_set::incremental_selector +sstable_set::make_incremental_selector() const { + return incremental_selector(_impl->make_incremental_selector()); +} + // default sstable_set, not specialized for anything class bag_sstable_set : public sstable_set_impl { // erasing is slow, but select() is fast @@ -124,7 +154,7 @@ public: virtual std::unique_ptr clone() const override { return std::make_unique(*this); } - virtual std::vector select(const query::partition_range& range) const override { + virtual std::vector select(const query::partition_range& range = query::full_partition_range) const override { return _sstables; } virtual void insert(shared_sstable sst) override { @@ -133,8 +163,25 @@ public: virtual void erase(shared_sstable sst) override { _sstables.erase(boost::find(_sstables, sst)); } + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; }; +class bag_sstable_set::incremental_selector : public incremental_selector_impl { + const std::vector& _sstables; +public: + incremental_selector(const std::vector& sstables) + : _sstables(sstables) { + } + virtual std::pair, std::vector> select(const dht::token& token) override { + return std::make_pair(nonwrapping_range::make_open_ended_both_sides(), _sstables); + } +}; + +std::unique_ptr bag_sstable_set::make_incremental_selector() const { + return std::make_unique(_sstables); +} + // specialized when sstables are partitioned in the token range space // e.g. leveled compaction strategy class partitioned_sstable_set : public sstable_set_impl { @@ -146,10 +193,13 @@ private: schema_ptr _schema; interval_map_type _sstables; private: - interval_type make_interval(const query::partition_range& range) const { + static interval_type make_interval(const schema& s, const query::partition_range& range) { return interval_type::closed( - compatible_ring_position(*_schema, range.start()->value()), - compatible_ring_position(*_schema, range.end()->value())); + compatible_ring_position(s, range.start()->value()), + compatible_ring_position(s, range.end()->value())); + } + interval_type make_interval(const query::partition_range& range) const { + return make_interval(*_schema, range); } interval_type singular(const dht::ring_position& rp) const { auto crp = compatible_ring_position(*_schema, rp); @@ -208,8 +258,48 @@ public: bound(dht::ring_position::ending_at(last)))), value_set({sst})}); } + virtual std::unique_ptr make_incremental_selector() const override; + class incremental_selector; }; +class partitioned_sstable_set::incremental_selector : public incremental_selector_impl { + schema_ptr _schema; + map_iterator _it; + const map_iterator _end; +private: + static nonwrapping_range to_token_range(const interval_type& i) { + return nonwrapping_range::make({i.lower().token(), boost::icl::is_left_closed(i.bounds())}, + {i.upper().token(), boost::icl::is_right_closed(i.bounds())}); + } +public: + incremental_selector(schema_ptr schema, const interval_map_type& sstables) + : _schema(std::move(schema)) + , _it(sstables.begin()) + , _end(sstables.end()) { + } + virtual std::pair, std::vector> select(const dht::token& token) override { + auto pr = query::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token)); + auto interval = make_interval(*_schema, std::move(pr)); + + while (_it != _end) { + if (boost::icl::contains(_it->first, interval)) { + return std::make_pair(to_token_range(_it->first), std::vector(_it->second.begin(), _it->second.end())); + } + // we don't want to skip current interval if token lies before it. + if (boost::icl::lower_less(interval, _it->first)) { + return std::make_pair(nonwrapping_range::make({token, true}, {_it->first.lower().token(), false}), + std::vector()); + } + _it++; + } + return std::make_pair(nonwrapping_range::make_open_ended_both_sides(), std::vector()); + } +}; + +std::unique_ptr partitioned_sstable_set::make_incremental_selector() const { + return std::make_unique(_schema, _sstables); +} + class compaction_strategy_impl { protected: bool _use_clustering_key_filter = false; diff --git a/sstables/sstable_set.hh b/sstables/sstable_set.hh index d537acb9dd..c0f39aacce 100644 --- a/sstables/sstable_set.hh +++ b/sstables/sstable_set.hh @@ -29,6 +29,7 @@ namespace sstables { class sstable_set_impl; +class incremental_selector_impl; class sstable_set { std::unique_ptr _impl; @@ -46,6 +47,21 @@ public: lw_shared_ptr all() const { return _all; } void insert(shared_sstable sst); void erase(shared_sstable sst); + + // Used to incrementally select sstables from sstable set using tokens. + // sstable set must be alive and cannot be modified while incremental + // selector is used. + class incremental_selector { + std::unique_ptr _impl; + mutable stdx::optional> _current_token_range; + mutable std::vector _current_sstables; + public: + ~incremental_selector(); + incremental_selector(std::unique_ptr impl); + incremental_selector(incremental_selector&&) noexcept; + const std::vector& select(const dht::token& t) const; + }; + incremental_selector make_incremental_selector() const; }; }