From 5800ce8ddd61f8b93842be57794b8fab891ae0fe Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 27 Jun 2023 14:18:26 +0200 Subject: [PATCH 1/2] test: flat_mutation_reader_assertions: squash `r_t_c`s with the same position test_range_tombstones_v2 is too strict for this reader -- it expects a particular sequence of `range_tombstone_change`s, but multishard_combining_reader, when tested with a small buffer, may generate -- as expected -- additional (redundant) range tombstone change pairs (end+start). Currently we don't observe these redundant fragments due to a bug in `evictable_reader_v2` but they start appearing once we fix the bug and the test must be prepared first. To prepare the test, modify `flat_reader_assertions_v2` so it squashes redundant range tombstone change pairs. This happens only in non-exact mode. Enable exact mode in `test_sstable_reversing_reader_random_schema` for comparing two readers -- the squashing of `r_t_c`s may introduce an artificial difference. --- ...stable_conforms_to_mutation_source_test.cc | 2 +- test/lib/flat_mutation_reader_assertions.hh | 25 ++++++++++++++++--- test/lib/mutation_source_test.cc | 4 +-- test/lib/mutation_source_test.hh | 2 +- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/test/boost/sstable_conforms_to_mutation_source_test.cc b/test/boost/sstable_conforms_to_mutation_source_test.cc index 6b6adcd6cf..bf24f0fc0b 100644 --- a/test/boost/sstable_conforms_to_mutation_source_test.cc +++ b/test/boost/sstable_conforms_to_mutation_source_test.cc @@ -269,7 +269,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_reversing_reader_random_schema) { streamed_mutation::forwarding::no, mutation_reader::forwarding::no); close_r1.cancel(); - compare_readers(*query_schema, std::move(r1), std::move(r2)); + compare_readers(*query_schema, std::move(r1), std::move(r2), true); } auto r1 = source.make_reader_v2(query_schema, semaphore.make_permit(), prange, diff --git a/test/lib/flat_mutation_reader_assertions.hh b/test/lib/flat_mutation_reader_assertions.hh index fc3202c794..80aa63dba6 100644 --- a/test/lib/flat_mutation_reader_assertions.hh +++ b/test/lib/flat_mutation_reader_assertions.hh @@ -59,10 +59,27 @@ private: continue; } // silently ignore rtcs that don't change anything - if (next->is_range_tombstone_change() && next->as_range_tombstone_change().tombstone() == _rt) { - testlog.trace("Received spurious closing rtc: {}", mutation_fragment_v2::printer(*_reader.schema(), *next)); - _reader().get(); - continue; + if (next->is_range_tombstone_change()) { + auto rtc_mf = std::move(*_reader().get()); + auto tomb = rtc_mf.as_range_tombstone_change().tombstone(); + auto cmp = position_in_partition::tri_compare(*_reader.schema()); + // squash rtcs with the same pos + while (auto next_maybe_rtc = _reader.peek().get0()) { + if (next_maybe_rtc->is_range_tombstone_change() && cmp(next_maybe_rtc->position(), rtc_mf.position()) == 0) { + testlog.trace("Squashing {} with {}", next_maybe_rtc->as_range_tombstone_change().tombstone(), tomb); + tomb = next_maybe_rtc->as_range_tombstone_change().tombstone(); + _reader().get0(); + } else { + break; + } + } + rtc_mf.mutate_as_range_tombstone_change(*_reader.schema(), [tomb] (range_tombstone_change& rtc) { rtc.set_tombstone(tomb); }); + if (tomb == _rt) { + testlog.trace("Received spurious rtcs, equivalent to: {}", mutation_fragment_v2::printer(*_reader.schema(), rtc_mf)); + continue; + } + _reader.unpop_mutation_fragment(std::move(rtc_mf)); + next = _reader.peek().get0(); } return next; } diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index 1d558671d3..ff2a20b75b 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -2669,9 +2669,9 @@ static bool compare_readers(const schema& s, flat_mutation_reader_v2& authority, return !empty; } -void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested) { +void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, bool exact) { auto close_authority = deferred_close(authority); - auto assertions = assert_that(std::move(tested)); + auto assertions = assert_that(std::move(tested)).exact(exact); compare_readers(s, authority, assertions); } diff --git a/test/lib/mutation_source_test.hh b/test/lib/mutation_source_test.hh index b4cc35ed3c..41003c70ae 100644 --- a/test/lib/mutation_source_test.hh +++ b/test/lib/mutation_source_test.hh @@ -74,7 +74,7 @@ bytes make_blob(size_t blob_size); void for_each_schema_change(std::function&, schema_ptr, const std::vector&)>); -void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested); +void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, bool exact = false); void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, const std::vector& fwd_ranges); // Forward `r` to each range in `fwd_ranges` and consume all fragments produced by `r` in these ranges. From 96bc78905d1f9d7da70402355b42d71d84cede09 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Fri, 23 Jun 2023 17:02:09 +0200 Subject: [PATCH 2/2] readers: evictable_reader: don't accidentally consume the entire partition The evictable reader must ensure that each buffer fill makes forward progress, i.e. the last fragment in the buffer has a position larger than the last fragment from the previous buffer-fill. Otherwise, the reader could get stuck in an infinite loop between buffer fills, if the reader is evicted in-between. The code guranteeing this forward progress had a bug: the comparison between the position after the last buffer-fill and the current last fragment position was done in the wrong direction. So if the condition that we wanted to achieve was already true, we would continue filling the buffer until partition end which may lead to OOMs such as in #13491. There was already a fix in this area to handle `partition_start` fragments correctly - #13563 - but it missed that the position comparison was done in the wrong order. Fix the comparison and adjust one of the tests (added in #13563) to detect this case. Fixes #13491 --- readers/multishard.cc | 2 +- test/boost/mutation_reader_test.cc | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/readers/multishard.cc b/readers/multishard.cc index 4c65b71e01..3abfecc631 100644 --- a/readers/multishard.cc +++ b/readers/multishard.cc @@ -607,7 +607,7 @@ future<> evictable_reader_v2::fill_buffer() { // First make sure we've made progress w.r.t. _next_position_in_partition. // This loop becomes inifinite when next pos is a partition start. // In that case progress is guranteed anyway, so skip this loop entirely. - while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(_next_position_in_partition, buffer().back().position()) <= 0) { + while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(buffer().back().position(), _next_position_in_partition) <= 0) { push_mutation_fragment(_reader->pop_mutation_fragment()); next_mf = co_await _reader->peek(); } diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 1773392d81..be9b7cb736 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -3648,9 +3648,13 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_next_pos_is_partition_start) { auto stop_rd = deferred_close(rd); rd.set_max_buffer_size(max_buf_size); + // #13491 - the reader must not consume the entire partition but a small batch of fragments based on the buffer size. + rd.fill_buffer().get(); rd.fill_buffer().get(); auto buf1 = rd.detach_buffer(); - BOOST_REQUIRE_EQUAL(buf1.size(), 3); + // There should be 6-7 fragments, but to avoid computing the exact number of fragments that should fit in `max_buf_size`, + // just ensure that there are <= 10 (consuming the whole partition would give ~1000 fragments). + BOOST_REQUIRE_LE(buf1.size(), 10); } struct mutation_bounds {