diff --git a/partition_version.cc b/partition_version.cc index 0a92a14b74..e61acde28f 100644 --- a/partition_version.cc +++ b/partition_version.cc @@ -291,181 +291,3 @@ lw_shared_ptr partition_entry::read(schema_ptr entry_schema) return snp; } } - -partition_snapshot_reader::partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, - lw_shared_ptr snp, - query::clustering_key_filter_ranges crr, logalloc::region& region, - logalloc::allocating_section& read_section, boost::any pointer_to_container) - : streamed_mutation::impl(s, std::move(dk), tomb(*snp)) - , _container_guard(std::move(pointer_to_container)) - , _ck_ranges(std::move(crr)) - , _current_ck_range(_ck_ranges.begin()) - , _ck_range_end(_ck_ranges.end()) - , _cmp(*s) - , _eq(*s) - , _snapshot(snp) - , _range_tombstones(*s) - , _lsa_region(region) - , _read_section(read_section) -{ - for (auto&& v : _snapshot->versions()) { - _range_tombstones.apply(v.partition().row_tombstones()); - } - do_fill_buffer(); -} - -partition_snapshot_reader::~partition_snapshot_reader() -{ - if (!_snapshot.owned()) { - return; - } - // If no one else is using this particular snapshot try to merge partition - // versions. - with_allocator(_lsa_region.allocator(), [this] { - return with_linearized_managed_bytes([this] { - try { - _read_section(_lsa_region, [this] { - _snapshot->merge_partition_versions(); - _snapshot = {}; - }); - } catch (...) { } - }); - }); -} - -tombstone partition_snapshot_reader::tomb(partition_snapshot& snp) -{ - tombstone t; - for (auto& v : snp.versions()) { - t.apply(v.partition().partition_tombstone()); - } - return t; -} - -mutation_fragment_opt partition_snapshot_reader::read_static_row() -{ - _last_entry = position_in_partition(position_in_partition::static_row_tag_t()); - mutation_fragment_opt sr; - for (auto&& v : _snapshot->versions()) { - if (!v.partition().static_row().empty()) { - if (!sr) { - sr = mutation_fragment(static_row(v.partition().static_row())); - } else { - sr->as_static_row().apply(*_schema, v.partition().static_row()); - } - } - } - return sr; -} - -void partition_snapshot_reader::refresh_iterators() -{ - _clustering_rows.clear(); - - if (!_in_ck_range && _current_ck_range == _ck_range_end) { - return; - } - - for (auto&& v : _snapshot->versions()) { - auto cr_end = v.partition().upper_bound(*_schema, *_current_ck_range); - auto cr = [&] () -> mutation_partition::rows_type::const_iterator { - if (_in_ck_range) { - return v.partition().clustered_rows().upper_bound(*_last_entry, _cmp); - } else { - return v.partition().lower_bound(*_schema, *_current_ck_range); - } - }(); - - if (cr != cr_end) { - _clustering_rows.emplace_back(rows_position { cr, cr_end }); - } - } - - _in_ck_range = true; - boost::range::make_heap(_clustering_rows, heap_compare(_cmp)); -} - -void partition_snapshot_reader::pop_clustering_row() -{ - auto& current = _clustering_rows.back(); - current._position = std::next(current._position); - if (current._position == current._end) { - _clustering_rows.pop_back(); - } else { - boost::range::push_heap(_clustering_rows, heap_compare(_cmp)); - } -} - -mutation_fragment_opt partition_snapshot_reader::read_next() -{ - if (!_clustering_rows.empty()) { - auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position); - if (mf) { - return mf; - } - - boost::range::pop_heap(_clustering_rows, heap_compare(_cmp)); - clustering_row result = *_clustering_rows.back()._position; - pop_clustering_row(); - while (!_clustering_rows.empty() && _eq(*_clustering_rows.front()._position, result)) { - boost::range::pop_heap(_clustering_rows, heap_compare(_cmp)); - auto& current = _clustering_rows.back(); - result.apply(*_schema, *current._position); - pop_clustering_row(); - } - _last_entry = result.position(); - return mutation_fragment(std::move(result)); - } - return _range_tombstones.get_next(); -} - -void partition_snapshot_reader::do_fill_buffer() -{ - if (!_last_entry) { - auto mfopt = read_static_row(); - if (mfopt) { - _buffer.emplace_back(std::move(*mfopt)); - } - } - - if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) { - refresh_iterators(); - _reclaim_counter = _lsa_region.reclaim_counter(); - _version_count = _snapshot->version_count(); - } - - while (!is_end_of_stream() && !is_buffer_full()) { - if (_in_ck_range && _clustering_rows.empty()) { - _in_ck_range = false; - _current_ck_range = std::next(_current_ck_range); - refresh_iterators(); - continue; - } - - auto mfopt = read_next(); - if (mfopt) { - _buffer.emplace_back(std::move(*mfopt)); - } else { - _end_of_stream = true; - } - } -} - -future<> partition_snapshot_reader::fill_buffer() -{ - return _read_section(_lsa_region, [&] { - return with_linearized_managed_bytes([&] { - do_fill_buffer(); - return make_ready_future<>(); - }); - }); -} - -streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, - query::clustering_key_filter_ranges crr, - lw_shared_ptr snp, logalloc::region& region, - logalloc::allocating_section& read_section, boost::any pointer_to_container) -{ - return make_streamed_mutation(s, std::move(dk), - snp, std::move(crr), region, read_section, std::move(pointer_to_container)); -} diff --git a/partition_version.hh b/partition_version.hh index 45a3e137e5..2722df6c3b 100644 --- a/partition_version.hh +++ b/partition_version.hh @@ -311,23 +311,173 @@ private: uint64_t _reclaim_counter; unsigned _version_count = 0; private: - void refresh_iterators(); - void pop_clustering_row(); + void refresh_iterators() { + _clustering_rows.clear(); - mutation_fragment_opt read_static_row(); - mutation_fragment_opt read_next(); - void do_fill_buffer(); - static tombstone tomb(partition_snapshot& snp); + if (!_in_ck_range && _current_ck_range == _ck_range_end) { + return; + } + + for (auto&& v : _snapshot->versions()) { + auto cr_end = v.partition().upper_bound(*_schema, *_current_ck_range); + auto cr = [&] () -> mutation_partition::rows_type::const_iterator { + if (_in_ck_range) { + return v.partition().clustered_rows().upper_bound(*_last_entry, _cmp); + } else { + return v.partition().lower_bound(*_schema, *_current_ck_range); + } + }(); + + if (cr != cr_end) { + _clustering_rows.emplace_back(rows_position { cr, cr_end }); + } + } + + _in_ck_range = true; + boost::range::make_heap(_clustering_rows, heap_compare(_cmp)); + } + + void pop_clustering_row() { + auto& current = _clustering_rows.back(); + current._position = std::next(current._position); + if (current._position == current._end) { + _clustering_rows.pop_back(); + } else { + boost::range::push_heap(_clustering_rows, heap_compare(_cmp)); + } + } + + mutation_fragment_opt read_static_row() { + _last_entry = position_in_partition(position_in_partition::static_row_tag_t()); + mutation_fragment_opt sr; + for (auto&& v : _snapshot->versions()) { + if (!v.partition().static_row().empty()) { + if (!sr) { + sr = mutation_fragment(static_row(v.partition().static_row())); + } else { + sr->as_static_row().apply(*_schema, v.partition().static_row()); + } + } + } + return sr; + } + + mutation_fragment_opt read_next() { + if (!_clustering_rows.empty()) { + auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position); + if (mf) { + return mf; + } + + boost::range::pop_heap(_clustering_rows, heap_compare(_cmp)); + clustering_row result = *_clustering_rows.back()._position; + pop_clustering_row(); + while (!_clustering_rows.empty() && _eq(*_clustering_rows.front()._position, result)) { + boost::range::pop_heap(_clustering_rows, heap_compare(_cmp)); + auto& current = _clustering_rows.back(); + result.apply(*_schema, *current._position); + pop_clustering_row(); + } + _last_entry = result.position(); + return mutation_fragment(std::move(result)); + } + return _range_tombstones.get_next(); + } + + void do_fill_buffer() { + if (!_last_entry) { + auto mfopt = read_static_row(); + if (mfopt) { + _buffer.emplace_back(std::move(*mfopt)); + } + } + + if (!_in_ck_range || _lsa_region.reclaim_counter() != _reclaim_counter || _snapshot->version_count() != _version_count) { + refresh_iterators(); + _reclaim_counter = _lsa_region.reclaim_counter(); + _version_count = _snapshot->version_count(); + } + + while (!is_end_of_stream() && !is_buffer_full()) { + if (_in_ck_range && _clustering_rows.empty()) { + _in_ck_range = false; + _current_ck_range = std::next(_current_ck_range); + refresh_iterators(); + continue; + } + + auto mfopt = read_next(); + if (mfopt) { + _buffer.emplace_back(std::move(*mfopt)); + } else { + _end_of_stream = true; + } + } + } + + static tombstone tomb(partition_snapshot& snp) { + tombstone t; + for (auto& v : snp.versions()) { + t.apply(v.partition().partition_tombstone()); + } + return t; + } public: partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, lw_shared_ptr snp, query::clustering_key_filter_ranges crr, logalloc::region& region, logalloc::allocating_section& read_section, - boost::any pointer_to_container); - ~partition_snapshot_reader(); - virtual future<> fill_buffer() override; + boost::any pointer_to_container) + : streamed_mutation::impl(s, std::move(dk), tomb(*snp)) + , _container_guard(std::move(pointer_to_container)) + , _ck_ranges(std::move(crr)) + , _current_ck_range(_ck_ranges.begin()) + , _ck_range_end(_ck_ranges.end()) + , _cmp(*s) + , _eq(*s) + , _snapshot(snp) + , _range_tombstones(*s) + , _lsa_region(region) + , _read_section(read_section) { + for (auto&& v : _snapshot->versions()) { + _range_tombstones.apply(v.partition().row_tombstones()); + } + do_fill_buffer(); + } + + ~partition_snapshot_reader() { + if (!_snapshot.owned()) { + return; + } + // If no one else is using this particular snapshot try to merge partition + // versions. + with_allocator(_lsa_region.allocator(), [this] { + return with_linearized_managed_bytes([this] { + try { + _read_section(_lsa_region, [this] { + _snapshot->merge_partition_versions(); + _snapshot = {}; + }); + } catch (...) { } + }); + }); + } + + virtual future<> fill_buffer() override { + return _read_section(_lsa_region, [&] { + return with_linearized_managed_bytes([&] { + do_fill_buffer(); + return make_ready_future<>(); + }); + }); + } }; -streamed_mutation make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, +inline streamed_mutation +make_partition_snapshot_reader(schema_ptr s, dht::decorated_key dk, query::clustering_key_filter_ranges crr, lw_shared_ptr snp, logalloc::region& region, - logalloc::allocating_section& read_section, boost::any pointer_to_container); + logalloc::allocating_section& read_section, boost::any pointer_to_container) +{ + return make_streamed_mutation(s, std::move(dk), + snp, std::move(crr), region, read_section, std::move(pointer_to_container)); +}