From 2057db54ab320b010bfcdf5f5bc8a03a3c76e516 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Thu, 3 Feb 2022 09:42:57 +0200 Subject: [PATCH] test/boost/mutation_test: test_compactor_range_tombstone_spanning_many_pages extend to check v2 output too --- test/boost/mutation_test.cc | 128 ++++++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/test/boost/mutation_test.cc b/test/boost/mutation_test.cc index 936f224e0e..15fcc084d8 100644 --- a/test/boost/mutation_test.cc +++ b/test/boost/mutation_test.cc @@ -2720,6 +2720,48 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { } }; + struct consumer_v2 { + reader_permit permit; + mutation& mut; + const uint64_t row_limit; + uint64_t rows = 0; + mutation_rebuilder_v2 builder; + + consumer_v2(reader_permit permit, mutation& mut, uint64_t row_limit, uint64_t rows = 0) + : permit(std::move(permit)), mut(mut), row_limit(row_limit), rows(rows), builder(mut.schema()) + { } + + void consume_new_partition(const dht::decorated_key& dk) { + BOOST_REQUIRE(mut.decorated_key().equal(*mut.schema(), dk)); + builder.consume_new_partition(dk); + } + void consume(const tombstone& t) { + BOOST_REQUIRE_EQUAL(t, mut.partition().partition_tombstone()); + builder.consume(t); + } + stop_iteration consume(static_row&& sr, tombstone, bool) { + builder.consume(std::move(sr)); + return stop_iteration(++rows >= row_limit); + } + stop_iteration consume(clustering_row&& cr, row_tombstone t, bool is_alive) { + builder.consume(std::move(cr)); + return stop_iteration(++rows >= row_limit); + } + stop_iteration consume(range_tombstone_change&& rtc) { + builder.consume(std::move(rtc)); + return stop_iteration(++rows >= row_limit); + } + stop_iteration consume_end_of_partition() { + builder.consume_end_of_partition(); + return stop_iteration::yes; + } + void consume_end_of_stream() { + if (auto mut_opt = builder.consume_end_of_stream()) { + mut += *mut_opt; + } + } + }; + testlog.info("non-paged"); { mutation res_mut(s, pk); @@ -2732,6 +2774,18 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } + testlog.info("non-paged v2"); + { + mutation res_mut(s, pk); + auto c = compact_for_query_v2(*s, query_time, s->full_slice(), max_rows, max_partitions, consumer_v2{permit, res_mut, max_rows}); + auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); + auto close_reader = deferred_close(reader); + + reader.consume(std::move(c)).get(); + + BOOST_REQUIRE_EQUAL(res_mut, ref_mut); + } + testlog.info("limited pages"); { mutation res_mut(s, pk); @@ -2748,6 +2802,22 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } + testlog.info("limited pages v2"); + { + mutation res_mut(s, pk); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); + auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); + auto close_reader = deferred_close(reader); + + while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { + auto c = consumer_v2{permit, res_mut, max_rows}; + compaction_state->start_new_page(1, max_partitions, query_time, reader.peek().get()->position().region(), c); + reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); + } + + BOOST_REQUIRE_EQUAL(res_mut, ref_mut); + } + testlog.info("short pages"); { mutation res_mut(s, pk); @@ -2764,6 +2834,22 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } + testlog.info("short pages v2"); + { + mutation res_mut(s, pk); + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); + auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); + auto close_reader = deferred_close(reader); + + while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { + auto c = consumer_v2{permit, res_mut, 2}; + compaction_state->start_new_page(max_rows, max_partitions, query_time, reader.peek().get()->position().region(), c); + reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); + } + + BOOST_REQUIRE_EQUAL(res_mut, ref_mut); + } + testlog.info("limited pages - detach state"); { mutation res_mut(s, pk); @@ -2785,6 +2871,27 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } + testlog.info("limited pages - detach state v2"); + { + mutation res_mut(s, pk); + auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); + auto close_reader = deferred_close(reader); + + std::optional detached_state; + + while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { + if (detached_state) { + restore_state(reader, std::move(*detached_state)); + } + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), 1, max_partitions); + auto c = consumer_v2{permit, res_mut, max_rows}; + reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); + detached_state = std::move(*compaction_state).detach_state(); + } + + BOOST_REQUIRE_EQUAL(res_mut, ref_mut); + } + testlog.info("short pages - detach state"); { mutation res_mut(s, pk); @@ -2805,4 +2912,25 @@ SEASTAR_THREAD_TEST_CASE(test_compactor_range_tombstone_spanning_many_pages) { BOOST_REQUIRE_EQUAL(res_mut, ref_mut); } + + testlog.info("short pages - detach state v2"); + { + mutation res_mut(s, pk); + auto reader = make_flat_mutation_reader_from_fragments(s, permit, make_frags()); + auto close_reader = deferred_close(reader); + + std::optional detached_state; + + while (!reader.is_buffer_empty() || !reader.is_end_of_stream()) { + if (detached_state) { + restore_state(reader, std::move(*detached_state)); + } + auto compaction_state = make_lw_shared>(*s, query_time, s->full_slice(), max_rows, max_partitions); + auto c = consumer_v2{permit, res_mut, 2}; + reader.consume(compact_for_query_v2(compaction_state, std::move(c))).get(); + detached_state = std::move(*compaction_state).detach_state(); + } + + BOOST_REQUIRE_EQUAL(res_mut, ref_mut); + } }