partition_snapshot_reader: Encapsulate row walking to simplify read_next()

This commit is contained in:
Tomasz Grabiec
2017-05-23 17:27:49 +02:00
parent a77734952d
commit d1a1fdfd57

View File

@@ -117,14 +117,27 @@ private:
boost::range::make_heap(_clustering_rows, _heap_cmp);
}
void pop_clustering_row() {
// Valid if has_more_rows()
const rows_entry& pop_clustering_row() {
boost::range::pop_heap(_clustering_rows, _heap_cmp);
auto& current = _clustering_rows.back();
const rows_entry& e = *current._position;
current._position = std::next(current._position);
if (current._position == current._end) {
_clustering_rows.pop_back();
} else {
boost::range::push_heap(_clustering_rows, _heap_cmp);
}
return e;
}
// Valid if has_more_rows()
const rows_entry& peek_row() const {
return *_clustering_rows.front()._position;
}
bool has_more_rows() const {
return !_clustering_rows.empty();
}
mutation_fragment_opt read_static_row() {
@@ -143,20 +156,15 @@ private:
}
mutation_fragment_opt read_next() {
if (!_clustering_rows.empty()) {
auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
if (has_more_rows()) {
auto mf = _range_tombstones.get_next(peek_row());
if (mf) {
return mf;
}
boost::range::pop_heap(_clustering_rows, _heap_cmp);
clustering_row result = *_clustering_rows.back()._position;
pop_clustering_row();
while (!_clustering_rows.empty() && _eq(_clustering_rows.front()._position->key(), result.key())) {
boost::range::pop_heap(_clustering_rows, _heap_cmp);
auto& current = _clustering_rows.back();
result.apply(*_schema, *current._position);
pop_clustering_row();
clustering_row result = pop_clustering_row();
while (has_more_rows() && _eq(peek_row().key(), result.key())) {
result.apply(*_schema, pop_clustering_row());
}
_last_entry = position_in_partition(result.position());
return mutation_fragment(std::move(result));