mirror of
https://github.com/scylladb/scylladb.git
synced 2026-06-08 07:53:20 +00:00
sstables: provide key_reader
In order to get just partition keys stored in the sstable its summary and index are used. Signed-off-by: Paweł Dziepak <pdziepak@scylladb.com>
This commit is contained in:
@@ -629,4 +629,109 @@ sstable::read_range_rows(schema_ptr schema, const query::partition_range& range)
|
||||
*this, std::move(schema), std::move(start), std::move(end));
|
||||
}
|
||||
|
||||
|
||||
class key_reader final : public ::key_reader::impl {
|
||||
schema_ptr _s;
|
||||
shared_sstable _sst;
|
||||
index_list _bucket;
|
||||
int64_t _current_bucket_id;
|
||||
int64_t _end_bucket_id;
|
||||
int64_t _position_in_bucket = 0;
|
||||
int64_t _end_of_bucket = 0;
|
||||
query::partition_range _range;
|
||||
private:
|
||||
dht::decorated_key decorate(const index_entry& ie) {
|
||||
auto pk = partition_key::from_exploded(*_s, ie.get_key().explode(*_s));
|
||||
return dht::global_partitioner().decorate_key(*_s, std::move(pk));
|
||||
}
|
||||
public:
|
||||
key_reader(schema_ptr s, shared_sstable sst, const query::partition_range& range)
|
||||
: _s(s), _sst(std::move(sst)), _range(range)
|
||||
{
|
||||
auto& summary = _sst->_summary;
|
||||
|
||||
_current_bucket_id = -1;
|
||||
if (range.start()) {
|
||||
auto pos = std::lower_bound(summary.entries.begin(), summary.entries.end(),
|
||||
range.start()->value(), index_comparator(*s));
|
||||
_current_bucket_id = std::distance(summary.entries.begin(), pos) - 1;
|
||||
if (_current_bucket_id >= 0) {
|
||||
_current_bucket_id--;
|
||||
}
|
||||
}
|
||||
_end_bucket_id = summary.header.size;
|
||||
if (range.end()) {
|
||||
auto pos = std::upper_bound(summary.entries.begin(), summary.entries.end(),
|
||||
range.end()->value(), index_comparator(*s));
|
||||
_end_bucket_id = std::distance(summary.entries.begin(), pos);
|
||||
if (_end_bucket_id) {
|
||||
_end_bucket_id--;
|
||||
}
|
||||
}
|
||||
}
|
||||
virtual future<dht::decorated_key_opt> operator()() override;
|
||||
};
|
||||
|
||||
future<dht::decorated_key_opt> key_reader::operator()()
|
||||
{
|
||||
if (_position_in_bucket < _end_of_bucket) {
|
||||
auto& ie = _bucket[_position_in_bucket++];
|
||||
return make_ready_future<dht::decorated_key_opt>(decorate(ie));
|
||||
}
|
||||
if (_current_bucket_id == _end_bucket_id) {
|
||||
return make_ready_future<dht::decorated_key_opt>();
|
||||
}
|
||||
if (!_bucket.empty()) {
|
||||
auto dk = decorate(_bucket.back());
|
||||
auto cmp = dht::ring_position_comparator(*_s);
|
||||
// It is possible that previous range_indexes() returned a range that
|
||||
// contains all elements from the current bucket.
|
||||
if (_range.contains(dk, cmp)) {
|
||||
_range = _range.split_after(dk, cmp);
|
||||
}
|
||||
}
|
||||
return _sst->read_indexes(++_current_bucket_id).then([this] (index_list il) mutable {
|
||||
_bucket = std::move(il);
|
||||
|
||||
// FIXME: the following lookups could be done only once if
|
||||
// read_indexes() guaranteed that the returned ranges of keys
|
||||
// never overlap. See #475.
|
||||
if (_range.start()) {
|
||||
index_list::const_iterator pos;
|
||||
if (_range.start()->is_inclusive()) {
|
||||
pos = std::lower_bound(_bucket.begin(), _bucket.end(), _range.start()->value(), index_comparator(*_s));
|
||||
} else {
|
||||
pos = std::upper_bound(_bucket.begin(), _bucket.end(), _range.start()->value(), index_comparator(*_s));
|
||||
}
|
||||
_position_in_bucket = std::distance(_bucket.cbegin(), pos);
|
||||
} else {
|
||||
_position_in_bucket = 0;
|
||||
}
|
||||
|
||||
if (_range.end()) {
|
||||
index_list::const_iterator pos;
|
||||
if (_range.end()->is_inclusive()) {
|
||||
pos = std::upper_bound(_bucket.begin(), _bucket.end(), _range.end()->value(), index_comparator(*_s));
|
||||
} else {
|
||||
pos = std::lower_bound(_bucket.begin(), _bucket.end(), _range.end()->value(), index_comparator(*_s));
|
||||
}
|
||||
if (pos != _bucket.end()) {
|
||||
// Since, read_indexes() may read more than necessary we may
|
||||
// find the end of the range in an earlier bucket.
|
||||
_end_bucket_id = _current_bucket_id;
|
||||
}
|
||||
_end_of_bucket = std::distance(_bucket.cbegin(), pos);
|
||||
} else {
|
||||
_end_of_bucket = _bucket.size();
|
||||
}
|
||||
|
||||
return operator()();
|
||||
});
|
||||
}
|
||||
|
||||
::key_reader make_key_reader(schema_ptr s, shared_sstable sst, const query::partition_range& range)
|
||||
{
|
||||
return ::make_key_reader<key_reader>(std::move(s), std::move(sst), range);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -45,6 +45,8 @@
|
||||
#include "filter.hh"
|
||||
#include "exceptions.hh"
|
||||
#include "mutation_reader.hh"
|
||||
#include "query-request.hh"
|
||||
#include "key_reader.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
@@ -503,11 +505,15 @@ public:
|
||||
// a placeholder to avoid cluttering this class too much. The sstable_test class
|
||||
// will then re-export as public every method it needs.
|
||||
friend class test;
|
||||
|
||||
friend class key_reader;
|
||||
};
|
||||
|
||||
using shared_sstable = lw_shared_ptr<sstable>;
|
||||
using sstable_list = std::map<unsigned long, shared_sstable>;
|
||||
|
||||
::key_reader make_key_reader(schema_ptr s, shared_sstable sst, const query::partition_range& range);
|
||||
|
||||
struct entry_descriptor {
|
||||
sstring ks;
|
||||
sstring cf;
|
||||
@@ -523,4 +529,5 @@ struct entry_descriptor {
|
||||
sstable::component_type component)
|
||||
: ks(ks), cf(cf), version(version), generation(generation), format(format), component(component) {}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user