diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 0cf65dafcc..1378fe0713 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -2984,3 +2984,148 @@ SEASTAR_THREAD_TEST_CASE(test_manual_paused_evictable_reader_is_mutation_source) run_mutation_source_tests(make_populate); } + +namespace { + +std::deque copy_fragments(const schema& s, const std::deque& o) { + std::deque buf; + for (const auto& mf : o) { + buf.emplace_back(s, mf); + } + return buf; +} + +flat_mutation_reader create_evictable_reader_and_evict_after_first_buffer( + schema_ptr schema, + reader_permit permit, + const dht::partition_range& prange, + const query::partition_slice& slice, + std::deque first_buffer, + position_in_partition_view last_fragment_position, + std::deque second_buffer, + size_t max_buffer_size) { + class factory { + schema_ptr _schema; + std::optional> _first_buffer; + std::optional> _second_buffer; + size_t _max_buffer_size; + + private: + std::optional> copy_buffer(const std::optional>& o) { + if (!o) { + return {}; + } + return copy_fragments(*_schema, *o); + } + + public: + factory(schema_ptr schema, std::deque first_buffer, std::deque second_buffer, size_t max_buffer_size) + : _schema(std::move(schema)), _first_buffer(std::move(first_buffer)), _second_buffer(std::move(second_buffer)), _max_buffer_size(max_buffer_size) { + } + + factory(const factory& o) + : _schema(o._schema) + , _first_buffer(copy_buffer(o._first_buffer)) + , _second_buffer(copy_buffer(o._second_buffer)) { + } + factory(factory&& o) = default; + + flat_mutation_reader operator()( + schema_ptr s, + reader_permit permit, + const dht::partition_range& range, + const query::partition_slice& slice, + const io_priority_class& pc, + tracing::trace_state_ptr trace_state, + streamed_mutation::forwarding fwd_sm, + mutation_reader::forwarding fwd_mr) { + BOOST_REQUIRE(s == _schema); + if (_first_buffer) { + auto buf = *std::exchange(_first_buffer, {}); + auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf)); + rd.set_max_buffer_size(_max_buffer_size); + return rd; + } + if (_second_buffer) { + auto buf = *std::exchange(_second_buffer, {}); + auto rd = make_flat_mutation_reader_from_fragments(_schema, std::move(buf)); + rd.set_max_buffer_size(_max_buffer_size); + return rd; + } + return make_empty_flat_reader(_schema); + } + }; + auto ms = mutation_source(factory(schema, std::move(first_buffer), std::move(second_buffer), max_buffer_size)); + + auto [rd, handle] = make_manually_paused_evictable_reader( + std::move(ms), + schema, + permit, + prange, + slice, + seastar::default_priority_class(), + nullptr, + mutation_reader::forwarding::yes); + + rd.set_max_buffer_size(max_buffer_size); + + rd.fill_buffer(db::no_timeout).get0(); + + const auto eq_cmp = position_in_partition::equal_compare(*schema); + BOOST_REQUIRE(rd.is_buffer_full()); + BOOST_REQUIRE(eq_cmp(rd.buffer().back().position(), last_fragment_position)); + BOOST_REQUIRE(!rd.is_end_of_stream()); + + rd.detach_buffer(); + + handle.pause(); + + while(permit.semaphore().try_evict_one_inactive_read()); + + return std::move(rd); +} + +} + +SEASTAR_THREAD_TEST_CASE(test_evictable_reader_trim_range_tombstones) { + reader_concurrency_semaphore semaphore(reader_concurrency_semaphore::no_limits{}, get_name()); + simple_schema s; + + const auto pkey = s.make_pkey(); + size_t max_buffer_size = 512; + const int first_ck = 100; + const int second_buffer_ck = first_ck + 100; + + size_t mem_usage = 0; + + std::deque first_buffer; + first_buffer.emplace_back(partition_start{pkey, {}}); + mem_usage = first_buffer.back().memory_usage(*s.schema()); + for (int i = 0; i < second_buffer_ck; ++i) { + first_buffer.emplace_back(s.make_row(s.make_ckey(i++), "v")); + mem_usage += first_buffer.back().memory_usage(*s.schema()); + } + const auto last_fragment_position = position_in_partition(first_buffer.back().position()); + max_buffer_size = mem_usage; + first_buffer.emplace_back(s.make_row(s.make_ckey(second_buffer_ck), "v")); + + std::deque second_buffer; + second_buffer.emplace_back(partition_start{pkey, {}}); + mem_usage = second_buffer.back().memory_usage(*s.schema()); + second_buffer.emplace_back(s.make_range_tombstone(query::clustering_range::make_ending_with(s.make_ckey(second_buffer_ck + 10)))); + int ckey = second_buffer_ck; + while (mem_usage <= max_buffer_size) { + second_buffer.emplace_back(s.make_row(s.make_ckey(ckey++), "v")); + mem_usage += second_buffer.back().memory_usage(*s.schema()); + } + second_buffer.emplace_back(partition_end{}); + + auto rd = create_evictable_reader_and_evict_after_first_buffer(s.schema(), semaphore.make_permit(), query::full_partition_range, + s.schema()->full_slice(), std::move(first_buffer), last_fragment_position, std::move(second_buffer), max_buffer_size); + + rd.fill_buffer(db::no_timeout).get(); + + const auto tri_cmp = position_in_partition::tri_compare(*s.schema()); + + BOOST_REQUIRE(tri_cmp(last_fragment_position, rd.peek_buffer().position()) < 0); +}