mutation_reader_test: test clustering_order_reader_merger with time_series_sstable_set

This commit is contained in:
Kamil Braun
2020-11-19 17:38:59 +01:00
parent d0548aa77f
commit b41139a07f

View File

@@ -51,6 +51,7 @@
#include "schema_builder.hh"
#include "cell_locking.hh"
#include "sstables/sstables.hh"
#include "sstables/sstable_set_impl.hh"
#include "database.hh"
#include "partition_slice_builder.hh"
#include "schema_registry.hh"
@@ -3666,6 +3667,72 @@ SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_in_memory) {
}
}
SEASTAR_THREAD_TEST_CASE(test_clustering_order_merger_sstable_set) {
sstables::test_env::do_with_async([] (sstables::test_env& env) {
storage_service_for_tests ssft;
clustering_order_merger_test_generator g;
auto make_authority = [] (mutation mut, streamed_mutation::forwarding fwd) {
return flat_mutation_reader_from_mutations(tests::make_permit(), {std::move(mut)}, fwd);
};
auto make_tested = [s = g._s, pos = dht::ring_position(g.decorated_pk())]
(const time_series_sstable_set& sst_set,
const std::unordered_set<int64_t>& included_gens, streamed_mutation::forwarding fwd) {
auto q = sst_set.make_min_position_reader_queue(
[s, pos, fwd] (sstable& sst) {
return sst.read_row_flat(s, tests::make_permit(), pos,
s->full_slice(), seastar::default_priority_class(), nullptr, fwd);
},
[included_gens] (const sstable& sst) { return included_gens.contains(sst.generation()); });
return make_clustering_combined_reader(s, tests::make_permit(), fwd, std::move(q));
};
auto seed = tests::random::get_int<uint32_t>();
std::cout << "test_clustering_order_merger_sstable_set seed: " << seed << std::endl;
auto engine = std::mt19937(seed);
std::bernoulli_distribution dist(0.9);
for (int run = 0; run < 100; ++run) {
auto scenario = g.generate_scenario(engine);
auto tmp = tmpdir();
time_series_sstable_set sst_set(g._s);
mutation merged(g._s, g._pk);
std::unordered_set<int64_t> included_gens;
int64_t gen = 0;
for (auto& mb: scenario.readers_data) {
sst_set.insert(make_sstable_containing([s = g._s, &env, &tmp, gen = ++gen] () {
return env.make_sstable(std::move(s), tmp.path().string(), gen,
sstables::sstable::version_types::md, sstables::sstable::format_types::big);
}, {mb.m}));
if (dist(engine)) {
included_gens.insert(gen);
merged += mb.m;
}
}
if (included_gens.empty()) {
for (auto fwd: {streamed_mutation::forwarding::no, streamed_mutation::forwarding::yes}) {
assert_that(make_tested(sst_set, included_gens, fwd)).produces_end_of_stream();
}
continue;
}
{
auto fwd = streamed_mutation::forwarding::no;
compare_readers(*g._s, make_authority(merged, fwd), make_tested(sst_set, included_gens, fwd));
}
auto fwd = streamed_mutation::forwarding::yes;
compare_readers(*g._s, make_authority(std::move(merged), fwd), make_tested(sst_set, included_gens, fwd), scenario.fwd_ranges);
}
}).get();
}
SEASTAR_THREAD_TEST_CASE(clustering_combined_reader_mutation_source_test) {
// The clustering combined reader (based on `clustering_order_reader_merger`) supports only
// single partition readers, but the mutation source test framework tests the provided source