sstables: index_reader: Change lookup to work on ring_position_view

In preparation for changing the interface to work not only with ranges.
This commit is contained in:
Tomasz Grabiec
2017-02-24 13:34:47 +01:00
parent cd295e9926
commit 02ace99798

View File

@@ -146,33 +146,23 @@ class index_comparator {
public:
index_comparator(const schema& s) : _s(s) {}
int tri_cmp(key_view k2, const dht::ring_position& pos) const {
auto k2_token = dht::global_partitioner().get_token(k2);
if (k2_token == pos.token()) {
if (pos.has_key()) {
return k2.tri_compare(_s, *pos.key());
} else {
return -pos.relation_to_keys();
}
} else {
return k2_token < pos.token() ? -1 : 1;
}
int tri_cmp(key_view k2, dht::ring_position_view pos) const {
return -pos.tri_compare(_s, k2);
}
bool operator()(const summary_entry& e, const dht::ring_position& rp) const {
bool operator()(const summary_entry& e, dht::ring_position_view rp) const {
return tri_cmp(e.get_key(), rp) < 0;
}
bool operator()(const index_entry& e, const dht::ring_position& rp) const {
bool operator()(const index_entry& e, dht::ring_position_view rp) const {
return tri_cmp(e.get_key(), rp) < 0;
}
bool operator()(const dht::ring_position& rp, const summary_entry& e) const {
bool operator()(dht::ring_position_view rp, const summary_entry& e) const {
return tri_cmp(e.get_key(), rp) > 0;
}
bool operator()(const dht::ring_position& rp, const index_entry& e) const {
bool operator()(dht::ring_position_view rp, const index_entry& e) const {
return tri_cmp(e.get_key(), rp) > 0;
}
};
@@ -257,18 +247,16 @@ private:
}
future<uint64_t> start_position(const schema& s, const dht::partition_range& range) {
return range.start() ? (range.start()->is_inclusive()
? lower_bound(s, range.start()->value())
: upper_bound(s, range.start()->value()))
return range.start() ? lower_bound(s, dht::ring_position_view(range.start()->value(),
dht::ring_position_view::after_key(!range.start()->is_inclusive())))
: make_ready_future<uint64_t>(0);
}
future<uint64_t> end_position(const schema& s, const dht::partition_range& range) {
return range.end() ? (range.end()->is_inclusive()
? upper_bound(s, range.end()->value())
: lower_bound(s, range.end()->value()))
return range.end() ? lower_bound(s, dht::ring_position_view(range.end()->value(),
dht::ring_position_view::after_key(range.end()->is_inclusive())))
: make_ready_future<uint64_t>(_sstable->data_size());
};
}
public:
index_reader(shared_sstable sst, const io_priority_class& pc)
: _sstable(std::move(sst))
@@ -281,21 +269,10 @@ public:
});
}
private:
enum class bound_kind { lower, upper };
template<bound_kind bound>
future<uint64_t> find_bound(const schema& s, const dht::ring_position& pos) {
auto do_find_bound = [] (auto begin, auto end, const dht::ring_position& pos, const index_comparator& cmp) {
if (bound == bound_kind::lower) {
return std::lower_bound(begin, end, pos, cmp);
} else {
return std::upper_bound(begin, end, pos, cmp);
}
};
future<uint64_t> lower_bound(const schema& s, dht::ring_position_view pos) {
auto& summary = _sstable->get_summary();
uint64_t summary_idx = std::distance(std::begin(summary.entries),
do_find_bound(summary.entries.begin(), summary.entries.end(), pos, index_comparator(s)));
std::lower_bound(summary.entries.begin(), summary.entries.end(), pos, index_comparator(s)));
if (summary_idx == 0) {
return make_ready_future<uint64_t>(0);
@@ -323,12 +300,12 @@ private:
return make_ready_future<uint64_t>(_reader->_consumer.indexes.front().position());
}
return read_index_entries(summary_idx).then([this, &s, pos, summary_idx, do_find_bound = std::move(do_find_bound)] {
return read_index_entries(summary_idx).then([this, &s, pos, summary_idx] {
if (!_reader) {
return data_end_position(summary_idx);
}
auto& il = _reader->_consumer.indexes;
auto i = do_find_bound(il.begin(), il.end(), pos, index_comparator(s));
auto i = std::lower_bound(il.begin(), il.end(), pos, index_comparator(s));
if (i == il.end()) {
return data_end_position(summary_idx);
}
@@ -336,13 +313,6 @@ private:
});
}
future<uint64_t> lower_bound(const schema& s, const dht::ring_position& pos) {
return find_bound<bound_kind::lower>(s, pos);
}
future<uint64_t> upper_bound(const schema& s, const dht::ring_position& pos) {
return find_bound<bound_kind::upper>(s, pos);
}
future<> close_reader() {
if (_reader) {
return _reader->_context.close();