move partition_snapshot_reader code to header file

This is so we can template it without worrying about declaring the
specializations in the .cc file.

Signed-off-by: Glauber Costa <glauber@scylladb.com>
This commit is contained in:
Glauber Costa
2016-09-13 14:22:38 -04:00
committed by Glauber Costa
parent 86aa0b830d
commit 452eb95943
2 changed files with 161 additions and 189 deletions

View File

@@ -291,181 +291,3 @@ lw_shared_ptr<partition_snapshot> partition_entry::read(schema_ptr entry_schema)
return snp;
}
}
partition_snapshot_reader::partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr, logalloc::region& region,
logalloc::allocating_section& read_section, boost::any pointer_to_container)
: streamed_mutation::impl(s, std::move(dk), tomb(*snp))
, _container_guard(std::move(pointer_to_container))
, _ck_ranges(std::move(crr))
, _current_ck_range(_ck_ranges.begin())
, _ck_range_end(_ck_ranges.end())
, _cmp(*s)
, _eq(*s)
, _snapshot(snp)
, _range_tombstones(*s)
, _lsa_region(region)
, _read_section(read_section)
{
for (auto&& v : _snapshot->versions()) {
_range_tombstones.apply(v.partition().row_tombstones());
}
do_fill_buffer();
}
partition_snapshot_reader::~partition_snapshot_reader()
{
if (!_snapshot.owned()) {
return;
}
// If no one else is using this particular snapshot try to merge partition
// versions.
with_allocator(_lsa_region.allocator(), [this] {
return with_linearized_managed_bytes([this] {
try {
_read_section(_lsa_region, [this] {
_snapshot->merge_partition_versions();
_snapshot = {};
});
} catch (...) { }
});
});
}
tombstone partition_snapshot_reader::tomb(partition_snapshot& snp)
{
tombstone t;
for (auto& v : snp.versions()) {
t.apply(v.partition().partition_tombstone());
}
return t;
}
mutation_fragment_opt partition_snapshot_reader::read_static_row()
{
_last_entry = position_in_partition(position_in_partition::static_row_tag_t());
mutation_fragment_opt sr;
for (auto&& v : _snapshot->versions()) {
if (!v.partition().static_row().empty()) {
if (!sr) {
sr = mutation_fragment(static_row(v.partition().static_row()));
} else {
sr->as_static_row().apply(*_schema, v.partition().static_row());
}
}
}
return sr;
}
void partition_snapshot_reader::refresh_iterators()
{
_clustering_rows.clear();
if (!_in_ck_range && _current_ck_range == _ck_range_end) {
return;
}
for (auto&& v : _snapshot->versions()) {
auto cr_end = v.partition().upper_bound(*_schema, *_current_ck_range);
auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
if (_in_ck_range) {
return v.partition().clustered_rows().upper_bound(*_last_entry, _cmp);
} else {
return v.partition().lower_bound(*_schema, *_current_ck_range);
}
}();
if (cr != cr_end) {
_clustering_rows.emplace_back(rows_position { cr, cr_end });
}
}
_in_ck_range = true;
boost::range::make_heap(_clustering_rows, heap_compare(_cmp));
}
void partition_snapshot_reader::pop_clustering_row()
{
auto& current = _clustering_rows.back();
current._position = std::next(current._position);
if (current._position == current._end) {
_clustering_rows.pop_back();
} else {
boost::range::push_heap(_clustering_rows, heap_compare(_cmp));
}
}
mutation_fragment_opt partition_snapshot_reader::read_next()
{
if (!_clustering_rows.empty()) {
auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
if (mf) {
return mf;
}
boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
clustering_row result = *_clustering_rows.back()._position;
pop_clustering_row();
while (!_clustering_rows.empty() && _eq(*_clustering_rows.front()._position, result)) {
boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
auto& current = _clustering_rows.back();
result.apply(*_schema, *current._position);
pop_clustering_row();
}
_last_entry = result.position();
return mutation_fragment(std::move(result));
}
return _range_tombstones.get_next();
}
void partition_snapshot_reader::do_fill_buffer()
{
if (!_last_entry) {
auto mfopt = read_static_row();
if (mfopt) {
_buffer.emplace_back(std::move(*mfopt));
}
}
if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) {
refresh_iterators();
_reclaim_counter = _lsa_region.reclaim_counter();
_version_count = _snapshot->version_count();
}
while (!is_end_of_stream() && !is_buffer_full()) {
if (_in_ck_range && _clustering_rows.empty()) {
_in_ck_range = false;
_current_ck_range = std::next(_current_ck_range);
refresh_iterators();
continue;
}
auto mfopt = read_next();
if (mfopt) {
_buffer.emplace_back(std::move(*mfopt));
} else {
_end_of_stream = true;
}
}
}
future<> partition_snapshot_reader::fill_buffer()
{
return _read_section(_lsa_region, [&] {
return with_linearized_managed_bytes([&] {
do_fill_buffer();
return make_ready_future<>();
});
});
}
streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
logalloc::allocating_section& read_section, boost::any pointer_to_container)
{
return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
snp, std::move(crr), region, read_section, std::move(pointer_to_container));
}

View File

