position_in_partition: Make after_key() work with non-full keys

This fixes a long standing bug related to handling of non-full
clustering keys, issue #1446.

after_key() was creating a position which is after all keys prefixed
by a non-full key, rather than a position which is right after that
key.

This will issue will be caught by cql_query_test::test_compact_storage
in debug mode when mutation_partition_v2 merging starts inserting
sentinels at position after_key() on preemption.

It probably already causes problems for such keys.
This commit is contained in:
Tomasz Grabiec
2022-09-22 15:42:54 +02:00
parent 4e7ddb6309
commit 23e4c83155
24 changed files with 240 additions and 107 deletions

View File

@@ -572,7 +572,7 @@ void cache_flat_mutation_reader::maybe_add_to_cache(const clustering_row& cr) {
_read_context.cache().on_mispopulate();
return;
}
auto rt_opt = _rt_assembler.flush(*_schema, position_in_partition::after_key(cr.key()));
auto rt_opt = _rt_assembler.flush(*_schema, position_in_partition::after_key(*_schema, cr.key()));
clogger.trace("csm {}: populate({})", fmt::ptr(this), clustering_row::printer(*_schema, cr));
_lsa_manager.run_in_update_section_with_allocator([this, &cr, &rt_opt] {
mutation_partition& mp = _snp->version()->partition();
@@ -634,8 +634,8 @@ inline
void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
clogger.trace("csm {}: copy_from_cache, next={}, next_row_in_range={}", fmt::ptr(this), _next_row.position(), _next_row_in_range);
_next_row.touch();
position_in_partition_view next_lower_bound = _next_row.dummy() ? _next_row.position() : position_in_partition_view::after_key(_next_row.key());
auto upper_bound = _next_row_in_range ? next_lower_bound : _upper_bound;
auto next_lower_bound = position_in_partition_view::after_key(table_schema(), _next_row.position());
auto upper_bound = _next_row_in_range ? next_lower_bound.view : _upper_bound;
if (_snp->range_tombstones(_lower_bound, upper_bound, [&] (range_tombstone rts) {
add_range_tombstone_to_buffer(std::move(rts));
return stop_iteration(_lower_bound_changed && is_buffer_full());
@@ -774,14 +774,14 @@ void cache_flat_mutation_reader::move_to_next_entry() {
}
}
void cache_flat_mutation_reader::flush_tombstones(position_in_partition_view pos, bool end_of_range) {
void cache_flat_mutation_reader::flush_tombstones(position_in_partition_view pos_, bool end_of_range) {
// Ensure position is appropriate for range tombstone bound
pos = position_in_partition_view::after_key(pos);
clogger.trace("csm {}: flush_tombstones({}) end_of_range: {}", fmt::ptr(this), pos, end_of_range);
_rt_gen.flush(pos, [this] (range_tombstone_change&& rtc) {
auto pos = position_in_partition_view::after_key(*_schema, pos_);
clogger.trace("csm {}: flush_tombstones({}) end_of_range: {}", fmt::ptr(this), pos.view, end_of_range);
_rt_gen.flush(pos.view, [this] (range_tombstone_change&& rtc) {
add_to_buffer(std::move(rtc), source::cache);
}, end_of_range);
if (auto rtc_opt = _rt_merger.flush(pos, end_of_range)) {
if (auto rtc_opt = _rt_merger.flush(pos.view, end_of_range)) {
do_add_to_buffer(std::move(*rtc_opt));
}
}
@@ -832,7 +832,7 @@ inline
void cache_flat_mutation_reader::add_clustering_row_to_buffer(mutation_fragment_v2&& mf) {
clogger.trace("csm {}: add_clustering_row_to_buffer({})", fmt::ptr(this), mutation_fragment_v2::printer(*_schema, mf));
auto& row = mf.as_clustering_row();
auto new_lower_bound = position_in_partition::after_key(row.key());
auto new_lower_bound = position_in_partition::after_key(*_schema, row.key());
push_mutation_fragment(std::move(mf));
_lower_bound = std::move(new_lower_bound);
_lower_bound_changed = true;

View File

@@ -907,7 +907,7 @@ void compacted_fragments_writer::split_large_partition() {
// will result in current fragment storing an inclusive end bound for last pos, and the
// next fragment storing an exclusive start bound for last pos. This is very important
// for not losing information on the range tombstone.
auto after_last_pos = position_in_partition::after_key(_current_partition.last_pos.key());
auto after_last_pos = position_in_partition::after_key(*_c.schema(), _current_partition.last_pos.key());
if (_current_partition.current_emitted_tombstone) {
auto rtc = range_tombstone_change(after_last_pos, tombstone{});
_c.log_debug("Closing active tombstone {} with {} for partition {}", _current_partition.current_emitted_tombstone, rtc, *_current_partition.dk);

View File

@@ -446,7 +446,7 @@ public:
requires CompactedFragmentsConsumerV2<Consumer> && CompactedFragmentsConsumerV2<GCConsumer>
stop_iteration consume_end_of_partition(Consumer& consumer, GCConsumer& gc_consumer) {
if (_effective_tombstone) {
auto rtc = range_tombstone_change(position_in_partition::after_key(_last_pos), tombstone{});
auto rtc = range_tombstone_change(position_in_partition::after_key(_schema, _last_pos), tombstone{});
// do_consume() overwrites _effective_tombstone with {}, so save and restore it.
auto prev_tombstone = _effective_tombstone;
do_consume(std::move(rtc), consumer, gc_consumer);
@@ -539,7 +539,7 @@ public:
consume(*std::exchange(_last_static_row, {}), consumer, nc);
}
if (_effective_tombstone) {
auto rtc = range_tombstone_change(position_in_partition_view::after_key(_last_pos), _effective_tombstone);
auto rtc = range_tombstone_change(position_in_partition::after_key(_schema, _last_pos), _effective_tombstone);
do_consume(std::move(rtc), consumer, nc);
}
}
@@ -574,7 +574,7 @@ public:
partition_start ps(std::move(_last_dk), _partition_tombstone);
if (_effective_tombstone) {
return detached_compaction_state{std::move(ps), std::move(_last_static_row),
range_tombstone_change(position_in_partition_view::after_key(_last_pos), _effective_tombstone)};
range_tombstone_change(position_in_partition::after_key(_schema, _last_pos), _effective_tombstone)};
} else {
return detached_compaction_state{std::move(ps), std::move(_last_static_row), std::optional<range_tombstone_change>{}};
}

View File

@@ -258,12 +258,12 @@ position_in_partition_view mutation_fragment::position() const
return visit([] (auto& mf) -> position_in_partition_view { return mf.position(); });
}
position_range mutation_fragment::range() const {
position_range mutation_fragment::range(const schema& s) const {
switch (_kind) {
case kind::static_row:
return position_range::for_static_row();
case kind::clustering_row:
return position_range(position_in_partition(position()), position_in_partition::after_key(key()));
return position_range(position_in_partition(position()), position_in_partition::after_key(s, key()));
case kind::partition_start:
return position_range(position_in_partition(position()), position_in_partition::for_static_row());
case kind::partition_end:

View File

@@ -361,7 +361,7 @@ public:
position_in_partition_view position() const;
// Returns the range of positions for which this fragment holds relevant information.
position_range range() const;
position_range range(const schema& s) const;
// Checks if this fragment may be relevant for any range starting at given position.
bool relevant_for_range(const schema& s, position_in_partition_view pos) const;

View File

@@ -2145,7 +2145,7 @@ stop_iteration reconcilable_result_builder::consume(static_row&& sr, tombstone,
stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tombstone, bool is_alive) {
if (_rt_assembler.needs_flush()) {
if (auto rt_opt = _rt_assembler.flush(_schema, position_in_partition::after_key(cr.key()))) {
if (auto rt_opt = _rt_assembler.flush(_schema, position_in_partition::after_key(_schema, cr.key()))) {
consume(std::move(*rt_opt));
}
}
@@ -2368,10 +2368,10 @@ clustering_interval_set mutation_partition::get_continuity(const schema& s, is_c
}
if (i->position().is_clustering_row() && bool(i->dummy()) == !bool(cont)) {
result.add(s, position_range(position_in_partition(i->position()),
position_in_partition::after_key(i->position().key())));
position_in_partition::after_key(s, i->position().key())));
}
prev_pos = i->position().is_clustering_row()
? position_in_partition::after_key(i->position().key())
? position_in_partition::after_key(s, i->position().key())
: position_in_partition(i->position());
++i;
}

View File

@@ -36,7 +36,7 @@ private:
_mut_builder->consume_new_partition(key);
_mut_builder->consume(tomb);
if (_current_tombstone) {
_mut_builder->consume(range_tombstone_change(position_in_partition_view::after_key(_last_pos), _current_tombstone));
_mut_builder->consume(range_tombstone_change(position_in_partition::after_key(*_schema, _last_pos), _current_tombstone));
}
}
@@ -49,7 +49,7 @@ private:
if (_memtable->occupancy().total_space() + _current_mut_size > _max_memory) {
if (_mut_builder) {
if (_current_tombstone) {
_mut_builder->consume(range_tombstone_change(position_in_partition_view::after_key(_last_pos), {}));
_mut_builder->consume(range_tombstone_change(position_in_partition::after_key(*_schema, _last_pos), {}));
}
auto mut = _mut_builder->consume_end_of_stream();
init_current_mut(mut->decorated_key(), mut->partition().partition_tombstone());

View File

@@ -80,6 +80,8 @@ enum class partition_region : uint8_t {
partition_end,
};
struct view_and_holder;
std::ostream& operator<<(std::ostream&, partition_region);
std::string_view to_string(partition_region);
partition_region parse_partition_region(std::string_view);
@@ -168,12 +170,18 @@ public:
return {clustering_row_tag_t(), ck};
}
static position_in_partition_view after_key(const clustering_key& ck) {
// Returns a view, as the first element of the returned pair, to before_key(pos._ck)
// if pos.is_clustering_row() else returns pos as-is.
// The second element of the pair needs to be kept alive as long as the first element is used.
// The returned view is valid as long as the view passed to this method is valid.
static view_and_holder after_key(const schema&, position_in_partition_view);
static position_in_partition_view after_all_prefixed(const clustering_key& ck) {
return {partition_region::clustered, bound_weight::after_all_prefixed, &ck};
}
// Returns a view to after_key(pos._ck) if pos.is_clustering_row() else returns pos as-is.
static position_in_partition_view after_key(position_in_partition_view pos) {
// Returns a view to after_all_prefixed(pos._ck) if pos.is_clustering_row() else returns pos as-is.
static position_in_partition_view after_all_prefixed(position_in_partition_view pos) {
return {partition_region::clustered, pos._bound_weight == bound_weight::equal ? bound_weight::after_all_prefixed : pos._bound_weight, pos._ck};
}
@@ -266,13 +274,27 @@ public:
explicit position_in_partition(static_row_tag_t) : _type(partition_region::static_row) { }
position_in_partition(clustering_row_tag_t, clustering_key_prefix ck)
: _type(partition_region::clustered), _ck(std::move(ck)) { }
position_in_partition(after_clustering_row_tag_t, clustering_key_prefix ck)
// FIXME: Use lexicographical_relation::before_strictly_prefixed here. Refs #1446
: _type(partition_region::clustered), _bound_weight(bound_weight::after_all_prefixed), _ck(std::move(ck)) { }
position_in_partition(after_clustering_row_tag_t, position_in_partition_view pos)
position_in_partition(after_clustering_row_tag_t, const schema& s, clustering_key_prefix ck)
: _type(partition_region::clustered)
, _bound_weight(bound_weight::after_all_prefixed)
, _ck(std::move(ck))
{
if (clustering_key::make_full(s, *_ck)) { // Refs #1446
_bound_weight = bound_weight::before_all_prefixed;
}
}
position_in_partition(after_clustering_row_tag_t, const schema& s, position_in_partition_view pos)
: position_in_partition(after_clustering_row_tag_t(), s, position_in_partition(pos))
{ }
position_in_partition(after_clustering_row_tag_t, const schema& s, position_in_partition&& pos)
: _type(partition_region::clustered)
, _bound_weight(pos._bound_weight != bound_weight::equal ? pos._bound_weight : bound_weight::after_all_prefixed)
, _ck(*pos._ck) { }
, _ck(std::move(pos._ck))
{
if (pos._bound_weight == bound_weight::equal && _ck && clustering_key::make_full(s, *_ck)) { // Refs #1446
_bound_weight = bound_weight::before_all_prefixed;
}
}
position_in_partition(before_clustering_row_tag_t, clustering_key_prefix ck)
: _type(partition_region::clustered), _bound_weight(bound_weight::before_all_prefixed), _ck(std::move(ck)) { }
position_in_partition(before_clustering_row_tag_t, position_in_partition_view pos)
@@ -323,15 +345,19 @@ public:
return {before_clustering_row_tag_t(), std::move(ck)};
}
static position_in_partition after_key(clustering_key ck) {
return {after_clustering_row_tag_t(), std::move(ck)};
static position_in_partition after_key(const schema& s, clustering_key ck) {
return {after_clustering_row_tag_t(), s, std::move(ck)};
}
// If given position is a clustering row position, returns a position
// right after it. Otherwise returns it unchanged.
// The position "pos" must be a clustering position.
static position_in_partition after_key(position_in_partition_view pos) {
return {after_clustering_row_tag_t(), pos};
static position_in_partition after_key(const schema& s, position_in_partition_view pos) {
return {after_clustering_row_tag_t(), s, pos};
}
static position_in_partition after_key(const schema& s, position_in_partition&& pos) noexcept {
return {after_clustering_row_tag_t(), s, std::move(pos)};
}
static position_in_partition for_key(clustering_key ck) {
@@ -572,6 +598,25 @@ public:
}
};
struct view_and_holder {
position_in_partition_view view;
std::optional<position_in_partition> holder;
};
inline
view_and_holder position_in_partition_view::after_key(const schema& s, position_in_partition_view pos) {
if (!pos.is_clustering_row()) {
return {pos, std::nullopt};
}
if (pos.key().is_full(s)) {
return {position_in_partition_view::after_all_prefixed(pos.key()), std::nullopt};
}
// FIXME: This wouldn't be needed if we had a bound weight to represent this.
auto res = position_in_partition::after_key(s, clustering_key(pos.key()));
position_in_partition_view res_view = res;
return {std::move(res_view), std::move(res)};
}
inline
position_in_partition position_in_partition::for_range_start(const query::clustering_range& r) {
return {position_in_partition::range_tag_t(), bound_view::from_range_start(r)};

View File

@@ -27,6 +27,7 @@
#include "bytes.hh"
class position_in_partition_view;
class position_in_partition;
class partition_slice_builder;
using query_id = utils::tagged_uuid<struct query_id_tag>;
@@ -80,7 +81,7 @@ typedef std::vector<clustering_range> clustering_row_ranges;
/// partially overlap are trimmed.
/// Result: each range will overlap fully with [pos, +inf), or (-int, pos] if
/// reversed is true.
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, position_in_partition_view pos, bool reversed = false);
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, position_in_partition pos, bool reversed = false);
/// Trim the clustering ranges.
///

View File

@@ -107,7 +107,7 @@ std::ostream& operator<<(std::ostream& out, const specific_ranges& s) {
return out << "{" << s._pk << " : " << join(", ", s._ranges) << "}";
}
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, position_in_partition_view pos, bool reversed) {
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, position_in_partition pos, bool reversed) {
auto cmp = [reversed, cmp = position_in_partition::composite_tri_compare(s)] (const auto& a, const auto& b) {
return reversed ? cmp(b, a) : cmp(a, b);
};
@@ -135,13 +135,8 @@ void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& range
}
void trim_clustering_row_ranges_to(const schema& s, clustering_row_ranges& ranges, const clustering_key& key, bool reversed) {
if (key.is_full(s) || reversed) {
return trim_clustering_row_ranges_to(s, ranges,
reversed ? position_in_partition_view::before_key(key) : position_in_partition_view::after_key(key), reversed);
}
auto full_key = key;
clustering_key::make_full(s, full_key);
return trim_clustering_row_ranges_to(s, ranges, position_in_partition_view::before_key(full_key), reversed);
return trim_clustering_row_ranges_to(s, ranges,
reversed ? position_in_partition::before_key(key) : position_in_partition::after_key(s, key), reversed);
}

View File

@@ -325,7 +325,7 @@ void evictable_reader_v2::update_next_position() {
push_mutation_fragment(_reader->pop_mutation_fragment());
_next_position_in_partition = position_in_partition::for_partition_start();
} else {
_next_position_in_partition = position_in_partition::after_key(last_pos);
_next_position_in_partition = position_in_partition::after_key(*_schema, last_pos);
}
break;
case partition_region::partition_end:

View File

@@ -20,7 +20,7 @@ class mutation_fragment_v1_stream final {
}
mutation_fragment_opt consume(clustering_row mf) {
if (_rt_assembler.needs_flush()) [[unlikely]] {
if (auto rt_opt = _rt_assembler.flush(*_schema, position_in_partition::after_key(mf.position()))) [[unlikely]] {
if (auto rt_opt = _rt_assembler.flush(*_schema, position_in_partition::after_key(*_schema, mf.position()))) [[unlikely]] {
_row = std::move(mf);
return wrap(std::move(*rt_opt));
}

View File

@@ -150,9 +150,10 @@ future<result<service::storage_proxy::coordinator_query_result>> query_pager::do
if (has_ck) {
query::clustering_row_ranges row_ranges = _cmd->slice.default_row_ranges();
position_in_partition_view next_pos = _last_pos;
position_in_partition next_pos = _last_pos;
if (_last_pos.has_key()) {
next_pos = reversed ? position_in_partition_view::before_key(_last_pos) : position_in_partition_view::after_key(_last_pos);
next_pos = reversed ? position_in_partition::before_key(_last_pos)
: position_in_partition::after_key(*_schema, _last_pos);
}
query::trim_clustering_row_ranges_to(*_schema, row_ranges, next_pos, reversed);

View File

@@ -38,7 +38,7 @@ void metadata_collector::update_min_max_components(position_in_partition_view po
// This is how callers expect prefixes to be interpreted.
const auto is_prefix_row = pos.is_clustering_row() && !pos.key().is_full(_schema);
const auto min_pos = is_prefix_row ? position_in_partition_view::before_key(pos) : pos;
const auto max_pos = is_prefix_row ? position_in_partition_view::after_key(pos) : pos;
const auto max_pos = is_prefix_row ? position_in_partition_view::after_all_prefixed(pos) : pos;
if (!_min_clustering_pos || cmp(min_pos, *_min_clustering_pos) < 0) {
mdclogger.trace("{}: setting min_clustering_key={}", _name, position_in_partition_view::printer(_schema, min_pos));

View File

@@ -503,7 +503,7 @@ struct rt_marker {
if (kind == bound_kind_m::incl_start || kind == bound_kind_m::excl_end_incl_start) {
return position_in_partition_view::before_key(clustering);
}
return position_in_partition_view::after_key(clustering);
return position_in_partition_view::after_all_prefixed(clustering);
}
// We need this one to uniformly write rows and RT markers inside write_clustered().

View File

@@ -262,11 +262,11 @@ void test_single_row(int ck,
}
static expected_row after_cont(int ck) {
return expected_row(position_in_partition::after_key(make_ck(ck)), is_continuous::yes, is_dummy::yes);
return expected_row(position_in_partition::after_key(*SCHEMA, make_ck(ck)), is_continuous::yes, is_dummy::yes);
}
static expected_row after_notc(int ck) {
return expected_row(position_in_partition::after_key(make_ck(ck)), is_continuous::no, is_dummy::yes);
return expected_row(position_in_partition::after_key(*SCHEMA, make_ck(ck)), is_continuous::no, is_dummy::yes);
}
static expected_row before_cont(int ck) {
@@ -1289,7 +1289,7 @@ SEASTAR_TEST_CASE(test_single_row_and_tombstone_not_cached_single_row_range1) {
test_slice_single_version(underlying, cache, slice, {
expected_fragment(position_in_partition::before_key(make_ck(1)), rt.tomb),
expected_fragment(1),
expected_fragment(position_in_partition::after_key(make_ck(1)), {}),
expected_fragment(position_in_partition_view::after_all_prefixed(make_ck(1)), {}),
}, {
expected_row(1, is_continuous::no),
expected_row(expected_row::dummy_tag_t{}, is_continuous::no)
@@ -1336,7 +1336,7 @@ SEASTAR_TEST_CASE(test_single_row_and_tombstone_not_cached_single_row_range3) {
test_slice_single_version(underlying, cache, slice, {
expected_fragment(position_in_partition_view::before_key(make_ck(0)), rt.tomb),
expected_fragment(position_in_partition_view::after_key(make_ck(2)), {}),
expected_fragment(position_in_partition_view::after_all_prefixed(make_ck(2)), {}),
expected_fragment(4)
}, {
before_notc(0),

View File

@@ -102,7 +102,7 @@ SEASTAR_TEST_CASE(test_basic_operation_on_various_data_sets) {
query::clustering_range::make({keys[6], true}, {keys[6], true}),
});
auto end_pos = position_in_partition::after_key(keys[6]);
auto end_pos = position_in_partition::after_key(*s.schema(), keys[6]);
auto steps = std::vector<step>({
{position_in_partition_view::for_static_row(), true},
@@ -125,7 +125,7 @@ SEASTAR_TEST_CASE(test_basic_operation_on_various_data_sets) {
query::clustering_range::make({keys[1], false}, {keys[2], true}),
});
auto end_pos = position_in_partition::after_key(keys[2]);
auto end_pos = position_in_partition::after_key(*s.schema(), keys[2]);
auto steps = std::vector<step>({
{position_in_partition_view::for_static_row(), true},
@@ -143,7 +143,7 @@ SEASTAR_TEST_CASE(test_basic_operation_on_various_data_sets) {
query::clustering_range::make({keys[1], false}, {keys[2], true}),
});
auto end_pos = position_in_partition::after_key(keys[2]);
auto end_pos = position_in_partition::after_key(*s.schema(), keys[2]);
auto steps = std::vector<step>({
{position_in_partition_view::for_static_row(), false},
@@ -214,14 +214,14 @@ SEASTAR_TEST_CASE(test_range_overlap) {
BOOST_REQUIRE(!walker.advance_to(
position_in_partition::for_key(keys[0]),
position_in_partition::after_key(keys[0])));
position_in_partition::after_key(*s.schema(), keys[0])));
++lbcc;
BOOST_REQUIRE(walker.lower_bound_change_counter() == lbcc);
BOOST_REQUIRE(eq(walker.lower_bound(), position_in_partition::for_range_start(range1)));
BOOST_REQUIRE(walker.advance_to(
position_in_partition::after_key(keys[0]),
position_in_partition::after_key(*s.schema(), keys[0]),
position_in_partition::for_key(keys[1])));
BOOST_REQUIRE(walker.lower_bound_change_counter() == lbcc);
@@ -240,7 +240,7 @@ SEASTAR_TEST_CASE(test_range_overlap) {
BOOST_REQUIRE(eq(walker.lower_bound(), position_in_partition::for_range_start(range2)));
BOOST_REQUIRE(walker.advance_to(
position_in_partition::after_key(keys[2]),
position_in_partition::after_key(*s.schema(), keys[2]),
position_in_partition::for_key(keys[4])));
BOOST_REQUIRE(walker.advance_to(

View File

@@ -530,11 +530,11 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
ss.make_static_row_v2(permit, "v"),
ss.make_row_v2(permit, ck0, "ck0"),
ss.make_row_v2(permit, ck1, "ck1"),
range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::before_key(ck2), {ss.new_tombstone()}),
ss.make_row_v2(permit, ck2, "ck2"),
range_tombstone_change(position_in_partition::after_key(ck2), {}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck2), {}),
partition_end{},
partition_start(dk1, {}),
partition_end{});
@@ -554,14 +554,14 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
expect_valid(
"2 range tombstone changes",
partition_start(dk0, {}),
range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::after_key(ck2), {}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck2), {}),
partition_end{});
expect_valid(
"null range tombstone change alone",
partition_start(dk0, {}),
range_tombstone_change(position_in_partition::after_key(ck2), {}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck2), {}),
partition_end{});
expect_invalid_at_eos(
@@ -572,13 +572,13 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator) {
"active range tombstone end at partition end",
2,
partition_start(dk0, {}),
range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}),
range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}),
partition_end{});
const auto ps = mutation_fragment_v2(*ss.schema(), permit, partition_start(dk1, {}));
const auto sr = ss.make_static_row_v2(permit, "v");
const auto cr = ss.make_row_v2(permit, ck2, "ck2");
const auto rtc = mutation_fragment_v2(*ss.schema(), permit, range_tombstone_change(position_in_partition::after_key(ck1), {ss.new_tombstone()}));
const auto rtc = mutation_fragment_v2(*ss.schema(), permit, range_tombstone_change(position_in_partition::after_key(*ss.schema(), ck1), {ss.new_tombstone()}));
const auto pe = mutation_fragment_v2(*ss.schema(), permit, partition_end{});
auto check_invalid_after = [&] (auto&& mf_raw, std::initializer_list<const mutation_fragment_v2*> invalid_mfs) {
@@ -637,8 +637,8 @@ SEASTAR_THREAD_TEST_CASE(test_mutation_fragment_stream_validator_mixed_api_usage
BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
BOOST_REQUIRE(validator(mf_kind::clustering_row, position_in_partition_view::for_key(ck1), {}));
BOOST_REQUIRE(validator(mf_kind::clustering_row, {}));
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition_view::after_key(ck1), {}));
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition_view::after_key(ck1), {}));
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition::after_key(*ss.schema(), ck1), {}));
BOOST_REQUIRE(validator(mf_kind::range_tombstone_change, position_in_partition::after_key(*ss.schema(), ck1), {}));
BOOST_REQUIRE(validator(mf_kind::partition_end, {}));
BOOST_REQUIRE(validator(dk0));
BOOST_REQUIRE(!validator(dk0));

View File

@@ -597,10 +597,10 @@ SEASTAR_TEST_CASE(test_sm_fast_forwarding_combining_reader) {
.next_partition()
.produces_partition_start(pkeys[1])
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::before_key(ckeys[2]), position_in_partition::after_key(ckeys[2])))
.fast_forward_to(position_range(position_in_partition::before_key(ckeys[2]), position_in_partition::after_key(*s.schema(), ckeys[2])))
.produces_row_with_key(ckeys[2])
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::after_key(ckeys[2]), position_in_partition::after_all_clustered_rows()))
.fast_forward_to(position_range(position_in_partition::after_key(*s.schema(), ckeys[2]), position_in_partition::after_all_clustered_rows()))
.produces_row_with_key(ckeys[3])
.produces_end_of_stream()
.next_partition()
@@ -648,13 +648,13 @@ SEASTAR_THREAD_TEST_CASE(test_sm_fast_forwarding_combining_reader_with_galloping
.next_partition()
.produces_partition_start(pkeys[1])
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::before_key(ckeys[0]), position_in_partition::after_key(ckeys[3])))
.fast_forward_to(position_range(position_in_partition::before_key(ckeys[0]), position_in_partition::after_key(*s.schema(), ckeys[3])))
.produces_row_with_key(ckeys[0])
.produces_row_with_key(ckeys[1])
.produces_row_with_key(ckeys[2])
.produces_row_with_key(ckeys[3])
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::after_key(ckeys[6]), position_in_partition::after_all_clustered_rows()))
.fast_forward_to(position_range(position_in_partition::after_key(*s.schema(), ckeys[6]), position_in_partition::after_all_clustered_rows()))
.produces_row_with_key(ckeys[7])
.produces_row_with_key(ckeys[8])
.produces_row_with_key(ckeys[9])
@@ -1127,7 +1127,7 @@ SEASTAR_TEST_CASE(test_combined_reader_slicing_with_overlapping_range_tombstones
mutation result(m1.schema(), m1.decorated_key());
rd.consume_pausable([&] (mutation_fragment&& mf) {
if (mf.position().has_clustering_key() && !mf.range().overlaps(*s, prange.start(), prange.end())) {
if (mf.position().has_clustering_key() && !mf.range(*s).overlaps(*s, prange.start(), prange.end())) {
BOOST_FAIL(format("Received fragment which is not relevant for the slice: {}, slice: {}", mutation_fragment::printer(*s, mf), range));
}
result.partition().apply(*s, std::move(mf));
@@ -3518,12 +3518,12 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_non_monotonic_positions) {
push_mf(partition_start(pkey, {}));
for (int i = 0; i < 10; ++i) {
const auto ckey = s.make_ckey(i);
const auto pos = i % 2 ? position_in_partition::after_key(ckey) : position_in_partition::before_key(ckey);
const auto pos = i % 2 ? position_in_partition::after_key(*s.schema(), ckey) : position_in_partition::before_key(ckey);
for (int j = 0; j < 10; ++j) {
push_mf(range_tombstone_change(pos, tombstone(s.new_timestamp(), {})));
}
}
push_mf(range_tombstone_change(position_in_partition::after_key(s.make_ckey(11)), tombstone{}));
push_mf(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(11)), tombstone{}));
push_mf(partition_end{});
auto mut_opt = mut_builder.consume_end_of_stream();
@@ -3570,7 +3570,7 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_clear_tombstone_in_discontinued_p
const auto& pkey2 = pkeys[1];
const auto first_rtc = range_tombstone_change(position_in_partition::before_key(s.make_ckey(0)), tombstone{s.new_timestamp(), {}});
const auto last_rtc = range_tombstone_change(position_in_partition::after_key(s.make_ckey(11)), tombstone{});
const auto last_rtc = range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(11)), tombstone{});
size_t buffer_size = 0;
@@ -3749,7 +3749,7 @@ struct clustering_order_merger_test_generator {
auto ck = mk_ck(k);
positions.push_back(position_in_partition::before_key(ck));
positions.push_back(position_in_partition::for_key(ck));
positions.push_back(position_in_partition::after_key(ck));
positions.push_back(position_in_partition::after_key(*_s, ck));
}
positions.push_back(position_in_partition::after_all_clustered_rows());

View File

@@ -2679,15 +2679,15 @@ SEASTAR_THREAD_TEST_CASE(test_position_in_partition_reversal) {
auto rev_s = ss.schema()->make_reversed();
position_in_partition::tri_compare rev_cmp(*rev_s);
BOOST_REQUIRE(fwd_cmp(p_i_p::before_key(ss.make_ckey(1)), p_i_p::after_key(ss.make_ckey(1))) < 0);
BOOST_REQUIRE(fwd_cmp(p_i_p::before_key(ss.make_ckey(1)), p_i_p::after_key(*ss.schema(), ss.make_ckey(1))) < 0);
BOOST_REQUIRE(fwd_cmp(p_i_p::before_key(ss.make_ckey(1)), ss.make_ckey(0)) > 0);
BOOST_REQUIRE(fwd_cmp(p_i_p::after_key(ss.make_ckey(1)), p_i_p::before_key(ss.make_ckey(1))) > 0);
BOOST_REQUIRE(fwd_cmp(p_i_p::after_key(ss.make_ckey(1)), ss.make_ckey(2)) < 0);
BOOST_REQUIRE(fwd_cmp(p_i_p::after_key(*ss.schema(), ss.make_ckey(1)), p_i_p::before_key(ss.make_ckey(1))) > 0);
BOOST_REQUIRE(fwd_cmp(p_i_p::after_key(*ss.schema(), ss.make_ckey(1)), ss.make_ckey(2)) < 0);
BOOST_REQUIRE(rev_cmp(p_i_p::before_key(ss.make_ckey(1)).reversed(), p_i_p::after_key(ss.make_ckey(1)).reversed()) > 0);
BOOST_REQUIRE(rev_cmp(p_i_p::before_key(ss.make_ckey(1)).reversed(), p_i_p::after_key(*ss.schema(), ss.make_ckey(1)).reversed()) > 0);
BOOST_REQUIRE(rev_cmp(p_i_p::before_key(ss.make_ckey(1)).reversed(), ss.make_ckey(0)) < 0);
BOOST_REQUIRE(rev_cmp(p_i_p::after_key(ss.make_ckey(1)).reversed(), p_i_p::before_key(ss.make_ckey(1)).reversed()) < 0);
BOOST_REQUIRE(rev_cmp(p_i_p::after_key(ss.make_ckey(1)).reversed(), ss.make_ckey(2)) > 0);
BOOST_REQUIRE(rev_cmp(p_i_p::after_key(*ss.schema(), ss.make_ckey(1)).reversed(), p_i_p::before_key(ss.make_ckey(1)).reversed()) < 0);
BOOST_REQUIRE(rev_cmp(p_i_p::after_key(*ss.schema(), ss.make_ckey(1)).reversed(), ss.make_ckey(2)) > 0);
// Test reversal-invariant positions
@@ -2701,6 +2701,97 @@ SEASTAR_THREAD_TEST_CASE(test_position_in_partition_reversal) {
p_i_p::for_static_row()) == 0);
}
SEASTAR_THREAD_TEST_CASE(test_position_in_partition_order_with_prefix_keys) {
using pip = position_in_partition;
using pipv = position_in_partition_view;
schema_ptr s = schema_builder("ks", "cf")
.with_column("pk", utf8_type, column_kind::partition_key)
.with_column("ck1", utf8_type, column_kind::clustering_key)
.with_column("ck2", utf8_type, column_kind::clustering_key)
.with_column("v", utf8_type)
.build();
position_in_partition::tri_compare cmp(*s);
auto make_ck = [&] (sstring ck1, std::optional<sstring> ck2 = {}) {
if (ck2) {
return clustering_key::from_exploded(*s, {serialized(ck1), serialized(*ck2)});
}
return clustering_key::from_exploded(*s, {serialized(ck1)});
};
auto prefix_a = make_ck("a");
auto full_a = prefix_a;
clustering_key::make_full(*s, full_a);
auto full_a_a = make_ck("a", "a");
BOOST_REQUIRE(cmp(pip::before_key(prefix_a), prefix_a) < 0);
BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), prefix_a) > 0);
BOOST_REQUIRE(cmp(prefix_a, full_a_a) < 0);
BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), prefix_a) > 0);
BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), full_a_a) < 0);
BOOST_REQUIRE(cmp(pipv::after_all_prefixed(prefix_a), full_a_a) > 0);
BOOST_REQUIRE(cmp(prefix_a, full_a) < 0);
BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), full_a) < 0);
BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), pip::before_key(full_a)) <= 0);
// before_key()/after_key() applied to dummy does not change the position.
BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), pip::after_key(*s, pip::after_key(*s, prefix_a))) == 0);
BOOST_REQUIRE(cmp(pip::before_key(prefix_a), pip::before_key(pip::before_key(prefix_a))) == 0);
BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), pip::after_key(*s, pip::after_key(*s, prefix_a))) == 0);
BOOST_REQUIRE(cmp(pip::for_key(prefix_a), pip::for_key(full_a_a)) < 0);
BOOST_REQUIRE(cmp(pip::for_key(full_a_a), pip::for_key(prefix_a)) > 0);
BOOST_REQUIRE(cmp(pip::for_key(prefix_a), pip::for_key(full_a)) < 0);
BOOST_REQUIRE(cmp(pip::after_key(*s, prefix_a), pip::for_key(prefix_a)) > 0);
// Check reversed schema
auto rev_s = s->make_reversed();
position_in_partition::tri_compare rev_cmp(*rev_s);
BOOST_REQUIRE(rev_cmp(pip::before_key(prefix_a).reversed(),
pip::for_key(full_a_a).reversed()) > 0);
BOOST_REQUIRE(rev_cmp(pip::before_key(prefix_a).reversed(),
pip::for_key(full_a).reversed()) > 0);
BOOST_REQUIRE(rev_cmp(pip::before_key(prefix_a).reversed(),
pip::for_key(full_a_a).reversed()) > 0);
BOOST_REQUIRE(rev_cmp(pip::before_key(prefix_a).reversed(),
pip::for_key(full_a).reversed()) > 0);
BOOST_REQUIRE(rev_cmp(pipv::after_all_prefixed(prefix_a).reversed(),
pip::for_key(full_a_a).reversed()) < 0);
BOOST_REQUIRE(rev_cmp(pipv::after_all_prefixed(prefix_a).reversed(),
pip::for_key(full_a).reversed()) < 0);
BOOST_REQUIRE(rev_cmp(pip::after_key(*rev_s, prefix_a).reversed(),
pip::for_key(full_a_a).reversed()) > 0);
BOOST_REQUIRE(rev_cmp(pip::after_key(*rev_s, prefix_a).reversed(),
pip::for_key(full_a).reversed()) > 0);
// FIXME: Below don't work due to https://github.com/scylladb/scylladb/issues/12258
//
// BOOST_REQUIRE(rev_cmp(pip::for_key(prefix_a).reversed(),
// pip::for_key(full_a_a).reversed()) > 0);
//
// BOOST_REQUIRE(rev_cmp(pip::for_key(full_a_a).reversed(),
// pip::for_key(prefix_a).reversed()) < 0);
//
// BOOST_REQUIRE(rev_cmp(pip::for_key(prefix_a).reversed(),
// pip::for_key(full_a).reversed()) > 0);
//
// BOOST_REQUIRE(rev_cmp(pip::after_key(*rev_s, prefix_a).reversed(),
// pip::for_key(prefix_a).reversed()) < 0);
}
SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
simple_schema ss;
auto pk = ss.make_pkey();
@@ -2730,7 +2821,7 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) {
frags.emplace_back(mutation_fragment_v2(*s, permit, std::move(row)));
}
frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::after_key(ss.make_ckey(10)), tombstone{}));
frags.emplace_back(*s, permit, range_tombstone_change(position_in_partition::after_key(*s, ss.make_ckey(10)), tombstone{}));
frags.emplace_back(*s, permit, partition_end{});

