test/boost/mutation_reader_test: add test_combined_reader_range_tombstone_change_merging

Stressing the range tombstone change merging logic.
This commit is contained in:
Botond Dénes
2021-12-13 13:15:14 +02:00
parent e1bbc4a480
commit 7f331cee01

View File

@@ -230,6 +230,95 @@ SEASTAR_THREAD_TEST_CASE(combined_reader_galloping_changing_multiple_partitions_
.produces_end_of_stream();
}
SEASTAR_THREAD_TEST_CASE(test_combined_reader_range_tombstone_change_merging) {
simple_schema s;
const auto schema = s.schema();
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
auto rtc = [&] (uint32_t ckey, std::optional<api::timestamp_type> ts) {
return range_tombstone_change(
position_in_partition::before_key(s.make_ckey(ckey)),
ts ? tombstone(*ts, {}) : tombstone());
};
struct input {
std::vector<range_tombstone_change> rtcs;
};
struct output {
std::vector<range_tombstone_change> rtcs;
};
auto check = [&] (const char* desc, std::vector<input> in, output out, std::experimental::source_location sl = std::experimental::source_location::current()) {
testlog.info("check() {} @ {}:{}", desc, sl.file_name(), sl.line());
std::vector<flat_mutation_reader_v2> readers;
for (auto& i : in) {
std::deque<mutation_fragment_v2> fragments;
fragments.emplace_back(*schema, permit, partition_start(s.make_pkey(0), {}));
for (auto& rtc : i.rtcs) {
fragments.emplace_back(*schema, permit, std::move(rtc));
}
fragments.emplace_back(*schema, permit, partition_end{});
readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, std::move(fragments)));
}
auto combined_reader = assert_that(make_combined_reader(schema, permit, std::move(readers)));
combined_reader.produces_partition_start(s.make_pkey(0));
for (const auto& rtc : out.rtcs) {
combined_reader.produces_range_tombstone_change(rtc);
}
combined_reader.produces_partition_end();
combined_reader.produces_end_of_stream();
};
check("single stream",
{
input{{rtc(1, 100), rtc(2, {})}}
},
output{{rtc(1, 100), rtc(2, {})}});
check("two streams, two rtc with same pos",
{
input{{rtc(1, 100), rtc(2, {})}},
input{{rtc(1, 200), rtc(2, {})}}
},
output{{rtc(1, 200), rtc(2, {})}});
check("two streams, alternating updates to both streams",
{
input{{rtc(1, 100), rtc(3, 200), rtc(4, {})}},
input{{rtc(1, 200), rtc(2, 100), rtc(4, {})}}
},
output{{rtc(1, 200), rtc(2, 100), rtc(3, 200), rtc(4, {})}});
check("two streams, active tombstone terminated, fallback to overshadowed tombstone",
{
input{{rtc(1, 100), rtc(3, 200), rtc(4, {})}},
input{{rtc(1, 200), rtc(2, {})}}
},
output{{rtc(1, 200), rtc(2, 100), rtc(3, 200), rtc(4, {})}});
check("three streams, active tombstone terminated, fallback to overshadowed tombstones",
{
input{{rtc(1, 100), rtc(4, 200), rtc(5, {})}},
input{{rtc(1, 200), rtc(3, {}), rtc(4, {})}},
input{{rtc(1, 300), rtc(2, {})}}
},
output{{rtc(1, 300), rtc(2, 200), rtc(3, 100), rtc(4, 200), rtc(5, {})}});
check("three streams, tombstones not representing timestamp upgrades are omitted",
{
input{{rtc(1, 100), rtc(6, {})}},
input{{rtc(2, 100), rtc(5, {})}},
input{{rtc(3, 100), rtc(4, {})}}
},
output{{rtc(1, 100), rtc(6, {})}});
check("two streams, new tombstone has smaller timestamp",
{
input{{rtc(1, 200), rtc(3, {})}},
input{{rtc(2, 100), rtc(4, {})}},
},
output{{rtc(1, 200), rtc(3, 100), rtc(4, {})}});
}
static mutation make_mutation_with_key(schema_ptr s, dht::decorated_key dk) {
mutation m(s, std::move(dk));
m.set_clustered_cell(clustering_key::make_empty(), "v", data_value(bytes("v1")), 1);