diff --git a/sstables/partition.cc b/sstables/partition.cc index e3c0057ef6..46eef9cf82 100644 --- a/sstables/partition.cc +++ b/sstables/partition.cc @@ -629,4 +629,109 @@ sstable::read_range_rows(schema_ptr schema, const query::partition_range& range) *this, std::move(schema), std::move(start), std::move(end)); } + +class key_reader final : public ::key_reader::impl { + schema_ptr _s; + shared_sstable _sst; + index_list _bucket; + int64_t _current_bucket_id; + int64_t _end_bucket_id; + int64_t _position_in_bucket = 0; + int64_t _end_of_bucket = 0; + query::partition_range _range; +private: + dht::decorated_key decorate(const index_entry& ie) { + auto pk = partition_key::from_exploded(*_s, ie.get_key().explode(*_s)); + return dht::global_partitioner().decorate_key(*_s, std::move(pk)); + } +public: + key_reader(schema_ptr s, shared_sstable sst, const query::partition_range& range) + : _s(s), _sst(std::move(sst)), _range(range) + { + auto& summary = _sst->_summary; + + _current_bucket_id = -1; + if (range.start()) { + auto pos = std::lower_bound(summary.entries.begin(), summary.entries.end(), + range.start()->value(), index_comparator(*s)); + _current_bucket_id = std::distance(summary.entries.begin(), pos) - 1; + if (_current_bucket_id >= 0) { + _current_bucket_id--; + } + } + _end_bucket_id = summary.header.size; + if (range.end()) { + auto pos = std::upper_bound(summary.entries.begin(), summary.entries.end(), + range.end()->value(), index_comparator(*s)); + _end_bucket_id = std::distance(summary.entries.begin(), pos); + if (_end_bucket_id) { + _end_bucket_id--; + } + } + } + virtual future operator()() override; +}; + +future key_reader::operator()() +{ + if (_position_in_bucket < _end_of_bucket) { + auto& ie = _bucket[_position_in_bucket++]; + return make_ready_future(decorate(ie)); + } + if (_current_bucket_id == _end_bucket_id) { + return make_ready_future(); + } + if (!_bucket.empty()) { + auto dk = decorate(_bucket.back()); + auto cmp = dht::ring_position_comparator(*_s); + // It is possible that previous range_indexes() returned a range that + // contains all elements from the current bucket. + if (_range.contains(dk, cmp)) { + _range = _range.split_after(dk, cmp); + } + } + return _sst->read_indexes(++_current_bucket_id).then([this] (index_list il) mutable { + _bucket = std::move(il); + + // FIXME: the following lookups could be done only once if + // read_indexes() guaranteed that the returned ranges of keys + // never overlap. See #475. + if (_range.start()) { + index_list::const_iterator pos; + if (_range.start()->is_inclusive()) { + pos = std::lower_bound(_bucket.begin(), _bucket.end(), _range.start()->value(), index_comparator(*_s)); + } else { + pos = std::upper_bound(_bucket.begin(), _bucket.end(), _range.start()->value(), index_comparator(*_s)); + } + _position_in_bucket = std::distance(_bucket.cbegin(), pos); + } else { + _position_in_bucket = 0; + } + + if (_range.end()) { + index_list::const_iterator pos; + if (_range.end()->is_inclusive()) { + pos = std::upper_bound(_bucket.begin(), _bucket.end(), _range.end()->value(), index_comparator(*_s)); + } else { + pos = std::lower_bound(_bucket.begin(), _bucket.end(), _range.end()->value(), index_comparator(*_s)); + } + if (pos != _bucket.end()) { + // Since, read_indexes() may read more than necessary we may + // find the end of the range in an earlier bucket. + _end_bucket_id = _current_bucket_id; + } + _end_of_bucket = std::distance(_bucket.cbegin(), pos); + } else { + _end_of_bucket = _bucket.size(); + } + + return operator()(); + }); +} + +::key_reader make_key_reader(schema_ptr s, shared_sstable sst, const query::partition_range& range) +{ + return ::make_key_reader(std::move(s), std::move(sst), range); +} + } diff --git a/sstables/sstables.hh b/sstables/sstables.hh index 3bd84990c7..c5009a9b08 100644 --- a/sstables/sstables.hh +++ b/sstables/sstables.hh @@ -45,6 +45,8 @@ #include "filter.hh" #include "exceptions.hh" #include "mutation_reader.hh" +#include "query-request.hh" +#include "key_reader.hh" namespace sstables { @@ -503,11 +505,15 @@ public: // a placeholder to avoid cluttering this class too much. The sstable_test class // will then re-export as public every method it needs. friend class test; + + friend class key_reader; }; using shared_sstable = lw_shared_ptr; using sstable_list = std::map; +::key_reader make_key_reader(schema_ptr s, shared_sstable sst, const query::partition_range& range); + struct entry_descriptor { sstring ks; sstring cf; @@ -523,4 +529,5 @@ struct entry_descriptor { sstable::component_type component) : ks(ks), cf(cf), version(version), generation(generation), format(format), component(component) {} }; + }