From 7f331cee01a3fe6bbdf79018bd4a4791cb7149ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Botond=20D=C3=A9nes?= Date: Mon, 13 Dec 2021 13:15:14 +0200 Subject: [PATCH] test/boost/mutation_reader_test: add test_combined_reader_range_tombstone_change_merging Stressing the range tombstone change merging logic. --- test/boost/mutation_reader_test.cc | 89 ++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index 992956b7f9..6a6853d525 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -230,6 +230,95 @@ SEASTAR_THREAD_TEST_CASE(combined_reader_galloping_changing_multiple_partitions_ .produces_end_of_stream(); } +SEASTAR_THREAD_TEST_CASE(test_combined_reader_range_tombstone_change_merging) { + simple_schema s; + const auto schema = s.schema(); + tests::reader_concurrency_semaphore_wrapper semaphore; + auto permit = semaphore.make_permit(); + + auto rtc = [&] (uint32_t ckey, std::optional ts) { + return range_tombstone_change( + position_in_partition::before_key(s.make_ckey(ckey)), + ts ? tombstone(*ts, {}) : tombstone()); + }; + struct input { + std::vector rtcs; + }; + struct output { + std::vector rtcs; + }; + auto check = [&] (const char* desc, std::vector in, output out, std::experimental::source_location sl = std::experimental::source_location::current()) { + testlog.info("check() {} @ {}:{}", desc, sl.file_name(), sl.line()); + std::vector readers; + for (auto& i : in) { + std::deque fragments; + fragments.emplace_back(*schema, permit, partition_start(s.make_pkey(0), {})); + for (auto& rtc : i.rtcs) { + fragments.emplace_back(*schema, permit, std::move(rtc)); + } + fragments.emplace_back(*schema, permit, partition_end{}); + readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, std::move(fragments))); + } + auto combined_reader = assert_that(make_combined_reader(schema, permit, std::move(readers))); + combined_reader.produces_partition_start(s.make_pkey(0)); + for (const auto& rtc : out.rtcs) { + combined_reader.produces_range_tombstone_change(rtc); + } + combined_reader.produces_partition_end(); + combined_reader.produces_end_of_stream(); + }; + + check("single stream", + { + input{{rtc(1, 100), rtc(2, {})}} + }, + output{{rtc(1, 100), rtc(2, {})}}); + + check("two streams, two rtc with same pos", + { + input{{rtc(1, 100), rtc(2, {})}}, + input{{rtc(1, 200), rtc(2, {})}} + }, + output{{rtc(1, 200), rtc(2, {})}}); + + check("two streams, alternating updates to both streams", + { + input{{rtc(1, 100), rtc(3, 200), rtc(4, {})}}, + input{{rtc(1, 200), rtc(2, 100), rtc(4, {})}} + }, + output{{rtc(1, 200), rtc(2, 100), rtc(3, 200), rtc(4, {})}}); + + check("two streams, active tombstone terminated, fallback to overshadowed tombstone", + { + input{{rtc(1, 100), rtc(3, 200), rtc(4, {})}}, + input{{rtc(1, 200), rtc(2, {})}} + }, + output{{rtc(1, 200), rtc(2, 100), rtc(3, 200), rtc(4, {})}}); + + check("three streams, active tombstone terminated, fallback to overshadowed tombstones", + { + input{{rtc(1, 100), rtc(4, 200), rtc(5, {})}}, + input{{rtc(1, 200), rtc(3, {}), rtc(4, {})}}, + input{{rtc(1, 300), rtc(2, {})}} + }, + output{{rtc(1, 300), rtc(2, 200), rtc(3, 100), rtc(4, 200), rtc(5, {})}}); + + check("three streams, tombstones not representing timestamp upgrades are omitted", + { + input{{rtc(1, 100), rtc(6, {})}}, + input{{rtc(2, 100), rtc(5, {})}}, + input{{rtc(3, 100), rtc(4, {})}} + }, + output{{rtc(1, 100), rtc(6, {})}}); + + check("two streams, new tombstone has smaller timestamp", + { + input{{rtc(1, 200), rtc(3, {})}}, + input{{rtc(2, 100), rtc(4, {})}}, + }, + output{{rtc(1, 200), rtc(3, 100), rtc(4, {})}}); +} + static mutation make_mutation_with_key(schema_ptr s, dht::decorated_key dk) { mutation m(s, std::move(dk)); m.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);