View File

@@ -500,8 +500,8 @@ static std::vector<range_tombstone> make_random() {
tombstone(dist(gen), gc_now));
} else {
rts.emplace_back(
position_in_partition::after_key(key),
position_in_partition::after_key(key),
position_in_partition::after_key(*s, key),
position_in_partition::after_key(*s, key),
tombstone(dist(gen), gc_now));
}
}

View File

@@ -2121,7 +2121,7 @@ SEASTAR_TEST_CASE(test_scan_with_partial_partitions_reversed) {
.produces_range_tombstone_change(start_change(reversed(rt2)))
.produces_range_tombstone_change(range_tombstone_change(reversed(rt2).end_position(), rt1.tomb))
.produces_row_with_key(s.make_ckey(2))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(s.make_ckey(1)), {}))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(1)), {}))
.produces_partition_end()
.produces_end_of_stream();
}

View File

@@ -159,7 +159,7 @@ static std::vector<position_range> random_ranges(const std::vector<clustering_ke
positions.emplace_back(position_in_partition::clustering_row_tag_t{}, std::move(ckp));
break;
default:
positions.emplace_back(position_in_partition::after_clustering_row_tag_t{}, std::move(ckp));
positions.emplace_back(position_in_partition_view::after_all_prefixed(std::move(ckp)));
break;
}
}

