From 757fc1275f24d2b3ec20ca8a58ce201047df2701 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Thu, 23 Sep 2021 23:44:09 +0200 Subject: [PATCH] partition_snapshot_row_cursor: Support reverse iteration --- partition_snapshot_row_cursor.hh | 279 ++++++++++++++++----- test/boost/mvcc_test.cc | 404 +++++++++++++++++++++++++++++++ 2 files changed, 629 insertions(+), 54 deletions(-) diff --git a/partition_snapshot_row_cursor.hh b/partition_snapshot_row_cursor.hh index 41c52bf3c2..f83e25e2bd 100644 --- a/partition_snapshot_row_cursor.hh +++ b/partition_snapshot_row_cursor.hh @@ -111,52 +111,94 @@ class partition_snapshot_row_cursor final { friend class partition_snapshot_row_weakref; struct position_in_version { mutation_partition::rows_type::iterator it; + utils::immutable_collection rows; int version_no; - bool unique_owner; - - struct less_compare { - rows_entry::tri_compare _cmp; - public: - explicit less_compare(const schema& s) : _cmp(s) { } - bool operator()(const position_in_version& a, const position_in_version& b) { - auto res = _cmp(*a.it, *b.it); - return res > 0 || (res == 0 && a.version_no > b.version_no); - } - }; + bool unique_owner = false; + is_continuous continuous = is_continuous::no; // Range continuity in the direction of lower keys (in cursor schema domain). }; - const schema& _schema; + const schema& _schema; // query domain partition_snapshot& _snp; - utils::small_vector _heap; + + // _heap contains iterators which are ahead of the cursor. + // _current_row contains iterators which are directly below the cursor. + utils::small_vector _heap; // query domain order utils::small_vector _current_row; + + // For !_reversed cursors points to the entry which + // is the lower_bound() of the current position in table schema order. + // For _reversed cursors it can be either lower_bound() in table order + // or lower_bound() in cursor's order, so should not be relied upon. + // if current entry is in the latest version then _latest_it points to it, + // also in _reversed mode. std::optional _latest_it; + + // Continuity corresponding to ranges which are not represented in _heap because the cursor + // went pass all the entries in those versions. + bool _background_continuity = false; + bool _continuous{}; bool _dummy{}; const bool _unique_owner; - position_in_partition _position; + const bool _reversed; + position_in_partition _position; // table domain partition_snapshot::change_mark _change_mark; + position_in_partition_view to_table_domain(position_in_partition_view pos) const { + if (_reversed) [[unlikely]] { + return pos.reversed(); + } + return pos; + } + + position_in_partition_view to_query_domain(position_in_partition_view pos) const { + if (_reversed) [[unlikely]] { + return pos.reversed(); + } + return pos; + } + + struct version_heap_less_compare { + rows_entry::tri_compare _cmp; + partition_snapshot_row_cursor& _cur; + public: + explicit version_heap_less_compare(partition_snapshot_row_cursor& cur) + : _cmp(cur._schema) + , _cur(cur) + { } + + bool operator()(const position_in_version& a, const position_in_version& b) { + auto res = _cmp(_cur.to_query_domain(a.it->position()), _cur.to_query_domain(b.it->position())); + return res > 0 || (res == 0 && a.version_no > b.version_no); + } + }; + // Removes the next row from _heap and puts it into _current_row bool recreate_current_row() { _current_row.clear(); + _continuous = _background_continuity; + _dummy = true; if (_heap.empty()) { + if (_reversed) { + _position = position_in_partition::before_all_clustered_rows(); + } else { + _position = position_in_partition::after_all_clustered_rows(); + } return false; } - position_in_version::less_compare heap_less(_schema); - position_in_partition::equal_compare eq(_schema); - _continuous = false; - _dummy = true; + version_heap_less_compare heap_less(*this); + position_in_partition::equal_compare eq(*_snp.schema()); do { boost::range::pop_heap(_heap, heap_less); memory::on_alloc_point(); rows_entry& e = *_heap.back().it; _dummy &= bool(e.dummy()); - _continuous |= bool(e.continuous()); + _continuous |= bool(_heap.back().continuous); _current_row.push_back(_heap.back()); _heap.pop_back(); } while (!_heap.empty() && eq(_current_row[0].it->position(), _heap[0].it->position())); - if (boost::algorithm::any_of(_heap, [] (auto&& v) { return v.it->continuous(); })) { + if (boost::algorithm::any_of(_heap, [] (auto&& v) { return v.continuous; })) { // FIXME: Optimize by dropping dummy() entries. _continuous = true; } @@ -165,12 +207,15 @@ class partition_snapshot_row_cursor final { return true; } + // lower_bound is in the query schema domain void prepare_heap(position_in_partition_view lower_bound) { + lower_bound = to_table_domain(lower_bound); memory::on_alloc_point(); - rows_entry::tri_compare cmp(_schema); - position_in_version::less_compare heap_less(_schema); + rows_entry::tri_compare cmp(*_snp.schema()); + version_heap_less_compare heap_less(*this); _heap.clear(); _latest_it.reset(); + _background_continuity = false; int version_no = 0; bool unique_owner = _unique_owner; bool first = true; @@ -182,7 +227,32 @@ class partition_snapshot_row_cursor final { _latest_it = pos; } if (pos) { - _heap.push_back({pos, version_no, unique_owner}); + is_continuous cont; + if (_reversed) [[unlikely]] { + if (cmp(pos->position(), lower_bound) != 0) { + cont = pos->continuous(); + if (pos != rows.begin()) { + --pos; + } else { + _background_continuity |= bool(cont); + pos = {}; + } + } else { + auto next_entry = std::next(pos); + if (next_entry == rows.end()) { + // Positions past last dummy are complete since mutation sources + // can't contain any keys which are larger. + cont = is_continuous::yes; + } else { + cont = next_entry->continuous(); + } + } + } else { + cont = pos->continuous(); + } + if (pos) [[likely]] { + _heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, cont}); + } } ++version_no; first = false; @@ -198,19 +268,41 @@ class partition_snapshot_row_cursor final { // When throws, the cursor is invalidated and its position is not changed. bool advance(bool keep) { memory::on_alloc_point(); - position_in_version::less_compare heap_less(_schema); + version_heap_less_compare heap_less(*this); assert(iterators_valid()); for (auto&& curr : _current_row) { if (!keep && curr.unique_owner) { mutation_partition::rows_type::key_grabber kg(curr.it); kg.release(current_deleter()); + if (_reversed && curr.it) [[unlikely]] { + if (curr.rows.begin() == curr.it) { + _background_continuity |= bool(curr.it->continuous()); + curr.it = {}; + } else { + curr.continuous = curr.it->continuous(); + --curr.it; + } + } } else { - ++curr.it; - } - if (curr.version_no == 0) { - _latest_it = curr.it; + if (_reversed) [[unlikely]] { + if (curr.rows.begin() == curr.it) { + _background_continuity |= bool(curr.it->continuous()); + curr.it = {}; + } else { + curr.continuous = curr.it->continuous(); + --curr.it; + } + } else { + ++curr.it; + if (curr.it) { + curr.continuous = curr.it->continuous(); + } + } } if (curr.it) { + if (curr.version_no == 0) { + _latest_it = curr.it; + } _heap.push_back(curr); boost::range::push_heap(_heap, heap_less); } @@ -218,16 +310,22 @@ class partition_snapshot_row_cursor final { return recreate_current_row(); } - bool is_in_latest_version() const noexcept { return _current_row[0].version_no == 0; } + bool is_in_latest_version() const noexcept { return at_a_row() && _current_row[0].version_no == 0; } public: - partition_snapshot_row_cursor(const schema& s, partition_snapshot& snp, bool unique_owner = false) + // When reversed is true then the cursor will operate in reversed direction. + // When reversed, s must be a reversed schema relative to snp->schema() + // Positions and fragments accepted and returned by the cursor are from the domain of s. + // Iterators are from the table's schema domain. + partition_snapshot_row_cursor(const schema& s, partition_snapshot& snp, bool unique_owner = false, bool reversed = false) : _schema(s) , _snp(snp) , _unique_owner(unique_owner) + , _reversed(reversed) , _position(position_in_partition::static_row_tag_t{}) { } + // If is_in_latest_version() then this returns an iterator to the entry under cursor in the latest version. mutation_partition::rows_type::iterator get_iterator_in_latest_version() const { assert(_latest_it); return *_latest_it; @@ -246,7 +344,12 @@ public: } // Advances cursor to the first entry with position >= pos, if such entry exists. - // Otherwise returns false and the cursor is left not pointing at a row and invalid. + // If no such entry exists, the cursor is positioned at an extreme position in the direction of + // the cursor (min for reversed cursor, max for forward cursor) and not pointing at a row + // but still valid. + // + // continuous() is always valid after the call, even if not pointing at a row. + // Returns true iff the cursor is pointing at a row after the call. bool maybe_advance_to(position_in_partition_view pos) { prepare_heap(pos); return recreate_current_row(); @@ -264,26 +367,47 @@ public: // Changes to attributes of the current row (e.g. continuity) don't have to be reflected. bool maybe_refresh() { if (!iterators_valid()) { - return advance_to(_position); + auto pos = position_in_partition(position()); // advance_to() modifies position() so copy + return advance_to(pos); } // Refresh latest version's iterator in case there was an insertion // before it and after cursor's position. There cannot be any // insertions for non-latest versions, so we don't have to update them. if (!is_in_latest_version()) { - rows_entry::tri_compare cmp(_schema); - position_in_version::less_compare heap_less(_schema); + rows_entry::tri_compare cmp(*_snp.schema()); + version_heap_less_compare heap_less(*this); auto rows = _snp.version()->partition().clustered_rows(); bool match; auto it = rows.lower_bound(_position, match, cmp); _latest_it = it; auto heap_i = boost::find_if(_heap, [](auto&& v) { return v.version_no == 0; }); + + is_continuous cont = it->continuous(); + if (_reversed) [[unlikely]] { + if (!match) { + // lower_bound() in reverse order points to predecessor of it unless the keys are equal. + if (it == rows.begin()) { + _background_continuity |= bool(it->continuous()); + it = {}; + } else { + cont = it->continuous(); + --it; + } + } else { + // We can put anything in the match case since this continuity will not be used + // when advancing the cursor. + cont = is_continuous::no; + } + } + if (!it) { if (heap_i != _heap.end()) { _heap.erase(heap_i); boost::range::make_heap(_heap, heap_less); } } else if (match) { - _current_row.insert(_current_row.begin(), position_in_version{it, 0}); + _current_row.insert(_current_row.begin(), position_in_version{ + it, std::move(rows), 0, _unique_owner, cont}); if (heap_i != _heap.end()) { _heap.erase(heap_i); boost::range::make_heap(_heap, heap_less); @@ -291,9 +415,11 @@ public: } else { if (heap_i != _heap.end()) { heap_i->it = it; + heap_i->continuous = cont; boost::range::make_heap(_heap, heap_less); } else { - _heap.push_back({it, 0}); + _heap.push_back(position_in_version{ + it, std::move(rows), 0, _unique_owner, cont}); boost::range::push_heap(_heap, heap_less); } } @@ -306,17 +432,17 @@ public: // Assumes that rows are not inserted into the snapshot (static). They can be removed. bool maybe_refresh_static() { if (!iterators_valid()) { - return maybe_advance_to(_position); + return maybe_advance_to(position()); } return true; } // Moves the cursor to the first entry with position >= pos. - // - // The caller must ensure that such entry exists. + // If no such entry exists, the cursor is still moved, although + // it won't be pointing at a row. Still, continuous() will be valid. // // Returns true iff there can't be any clustering row entries - // between lower_bound (inclusive) and the entry to which the cursor + // between lower_bound (inclusive) and the position to which the cursor // was advanced. // // May be called when cursor is not valid. @@ -324,17 +450,26 @@ public: // Must be called under reclaim lock. // When throws, the cursor is invalidated and its position is not changed. bool advance_to(position_in_partition_view lower_bound) { - prepare_heap(lower_bound); - bool found = no_clustering_row_between(_schema, lower_bound, _heap[0].it->position()); - recreate_current_row(); - return found; + maybe_advance_to(lower_bound); + return no_clustering_row_between(_schema, lower_bound, position()); } + // Call only when valid. + // Returns true iff the cursor is pointing at a row. + bool at_a_row() const { return !_current_row.empty(); } + + // Advances to the next row, if any. + // If there is no next row, advances to the extreme position in the direction of the cursor + // (position_in_partition::before_all_clustering_rows() or position_in_partition::after_all_clustering_rows) + // and does not point at a row. continuous() is still valid in this case. + // Call only when valid, not necessarily pointing at a row. bool next() { return advance(true); } bool erase_and_advance() { return advance(false); } // Can be called when cursor is pointing at a row. + // Returns true iff the key range adjacent to the cursor's position from the side of smaller keys + // is marked as continuous. bool continuous() const { return _continuous; } // Can be called when cursor is pointing at a row. @@ -401,9 +536,9 @@ public: // The cursor remains valid after the call and points at the same row as before. ensure_result ensure_entry_in_latest() { auto&& rows = _snp.version()->partition().mutable_clustered_rows(); - auto latest_i = get_iterator_in_latest_version(); - rows_entry& latest = *latest_i; if (is_in_latest_version()) { + auto latest_i = get_iterator_in_latest_version(); + rows_entry& latest = *latest_i; if (_snp.at_latest_version()) { _snp.tracker()->touch(latest); } @@ -411,11 +546,33 @@ public: } else { // Copy row from older version because rows in evictable versions must // hold values which are independently complete to be consistent on eviction. - auto e = alloc_strategy_unique_ptr(current_allocator().construct(_schema, *_current_row[0].it)); - e->set_continuous(latest_i && latest_i->continuous()); - _snp.tracker()->insert(*e); - auto e_i = rows.insert_before(latest_i, std::move(e)); - return ensure_result{*e_i, true}; + auto e = [&] { + if (!at_a_row()) { + return alloc_strategy_unique_ptr( + current_allocator().construct(*_snp.schema(), _position, + is_dummy(!_position.is_clustering_row()), is_continuous::no)); + } else { + return alloc_strategy_unique_ptr( + current_allocator().construct(*_snp.schema(), *_current_row[0].it)); + } + }(); + rows_entry& re = *e; + if (_reversed) { // latest_i is not reliably a successor + // FIXME: set continuity when possible. Not that important since cache sets it anyway when populating. + re.set_continuous(false); + rows_entry::tri_compare cmp(*_snp.schema()); + auto res = rows.insert(std::move(e), cmp); + if (res.second) { + _snp.tracker()->insert(re); + } + return {*res.first, res.second}; + } else { + auto latest_i = get_iterator_in_latest_version(); + e->set_continuous(latest_i && latest_i->continuous()); + rows.insert_before(latest_i, std::move(e)); + _snp.tracker()->insert(re); + return {re, true}; + } } } @@ -428,7 +585,11 @@ public: // Assumes the snapshot is evictable and not populated by means other than ensure_entry_if_complete(). // Subsequent calls to ensure_entry_if_complete() must be given strictly monotonically increasing // positions unless iterators are invalidated across the calls. + // The cursor must not be a reversed-order cursor. std::optional ensure_entry_if_complete(position_in_partition_view pos) { + if (_reversed) { // latest_i is unreliable + throw_with_backtrace("ensure_entry_if_complete() called on reverse cursor"); + } position_in_partition::less_compare less(_schema); if (!iterators_valid() || less(position(), pos)) { auto has_entry = maybe_advance_to(pos); @@ -465,13 +626,23 @@ public: } } - // Can be called when cursor is pointing at a row, even when invalid. - const position_in_partition& position() const { + // Position of the cursor in the cursor schema domain. + // Can be called when cursor is pointing at a row, even when invalid, or when valid. + position_in_partition_view position() const { + return to_query_domain(_position); + } + + // Position of the cursor in the table schema domain. + // Can be called when cursor is pointing at a row, even when invalid, or when valid. + position_in_partition_view table_position() const { return _position; } friend std::ostream& operator<<(std::ostream& out, const partition_snapshot_row_cursor& cur) { out << "{cursor: position=" << cur._position << ", cont=" << cur.continuous() << ", "; + if (cur._reversed) { + out << "reversed, "; + } if (!cur.iterators_valid()) { return out << " iterators invalid}"; } @@ -491,7 +662,7 @@ public: out << ",\n "; } first = false; - out << "{v=" << v.version_no << ", pos=" << v.it->position() << ", cont=" << v.it->continuous() << "}"; + out << "{v=" << v.version_no << ", pos=" << v.it->position() << ", cont=" << v.continuous << "}"; } out << "], latest_iterator=["; if (cur._latest_it) { diff --git a/test/boost/mvcc_test.cc b/test/boost/mvcc_test.cc index 38e6861ed3..e4994bbec1 100644 --- a/test/boost/mvcc_test.cc +++ b/test/boost/mvcc_test.cc @@ -863,6 +863,410 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) { }); } +SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor_reversed) { + return seastar::async([] { + cache_tracker tracker; + auto& r = tracker.region(); + with_allocator(r.allocator(), [&] { + simple_schema table; + auto&& s = *table.schema(); + + auto e = partition_entry::make_evictable(s, mutation_partition(table.schema())); + auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker); + + int ck_0 = 10; + int ck_1 = 9; + int ck_2 = 8; + int ck_3 = 7; + int ck_4 = 6; + int ck_5 = 5; + int ck_6 = 4; + + { + auto&& p1 = snap1->version()->partition(); + p1.clustered_row(s, table.make_ckey(ck_0), is_dummy::no, is_continuous::no); + p1.clustered_row(s, table.make_ckey(ck_1), is_dummy::no, is_continuous::no); + p1.clustered_row(s, table.make_ckey(ck_2), is_dummy::no, is_continuous::no); + p1.clustered_row(s, table.make_ckey(ck_3), is_dummy::no, is_continuous::no); + p1.clustered_row(s, table.make_ckey(ck_6), is_dummy::no, is_continuous::no); + p1.ensure_last_dummy(s); + } + + auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1); + + auto rev_s = s.make_reversed(); + partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true); + position_in_partition::equal_compare eq(s); + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.advance_to(table.make_ckey(ck_0))); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_0))); + BOOST_REQUIRE(cur.continuous()); + } + + r.full_compaction(); + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_0))); + BOOST_REQUIRE(cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_1))); + BOOST_REQUIRE(!cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_2))); + BOOST_REQUIRE(!cur.continuous()); + } + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_2))); + BOOST_REQUIRE(!cur.continuous()); + } + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_2))); + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3))); + BOOST_REQUIRE(!cur.continuous()); + } + + { + auto&& p2 = snap2->version()->partition(); + p2.clustered_row(s, table.make_ckey(ck_4), is_dummy::no, is_continuous::no); + } + + { + logalloc::reclaim_lock rl(r); + + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3))); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_4))); + BOOST_REQUIRE(!cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_6))); + BOOST_REQUIRE(!cur.continuous()); + + BOOST_REQUIRE(!cur.next()); + } + + { + logalloc::reclaim_lock rl(r); + + BOOST_REQUIRE(cur.advance_to(position_in_partition::before_all_clustered_rows())); + BOOST_REQUIRE(cur.continuous()); + BOOST_REQUIRE(cur.next()); + + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_0))); + BOOST_REQUIRE(cur.continuous()); + BOOST_REQUIRE(cur.next()); + + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_1))); + BOOST_REQUIRE(!cur.continuous()); + } + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.advance_to(table.make_ckey(ck_3))); + BOOST_REQUIRE(!cur.continuous()); + } + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3))); + BOOST_REQUIRE(!cur.continuous()); + } + + { + auto&& p2 = snap2->version()->partition(); + p2.clustered_row(s, table.make_ckey(ck_5), is_dummy::no, is_continuous::yes); + } + + { + logalloc::reclaim_lock rl(r); + + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3))); + BOOST_REQUIRE(!cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_4))); + BOOST_REQUIRE(!cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_5))); + BOOST_REQUIRE(!cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_6))); + BOOST_REQUIRE(cur.continuous()); + + BOOST_REQUIRE(!cur.next()); + } + + // Test refresh after eviction + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.advance_to(table.make_ckey(ck_3))); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3))); + } + + e.evict(tracker.cleaner()); + + { + auto&& p2 = snap2->version()->partition(); + p2.clustered_row(s, table.make_ckey(ck_5), is_dummy::no, is_continuous::yes); + } + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3))); + BOOST_REQUIRE(!cur.continuous()); + } + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.advance_to(table.make_ckey(ck_4))); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_4))); + BOOST_REQUIRE(!cur.continuous()); + BOOST_REQUIRE(cur.next()); + } + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_5))); + BOOST_REQUIRE(!cur.continuous()); + } + }); + }); +} + +SEASTAR_TEST_CASE(test_cursor_tracks_continuity_in_reversed_mode) { + return seastar::async([] { + cache_tracker tracker; + auto& r = tracker.region(); + with_allocator(r.allocator(), [&] { + simple_schema table; + auto&& s = *table.schema(); + + auto e = partition_entry::make_evictable(s, mutation_partition(table.schema())); + auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker); + + { + auto&& p1 = snap1->version()->partition(); + p1.clustered_row(s, table.make_ckey(0), is_dummy::no, is_continuous::no); + p1.clustered_row(s, table.make_ckey(4), is_dummy::no, is_continuous::no); + p1.ensure_last_dummy(s); + } + + auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1); + + { + auto&& p2 = snap2->version()->partition(); + p2.clustered_row(s, table.make_ckey(3), is_dummy::no, is_continuous::yes); + p2.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::no); + p2.ensure_last_dummy(s); + } + + auto rev_s = s.make_reversed(); + partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true); + position_in_partition::equal_compare eq(s); + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.advance_to(table.make_ckey(4))); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(4))); + BOOST_REQUIRE(cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(3))); + BOOST_REQUIRE(!cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(0))); + BOOST_REQUIRE(cur.continuous()); + } + + r.full_compaction(); + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(0))); + BOOST_REQUIRE(cur.continuous()); + } + + { + auto&& p2 = snap2->version()->partition(); + p2.clustered_row(s, table.make_ckey(1), is_dummy::no, is_continuous::yes); + } + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.maybe_refresh()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(0))); + BOOST_REQUIRE(cur.continuous()); + + BOOST_REQUIRE(eq(cur.get_iterator_in_latest_version()->position(), table.make_ckey(1))); + + { + auto res = cur.ensure_entry_in_latest(); + BOOST_REQUIRE(res.inserted); + BOOST_REQUIRE(eq(res.row.position(), table.make_ckey(0))); + } + } + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.advance_to(position_in_partition::before_all_clustered_rows())); + BOOST_REQUIRE(eq(cur.table_position(), position_in_partition::after_all_clustered_rows())); + BOOST_REQUIRE(cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(5))); + BOOST_REQUIRE(cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(4))); + BOOST_REQUIRE(cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(3))); + BOOST_REQUIRE(!cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(1))); + BOOST_REQUIRE(cur.continuous()); + + BOOST_REQUIRE(cur.next()); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(0))); + BOOST_REQUIRE(cur.continuous()); + + BOOST_REQUIRE(!cur.next()); + } + }); + }); +} + +SEASTAR_TEST_CASE(test_ensure_entry_in_latest_in_reversed_mode) { + return seastar::async([] { + cache_tracker tracker; + auto& r = tracker.region(); + with_allocator(r.allocator(), [&] { + simple_schema table; + auto&& s = *table.schema(); + + auto e = partition_entry::make_evictable(s, mutation_partition(table.schema())); + auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker); + + { + auto&& p1 = snap1->version()->partition(); + p1.clustered_row(s, table.make_ckey(3), is_dummy::no, is_continuous::no); + p1.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::no); + p1.ensure_last_dummy(s); + } + + auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1); + + { + auto&& p2 = snap2->version()->partition(); + p2.clustered_row(s, table.make_ckey(1), is_dummy::no, is_continuous::yes); + p2.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::no); + p2.ensure_last_dummy(s); + } + + auto rev_s = s.make_reversed(); + partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true); + position_in_partition::equal_compare eq(s); + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.advance_to(table.make_ckey(3))); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(3))); + BOOST_REQUIRE(!cur.continuous()); + + { + auto res = cur.ensure_entry_in_latest(); + BOOST_REQUIRE(res.inserted); + BOOST_REQUIRE(eq(res.row.position(), table.make_ckey(3))); + } + + BOOST_REQUIRE(cur.advance_to(table.make_ckey(3))); + BOOST_REQUIRE(!cur.continuous()); + + { + auto res = cur.ensure_entry_in_latest(); + BOOST_REQUIRE(!res.inserted); + } + } + }); + }); +} + +SEASTAR_TEST_CASE(test_ensure_entry_in_latest_does_not_set_continuity_in_reversed_mode) { + return seastar::async([] { + cache_tracker tracker; + auto& r = tracker.region(); + with_allocator(r.allocator(), [&] { + simple_schema table; + auto&& s = *table.schema(); + + auto e = partition_entry::make_evictable(s, mutation_partition(table.schema())); + auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker); + + { + auto&& p1 = snap1->version()->partition(); + p1.clustered_row(s, table.make_ckey(0), is_dummy::no, is_continuous::no); + p1.clustered_row(s, table.make_ckey(1), is_dummy::no, is_continuous::no); + p1.clustered_row(s, table.make_ckey(2), is_dummy::no, is_continuous::yes); + p1.ensure_last_dummy(s); + } + + auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1); + + { + auto&& p2 = snap2->version()->partition(); + p2.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::no); + p2.ensure_last_dummy(s); + } + + auto rev_s = s.make_reversed(); + partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true); + position_in_partition::equal_compare eq(s); + + { + logalloc::reclaim_lock rl(r); + BOOST_REQUIRE(cur.advance_to(table.make_ckey(2))); + BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(2))); + BOOST_REQUIRE(cur.continuous()); + + { + auto res = cur.ensure_entry_in_latest(); + BOOST_REQUIRE(res.inserted); + BOOST_REQUIRE(eq(res.row.position(), table.make_ckey(2))); + } + + BOOST_REQUIRE(cur.advance_to(table.make_ckey(0))); + // the entry for ckey 2 in latest version should not be marked as continuous. + BOOST_REQUIRE(!cur.continuous()); + } + }); + }); +} + SEASTAR_TEST_CASE(test_apply_is_atomic) { auto do_test = [](auto&& gen) { failure_injecting_allocation_strategy alloc(standard_allocator());