diff --git a/test/boost/mutation_writer_test.cc b/test/boost/mutation_writer_test.cc index a55e9abcde..5dbc66ef8f 100644 --- a/test/boost/mutation_writer_test.cc +++ b/test/boost/mutation_writer_test.cc @@ -495,3 +495,54 @@ SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer_bucket_l }); }).get(); } + +SEASTAR_THREAD_TEST_CASE(test_partition_based_splitting_mutation_writer_exception_safety) { + tests::reader_concurrency_semaphore_wrapper semaphore; + auto random_spec = tests::make_random_schema_specification( + get_name(), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(0, 2), + std::uniform_int_distribution(1, 2), + std::uniform_int_distribution(0, 1)); + + auto random_schema = tests::random_schema{tests::random::get_int(), *random_spec}; + + const auto max_buckets = 10u; + + auto input_mutations = tests::generate_random_mutations(random_schema).get(); + std::shuffle(input_mutations.begin(), input_mutations.end(), tests::random::gen()); + while (input_mutations.size() < max_buckets * 4) { + input_mutations.push_back(input_mutations.at(tests::random::get_int(input_mutations.size() - 1))); + } + + auto& injector = memory::local_failure_injector(); + uint64_t i = 0; + do { + auto reader = flat_mutation_reader_from_mutations(semaphore.make_permit(), input_mutations); + testlog.trace("i={}", i); + try { + injector.fail_after(i++); + + mutation_writer::segregate_by_partition(std::move(reader), max_buckets, [&injector] (flat_mutation_reader rd) { + try { + injector.on_alloc_point(); + } catch (...) { + (void)rd.close(); + throw; + } + // We are not interested in the various ways the below can fail. + memory::scoped_critical_alloc_section _; + return async([rd = std::move(rd)] () mutable { + auto close_reader = deferred_close(rd); + try { + while (auto mfopt = rd().get()); + } catch (...) { + } + }); + }).get(); + + injector.cancel(); + } catch (const std::bad_alloc&) { + } + } while (injector.failed()); +}