View File

@@ -181,7 +181,7 @@ static void test_slicing_and_fast_forwarding(tests::reader_concurrency_semaphore
for (auto current = start; current < start + range_size; current++) {
auto ck = s.make_ckey(current);
if (expected.partition().find_row(*s.schema(), ck)) {
auto end_position = position_in_partition(position_in_partition::after_clustering_row_tag_t(), ck);
auto end_position = position_in_partition::after_key(*s.schema(), ck);
actual.may_produce_tombstones(position_range(start_position, end_position));
actual.produces_row_with_key(ck, expected.partition().range_tombstone_for_row(*s.schema(), ck));
actual.may_produce_tombstones(position_range(start_position, end_position));
@@ -209,7 +209,7 @@ static void test_slicing_and_fast_forwarding(tests::reader_concurrency_semaphore
auto ck = s.make_ckey(current);
auto pos_range = position_range(
position_in_partition(position_in_partition::before_clustering_row_tag_t(), ck),
position_in_partition(position_in_partition::after_clustering_row_tag_t(), ck));
position_in_partition::after_key(*s.schema(), ck));
actual.fast_forward_to(pos_range);
actual.may_produce_tombstones(pos_range);
if (expected.partition().find_row(*s.schema(), ck)) {
@@ -258,7 +258,7 @@ static void test_slicing_and_fast_forwarding(tests::reader_concurrency_semaphore
for (auto current = start; current < start + range_size; current++) {
auto ck = s.make_ckey(current);
if (expected.partition().find_row(*s.schema(), ck)) {
auto end_position = position_in_partition(position_in_partition::after_clustering_row_tag_t(), ck);
auto end_position = position_in_partition::after_key(*s.schema(), ck);
actual.may_produce_tombstones(position_range(current_position, end_position));
actual.produces_row_with_key(ck, expected.partition().range_tombstone_for_row(*s.schema(), ck));
current_position = std::move(end_position);
@@ -1099,7 +1099,7 @@ static void test_clustering_slices(tests::reader_concurrency_semaphore_wrapper&
.build();
auto rd = assert_that(ds.make_reader_v2(s, semaphore.make_permit(), pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
rd.produces_partition_start(pk)
.fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(ck2)))
.fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(*s, ck2)))
.produces_row_with_key(ck1)
.produces_row_with_key(ck2)
.produces_end_of_stream();
@@ -1111,7 +1111,7 @@ static void test_clustering_slices(tests::reader_concurrency_semaphore_wrapper&
auto rd = assert_that(ds.make_reader_v2(s, semaphore.make_permit(), pr, slice, default_priority_class(), nullptr, streamed_mutation::forwarding::yes));
rd.produces_partition_start(pk)
.produces_end_of_stream()
.fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(ck2)))
.fast_forward_to(position_range(position_in_partition::for_key(ck1), position_in_partition::after_key(*s, ck2)))
.produces_row_with_key(ck1)
.produces_row_with_key(ck2)
.produces_end_of_stream();
@@ -1405,10 +1405,10 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1))
.produces_row_with_key(s.make_ckey(5))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(7)), t2))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::after_key(s.make_ckey(12)), tombstone()))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(12)), tombstone()))
.produces_row_with_key(s.make_ckey(15))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(17)), t3))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::after_key(s.make_ckey(19)), tombstone()))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(19)), tombstone()))
.produces_partition_end()
.produces_end_of_stream();
@@ -1422,7 +1422,7 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
.produces_end_of_stream()
.fast_forward_to(position_range(
position_in_partition::after_key(s.make_ckey(0)),
position_in_partition::after_key(*s.schema(), s.make_ckey(0)),
position_in_partition::before_key(s.make_ckey(2))))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(2)), {}))
@@ -1430,10 +1430,10 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
.fast_forward_to(position_range(
position_in_partition::before_key(s.make_ckey(5)),
position_in_partition::after_key(s.make_ckey(5))))
position_in_partition::after_key(*s.schema(), s.make_ckey(5))))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(5)), t1))
.produces_row_with_key(s.make_ckey(5))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::after_key(s.make_ckey(5)), {}))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(5)), {}))
.produces_end_of_stream();
assert_that(ms.make_reader_v2(s.schema(), semaphore.make_permit(), pr,
@@ -1445,7 +1445,7 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
.produces_partition_start(pkey)
.produces_end_of_stream()
.fast_forward_to(position_range(
position_in_partition::after_key(s.make_ckey(0)),
position_in_partition::after_key(*s.schema(), s.make_ckey(0)),
position_in_partition::for_key(s.make_ckey(2))))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(2)), {}))
@@ -1535,19 +1535,19 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
position_in_partition::before_key(s.make_ckey(10)),
position_in_partition::before_key(s.make_ckey(13))))
.produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(10)), t2})
.produces_range_tombstone_change({position_in_partition_view::after_key(s.make_ckey(12)), {}})
.produces_range_tombstone_change({position_in_partition::after_key(*s.schema(), s.make_ckey(12)), {}})
.produces_end_of_stream()
.fast_forward_to(position_range(
position_in_partition::before_key(s.make_ckey(16)),
position_in_partition::after_key(s.make_ckey(16))))
position_in_partition::after_key(*s.schema(), s.make_ckey(16))))
.produces_end_of_stream()
.fast_forward_to(position_range(
position_in_partition::before_key(s.make_ckey(17)),
position_in_partition::after_key(s.make_ckey(18))))
position_in_partition::after_key(*s.schema(), s.make_ckey(18))))
.produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(17)), t3})
.produces_range_tombstone_change({position_in_partition_view::after_key(s.make_ckey(18)), {}})
.produces_range_tombstone_change({position_in_partition::after_key(*s.schema(), s.make_ckey(18)), {}})
.produces_end_of_stream();
// Slicing using query restrictions
@@ -1559,7 +1559,7 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
assert_that(ms.make_reader_v2(s.schema(), semaphore.make_permit(), pr, slice))
.produces_partition_start(pkey)
.produces_range_tombstone_change({position_in_partition_view::before_key(s.make_ckey(17)), t3})
.produces_range_tombstone_change({position_in_partition_view::after_key(s.make_ckey(18)), {}})
.produces_range_tombstone_change({position_in_partition::after_key(*s.schema(), s.make_ckey(18)), {}})
.produces_partition_end()
.produces_end_of_stream();
}
@@ -1573,9 +1573,9 @@ void test_range_tombstones_v2(tests::reader_concurrency_semaphore_wrapper& semap
.produces_partition_start(pkey)
.produces_row_with_key(s.make_ckey(0))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(1)), t1))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::after_key(s.make_ckey(3)), {}))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(3)), {}))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::before_key(s.make_ckey(8)), t2))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition_view::after_key(s.make_ckey(11)), {}))
.produces_range_tombstone_change(range_tombstone_change(position_in_partition::after_key(*s.schema(), s.make_ckey(11)), {}))
.produces_partition_end()
.produces_end_of_stream();
}
@@ -2310,7 +2310,7 @@ public:
}
}
} else {
m.partition().clustered_row(*_schema, position_in_partition_view::after_key(ckey), is_dummy::yes, continuous);
m.partition().clustered_row(*_schema, position_in_partition::after_key(*_schema, ckey), is_dummy::yes, continuous);
}
}