diff --git a/partition_snapshot_row_cursor.hh b/partition_snapshot_row_cursor.hh index 42ecee9c3f..aaad114136 100644 --- a/partition_snapshot_row_cursor.hh +++ b/partition_snapshot_row_cursor.hh @@ -34,6 +34,8 @@ // When the cursor is invalidated, it still maintains its previous position. It can be brought // back to validity by calling maybe_refresh(), or advance_to(). // +// Insertion of row entries after cursor's position invalidates the cursor. +// class partition_snapshot_row_cursor final { struct position_in_version { mutation_partition::rows_type::iterator it; @@ -101,6 +103,37 @@ public: if (!iterators_valid()) { return advance_to(_position); } + // Refresh latest version's iterator in case there was an insertion + // before it and after cursor's position. There cannot be any + // insertions for non-latest versions, so we don't have to update them. + if (_current_row[0].version_no != 0) { + rows_entry::compare less(_schema); + position_in_partition::equal_compare eq(_schema); + position_in_version::less_compare heap_less(_schema); + auto& rows = _snp.version()->partition().clustered_rows(); + auto it = _iterators[0] = rows.lower_bound(_position, less); + auto heap_i = boost::find_if(_heap, [](auto&& v) { return v.version_no == 0; }); + if (it == rows.end()) { + if (heap_i != _heap.end()) { + _heap.erase(heap_i); + boost::range::make_heap(_heap, heap_less); + } + } else if (eq(_position, it->position())) { + _current_row.insert(_current_row.begin(), position_in_version{it, rows.end(), 0}); + if (heap_i != _heap.end()) { + _heap.erase(heap_i); + boost::range::make_heap(_heap, heap_less); + } + } else { + if (heap_i != _heap.end()) { + heap_i->it = it; + boost::range::make_heap(_heap, heap_less); + } else { + _heap.push_back({it, rows.end(), 0}); + boost::range::push_heap(_heap, heap_less); + } + } + } return true; }