From ab59e7c7254b1f6da15abd2fd54c0e3c9776a439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 16 Sep 2020 15:43:29 +0300 Subject: [PATCH 1/7] flat_mutation_reader: add buffer() accessor To allow outsiders to inspect the contents of the reader's buffer. --- flat_mutation_reader.hh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flat_mutation_reader.hh b/flat_mutation_reader.hh index a7b0a4ca23..4190fee4af 100644 --- a/flat_mutation_reader.hh +++ b/flat_mutation_reader.hh @@ -470,6 +470,9 @@ public: size_t buffer_size() const { return _impl->buffer_size(); } + const circular_buffer& buffer() const { + return _impl->buffer(); + } // Detach the internal buffer of the reader. // Roughly equivalent to depleting it by calling pop_mutation_fragment() // until is_buffer_empty() returns true. From d7d93aef4987dcf061be6ba2909140aca688b495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Fri, 25 Sep 2020 11:34:21 +0300 Subject: [PATCH 2/7] position_in_partition_view: add position_in_partition_view before_key() overload --- position_in_partition.hh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/position_in_partition.hh b/position_in_partition.hh index 453398fc90..7dbbd987cc 100644 --- a/position_in_partition.hh +++ b/position_in_partition.hh @@ -163,6 +163,11 @@ public: return {partition_region::clustered, bound_weight::before_all_prefixed, &ck}; } + // Returns a view to before_key(pos._ck) if pos.is_clustering_row() else returns pos as-is. + static position_in_partition_view before_key(position_in_partition_view pos) { + return {partition_region::clustered, pos._bound_weight == bound_weight::equal ? bound_weight::before_all_prefixed : pos._bound_weight, pos._ck}; + } + partition_region region() const { return _type; } bound_weight get_bound_weight() const { return _bound_weight; } bool is_partition_start() const { return _type == partition_region::partition_start; } From 4f2e7a18e2325443384b6fbc5bd5b68e874bb908 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 21 Sep 2020 15:06:05 +0300 Subject: [PATCH 3/7] evictable_reader: trim range tombstones to the read clustering range Currently mutation sources are allowed to emit range tombstones that are out-of the clustering read range if they are relevant to it. For example a read of a clustering range [ck100, +inf), might start with: range_tombstone{start={ck1, -1}, end={ck200, 1}}, clustering_row{ck100} The range tombstone is relevant to the range and the first row of the range so it is emitted as first, but its position (start) is outside the read range. This is normally fine, but it poses a problem for evictable reader. When the underlying reader is evicted and has to be recreated from a certain clustering position, this results in out-of-order mutation fragments being inserted into the middle of the stream. This is not fine anymore as the monotonicity guarantee of the stream is violated. The real solution would be to require all mutation sources to trim range tombstones to their read range, but this is a lot of work. Until that is done, as a workaround we do this trimming in the evictable reader itself. --- mutation_reader.cc | 41 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 591754e070..130d3f6de1 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1029,6 +1029,9 @@ private: bool _reader_created = false; bool _drop_partition_start = false; bool _drop_static_row = false; + // Trim range tombstones on the start of the buffer to the start of the read + // range (_next_position_in_partition). Set after reader recreation. + bool _trim_range_tombstones = false; position_in_partition::tri_compare _tri_cmp; std::optional _last_pkey; @@ -1051,6 +1054,7 @@ private: flat_mutation_reader recreate_reader(); flat_mutation_reader resume_or_create_reader(); bool should_drop_fragment(const mutation_fragment& mf); + bool maybe_trim_range_tombstone(mutation_fragment& mf) const; future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout); future<> fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout); @@ -1194,6 +1198,8 @@ flat_mutation_reader evictable_reader::recreate_reader() { range = &*_range_override; } + _trim_range_tombstones = true; + return _ms.make_reader( _schema, _permit, @@ -1232,6 +1238,28 @@ bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) { return false; } +bool evictable_reader::maybe_trim_range_tombstone(mutation_fragment& mf) const { + // We either didn't read a partition yet (evicted after fast-forwarding) or + // didn't stop in a clustering region. We don't need to trim range + // tombstones in either case. + if (!_last_pkey || _next_position_in_partition.region() != partition_region::clustered) { + return false; + } + if (!mf.is_range_tombstone()) { + return false; + } + + if (_tri_cmp(mf.position(), _next_position_in_partition) >= 0) { + return false; // rt in range, no need to trim + } + + auto& rt = mf.as_mutable_range_tombstone(); + + rt.set_start(*_schema, position_in_partition_view::before_key(_next_position_in_partition)); + + return true; +} + future<> evictable_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) { if (!_drop_partition_start && !_drop_static_row) { return reader.fill_buffer(timeout); @@ -1251,6 +1279,11 @@ future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout if (reader.is_buffer_empty()) { return make_ready_future<>(); } + while (_trim_range_tombstones && !reader.is_buffer_empty()) { + auto mf = reader.pop_mutation_fragment(); + _trim_range_tombstones = maybe_trim_range_tombstone(mf); + push_mutation_fragment(std::move(mf)); + } reader.move_buffer_content_to(*this); auto stop = [this, &reader] { // The only problematic fragment kind is the range tombstone. @@ -1291,7 +1324,13 @@ future<> evictable_reader::fill_buffer(flat_mutation_reader& reader, db::timeout if (reader.is_buffer_empty()) { return do_fill_buffer(reader, timeout); } - push_mutation_fragment(reader.pop_mutation_fragment()); + if (_trim_range_tombstones) { + auto mf = reader.pop_mutation_fragment(); + _trim_range_tombstones = maybe_trim_range_tombstone(mf); + push_mutation_fragment(std::move(mf)); + } else { + push_mutation_fragment(reader.pop_mutation_fragment()); + } return make_ready_future<>(); }); }).then([this, &reader] { From d1b0573e1ca612cd1ed4c2f69625daab358750c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 23 Sep 2020 17:48:42 +0300 Subject: [PATCH 4/7] mutation_reader_test: add unit test for evictable reader range tombstone trimming --- test/boost/mutation_reader_test.cc | 145 +++++++++++++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 0cf65dafcc..1378fe0713 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -2984,3 +2984,148 @@ SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source) run_mutation_source_tests(make_populate); } + +namespace { + +std::deque copy_fragments(const schema& s, const std::deque& o) { + std::deque buf; + for (const auto& mf : o) { + buf.emplace_back(s, mf); + } + return buf; +} + +flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer( + schema_ptr schema, + reader_permit permit, + const dht::partition_range& prange, + const query::partition_slice& slice, + std::deque first_buffer, + position_in_partition_view last_fragment_position, + std::deque second_buffer, + size_t max_buffer_size) { + class factory { + schema_ptr _schema; + std::optional> _first_buffer; + std::optional> _second_buffer; + size_t _max_buffer_size; + + private: + std::optional> copy_buffer(const std::optional>& o) { + if (!o) { + return {}; + } + return copy_fragments(*_schema, *o); + } + + public: + factory(schema_ptr schema, std::deque first_buffer, std::deque second_buffer, size_t max_buffer_size) + : _schema(std::move(schema)), _first_buffer(std::move(first_buffer)), _second_buffer(std::move(second_buffer)), _max_buffer_size(max_buffer_size) { + } + + factory(const factory& o) + : _schema(o._schema) + , _first_buffer(copy_buffer(o._first_buffer)) + , _second_buffer(copy_buffer(o._second_buffer)) { + } + factory(factory&& o) = default; + + flat_mutation_reader operator()( + schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd_sm, + mutation_reader::forwarding fwd_mr) { + BOOST_REQUIRE(s == _schema); + if (_first_buffer) { + auto buf = *std::exchange(_first_buffer, {}); + auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf)); + rd.set_max_buffer_size(_max_buffer_size); + return rd; + } + if (_second_buffer) { + auto buf = *std::exchange(_second_buffer, {}); + auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf)); + rd.set_max_buffer_size(_max_buffer_size); + return rd; + } + return make_empty_flat_reader(_schema); + } + }; + auto ms = mutation_source(factory(schema, std::move(first_buffer), std::move(second_buffer), max_buffer_size)); + + auto [rd, handle] = make_manually_paused_evictable_reader( + std::move(ms), + schema, + permit, + prange, + slice, + seastar::default_priority_class(), + nullptr, + mutation_reader::forwarding::yes); + + rd.set_max_buffer_size(max_buffer_size); + + rd.fill_buffer(db::no_timeout).get0(); + + const auto eq_cmp = position_in_partition::equal_compare(*schema); + BOOST_REQUIRE(rd.is_buffer_full()); + BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), last_fragment_position)); + BOOST_REQUIRE(!rd.is_end_of_stream()); + + rd.detach_buffer(); + + handle.pause(); + + while(permit.semaphore().try_evict_one_inactive_read()); + + return std::move(rd); +} + +} + +SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) { + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); + simple_schema s; + + const auto pkey = s.make_pkey(); + size_t max_buffer_size = 512; + const int first_ck = 100; + const int second_buffer_ck = first_ck + 100; + + size_t mem_usage = 0; + + std::deque first_buffer; + first_buffer.emplace_back(partition_start{pkey, {}}); + mem_usage = first_buffer.back().memory_usage(*s.schema()); + for (int i = 0; i < second_buffer_ck; ++i) { + first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v")); + mem_usage += first_buffer.back().memory_usage(*s.schema()); + } + const auto last_fragment_position = position_in_partition(first_buffer.back().position()); + max_buffer_size = mem_usage; + first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v")); + + std::deque second_buffer; + second_buffer.emplace_back(partition_start{pkey, {}}); + mem_usage = second_buffer.back().memory_usage(*s.schema()); + second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10)))); + int ckey = second_buffer_ck; + while (mem_usage <= max_buffer_size) { + second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v")); + mem_usage += second_buffer.back().memory_usage(*s.schema()); + } + second_buffer.emplace_back(partition_end{}); + + auto rd = create_evictable_reader_and_evict_after_first_buffer(s.schema(), semaphore.make_permit(), query::full_partition_range, + s.schema()->full_slice(), std::move(first_buffer), last_fragment_position, std::move(second_buffer), max_buffer_size); + + rd.fill_buffer(db::no_timeout).get(); + + const auto tri_cmp = position_in_partition::tri_compare(*s.schema()); + + BOOST_REQUIRE(tri_cmp(last_fragment_position, rd.peek_buffer().position()) < 0); +} From 91020eef735f9ec20e9616bccd56e55f81403f7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Tue, 22 Sep 2020 13:39:01 +0300 Subject: [PATCH 5/7] evictable_reader: update_next_position(): only use peek'd position on partition boundary `evictable_reader::update_next_position()` is used to record the position the reader will continue from, in the next buffer fill. This position is used to create the partition slice when the underlying reader is evicted and has to be recreated. There is an optimization in this method -- if the underlying's buffer is not empty we peek at the first fragment in it and use it as the next position. This is however problematic for buffer validation on reader recreation (introduced in the next patch), because using the next row's position as the next pos will allow for range tombstones to be emitted with before_key(next_pos.key()), which will trigger the validation. Instead of working around this, just drop this optimization for mid-partition positions, it is inconsequential anyway. We keep it for where it is important, when we detect that we are at a partition boundary. In this case we can avoid reading the current partition altogether when recreating the reader. --- mutation_reader.cc | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 130d3f6de1..1bca9acc79 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -1128,16 +1128,11 @@ void evictable_reader::update_next_position(flat_mutation_reader& reader) { _next_position_in_partition = position_in_partition::before_all_clustered_rows(); break; case partition_region::clustered: - if (reader.is_buffer_empty()) { - _next_position_in_partition = position_in_partition::after_key(last_pos); - } else { - const auto& next_frag = reader.peek_buffer(); - if (next_frag.is_end_of_partition()) { + if (!reader.is_buffer_empty() && reader.peek_buffer().is_end_of_partition()) { push_mutation_fragment(reader.pop_mutation_fragment()); _next_position_in_partition = position_in_partition::for_partition_start(); - } else { - _next_position_in_partition = position_in_partition(next_frag.position()); - } + } else { + _next_position_in_partition = position_in_partition::after_key(last_pos); } break; case partition_region::partition_end: From 0b0ae18a147a2053556167f28d5216d5ae086edf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 16 Sep 2020 15:44:05 +0300 Subject: [PATCH 6/7] evictable_reader: validate buffer after recreation the underlying The reader recreation mechanism is a very delicate and error-prone one, as proven by the countless bugs it had. Most of these bugs were related to the recreated reader not continuing the read from the expected position, inserting out-of-order fragments into the stream. This patch adds a defense mechanism against such bugs by validating the start position of the recreated reader. Several things are checked: * The partition is the expected one -- the one we were in the middle of or the next if we stopped at partition boundaries. * The partition is in the read range. * The first fragment in the partition is the expected one -- has a an equal or larger position than the next expected fragment. * The fragment is in the clustering range as defined by the slice. As these validations are only done on the slow-path of recreating an evicted reader, no performance impact is expected. --- mutation_reader.cc | 101 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 100 insertions(+), 1 deletion(-) diff --git a/mutation_reader.cc b/mutation_reader.cc index 1bca9acc79..70e0f8aaef 100644 --- a/mutation_reader.cc +++ b/mutation_reader.cc @@ -30,6 +30,7 @@ #include "schema_registry.hh" #include "mutation_compactor.hh" +logging::logger mrlog("mutation_reader"); static constexpr size_t merger_small_vector_size = 4; @@ -1031,7 +1032,11 @@ private: bool _drop_static_row = false; // Trim range tombstones on the start of the buffer to the start of the read // range (_next_position_in_partition). Set after reader recreation. + // Also validate the first not-trimmed mutation fragment's position. bool _trim_range_tombstones = false; + // Validate the partition key of the first emitted partition, set after the + // reader was recreated. + bool _validate_partition_key = false; position_in_partition::tri_compare _tri_cmp; std::optional _last_pkey; @@ -1053,6 +1058,8 @@ private: void adjust_partition_slice(); flat_mutation_reader recreate_reader(); flat_mutation_reader resume_or_create_reader(); + void maybe_validate_partition_start(const circular_buffer& buffer); + void validate_position_in_partition(position_in_partition_view pos) const; bool should_drop_fragment(const mutation_fragment& mf); bool maybe_trim_range_tombstone(mutation_fragment& mf) const; future<> do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout); @@ -1157,6 +1164,9 @@ flat_mutation_reader evictable_reader::recreate_reader() { const dht::partition_range* range = _pr; const query::partition_slice* slice = &_ps; + _range_override.reset(); + _slice_override.reset(); + if (_last_pkey) { bool partition_range_is_inclusive = true; @@ -1194,6 +1204,7 @@ flat_mutation_reader evictable_reader::recreate_reader() { } _trim_range_tombstones = true; + _validate_partition_key = true; return _ms.make_reader( _schema, @@ -1221,6 +1232,78 @@ flat_mutation_reader evictable_reader::resume_or_create_reader() { return recreate_reader(); } +template +static void require(bool condition, const char* msg, const Arg&... arg) { + if (!condition) { + on_internal_error(mrlog, format(msg, arg...)); + } +} + +void evictable_reader::maybe_validate_partition_start(const circular_buffer& buffer) { + if (!_validate_partition_key || buffer.empty()) { + return; + } + + // If this is set we can assume the first fragment is a partition-start. + const auto& ps = buffer.front().as_partition_start(); + const auto tri_cmp = dht::ring_position_comparator(*_schema); + // If we recreated the reader after fast-forwarding it we won't have + // _last_pkey set. In this case it is enough to check if the partition + // is in range. + if (_last_pkey) { + const auto cmp_res = tri_cmp(*_last_pkey, ps.key()); + if (_drop_partition_start) { // should be the same partition + require( + cmp_res == 0, + "{}(): validation failed, expected partition with key equal to _last_pkey {} due to _drop_partition_start being set, but got {}", + __FUNCTION__, + *_last_pkey, + ps.key()); + } else { // should be a larger partition + require( + cmp_res < 0, + "{}(): validation failed, expected partition with key larger than _last_pkey {} due to _drop_partition_start being unset, but got {}", + __FUNCTION__, + *_last_pkey, + ps.key()); + } + } + const auto& prange = _range_override ? *_range_override : *_pr; + require( + // TODO: somehow avoid this copy + prange.contains(ps.key(), tri_cmp), + "{}(): validation failed, expected partition with key that falls into current range {}, but got {}", + __FUNCTION__, + prange, + ps.key()); + + _validate_partition_key = false; +} + +void evictable_reader::validate_position_in_partition(position_in_partition_view pos) const { + require( + _tri_cmp(_next_position_in_partition, pos) <= 0, + "{}(): validation failed, expected position in partition that is larger-than-equal than _next_position_in_partition {}, but got {}", + __FUNCTION__, + _next_position_in_partition, + pos); + + if (_slice_override && pos.region() == partition_region::clustered) { + const auto ranges = _slice_override->row_ranges(*_schema, _last_pkey->key()); + const bool any_contains = std::any_of(ranges.begin(), ranges.end(), [this, &pos] (const query::clustering_range& cr) { + // TODO: somehow avoid this copy + auto range = position_range(cr); + return range.contains(*_schema, pos); + }); + require( + any_contains, + "{}(): validation failed, expected clustering fragment that is included in the slice {}, but got {}", + __FUNCTION__, + *_slice_override, + pos); + } +} + bool evictable_reader::should_drop_fragment(const mutation_fragment& mf) { if (_drop_partition_start && mf.is_partition_start()) { _drop_partition_start = false; @@ -1241,15 +1324,24 @@ bool evictable_reader::maybe_trim_range_tombstone(mutation_fragment& mf) const { return false; } if (!mf.is_range_tombstone()) { + validate_position_in_partition(mf.position()); return false; } if (_tri_cmp(mf.position(), _next_position_in_partition) >= 0) { + validate_position_in_partition(mf.position()); return false; // rt in range, no need to trim } auto& rt = mf.as_mutable_range_tombstone(); + require( + _tri_cmp(_next_position_in_partition, rt.end_position()) <= 0, + "{}(): validation failed, expected range tombstone with end pos larger than _next_position_in_partition {}, but got {}", + __FUNCTION__, + _next_position_in_partition, + rt.end_position()); + rt.set_start(*_schema, position_in_partition_view::before_key(_next_position_in_partition)); return true; @@ -1257,10 +1349,17 @@ bool evictable_reader::maybe_trim_range_tombstone(mutation_fragment& mf) const { future<> evictable_reader::do_fill_buffer(flat_mutation_reader& reader, db::timeout_clock::time_point timeout) { if (!_drop_partition_start && !_drop_static_row) { - return reader.fill_buffer(timeout); + auto fill_buf_fut = reader.fill_buffer(timeout); + if (_validate_partition_key) { + fill_buf_fut = fill_buf_fut.then([this, &reader] { + maybe_validate_partition_start(reader.buffer()); + }); + } + return fill_buf_fut; } return repeat([this, &reader, timeout] { return reader.fill_buffer(timeout).then([this, &reader] { + maybe_validate_partition_start(reader.buffer()); while (!reader.is_buffer_empty() && should_drop_fragment(reader.peek_buffer())) { reader.pop_mutation_fragment(); } From 076c27318b2961b611dd521272d23687de190efc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Wed, 23 Sep 2020 17:48:53 +0300 Subject: [PATCH 7/7] mutation_reader_test: add unit test for evictable reader self-validation Add both positive (where the validation should succeed) and negative (where the validation should fail) tests, covering all validation cases. --- test/boost/mutation_reader_test.cc | 340 +++++++++++++++++++++++++++++ 1 file changed, 340 insertions(+) diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 1378fe0713..ead559083b 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -3129,3 +3129,343 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) { BOOST_REQUIRE(tri_cmp(last_fragment_position, rd.peek_buffer().position()) < 0); } + +namespace { + +void check_evictable_reader_validation_is_triggered( + std::string_view test_name, + std::string_view error_prefix, // empty str if no exception is expected + schema_ptr schema, + reader_permit permit, + const dht::partition_range& prange, + const query::partition_slice& slice, + std::deque first_buffer, + position_in_partition_view last_fragment_position, + std::deque second_buffer, + size_t max_buffer_size) { + + testlog.info("check_evictable_reader_validation_is_triggered(): checking {} test case: {}", error_prefix.empty() ? "positive" : "negative", test_name); + + auto rd = create_evictable_reader_and_evict_after_first_buffer(std::move(schema), std::move(permit), prange, slice, std::move(first_buffer), + last_fragment_position, std::move(second_buffer), max_buffer_size); + + const bool fail = !error_prefix.empty(); + + try { + rd.fill_buffer(db::no_timeout).get0(); + } catch (std::runtime_error& e) { + if (fail) { + if (error_prefix == std::string_view(e.what(), error_prefix.size())) { + testlog.trace("Expected exception caught: {}", std::current_exception()); + return; + } else { + BOOST_FAIL(fmt::format("Exception with unexpected message caught: {}", std::current_exception())); + } + } else { + BOOST_FAIL(fmt::format("Unexpected exception caught: {}", std::current_exception())); + } + } + if (fail) { + BOOST_FAIL(fmt::format("Expected exception not thrown")); + } +} + +} + +SEASTAR_THREAD_TEST_CASE(test_evictable_reader_self_validation) { + set_abort_on_internal_error(false); + auto reset_on_internal_abort = defer([] { + set_abort_on_internal_error(true); + }); + + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); + simple_schema s; + + auto pkeys = s.make_pkeys(4); + std::ranges::sort(pkeys, dht::decorated_key::less_comparator(s.schema())); + + size_t max_buffer_size = 512; + const int first_ck = 100; + const int second_buffer_ck = first_ck + 100; + const int last_ck = second_buffer_ck + 100; + + static const char partition_error_prefix[] = "maybe_validate_partition_start(): validation failed"; + static const char position_in_partition_error_prefix[] = "validate_position_in_partition(): validation failed"; + static const char trim_range_tombstones_error_prefix[] = "maybe_trim_range_tombstone(): validation failed"; + + const auto prange = dht::partition_range::make( + dht::partition_range::bound(pkeys[1], true), + dht::partition_range::bound(pkeys[2], true)); + + const auto ckrange = query::clustering_range::make( + query::clustering_range::bound(s.make_ckey(first_ck), true), + query::clustering_range::bound(s.make_ckey(last_ck), true)); + + const auto slice = partition_slice_builder(*s.schema()).with_range(ckrange).build(); + + std::deque first_buffer; + first_buffer.emplace_back(partition_start{pkeys[1], {}}); + size_t mem_usage = first_buffer.back().memory_usage(*s.schema()); + for (int i = 0; i < second_buffer_ck; ++i) { + first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v")); + mem_usage += first_buffer.back().memory_usage(*s.schema()); + } + max_buffer_size = mem_usage; + auto last_fragment_position = position_in_partition(first_buffer.back().position()); + first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v")); + + auto make_second_buffer = [&s, &max_buffer_size, second_buffer_ck] (dht::decorated_key pkey, std::optional first_ckey = {}, + bool inject_range_tombstone = false) mutable { + auto ckey = first_ckey ? *first_ckey : second_buffer_ck; + std::deque second_buffer; + second_buffer.emplace_back(partition_start{std::move(pkey), {}}); + size_t mem_usage = second_buffer.back().memory_usage(*s.schema()); + if (inject_range_tombstone) { + second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(last_ck)))); + } + while (mem_usage <= max_buffer_size) { + second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v")); + mem_usage += second_buffer.back().memory_usage(*s.schema()); + } + second_buffer.emplace_back(partition_end{}); + return second_buffer; + }; + + // + // Continuing the same partition + // + + check_evictable_reader_validation_is_triggered( + "pkey < _last_pkey; pkey ∉ prange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[0]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey", + "", + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange (<)", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], first_ck - 10), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange (<); start with trimmable range-tombstone", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], first_ck - 10, true), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange; position_in_partition < _next_position_in_partition", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], second_buffer_ck - 2), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange; position_in_partition < _next_position_in_partition; start with trimmable range-tombstone", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], second_buffer_ck - 2, true), + max_buffer_size); + + { + auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck); + second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck - 10))); + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; end(range_tombstone) < _next_position_in_partition", + trim_range_tombstones_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + std::move(second_buffer), + max_buffer_size); + } + + { + auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck); + second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10))); + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; end(range_tombstone) > _next_position_in_partition", + "", + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + std::move(second_buffer), + max_buffer_size); + } + + { + auto second_buffer = make_second_buffer(pkeys[1], second_buffer_ck); + second_buffer[1] = s.make_range_tombstone(query::clustering_range::make_starting_with(s.make_ckey(last_ck + 10))); + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; start(range_tombstone) ∉ ckrange (>)", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + std::move(second_buffer), + max_buffer_size); + } + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∈ ckrange", + "", + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], second_buffer_ck), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey; position_in_partition ∉ ckrange (>)", + position_in_partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1], last_ck + 10), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey > _last_pkey; pkey ∈ pkrange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[2]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey > _last_pkey; pkey ∉ pkrange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[3]), + max_buffer_size); + + // + // Continuing from next partition + // + + first_buffer.clear(); + + first_buffer.emplace_back(partition_start{pkeys[1], {}}); + mem_usage = first_buffer.back().memory_usage(*s.schema()); + for (int i = 0; i < second_buffer_ck; ++i) { + first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v")); + mem_usage += first_buffer.back().memory_usage(*s.schema()); + } + first_buffer.emplace_back(partition_end{}); + mem_usage += first_buffer.back().memory_usage(*s.schema()); + last_fragment_position = position_in_partition(first_buffer.back().position()); + max_buffer_size = mem_usage; + first_buffer.emplace_back(partition_start{pkeys[2], {}}); + + check_evictable_reader_validation_is_triggered( + "pkey < _last_pkey; pkey ∉ pkrange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[0]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey == _last_pkey", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[1]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey > _last_pkey; pkey ∈ pkrange", + "", + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[2]), + max_buffer_size); + + check_evictable_reader_validation_is_triggered( + "pkey > _last_pkey; pkey ∉ pkrange", + partition_error_prefix, + s.schema(), + semaphore.make_permit(), + prange, + slice, + copy_fragments(*s.schema(), first_buffer), + last_fragment_position, + make_second_buffer(pkeys[3]), + max_buffer_size); +}