@@ -311,23 +311,173 @@ private:
uint64_t _reclaim_counter;
unsigned _version_count = 0;
private:
void refresh_iterators();
void pop_clustering_row();
void refresh_iterators() {
_clustering_rows.clear();
mutation_fragment_opt read_static_row();
mutation_fragment_opt read_next();
void do_fill_buffer();
static tombstone tomb(partition_snapshot& snp);
if (!_in_ck_range && _current_ck_range == _ck_range_end) {
return;
}
for (auto&& v : _snapshot->versions()) {
auto cr_end = v.partition().upper_bound(*_schema, *_current_ck_range);
auto cr = [&] () -> mutation_partition::rows_type::const_iterator {
if (_in_ck_range) {
return v.partition().clustered_rows().upper_bound(*_last_entry, _cmp);
} else {
return v.partition().lower_bound(*_schema, *_current_ck_range);
}
}();
if (cr != cr_end) {
_clustering_rows.emplace_back(rows_position { cr, cr_end });
}
}
_in_ck_range = true;
boost::range::make_heap(_clustering_rows, heap_compare(_cmp));
}
void pop_clustering_row() {
auto& current = _clustering_rows.back();
current._position = std::next(current._position);
if (current._position == current._end) {
_clustering_rows.pop_back();
} else {
boost::range::push_heap(_clustering_rows, heap_compare(_cmp));
}
}
mutation_fragment_opt read_static_row() {
_last_entry = position_in_partition(position_in_partition::static_row_tag_t());
mutation_fragment_opt sr;
for (auto&& v : _snapshot->versions()) {
if (!v.partition().static_row().empty()) {
if (!sr) {
sr = mutation_fragment(static_row(v.partition().static_row()));
} else {
sr->as_static_row().apply(*_schema, v.partition().static_row());
}
}
}
return sr;
}
mutation_fragment_opt read_next() {
if (!_clustering_rows.empty()) {
auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
if (mf) {
return mf;
}
boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
clustering_row result = *_clustering_rows.back()._position;
pop_clustering_row();
while (!_clustering_rows.empty() && _eq(*_clustering_rows.front()._position, result)) {
boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
auto& current = _clustering_rows.back();
result.apply(*_schema, *current._position);
pop_clustering_row();
}
_last_entry = result.position();
return mutation_fragment(std::move(result));
}
return _range_tombstones.get_next();
}
void do_fill_buffer() {
if (!_last_entry) {
auto mfopt = read_static_row();
if (mfopt) {
_buffer.emplace_back(std::move(*mfopt));
}
}
if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) {
refresh_iterators();
_reclaim_counter = _lsa_region.reclaim_counter();
_version_count = _snapshot->version_count();
}
while (!is_end_of_stream() && !is_buffer_full()) {
if (_in_ck_range && _clustering_rows.empty()) {
_in_ck_range = false;
_current_ck_range = std::next(_current_ck_range);
refresh_iterators();
continue;
}
auto mfopt = read_next();
if (mfopt) {
_buffer.emplace_back(std::move(*mfopt));
} else {
_end_of_stream = true;
}
}
}
static tombstone tomb(partition_snapshot& snp) {
tombstone t;
for (auto& v : snp.versions()) {
t.apply(v.partition().partition_tombstone());
}
return t;
}
public:
partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr<partition_snapshot> snp,
query::clustering_key_filter_ranges crr,
logalloc::region& region, logalloc::allocating_section& read_section,
boost::any pointer_to_container);
~partition_snapshot_reader();
virtual future<> fill_buffer() override;
boost::any pointer_to_container)
: streamed_mutation::impl(s, std::move(dk), tomb(*snp))
, _container_guard(std::move(pointer_to_container))
, _ck_ranges(std::move(crr))
, _current_ck_range(_ck_ranges.begin())
, _ck_range_end(_ck_ranges.end())
, _cmp(*s)
, _eq(*s)
, _snapshot(snp)
, _range_tombstones(*s)
, _lsa_region(region)
, _read_section(read_section) {
for (auto&& v : _snapshot->versions()) {
_range_tombstones.apply(v.partition().row_tombstones());
}
do_fill_buffer();
}
~partition_snapshot_reader() {
if (!_snapshot.owned()) {
return;
}
// If no one else is using this particular snapshot try to merge partition
// versions.
with_allocator(_lsa_region.allocator(), [this] {
return with_linearized_managed_bytes([this] {
try {
_read_section(_lsa_region, [this] {
_snapshot->merge_partition_versions();
_snapshot = {};
});
} catch (...) { }
});
});
}
virtual future<> fill_buffer() override {
return _read_section(_lsa_region, [&] {
return with_linearized_managed_bytes([&] {
do_fill_buffer();
return make_ready_future<>();
});
});
}
};
streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
inline streamed_mutation
make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk,
query::clustering_key_filter_ranges crr,
lw_shared_ptr<partition_snapshot> snp, logalloc::region& region,
logalloc::allocating_section& read_section, boost::any pointer_to_container);
logalloc::allocating_section& read_section, boost::any pointer_to_container)
{
return make_streamed_mutation<partition_snapshot_reader>(s, std::move(dk),
snp, std::move(crr), region, read_section, std::move(pointer_to_container));
}