test: mutation_writer_test: add exception safety test for segregate_by_partition()

This commit is contained in:
Botond Dénes
2021-10-08 13:57:35 +03:00
parent 2ca6552909
commit 0d744fd3fa

View File

@@ -495,3 +495,54 @@ SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer_bucket_l
});
}).get();
}
SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer_exception_safety) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto random_spec = tests::make_random_schema_specification(
get_name(),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(0, 2),
std::uniform_int_distribution<size_t>(1, 2),
std::uniform_int_distribution<size_t>(0, 1));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
const auto max_buckets = 10u;
auto input_mutations = tests::generate_random_mutations(random_schema).get();
std::shuffle(input_mutations.begin(), input_mutations.end(), tests::random::gen());
while (input_mutations.size() < max_buckets * 4) {
input_mutations.push_back(input_mutations.at(tests::random::get_int<uint32_t>(input_mutations.size() - 1)));
}
auto& injector = memory::local_failure_injector();
uint64_t i = 0;
do {
auto reader = flat_mutation_reader_from_mutations(semaphore.make_permit(), input_mutations);
testlog.trace("i={}", i);
try {
injector.fail_after(i++);
mutation_writer::segregate_by_partition(std::move(reader), max_buckets, [&injector] (flat_mutation_reader rd) {
try {
injector.on_alloc_point();
} catch (...) {
(void)rd.close();
throw;
}
// We are not interested in the various ways the below can fail.
memory::scoped_critical_alloc_section _;
return async([rd = std::move(rd)] () mutable {
auto close_reader = deferred_close(rd);
try {
while (auto mfopt = rd().get());
} catch (...) {
}
});
}).get();
injector.cancel();
} catch (const std::bad_alloc&) {
}
} while (injector.failed());
}