diff --git a/readers/multishard.cc b/readers/multishard.cc index 4c65b71e01..3abfecc631 100644 --- a/readers/multishard.cc +++ b/readers/multishard.cc @@ -607,7 +607,7 @@ future<> evictable_reader_v2::fill_buffer() { // First make sure we've made progress w.r.t. _next_position_in_partition. // This loop becomes inifinite when next pos is a partition start. // In that case progress is guranteed anyway, so skip this loop entirely. - while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(_next_position_in_partition, buffer().back().position()) <= 0) { + while (!_next_position_in_partition.is_partition_start() && next_mf && _tri_cmp(buffer().back().position(), _next_position_in_partition) <= 0) { push_mutation_fragment(_reader->pop_mutation_fragment()); next_mf = co_await _reader->peek(); } diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 1773392d81..be9b7cb736 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -3648,9 +3648,13 @@ SEASTAR_THREAD_TEST_CASE(test_evictable_reader_next_pos_is_partition_start) { auto stop_rd = deferred_close(rd); rd.set_max_buffer_size(max_buf_size); + // #13491 - the reader must not consume the entire partition but a small batch of fragments based on the buffer size. + rd.fill_buffer().get(); rd.fill_buffer().get(); auto buf1 = rd.detach_buffer(); - BOOST_REQUIRE_EQUAL(buf1.size(), 3); + // There should be 6-7 fragments, but to avoid computing the exact number of fragments that should fit in `max_buf_size`, + // just ensure that there are <= 10 (consuming the whole partition would give ~1000 fragments). + BOOST_REQUIRE_LE(buf1.size(), 10); } struct mutation_bounds { diff --git a/test/boost/sstable_conforms_to_mutation_source_test.cc b/test/boost/sstable_conforms_to_mutation_source_test.cc index 6b6adcd6cf..bf24f0fc0b 100644 --- a/test/boost/sstable_conforms_to_mutation_source_test.cc +++ b/test/boost/sstable_conforms_to_mutation_source_test.cc @@ -269,7 +269,7 @@ SEASTAR_THREAD_TEST_CASE(test_sstable_reversing_reader_random_schema) { streamed_mutation::forwarding::no, mutation_reader::forwarding::no); close_r1.cancel(); - compare_readers(*query_schema, std::move(r1), std::move(r2)); + compare_readers(*query_schema, std::move(r1), std::move(r2), true); } auto r1 = source.make_reader_v2(query_schema, semaphore.make_permit(), prange, diff --git a/test/lib/flat_mutation_reader_assertions.hh b/test/lib/flat_mutation_reader_assertions.hh index fc3202c794..80aa63dba6 100644 --- a/test/lib/flat_mutation_reader_assertions.hh +++ b/test/lib/flat_mutation_reader_assertions.hh @@ -59,10 +59,27 @@ private: continue; } // silently ignore rtcs that don't change anything - if (next->is_range_tombstone_change() && next->as_range_tombstone_change().tombstone() == _rt) { - testlog.trace("Received spurious closing rtc: {}", mutation_fragment_v2::printer(*_reader.schema(), *next)); - _reader().get(); - continue; + if (next->is_range_tombstone_change()) { + auto rtc_mf = std::move(*_reader().get()); + auto tomb = rtc_mf.as_range_tombstone_change().tombstone(); + auto cmp = position_in_partition::tri_compare(*_reader.schema()); + // squash rtcs with the same pos + while (auto next_maybe_rtc = _reader.peek().get0()) { + if (next_maybe_rtc->is_range_tombstone_change() && cmp(next_maybe_rtc->position(), rtc_mf.position()) == 0) { + testlog.trace("Squashing {} with {}", next_maybe_rtc->as_range_tombstone_change().tombstone(), tomb); + tomb = next_maybe_rtc->as_range_tombstone_change().tombstone(); + _reader().get0(); + } else { + break; + } + } + rtc_mf.mutate_as_range_tombstone_change(*_reader.schema(), [tomb] (range_tombstone_change& rtc) { rtc.set_tombstone(tomb); }); + if (tomb == _rt) { + testlog.trace("Received spurious rtcs, equivalent to: {}", mutation_fragment_v2::printer(*_reader.schema(), rtc_mf)); + continue; + } + _reader.unpop_mutation_fragment(std::move(rtc_mf)); + next = _reader.peek().get0(); } return next; } diff --git a/test/lib/mutation_source_test.cc b/test/lib/mutation_source_test.cc index 1d558671d3..ff2a20b75b 100644 --- a/test/lib/mutation_source_test.cc +++ b/test/lib/mutation_source_test.cc @@ -2669,9 +2669,9 @@ static bool compare_readers(const schema& s, flat_mutation_reader_v2& authority, return !empty; } -void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested) { +void compare_readers(const schema& s, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, bool exact) { auto close_authority = deferred_close(authority); - auto assertions = assert_that(std::move(tested)); + auto assertions = assert_that(std::move(tested)).exact(exact); compare_readers(s, authority, assertions); } diff --git a/test/lib/mutation_source_test.hh b/test/lib/mutation_source_test.hh index b4cc35ed3c..41003c70ae 100644 --- a/test/lib/mutation_source_test.hh +++ b/test/lib/mutation_source_test.hh @@ -74,7 +74,7 @@ bytes make_blob(size_t blob_size); void for_each_schema_change(std::function&, schema_ptr, const std::vector&)>); -void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested); +void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, bool exact = false); void compare_readers(const schema&, flat_mutation_reader_v2 authority, flat_mutation_reader_v2 tested, const std::vector& fwd_ranges); // Forward `r` to each range in `fwd_ranges` and consume all fragments produced by `r` in these ranges.