diff --git a/test/boost/mutation_reader_test.cc b/test/boost/mutation_reader_test.cc index a771acec8c..70ec160b5c 100644 --- a/test/boost/mutation_reader_test.cc +++ b/test/boost/mutation_reader_test.cc @@ -51,6 +51,7 @@ #include "schema_builder.hh" #include "cell_locking.hh" #include "sstables/sstables.hh" +#include "sstables/sstable_set_impl.hh" #include "database.hh" #include "partition_slice_builder.hh" #include "schema_registry.hh" @@ -3666,6 +3667,72 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) { } } +SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_sstable_set) { + sstables::test_env::do_with_async([] (sstables::test_env& env) { + storage_service_for_tests ssft; + clustering_order_merger_test_generator g; + + auto make_authority = [] (mutation mut, streamed_mutation::forwarding fwd) { + return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mut)}, fwd); + }; + + auto make_tested = [s = g._s, pos = dht::ring_position(g.decorated_pk())] + (const time_series_sstable_set& sst_set, + const std::unordered_set& included_gens, streamed_mutation::forwarding fwd) { + auto q = sst_set.make_min_position_reader_queue( + [s, pos, fwd] (sstable& sst) { + return sst.read_row_flat(s, tests::make_permit(), pos, + s->full_slice(), seastar::default_priority_class(), nullptr, fwd); + }, + [included_gens] (const sstable& sst) { return included_gens.contains(sst.generation()); }); + return make_clustering_combined_reader(s, tests::make_permit(), fwd, std::move(q)); + }; + + auto seed = tests::random::get_int(); + std::cout << "test_clustering_order_merger_sstable_set seed: " << seed << std::endl; + auto engine = std::mt19937(seed); + std::bernoulli_distribution dist(0.9); + + for (int run = 0; run < 100; ++run) { + auto scenario = g.generate_scenario(engine); + + auto tmp = tmpdir(); + time_series_sstable_set sst_set(g._s); + mutation merged(g._s, g._pk); + std::unordered_set included_gens; + int64_t gen = 0; + for (auto& mb: scenario.readers_data) { + sst_set.insert(make_sstable_containing([s = g._s, &env, &tmp, gen = ++gen] () { + return env.make_sstable(std::move(s), tmp.path().string(), gen, + sstables::sstable::version_types::md, sstables::sstable::format_types::big); + }, {mb.m})); + + if (dist(engine)) { + included_gens.insert(gen); + merged += mb.m; + } + } + + if (included_gens.empty()) { + for (auto fwd: {streamed_mutation::forwarding::no, streamed_mutation::forwarding::yes}) { + assert_that(make_tested(sst_set, included_gens, fwd)).produces_end_of_stream(); + } + + continue; + } + + { + auto fwd = streamed_mutation::forwarding::no; + compare_readers(*g._s, make_authority(merged, fwd), make_tested(sst_set, included_gens, fwd)); + } + + auto fwd = streamed_mutation::forwarding::yes; + compare_readers(*g._s, make_authority(std::move(merged), fwd), make_tested(sst_set, included_gens, fwd), scenario.fwd_ranges); + } + + }).get(); +} + SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) { // The clustering combined reader (based on `clustering_order_reader_merger`) supports only // single partition readers, but the mutation source test framework tests the provided source