mirror of
https://github.com/scylladb/scylladb.git
synced 2026-05-31 20:16:43 +00:00
partition_snapshot_row_cursor: Support reverse iteration
This commit is contained in:
@@ -111,52 +111,94 @@ class partition_snapshot_row_cursor final {
|
||||
friend class partition_snapshot_row_weakref;
|
||||
struct position_in_version {
|
||||
mutation_partition::rows_type::iterator it;
|
||||
utils::immutable_collection<mutation_partition::rows_type> rows;
|
||||
int version_no;
|
||||
bool unique_owner;
|
||||
|
||||
struct less_compare {
|
||||
rows_entry::tri_compare _cmp;
|
||||
public:
|
||||
explicit less_compare(const schema& s) : _cmp(s) { }
|
||||
bool operator()(const position_in_version& a, const position_in_version& b) {
|
||||
auto res = _cmp(*a.it, *b.it);
|
||||
return res > 0 || (res == 0 && a.version_no > b.version_no);
|
||||
}
|
||||
};
|
||||
bool unique_owner = false;
|
||||
is_continuous continuous = is_continuous::no; // Range continuity in the direction of lower keys (in cursor schema domain).
|
||||
};
|
||||
|
||||
const schema& _schema;
|
||||
const schema& _schema; // query domain
|
||||
partition_snapshot& _snp;
|
||||
utils::small_vector<position_in_version, 2> _heap;
|
||||
|
||||
// _heap contains iterators which are ahead of the cursor.
|
||||
// _current_row contains iterators which are directly below the cursor.
|
||||
utils::small_vector<position_in_version, 2> _heap; // query domain order
|
||||
utils::small_vector<position_in_version, 2> _current_row;
|
||||
|
||||
// For !_reversed cursors points to the entry which
|
||||
// is the lower_bound() of the current position in table schema order.
|
||||
// For _reversed cursors it can be either lower_bound() in table order
|
||||
// or lower_bound() in cursor's order, so should not be relied upon.
|
||||
// if current entry is in the latest version then _latest_it points to it,
|
||||
// also in _reversed mode.
|
||||
std::optional<mutation_partition::rows_type::iterator> _latest_it;
|
||||
|
||||
// Continuity corresponding to ranges which are not represented in _heap because the cursor
|
||||
// went pass all the entries in those versions.
|
||||
bool _background_continuity = false;
|
||||
|
||||
bool _continuous{};
|
||||
bool _dummy{};
|
||||
const bool _unique_owner;
|
||||
position_in_partition _position;
|
||||
const bool _reversed;
|
||||
position_in_partition _position; // table domain
|
||||
partition_snapshot::change_mark _change_mark;
|
||||
|
||||
position_in_partition_view to_table_domain(position_in_partition_view pos) const {
|
||||
if (_reversed) [[unlikely]] {
|
||||
return pos.reversed();
|
||||
}
|
||||
return pos;
|
||||
}
|
||||
|
||||
position_in_partition_view to_query_domain(position_in_partition_view pos) const {
|
||||
if (_reversed) [[unlikely]] {
|
||||
return pos.reversed();
|
||||
}
|
||||
return pos;
|
||||
}
|
||||
|
||||
struct version_heap_less_compare {
|
||||
rows_entry::tri_compare _cmp;
|
||||
partition_snapshot_row_cursor& _cur;
|
||||
public:
|
||||
explicit version_heap_less_compare(partition_snapshot_row_cursor& cur)
|
||||
: _cmp(cur._schema)
|
||||
, _cur(cur)
|
||||
{ }
|
||||
|
||||
bool operator()(const position_in_version& a, const position_in_version& b) {
|
||||
auto res = _cmp(_cur.to_query_domain(a.it->position()), _cur.to_query_domain(b.it->position()));
|
||||
return res > 0 || (res == 0 && a.version_no > b.version_no);
|
||||
}
|
||||
};
|
||||
|
||||
// Removes the next row from _heap and puts it into _current_row
|
||||
bool recreate_current_row() {
|
||||
_current_row.clear();
|
||||
_continuous = _background_continuity;
|
||||
_dummy = true;
|
||||
if (_heap.empty()) {
|
||||
if (_reversed) {
|
||||
_position = position_in_partition::before_all_clustered_rows();
|
||||
} else {
|
||||
_position = position_in_partition::after_all_clustered_rows();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
position_in_version::less_compare heap_less(_schema);
|
||||
position_in_partition::equal_compare eq(_schema);
|
||||
_continuous = false;
|
||||
_dummy = true;
|
||||
version_heap_less_compare heap_less(*this);
|
||||
position_in_partition::equal_compare eq(*_snp.schema());
|
||||
do {
|
||||
boost::range::pop_heap(_heap, heap_less);
|
||||
memory::on_alloc_point();
|
||||
rows_entry& e = *_heap.back().it;
|
||||
_dummy &= bool(e.dummy());
|
||||
_continuous |= bool(e.continuous());
|
||||
_continuous |= bool(_heap.back().continuous);
|
||||
_current_row.push_back(_heap.back());
|
||||
_heap.pop_back();
|
||||
} while (!_heap.empty() && eq(_current_row[0].it->position(), _heap[0].it->position()));
|
||||
|
||||
if (boost::algorithm::any_of(_heap, [] (auto&& v) { return v.it->continuous(); })) {
|
||||
if (boost::algorithm::any_of(_heap, [] (auto&& v) { return v.continuous; })) {
|
||||
// FIXME: Optimize by dropping dummy() entries.
|
||||
_continuous = true;
|
||||
}
|
||||
@@ -165,12 +207,15 @@ class partition_snapshot_row_cursor final {
|
||||
return true;
|
||||
}
|
||||
|
||||
// lower_bound is in the query schema domain
|
||||
void prepare_heap(position_in_partition_view lower_bound) {
|
||||
lower_bound = to_table_domain(lower_bound);
|
||||
memory::on_alloc_point();
|
||||
rows_entry::tri_compare cmp(_schema);
|
||||
position_in_version::less_compare heap_less(_schema);
|
||||
rows_entry::tri_compare cmp(*_snp.schema());
|
||||
version_heap_less_compare heap_less(*this);
|
||||
_heap.clear();
|
||||
_latest_it.reset();
|
||||
_background_continuity = false;
|
||||
int version_no = 0;
|
||||
bool unique_owner = _unique_owner;
|
||||
bool first = true;
|
||||
@@ -182,7 +227,32 @@ class partition_snapshot_row_cursor final {
|
||||
_latest_it = pos;
|
||||
}
|
||||
if (pos) {
|
||||
_heap.push_back({pos, version_no, unique_owner});
|
||||
is_continuous cont;
|
||||
if (_reversed) [[unlikely]] {
|
||||
if (cmp(pos->position(), lower_bound) != 0) {
|
||||
cont = pos->continuous();
|
||||
if (pos != rows.begin()) {
|
||||
--pos;
|
||||
} else {
|
||||
_background_continuity |= bool(cont);
|
||||
pos = {};
|
||||
}
|
||||
} else {
|
||||
auto next_entry = std::next(pos);
|
||||
if (next_entry == rows.end()) {
|
||||
// Positions past last dummy are complete since mutation sources
|
||||
// can't contain any keys which are larger.
|
||||
cont = is_continuous::yes;
|
||||
} else {
|
||||
cont = next_entry->continuous();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cont = pos->continuous();
|
||||
}
|
||||
if (pos) [[likely]] {
|
||||
_heap.emplace_back(position_in_version{pos, std::move(rows), version_no, unique_owner, cont});
|
||||
}
|
||||
}
|
||||
++version_no;
|
||||
first = false;
|
||||
@@ -198,19 +268,41 @@ class partition_snapshot_row_cursor final {
|
||||
// When throws, the cursor is invalidated and its position is not changed.
|
||||
bool advance(bool keep) {
|
||||
memory::on_alloc_point();
|
||||
position_in_version::less_compare heap_less(_schema);
|
||||
version_heap_less_compare heap_less(*this);
|
||||
assert(iterators_valid());
|
||||
for (auto&& curr : _current_row) {
|
||||
if (!keep && curr.unique_owner) {
|
||||
mutation_partition::rows_type::key_grabber kg(curr.it);
|
||||
kg.release(current_deleter<rows_entry>());
|
||||
if (_reversed && curr.it) [[unlikely]] {
|
||||
if (curr.rows.begin() == curr.it) {
|
||||
_background_continuity |= bool(curr.it->continuous());
|
||||
curr.it = {};
|
||||
} else {
|
||||
curr.continuous = curr.it->continuous();
|
||||
--curr.it;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
++curr.it;
|
||||
}
|
||||
if (curr.version_no == 0) {
|
||||
_latest_it = curr.it;
|
||||
if (_reversed) [[unlikely]] {
|
||||
if (curr.rows.begin() == curr.it) {
|
||||
_background_continuity |= bool(curr.it->continuous());
|
||||
curr.it = {};
|
||||
} else {
|
||||
curr.continuous = curr.it->continuous();
|
||||
--curr.it;
|
||||
}
|
||||
} else {
|
||||
++curr.it;
|
||||
if (curr.it) {
|
||||
curr.continuous = curr.it->continuous();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (curr.it) {
|
||||
if (curr.version_no == 0) {
|
||||
_latest_it = curr.it;
|
||||
}
|
||||
_heap.push_back(curr);
|
||||
boost::range::push_heap(_heap, heap_less);
|
||||
}
|
||||
@@ -218,16 +310,22 @@ class partition_snapshot_row_cursor final {
|
||||
return recreate_current_row();
|
||||
}
|
||||
|
||||
bool is_in_latest_version() const noexcept { return _current_row[0].version_no == 0; }
|
||||
bool is_in_latest_version() const noexcept { return at_a_row() && _current_row[0].version_no == 0; }
|
||||
|
||||
public:
|
||||
partition_snapshot_row_cursor(const schema& s, partition_snapshot& snp, bool unique_owner = false)
|
||||
// When reversed is true then the cursor will operate in reversed direction.
|
||||
// When reversed, s must be a reversed schema relative to snp->schema()
|
||||
// Positions and fragments accepted and returned by the cursor are from the domain of s.
|
||||
// Iterators are from the table's schema domain.
|
||||
partition_snapshot_row_cursor(const schema& s, partition_snapshot& snp, bool unique_owner = false, bool reversed = false)
|
||||
: _schema(s)
|
||||
, _snp(snp)
|
||||
, _unique_owner(unique_owner)
|
||||
, _reversed(reversed)
|
||||
, _position(position_in_partition::static_row_tag_t{})
|
||||
{ }
|
||||
|
||||
// If is_in_latest_version() then this returns an iterator to the entry under cursor in the latest version.
|
||||
mutation_partition::rows_type::iterator get_iterator_in_latest_version() const {
|
||||
assert(_latest_it);
|
||||
return *_latest_it;
|
||||
@@ -246,7 +344,12 @@ public:
|
||||
}
|
||||
|
||||
// Advances cursor to the first entry with position >= pos, if such entry exists.
|
||||
// Otherwise returns false and the cursor is left not pointing at a row and invalid.
|
||||
// If no such entry exists, the cursor is positioned at an extreme position in the direction of
|
||||
// the cursor (min for reversed cursor, max for forward cursor) and not pointing at a row
|
||||
// but still valid.
|
||||
//
|
||||
// continuous() is always valid after the call, even if not pointing at a row.
|
||||
// Returns true iff the cursor is pointing at a row after the call.
|
||||
bool maybe_advance_to(position_in_partition_view pos) {
|
||||
prepare_heap(pos);
|
||||
return recreate_current_row();
|
||||
@@ -264,26 +367,47 @@ public:
|
||||
// Changes to attributes of the current row (e.g. continuity) don't have to be reflected.
|
||||
bool maybe_refresh() {
|
||||
if (!iterators_valid()) {
|
||||
return advance_to(_position);
|
||||
auto pos = position_in_partition(position()); // advance_to() modifies position() so copy
|
||||
return advance_to(pos);
|
||||
}
|
||||
// Refresh latest version's iterator in case there was an insertion
|
||||
// before it and after cursor's position. There cannot be any
|
||||
// insertions for non-latest versions, so we don't have to update them.
|
||||
if (!is_in_latest_version()) {
|
||||
rows_entry::tri_compare cmp(_schema);
|
||||
position_in_version::less_compare heap_less(_schema);
|
||||
rows_entry::tri_compare cmp(*_snp.schema());
|
||||
version_heap_less_compare heap_less(*this);
|
||||
auto rows = _snp.version()->partition().clustered_rows();
|
||||
bool match;
|
||||
auto it = rows.lower_bound(_position, match, cmp);
|
||||
_latest_it = it;
|
||||
auto heap_i = boost::find_if(_heap, [](auto&& v) { return v.version_no == 0; });
|
||||
|
||||
is_continuous cont = it->continuous();
|
||||
if (_reversed) [[unlikely]] {
|
||||
if (!match) {
|
||||
// lower_bound() in reverse order points to predecessor of it unless the keys are equal.
|
||||
if (it == rows.begin()) {
|
||||
_background_continuity |= bool(it->continuous());
|
||||
it = {};
|
||||
} else {
|
||||
cont = it->continuous();
|
||||
--it;
|
||||
}
|
||||
} else {
|
||||
// We can put anything in the match case since this continuity will not be used
|
||||
// when advancing the cursor.
|
||||
cont = is_continuous::no;
|
||||
}
|
||||
}
|
||||
|
||||
if (!it) {
|
||||
if (heap_i != _heap.end()) {
|
||||
_heap.erase(heap_i);
|
||||
boost::range::make_heap(_heap, heap_less);
|
||||
}
|
||||
} else if (match) {
|
||||
_current_row.insert(_current_row.begin(), position_in_version{it, 0});
|
||||
_current_row.insert(_current_row.begin(), position_in_version{
|
||||
it, std::move(rows), 0, _unique_owner, cont});
|
||||
if (heap_i != _heap.end()) {
|
||||
_heap.erase(heap_i);
|
||||
boost::range::make_heap(_heap, heap_less);
|
||||
@@ -291,9 +415,11 @@ public:
|
||||
} else {
|
||||
if (heap_i != _heap.end()) {
|
||||
heap_i->it = it;
|
||||
heap_i->continuous = cont;
|
||||
boost::range::make_heap(_heap, heap_less);
|
||||
} else {
|
||||
_heap.push_back({it, 0});
|
||||
_heap.push_back(position_in_version{
|
||||
it, std::move(rows), 0, _unique_owner, cont});
|
||||
boost::range::push_heap(_heap, heap_less);
|
||||
}
|
||||
}
|
||||
@@ -306,17 +432,17 @@ public:
|
||||
// Assumes that rows are not inserted into the snapshot (static). They can be removed.
|
||||
bool maybe_refresh_static() {
|
||||
if (!iterators_valid()) {
|
||||
return maybe_advance_to(_position);
|
||||
return maybe_advance_to(position());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Moves the cursor to the first entry with position >= pos.
|
||||
//
|
||||
// The caller must ensure that such entry exists.
|
||||
// If no such entry exists, the cursor is still moved, although
|
||||
// it won't be pointing at a row. Still, continuous() will be valid.
|
||||
//
|
||||
// Returns true iff there can't be any clustering row entries
|
||||
// between lower_bound (inclusive) and the entry to which the cursor
|
||||
// between lower_bound (inclusive) and the position to which the cursor
|
||||
// was advanced.
|
||||
//
|
||||
// May be called when cursor is not valid.
|
||||
@@ -324,17 +450,26 @@ public:
|
||||
// Must be called under reclaim lock.
|
||||
// When throws, the cursor is invalidated and its position is not changed.
|
||||
bool advance_to(position_in_partition_view lower_bound) {
|
||||
prepare_heap(lower_bound);
|
||||
bool found = no_clustering_row_between(_schema, lower_bound, _heap[0].it->position());
|
||||
recreate_current_row();
|
||||
return found;
|
||||
maybe_advance_to(lower_bound);
|
||||
return no_clustering_row_between(_schema, lower_bound, position());
|
||||
}
|
||||
|
||||
// Call only when valid.
|
||||
// Returns true iff the cursor is pointing at a row.
|
||||
bool at_a_row() const { return !_current_row.empty(); }
|
||||
|
||||
// Advances to the next row, if any.
|
||||
// If there is no next row, advances to the extreme position in the direction of the cursor
|
||||
// (position_in_partition::before_all_clustering_rows() or position_in_partition::after_all_clustering_rows)
|
||||
// and does not point at a row. continuous() is still valid in this case.
|
||||
// Call only when valid, not necessarily pointing at a row.
|
||||
bool next() { return advance(true); }
|
||||
|
||||
bool erase_and_advance() { return advance(false); }
|
||||
|
||||
// Can be called when cursor is pointing at a row.
|
||||
// Returns true iff the key range adjacent to the cursor's position from the side of smaller keys
|
||||
// is marked as continuous.
|
||||
bool continuous() const { return _continuous; }
|
||||
|
||||
// Can be called when cursor is pointing at a row.
|
||||
@@ -401,9 +536,9 @@ public:
|
||||
// The cursor remains valid after the call and points at the same row as before.
|
||||
ensure_result ensure_entry_in_latest() {
|
||||
auto&& rows = _snp.version()->partition().mutable_clustered_rows();
|
||||
auto latest_i = get_iterator_in_latest_version();
|
||||
rows_entry& latest = *latest_i;
|
||||
if (is_in_latest_version()) {
|
||||
auto latest_i = get_iterator_in_latest_version();
|
||||
rows_entry& latest = *latest_i;
|
||||
if (_snp.at_latest_version()) {
|
||||
_snp.tracker()->touch(latest);
|
||||
}
|
||||
@@ -411,11 +546,33 @@ public:
|
||||
} else {
|
||||
// Copy row from older version because rows in evictable versions must
|
||||
// hold values which are independently complete to be consistent on eviction.
|
||||
auto e = alloc_strategy_unique_ptr<rows_entry>(current_allocator().construct<rows_entry>(_schema, *_current_row[0].it));
|
||||
e->set_continuous(latest_i && latest_i->continuous());
|
||||
_snp.tracker()->insert(*e);
|
||||
auto e_i = rows.insert_before(latest_i, std::move(e));
|
||||
return ensure_result{*e_i, true};
|
||||
auto e = [&] {
|
||||
if (!at_a_row()) {
|
||||
return alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(*_snp.schema(), _position,
|
||||
is_dummy(!_position.is_clustering_row()), is_continuous::no));
|
||||
} else {
|
||||
return alloc_strategy_unique_ptr<rows_entry>(
|
||||
current_allocator().construct<rows_entry>(*_snp.schema(), *_current_row[0].it));
|
||||
}
|
||||
}();
|
||||
rows_entry& re = *e;
|
||||
if (_reversed) { // latest_i is not reliably a successor
|
||||
// FIXME: set continuity when possible. Not that important since cache sets it anyway when populating.
|
||||
re.set_continuous(false);
|
||||
rows_entry::tri_compare cmp(*_snp.schema());
|
||||
auto res = rows.insert(std::move(e), cmp);
|
||||
if (res.second) {
|
||||
_snp.tracker()->insert(re);
|
||||
}
|
||||
return {*res.first, res.second};
|
||||
} else {
|
||||
auto latest_i = get_iterator_in_latest_version();
|
||||
e->set_continuous(latest_i && latest_i->continuous());
|
||||
rows.insert_before(latest_i, std::move(e));
|
||||
_snp.tracker()->insert(re);
|
||||
return {re, true};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -428,7 +585,11 @@ public:
|
||||
// Assumes the snapshot is evictable and not populated by means other than ensure_entry_if_complete().
|
||||
// Subsequent calls to ensure_entry_if_complete() must be given strictly monotonically increasing
|
||||
// positions unless iterators are invalidated across the calls.
|
||||
// The cursor must not be a reversed-order cursor.
|
||||
std::optional<ensure_result> ensure_entry_if_complete(position_in_partition_view pos) {
|
||||
if (_reversed) { // latest_i is unreliable
|
||||
throw_with_backtrace<std::logic_error>("ensure_entry_if_complete() called on reverse cursor");
|
||||
}
|
||||
position_in_partition::less_compare less(_schema);
|
||||
if (!iterators_valid() || less(position(), pos)) {
|
||||
auto has_entry = maybe_advance_to(pos);
|
||||
@@ -465,13 +626,23 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
// Can be called when cursor is pointing at a row, even when invalid.
|
||||
const position_in_partition& position() const {
|
||||
// Position of the cursor in the cursor schema domain.
|
||||
// Can be called when cursor is pointing at a row, even when invalid, or when valid.
|
||||
position_in_partition_view position() const {
|
||||
return to_query_domain(_position);
|
||||
}
|
||||
|
||||
// Position of the cursor in the table schema domain.
|
||||
// Can be called when cursor is pointing at a row, even when invalid, or when valid.
|
||||
position_in_partition_view table_position() const {
|
||||
return _position;
|
||||
}
|
||||
|
||||
friend std::ostream& operator<<(std::ostream& out, const partition_snapshot_row_cursor& cur) {
|
||||
out << "{cursor: position=" << cur._position << ", cont=" << cur.continuous() << ", ";
|
||||
if (cur._reversed) {
|
||||
out << "reversed, ";
|
||||
}
|
||||
if (!cur.iterators_valid()) {
|
||||
return out << " iterators invalid}";
|
||||
}
|
||||
@@ -491,7 +662,7 @@ public:
|
||||
out << ",\n ";
|
||||
}
|
||||
first = false;
|
||||
out << "{v=" << v.version_no << ", pos=" << v.it->position() << ", cont=" << v.it->continuous() << "}";
|
||||
out << "{v=" << v.version_no << ", pos=" << v.it->position() << ", cont=" << v.continuous << "}";
|
||||
}
|
||||
out << "], latest_iterator=[";
|
||||
if (cur._latest_it) {
|
||||
|
||||
@@ -863,6 +863,410 @@ SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_partition_snapshot_row_cursor_reversed) {
|
||||
return seastar::async([] {
|
||||
cache_tracker tracker;
|
||||
auto& r = tracker.region();
|
||||
with_allocator(r.allocator(), [&] {
|
||||
simple_schema table;
|
||||
auto&& s = *table.schema();
|
||||
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
|
||||
|
||||
int ck_0 = 10;
|
||||
int ck_1 = 9;
|
||||
int ck_2 = 8;
|
||||
int ck_3 = 7;
|
||||
int ck_4 = 6;
|
||||
int ck_5 = 5;
|
||||
int ck_6 = 4;
|
||||
|
||||
{
|
||||
auto&& p1 = snap1->version()->partition();
|
||||
p1.clustered_row(s, table.make_ckey(ck_0), is_dummy::no, is_continuous::no);
|
||||
p1.clustered_row(s, table.make_ckey(ck_1), is_dummy::no, is_continuous::no);
|
||||
p1.clustered_row(s, table.make_ckey(ck_2), is_dummy::no, is_continuous::no);
|
||||
p1.clustered_row(s, table.make_ckey(ck_3), is_dummy::no, is_continuous::no);
|
||||
p1.clustered_row(s, table.make_ckey(ck_6), is_dummy::no, is_continuous::no);
|
||||
p1.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
|
||||
|
||||
auto rev_s = s.make_reversed();
|
||||
partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true);
|
||||
position_in_partition::equal_compare eq(s);
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.advance_to(table.make_ckey(ck_0)));
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_0)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
}
|
||||
|
||||
r.full_compaction();
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_0)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_1)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_2)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_2)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_2)));
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
}
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
p2.clustered_row(s, table.make_ckey(ck_4), is_dummy::no, is_continuous::no);
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3)));
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_4)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_6)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(!cur.next());
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
|
||||
BOOST_REQUIRE(cur.advance_to(position_in_partition::before_all_clustered_rows()));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
BOOST_REQUIRE(cur.next());
|
||||
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_0)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
BOOST_REQUIRE(cur.next());
|
||||
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_1)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.advance_to(table.make_ckey(ck_3)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
}
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
p2.clustered_row(s, table.make_ckey(ck_5), is_dummy::no, is_continuous::yes);
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_4)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_5)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_6)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(!cur.next());
|
||||
}
|
||||
|
||||
// Test refresh after eviction
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.advance_to(table.make_ckey(ck_3)));
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3)));
|
||||
}
|
||||
|
||||
e.evict(tracker.cleaner());
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
p2.clustered_row(s, table.make_ckey(ck_5), is_dummy::no, is_continuous::yes);
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_3)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.advance_to(table.make_ckey(ck_4)));
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_4)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
BOOST_REQUIRE(cur.next());
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(ck_5)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_cursor_tracks_continuity_in_reversed_mode) {
|
||||
return seastar::async([] {
|
||||
cache_tracker tracker;
|
||||
auto& r = tracker.region();
|
||||
with_allocator(r.allocator(), [&] {
|
||||
simple_schema table;
|
||||
auto&& s = *table.schema();
|
||||
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
|
||||
|
||||
{
|
||||
auto&& p1 = snap1->version()->partition();
|
||||
p1.clustered_row(s, table.make_ckey(0), is_dummy::no, is_continuous::no);
|
||||
p1.clustered_row(s, table.make_ckey(4), is_dummy::no, is_continuous::no);
|
||||
p1.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
p2.clustered_row(s, table.make_ckey(3), is_dummy::no, is_continuous::yes);
|
||||
p2.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::no);
|
||||
p2.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto rev_s = s.make_reversed();
|
||||
partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true);
|
||||
position_in_partition::equal_compare eq(s);
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.advance_to(table.make_ckey(4)));
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(4)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(3)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(0)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
}
|
||||
|
||||
r.full_compaction();
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(0)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
}
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
p2.clustered_row(s, table.make_ckey(1), is_dummy::no, is_continuous::yes);
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.maybe_refresh());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(0)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(eq(cur.get_iterator_in_latest_version()->position(), table.make_ckey(1)));
|
||||
|
||||
{
|
||||
auto res = cur.ensure_entry_in_latest();
|
||||
BOOST_REQUIRE(res.inserted);
|
||||
BOOST_REQUIRE(eq(res.row.position(), table.make_ckey(0)));
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.advance_to(position_in_partition::before_all_clustered_rows()));
|
||||
BOOST_REQUIRE(eq(cur.table_position(), position_in_partition::after_all_clustered_rows()));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(5)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(4)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(3)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(1)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(cur.next());
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(0)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
BOOST_REQUIRE(!cur.next());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_ensure_entry_in_latest_in_reversed_mode) {
|
||||
return seastar::async([] {
|
||||
cache_tracker tracker;
|
||||
auto& r = tracker.region();
|
||||
with_allocator(r.allocator(), [&] {
|
||||
simple_schema table;
|
||||
auto&& s = *table.schema();
|
||||
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
|
||||
|
||||
{
|
||||
auto&& p1 = snap1->version()->partition();
|
||||
p1.clustered_row(s, table.make_ckey(3), is_dummy::no, is_continuous::no);
|
||||
p1.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::no);
|
||||
p1.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
p2.clustered_row(s, table.make_ckey(1), is_dummy::no, is_continuous::yes);
|
||||
p2.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::no);
|
||||
p2.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto rev_s = s.make_reversed();
|
||||
partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true);
|
||||
position_in_partition::equal_compare eq(s);
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.advance_to(table.make_ckey(3)));
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(3)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
{
|
||||
auto res = cur.ensure_entry_in_latest();
|
||||
BOOST_REQUIRE(res.inserted);
|
||||
BOOST_REQUIRE(eq(res.row.position(), table.make_ckey(3)));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(cur.advance_to(table.make_ckey(3)));
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
|
||||
{
|
||||
auto res = cur.ensure_entry_in_latest();
|
||||
BOOST_REQUIRE(!res.inserted);
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_ensure_entry_in_latest_does_not_set_continuity_in_reversed_mode) {
|
||||
return seastar::async([] {
|
||||
cache_tracker tracker;
|
||||
auto& r = tracker.region();
|
||||
with_allocator(r.allocator(), [&] {
|
||||
simple_schema table;
|
||||
auto&& s = *table.schema();
|
||||
|
||||
auto e = partition_entry::make_evictable(s, mutation_partition(table.schema()));
|
||||
auto snap1 = e.read(r, tracker.cleaner(), table.schema(), &tracker);
|
||||
|
||||
{
|
||||
auto&& p1 = snap1->version()->partition();
|
||||
p1.clustered_row(s, table.make_ckey(0), is_dummy::no, is_continuous::no);
|
||||
p1.clustered_row(s, table.make_ckey(1), is_dummy::no, is_continuous::no);
|
||||
p1.clustered_row(s, table.make_ckey(2), is_dummy::no, is_continuous::yes);
|
||||
p1.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto snap2 = e.read(r, tracker.cleaner(), table.schema(), &tracker, 1);
|
||||
|
||||
{
|
||||
auto&& p2 = snap2->version()->partition();
|
||||
p2.clustered_row(s, table.make_ckey(5), is_dummy::no, is_continuous::no);
|
||||
p2.ensure_last_dummy(s);
|
||||
}
|
||||
|
||||
auto rev_s = s.make_reversed();
|
||||
partition_snapshot_row_cursor cur(*rev_s, *snap2, false, true);
|
||||
position_in_partition::equal_compare eq(s);
|
||||
|
||||
{
|
||||
logalloc::reclaim_lock rl(r);
|
||||
BOOST_REQUIRE(cur.advance_to(table.make_ckey(2)));
|
||||
BOOST_REQUIRE(eq(cur.table_position(), table.make_ckey(2)));
|
||||
BOOST_REQUIRE(cur.continuous());
|
||||
|
||||
{
|
||||
auto res = cur.ensure_entry_in_latest();
|
||||
BOOST_REQUIRE(res.inserted);
|
||||
BOOST_REQUIRE(eq(res.row.position(), table.make_ckey(2)));
|
||||
}
|
||||
|
||||
BOOST_REQUIRE(cur.advance_to(table.make_ckey(0)));
|
||||
// the entry for ckey 2 in latest version should not be marked as continuous.
|
||||
BOOST_REQUIRE(!cur.continuous());
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_apply_is_atomic) {
|
||||
auto do_test = [](auto&& gen) {
|
||||
failure_injecting_allocation_strategy alloc(standard_allocator());
|
||||
|
||||
Reference in New Issue
Block a user