sstable_set: introduce incremental selector

Incrementally select sstables from sstable set using token
in ascending order.
For leveled strategy, it returns all sstables that belong
to current interval. For other strategies, it just return
all sstables from the set.
Useful for compaction which needs all sstables that overlap
with key being currently compacted to calculate maximum
purgeable timestamp.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>
This commit is contained in:
Raphael S. Carvalho
2016-12-07 16:21:30 -02:00
parent 453620a316
commit 02541e15c1
2 changed files with 110 additions and 4 deletions

View File

@@ -59,6 +59,12 @@ namespace sstables {
extern logging::logger logger;
class incremental_selector_impl {
public:
virtual ~incremental_selector_impl() {}
virtual std::pair<nonwrapping_range<dht::token>, std::vector<shared_sstable>> select(const dht::token& token) = 0;
};
class sstable_set_impl {
public:
virtual ~sstable_set_impl() {}
@@ -66,6 +72,7 @@ public:
virtual std::vector<shared_sstable> select(const query::partition_range& range) const = 0;
virtual void insert(shared_sstable sst) = 0;
virtual void erase(shared_sstable sst) = 0;
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const = 0;
};
sstable_set::sstable_set(std::unique_ptr<sstable_set_impl> impl, lw_shared_ptr<sstable_list> all)
@@ -116,6 +123,29 @@ sstable_set::erase(shared_sstable sst) {
sstable_set::~sstable_set() = default;
sstable_set::incremental_selector::incremental_selector(std::unique_ptr<incremental_selector_impl> impl)
: _impl(std::move(impl)) {
}
sstable_set::incremental_selector::~incremental_selector() = default;
sstable_set::incremental_selector::incremental_selector(sstable_set::incremental_selector&&) noexcept = default;
const std::vector<shared_sstable>&
sstable_set::incremental_selector::select(const dht::token& t) const {
if (!_current_token_range || !_current_token_range->contains(t, dht::token_comparator())) {
auto&& x = _impl->select(t);
_current_token_range = std::move(std::get<0>(x));
_current_sstables = std::move(std::get<1>(x));
}
return _current_sstables;
}
sstable_set::incremental_selector
sstable_set::make_incremental_selector() const {
return incremental_selector(_impl->make_incremental_selector());
}
// default sstable_set, not specialized for anything
class bag_sstable_set : public sstable_set_impl {
// erasing is slow, but select() is fast
@@ -124,7 +154,7 @@ public:
virtual std::unique_ptr<sstable_set_impl> clone() const override {
return std::make_unique<bag_sstable_set>(*this);
}
virtual std::vector<shared_sstable> select(const query::partition_range& range) const override {
virtual std::vector<shared_sstable> select(const query::partition_range& range = query::full_partition_range) const override {
return _sstables;
}
virtual void insert(shared_sstable sst) override {
@@ -133,8 +163,25 @@ public:
virtual void erase(shared_sstable sst) override {
_sstables.erase(boost::find(_sstables, sst));
}
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
class incremental_selector;
};
class bag_sstable_set::incremental_selector : public incremental_selector_impl {
const std::vector<shared_sstable>& _sstables;
public:
incremental_selector(const std::vector<shared_sstable>& sstables)
: _sstables(sstables) {
}
virtual std::pair<nonwrapping_range<dht::token>, std::vector<shared_sstable>> select(const dht::token& token) override {
return std::make_pair(nonwrapping_range<dht::token>::make_open_ended_both_sides(), _sstables);
}
};
std::unique_ptr<incremental_selector_impl> bag_sstable_set::make_incremental_selector() const {
return std::make_unique<incremental_selector>(_sstables);
}
// specialized when sstables are partitioned in the token range space
// e.g. leveled compaction strategy
class partitioned_sstable_set : public sstable_set_impl {
@@ -146,10 +193,13 @@ private:
schema_ptr _schema;
interval_map_type _sstables;
private:
interval_type make_interval(const query::partition_range& range) const {
static interval_type make_interval(const schema& s, const query::partition_range& range) {
return interval_type::closed(
compatible_ring_position(*_schema, range.start()->value()),
compatible_ring_position(*_schema, range.end()->value()));
compatible_ring_position(s, range.start()->value()),
compatible_ring_position(s, range.end()->value()));
}
interval_type make_interval(const query::partition_range& range) const {
return make_interval(*_schema, range);
}
interval_type singular(const dht::ring_position& rp) const {
auto crp = compatible_ring_position(*_schema, rp);
@@ -208,8 +258,48 @@ public:
bound(dht::ring_position::ending_at(last)))),
value_set({sst})});
}
virtual std::unique_ptr<incremental_selector_impl> make_incremental_selector() const override;
class incremental_selector;
};
class partitioned_sstable_set::incremental_selector : public incremental_selector_impl {
schema_ptr _schema;
map_iterator _it;
const map_iterator _end;
private:
static nonwrapping_range<dht::token> to_token_range(const interval_type& i) {
return nonwrapping_range<dht::token>::make({i.lower().token(), boost::icl::is_left_closed(i.bounds())},
{i.upper().token(), boost::icl::is_right_closed(i.bounds())});
}
public:
incremental_selector(schema_ptr schema, const interval_map_type& sstables)
: _schema(std::move(schema))
, _it(sstables.begin())
, _end(sstables.end()) {
}
virtual std::pair<nonwrapping_range<dht::token>, std::vector<shared_sstable>> select(const dht::token& token) override {
auto pr = query::partition_range::make(dht::ring_position::starting_at(token), dht::ring_position::ending_at(token));
auto interval = make_interval(*_schema, std::move(pr));
while (_it != _end) {
if (boost::icl::contains(_it->first, interval)) {
return std::make_pair(to_token_range(_it->first), std::vector<shared_sstable>(_it->second.begin(), _it->second.end()));
}
// we don't want to skip current interval if token lies before it.
if (boost::icl::lower_less(interval, _it->first)) {
return std::make_pair(nonwrapping_range<dht::token>::make({token, true}, {_it->first.lower().token(), false}),
std::vector<shared_sstable>());
}
_it++;
}
return std::make_pair(nonwrapping_range<dht::token>::make_open_ended_both_sides(), std::vector<shared_sstable>());
}
};
std::unique_ptr<incremental_selector_impl> partitioned_sstable_set::make_incremental_selector() const {
return std::make_unique<incremental_selector>(_schema, _sstables);
}
class compaction_strategy_impl {
protected:
bool _use_clustering_key_filter = false;

View File

@@ -29,6 +29,7 @@
namespace sstables {
class sstable_set_impl;
class incremental_selector_impl;
class sstable_set {
std::unique_ptr<sstable_set_impl> _impl;
@@ -46,6 +47,21 @@ public:
lw_shared_ptr<sstable_list> all() const { return _all; }
void insert(shared_sstable sst);
void erase(shared_sstable sst);
// Used to incrementally select sstables from sstable set using tokens.
// sstable set must be alive and cannot be modified while incremental
// selector is used.
class incremental_selector {
std::unique_ptr<incremental_selector_impl> _impl;
mutable stdx::optional<nonwrapping_range<dht::token>> _current_token_range;
mutable std::vector<shared_sstable> _current_sstables;
public:
~incremental_selector();
incremental_selector(std::unique_ptr<incremental_selector_impl> impl);
incremental_selector(incremental_selector&&) noexcept;
const std::vector<shared_sstable>& select(const dht::token& t) const;
};
incremental_selector make_incremental_selector() const;
};
}