diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index a7b0a4ca23..4190fee4af 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -470,6 +470,9 @@ public: size_t buffer_size() const { return _impl->buffer_size(); } + const circular_buffer& buffer() const { + return _impl->buffer(); + } // Detach the internal buffer of the reader. // Roughly equivalent to depleting it by calling pop_mutation_fragment() // until is_buffer_empty() returns true. diff --git a/mutation_reader.cc b/mutation_reader.cc index 591754e070..70e0f8aaef 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -30,6 +30,7 @@ #include "schema_registry.hh" #include "mutation_compactor.hh" +logging::logger mrlog("mutation_reader"); static constexpr size_t merger_small_vector_size = 4; @@ -1029,6 +1030,13 @@ private: bool _reader_created = false; bool _drop_partition_start = false; bool _drop_static_row = false; + // Trim range tombstones on the start of the buffer to the start of the read + // range (_next_position_in_partition). Set after reader recreation. + // Also validate the first not-trimmed mutation fragment's position. + bool _trim_range_tombstones = false; + // Validate the partition key of the first emitted partition, set after the + // reader was recreated. + bool _validate_partition_key = false; position_in_partition::tri_compare _tri_cmp; std::optional _last_pkey; @@ -1050,7 +1058,10 @@ private: void adjust_partition_slice(); flat_mutation_reader recreate_reader(); flat_mutation_reader resume_or_create_reader(); + void maybe_validate_partition_start(const circular_buffer& buffer); + void validate_position_in_partition(position_in_partition_view pos) const; bool should_drop_fragment(const mutation_fragment& mf); + bool maybe_trim_range_tombstone(mutation_fragment& mf) const; future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout); future<> fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout); @@ -1124,16 +1135,11 @@ void evictable_reader::update_next_position(flat_mutation_reader& reader) { _next_position_in_partition = position_in_partition::before_all_clustered_rows(); break; case partition_region::clustered: - if (reader.is_buffer_empty()) { - _next_position_in_partition = position_in_partition::after_key(last_pos); - } else { - const auto& next_frag = reader.peek_buffer(); - if (next_frag.is_end_of_partition()) { + if (!reader.is_buffer_empty() && reader.peek_buffer().is_end_of_partition()) { push_mutation_fragment(reader.pop_mutation_fragment()); _next_position_in_partition = position_in_partition::for_partition_start(); - } else { - _next_position_in_partition = position_in_partition(next_frag.position()); - } + } else { + _next_position_in_partition = position_in_partition::after_key(last_pos); } break; case partition_region::partition_end: @@ -1158,6 +1164,9 @@ flat_mutation_reader evictable_reader::recreate_reader() { const dht::partition_range* range = _pr; const query::partition_slice* slice = &_ps; + _range_override.reset(); + _slice_override.reset(); + if (_last_pkey) { bool partition_range_is_inclusive = true; @@ -1194,6 +1203,9 @@ flat_mutation_reader evictable_reader::recreate_reader() { range = &*_range_override; } + _trim_range_tombstones = true; + _validate_partition_key = true; + return _ms.make_reader( _schema, _permit, @@ -1220,6 +1232,78 @@ flat_mutation_reader evictable_reader::resume_or_create_reader() { return recreate_reader(); } +template +static void require(bool condition, const char* msg, const Arg&... arg) { + if (!condition) { + on_internal_error(mrlog, format(msg, arg...)); + } +} + +void evictable_reader::maybe_validate_partition_start(const circular_buffer& buffer) { + if (!_validate_partition_key || buffer.empty()) { + return; + } + + // If this is set we can assume the first fragment is a partition-start. + const auto& ps = buffer.front().as_partition_start(); + const auto tri_cmp = dht::ring_position_comparator(*_schema); + // If we recreated the reader after fast-forwarding it we won't have + // _last_pkey set. In this case it is enough to check if the partition + // is in range. + if (_last_pkey) { + const auto cmp_res = tri_cmp(*_last_pkey, ps.key()); + if (_drop_partition_start) { // should be the same partition + require( + cmp_res == 0, + "{}(): validation failed, expected partition with key equal to _last_pkey {} due to _drop_partition_start being set, but got {}", + __FUNCTION__, + *_last_pkey, + ps.key()); + } else { // should be a larger partition + require( + cmp_res < 0, + "{}(): validation failed, expected partition with key larger than _last_pkey {} due to _drop_partition_start being unset, but got {}", + __FUNCTION__, + *_last_pkey, + ps.key()); + } + } + const auto& prange = _range_override ? *_range_override : *_pr; + require( + // TODO: somehow avoid this copy + prange.contains(ps.key(), tri_cmp), + "{}(): validation failed, expected partition with key that falls into current range {}, but got {}", + __FUNCTION__, + prange, + ps.key()); + + _validate_partition_key = false; +} + +void evictable_reader::validate_position_in_partition(position_in_partition_view pos) const { + require( + _tri_cmp(_next_position_in_partition, pos) <= 0, + "{}(): validation failed, expected position in partition that is larger-than-equal than _next_position_in_partition {}, but got {}", + __FUNCTION__, + _next_position_in_partition, + pos); + + if (_slice_override && pos.region() == partition_region::clustered) { + const auto ranges = _slice_override->row_ranges(*_schema, _last_pkey->key()); + const bool any_contains = std::any_of(ranges.begin(), ranges.end(), [this, &pos] (const query::clustering_range& cr) { + // TODO: somehow avoid this copy + auto range = position_range(cr); + return range.contains(*_schema, pos); + }); + require( + any_contains, + "{}(): validation failed, expected clustering fragment that is included in the slice {}, but got {}", + __FUNCTION__, + *_slice_override, + pos); + } +} + bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) { if (_drop_partition_start && mf.is_partition_start()) { _drop_partition_start = false; @@ -1232,12 +1316,50 @@ bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) { return false; } +bool evictable_reader::maybe_trim_range_tombstone(mutation_fragment& mf) const { + // We either didn't read a partition yet (evicted after fast-forwarding) or + // didn't stop in a clustering region. We don't need to trim range + // tombstones in either case. + if (!_last_pkey || _next_position_in_partition.region() != partition_region::clustered) { + return false; + } + if (!mf.is_range_tombstone()) { + validate_position_in_partition(mf.position()); + return false; + } + + if (_tri_cmp(mf.position(), _next_position_in_partition) >= 0) { + validate_position_in_partition(mf.position()); + return false; // rt in range, no need to trim + } + + auto& rt = mf.as_mutable_range_tombstone(); + + require( + _tri_cmp(_next_position_in_partition, rt.end_position()) <= 0, + "{}(): validation failed, expected range tombstone with end pos larger than _next_position_in_partition {}, but got {}", + __FUNCTION__, + _next_position_in_partition, + rt.end_position()); + + rt.set_start(*_schema, position_in_partition_view::before_key(_next_position_in_partition)); + + return true; +} + future<> evictable_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) { if (!_drop_partition_start && !_drop_static_row) { - return reader.fill_buffer(timeout); + auto fill_buf_fut = reader.fill_buffer(timeout); + if (_validate_partition_key) { + fill_buf_fut = fill_buf_fut.then([this, &reader] { + maybe_validate_partition_start(reader.buffer()); + }); + } + return fill_buf_fut; } return repeat([this, &reader, timeout] { return reader.fill_buffer(timeout).then([this, &reader] { + maybe_validate_partition_start(reader.buffer()); while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) { reader.pop_mutation_fragment(); } @@ -1251,6 +1373,11 @@ future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout if (reader.is_buffer_empty()) { return make_ready_future<>(); } + while (_trim_range_tombstones && !reader.is_buffer_empty()) { + auto mf = reader.pop_mutation_fragment(); + _trim_range_tombstones = maybe_trim_range_tombstone(mf); + push_mutation_fragment(std::move(mf)); + } reader.move_buffer_content_to(*this); auto stop = [this, &reader] { // The only problematic fragment kind is the range tombstone. @@ -1291,7 +1418,13 @@ future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout if (reader.is_buffer_empty()) { return do_fill_buffer(reader, timeout); } - push_mutation_fragment(reader.pop_mutation_fragment()); + if (_trim_range_tombstones) { + auto mf = reader.pop_mutation_fragment(); + _trim_range_tombstones = maybe_trim_range_tombstone(mf); + push_mutation_fragment(std::move(mf)); + } else { + push_mutation_fragment(reader.pop_mutation_fragment()); + } return make_ready_future<>(); }); }).then([this, &reader] { diff --git a/position_in_partition.hh b/position_in_partition.hh index 453398fc90..7dbbd987cc 100644 --- a/position_in_partition.hh +++ b/position_in_partition.hh @@ -163,6 +163,11 @@ public: return {partition_region::clustered, bound_weight::before_all_prefixed, &ck}; } + // Returns a view to before_key(pos._ck) if pos.is_clustering_row() else returns pos as-is. + static position_in_partition_view before_key(position_in_partition_view pos) { + return {partition_region::clustered, pos._bound_weight == bound_weight::equal ? bound_weight::before_all_prefixed : pos._bound_weight, pos._ck}; + } + partition_region region() const { return _type; } bound_weight get_bound_weight() const { return _bound_weight; } bool is_partition_start() const { return _type == partition_region::partition_start; } diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index bc929e8f46..bc5c2a5734 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -2987,3 +2987,488 @@ SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source) run_mutation_source_tests(make_populate); } + +namespace { + +std::deque copy_fragments(const schema& s, const std::deque& o) { + std::deque buf; + for (const auto& mf : o) { + buf.emplace_back(s, mf); + } + return buf; +} + +flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer( + schema_ptr schema, + reader_permit permit, + const dht::partition_range& prange, + const query::partition_slice& slice, + std::deque first_buffer, + position_in_partition_view last_fragment_position, + std::deque second_buffer, + size_t max_buffer_size) { + class factory { + schema_ptr _schema; + std::optional> _first_buffer; + std::optional> _second_buffer; + size_t _max_buffer_size; + + private: + std::optional> copy_buffer(const std::optional>& o) { + if (!o) { + return {}; + } + return copy_fragments(*_schema, *o); + } + + public: + factory(schema_ptr schema, std::deque first_buffer, std::deque second_buffer, size_t max_buffer_size) + : _schema(std::move(schema)), _first_buffer(std::move(first_buffer)), _second_buffer(std::move(second_buffer)), _max_buffer_size(max_buffer_size) { + } + + factory(const factory& o) + : _schema(o._schema) + , _first_buffer(copy_buffer(o._first_buffer)) + , _second_buffer(copy_buffer(o._second_buffer)) { + } + factory(factory&& o) = default; + + flat_mutation_reader operator()( + schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd_sm, + mutation_reader::forwarding fwd_mr) { + BOOST_REQUIRE(s == _schema); + if (_first_buffer) { + auto buf = *std::exchange(_first_buffer, {}); + auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf)); + rd.set_max_buffer_size(_max_buffer_size); + return rd; + } + if (_second_buffer) { + auto buf = *std::exchange(_second_buffer, {}); + auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf)); + rd.set_max_buffer_size(_max_buffer_size); + return rd; + } + return make_empty_flat_reader(_schema); + } + }; + auto ms = mutation_source(factory(schema, std::move(first_buffer), std::move(second_buffer), max_buffer_size)); + + auto [rd, handle] = make_manually_paused_evictable_reader( + std::move(ms), + schema, + permit, + prange, + slice, + seastar::default_priority_class(), + nullptr, + mutation_reader::forwarding::yes); + + rd.set_max_buffer_size(max_buffer_size); + + rd.fill_buffer(db::no_timeout).get0(); + + const auto eq_cmp = position_in_partition::equal_compare(*schema); + BOOST_REQUIRE(rd.is_buffer_full()); + BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), last_fragment_position)); + BOOST_REQUIRE(!rd.is_end_of_stream()); + + rd.detach_buffer(); + + handle.pause(); + + while(permit.semaphore().try_evict_one_inactive_read()); + + return std::move(rd); +} + +} + +SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) { + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); + simple_schema s; + + const auto pkey = s.make_pkey(); + size_t max_buffer_size = 512; + const int first_ck = 100; + const int second_buffer_ck = first_ck + 100; + + size_t mem_usage = 0; + + std::deque first_buffer; + first_buffer.emplace_back(partition_start{pkey, {}}); + mem_usage = first_buffer.back().memory_usage(*s.schema()); + for (int i = 0; i < second_buffer_ck; ++i) { + first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v")); + mem_usage += first_buffer.back().memory_usage(*s.schema()); + } + const auto last_fragment_position = position_in_partition(first_buffer.back().position()); + max_buffer_size = mem_usage; + first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v")); + + std::deque second_buffer; + second_buffer.emplace_back(partition_start{pkey, {}}); + mem_usage = second_buffer.back().memory_usage(*s.schema()); + second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10)))); + int ckey = second_buffer_ck; + while (mem_usage <= max_buffer_size) { + second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v")); + mem_usage += second_buffer.back().memory_usage(*s.schema()); + } + second_buffer.emplace_back(partition_end{}); + + auto rd = create_evictable_reader_and_evict_after_first_buffer(s.schema(), semaphore.make_permit(), query::full_partition_range, + s.schema()->full_slice(), std::move(first_buffer), last_fragment_position, std::move(second_buffer), max_buffer_size); + + rd.fill_buffer(db::no_timeout).get(); + + const auto tri_cmp = position_in_partition::tri_compare(*s.schema()); + + BOOST_REQUIRE(tri_cmp(last_fragment_position, rd.peek_buffer().position()) < 0); +} + +namespace { + +void check_evictable_reader_validation_is_triggered( + std::string_view test_name, + std::string_view error_prefix, // empty str if no exception is expected + schema_ptr schema, + reader_permit permit, + const dht::partition_range& prange, + const query::partition_slice& slice, + std::deque first_buffer, + position_in_partition_view last_fragment_position, + std::deque second_buffer, + size_t max_buffer_size) { + + testlog.info("check_evictable_reader_validation_is_triggered(): checking {} test case: {}", error_prefix.empty() ? "positive" : "negative", test_name); + + auto rd = create_evictable_reader_and_evict_after_first_buffer(std::move(schema), std::move(permit), prange, slice, std::move(first_buffer), + last_fragment_position, std::move(second_buffer), max_buffer_size); + + const bool fail = !error_prefix.empty(); + + try { + rd.fill_buffer(db::no_timeout).get0(); + } catch (std::runtime_error& e) { + if (fail) { + if (error_prefix == std::string_view(e.what(), error_prefix.size())) { + testlog.trace("Expected exception caught: {}", std::current_exception()); + return; + } else { + BOOST_FAIL(fmt::format("Exception with unexpected message caught: {}", std::current_exception())); + } + } else { + BOOST_FAIL(fmt::format("Unexpected exception caught: {}", std::current_exception())); + } + } + if (fail) { + BOOST_FAIL(fmt::format("Expected exception not thrown")); + } +} + +} + +SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) { + set_abort_on_internal_error(false); + auto reset_on_internal_abort = defer([] { + set_abort_on_internal_error(true); + }); + + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); + simple_schema s; + + auto pkeys = s.make_pkeys(4); + std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); + + size_t max_buffer_size = 512; + const int first_ck = 100; + const int second_buffer_ck = first_ck + 100; + const int last_ck = second_buffer_ck + 100; + + static const char partition_error_prefix[] = "maybe_validate_partition_start(): validation failed"; + static const char position_in_partition_error_prefix[] = "validate_position_in_partition(): validation failed"; + static const char trim_range_tombstones_error_prefix[] = "maybe_trim_range_tombstone(): validation failed"; + + const auto prange = dht::partition_range::make( + dht::partition_range::bound(pkeys[1], true), + dht::partition_range::bound(pkeys[2], true)); + + const auto ckrange = query::clustering_range::make( + query::clustering_range::bound(s.make_ckey(first_ck), true), + query::clustering_range::bound(s.make_ckey(last_ck), true)); + + const auto slice = partition_slice_builder(*s.schema()).with_range(ckrange).build(); + + std::deque first_buffer; + first_buffer.emplace_back(partition_start{pkeys[1], {}}); + size_t mem_usage = first_buffer.back().memory_usage(*s.schema()); + for (int i = 0; i < second_buffer_ck; ++i) { + first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v")); + mem_usage += first_buffer.back().memory_usage(*s.schema()); + } + max_buffer_size = mem_usage; + auto last_fragment_position = position_in_partition(first_buffer.back().position()); + first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v")); + + auto make_second_buffer = [&s, &max_buffer_size, second_buffer_ck] (dht::decorated_key pkey, std::optional first_ckey = {}, + bool inject_range_tombstone = false) mutable { + auto ckey = first_ckey ? *first_ckey : second_buffer_ck; + std::deque second_buffer; + second_buffer.emplace_back(partition_start{std::move(pkey), {}}); + size_t mem_usage = second_buffer.back().memory_usage(*s.schema()); + if (inject_range_tombstone) { + second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(last_ck)))); + } + while (mem_usage <= max_buffer_size) { + second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v")); + mem_usage += second_buffer.back().memory_usage(*s.schema()); + } + second_buffer.emplace_back(partition_end{}); + return second_buffer; + }; + + // + // Continuing the same partition + // + + check_evictable_reader_validation_is_triggered( + "pkey < _last_pkey; pkey ∉ prange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[0]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey", + "", + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange (<)", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], first_ck - 10), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange (<); start with trimmable range-tombstone", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], first_ck - 10, true), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange; position_in_partition < _next_position_in_partition", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], second_buffer_ck - 2), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange; position_in_partition < _next_position_in_partition; start with trimmable range-tombstone", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], second_buffer_ck - 2, true), + max_buffer_size); + + { + auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck); + second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck - 10))); + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; end(range_tombstone) < _next_position_in_partition", + trim_range_tombstones_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + std::move(second_buffer), + max_buffer_size); + } + + { + auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck); + second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10))); + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; end(range_tombstone) > _next_position_in_partition", + "", + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + std::move(second_buffer), + max_buffer_size); + } + + { + auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck); + second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_starting_with(s.make_ckey(last_ck + 10))); + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; start(range_tombstone) ∉ ckrange (>)", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + std::move(second_buffer), + max_buffer_size); + } + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∈ ckrange", + "", + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], second_buffer_ck), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange (>)", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], last_ck + 10), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey > _last_pkey; pkey ∈ pkrange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[2]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey > _last_pkey; pkey ∉ pkrange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[3]), + max_buffer_size); + + // + // Continuing from next partition + // + + first_buffer.clear(); + + first_buffer.emplace_back(partition_start{pkeys[1], {}}); + mem_usage = first_buffer.back().memory_usage(*s.schema()); + for (int i = 0; i < second_buffer_ck; ++i) { + first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v")); + mem_usage += first_buffer.back().memory_usage(*s.schema()); + } + first_buffer.emplace_back(partition_end{}); + mem_usage += first_buffer.back().memory_usage(*s.schema()); + last_fragment_position = position_in_partition(first_buffer.back().position()); + max_buffer_size = mem_usage; + first_buffer.emplace_back(partition_start{pkeys[2], {}}); + + check_evictable_reader_validation_is_triggered( + "pkey < _last_pkey; pkey ∉ pkrange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[0]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey > _last_pkey; pkey ∈ pkrange", + "", + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[2]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey > _last_pkey; pkey ∉ pkrange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[3]), + max_buffer_size); +}