test/boost/sstable_compaction_test: migrate scrub tests to v2

This commit is contained in:
Botond Dénes
2022-01-10 15:05:22 +02:00
parent da0c5adcc3
commit 0e1bdca71b

View File

@@ -66,6 +66,7 @@
#include "db/config.hh"
#include "mutation_writer/partition_based_splitting_writer.hh"
#include "compaction/table_state.hh"
#include "mutation_rebuilder.hh"
#include <stdio.h>
#include <ftw.h>
@@ -2149,10 +2150,10 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) {
});
}
std::vector<mutation_fragment> write_corrupt_sstable(test_env& env, sstable& sst, reader_permit permit,
std::function<void(mutation_fragment&&, bool)> write_to_secondary) {
std::vector<mutation_fragment_v2> write_corrupt_sstable(test_env& env, sstable& sst, reader_permit permit,
std::function<void(mutation_fragment_v2&&, bool)> write_to_secondary) {
auto schema = sst.get_schema();
std::vector<mutation_fragment> corrupt_fragments;
std::vector<mutation_fragment_v2> corrupt_fragments;
const auto ts = api::timestamp_type{1};
@@ -2184,7 +2185,7 @@ std::vector<mutation_fragment> write_corrupt_sstable(test_env& env, sstable& sst
testlog.trace("Writing partition {}", pkey.with_schema(*schema));
write_to_secondary(mutation_fragment(*schema, permit, partition_start(dkey, {})), is_corrupt);
write_to_secondary(mutation_fragment_v2(*schema, permit, partition_start(dkey, {})), is_corrupt);
corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {}));
writer.consume_new_partition(dkey);
@@ -2193,7 +2194,7 @@ std::vector<mutation_fragment> write_corrupt_sstable(test_env& env, sstable& sst
testlog.trace("Writing row {}", sr.position());
write_to_secondary(mutation_fragment(*schema, permit, static_row(*schema, sr)), is_corrupt);
write_to_secondary(mutation_fragment_v2(*schema, permit, static_row(*schema, sr)), is_corrupt);
corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr));
writer.consume(std::move(sr));
}
@@ -2204,7 +2205,7 @@ std::vector<mutation_fragment> write_corrupt_sstable(test_env& env, sstable& sst
testlog.trace("Writing row {}", cr.position());
write_to_secondary(mutation_fragment(*schema, permit, clustering_row(*schema, cr)), is_corrupt);
write_to_secondary(mutation_fragment_v2(*schema, permit, clustering_row(*schema, cr)), is_corrupt);
corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr));
writer.consume(clustering_row(*schema, cr));
@@ -2212,7 +2213,7 @@ std::vector<mutation_fragment> write_corrupt_sstable(test_env& env, sstable& sst
if (i == (rows_count / 2)) {
auto bad_cr = make_clustering_row(i - 2);
testlog.trace("Writing out-of-order row {}", bad_cr.position());
write_to_secondary(mutation_fragment(*schema, permit, clustering_row(*schema, cr)), true);
write_to_secondary(mutation_fragment_v2(*schema, permit, clustering_row(*schema, cr)), true);
corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr));
writer.consume(std::move(bad_cr));
}
@@ -2220,7 +2221,7 @@ std::vector<mutation_fragment> write_corrupt_sstable(test_env& env, sstable& sst
testlog.trace("Writing partition_end");
write_to_secondary(mutation_fragment(*schema, permit, partition_end{}), is_corrupt);
write_to_secondary(mutation_fragment_v2(*schema, permit, partition_end{}), is_corrupt);
corrupt_fragments.emplace_back(*schema, permit, partition_end{});
writer.consume_end_of_partition();
};
@@ -2268,14 +2269,11 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) {
testlog.info("Writing sstable {}", sst->get_filename());
const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut = std::optional<mutation>()] (mutation_fragment&& mf, bool) mutable {
if (mf.is_partition_start()) {
mut.emplace(schema, mf.as_partition_start().key());
} else if (mf.is_end_of_partition()) {
scrubbed_mt->apply(std::move(*mut));
mut.reset();
const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut_builder = mutation_rebuilder_v2(schema)] (mutation_fragment_v2&& mf, bool) mutable {
if (mf.is_end_of_partition()) {
scrubbed_mt->apply(*std::move(mut_builder).consume_end_of_stream());
} else {
mut->apply(std::move(mf));
std::move(mf).consume(mut_builder);
}
});
@@ -2298,10 +2296,10 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) {
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader(schema, env.make_reader_permit()));
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment_v2>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit()));
for (const auto& mf : mfs) {
testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf));
testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, mf));
r.produces(*schema, mf);
}
r.produces_end_of_stream();
@@ -2467,10 +2465,10 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
return env.make_sstable(schema, tmp.path().string(), (*gen)++);
};
std::vector<mutation_fragment> scrubbed_fragments;
std::vector<mutation_fragment_v2> scrubbed_fragments;
auto sst = sst_gen();
const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&] (mutation_fragment&& mf, bool is_corrupt) {
const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&] (mutation_fragment_v2&& mf, bool is_corrupt) {
if (!is_corrupt) {
scrubbed_fragments.emplace_back(std::move(mf));
}
@@ -2497,10 +2495,10 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) {
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader(schema, permit));
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment_v2>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, permit));
for (const auto& mf : mfs) {
testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf));
testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, mf));
r.produces(*schema, mf);
}
r.produces_end_of_stream();
@@ -2567,14 +2565,11 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
testlog.info("Writing sstable {}", sst->get_filename());
const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut = std::optional<mutation>()] (mutation_fragment&& mf, bool) mutable {
if (mf.is_partition_start()) {
mut.emplace(schema, mf.as_partition_start().key());
} else if (mf.is_end_of_partition()) {
scrubbed_mt->apply(std::move(*mut));
mut.reset();
const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut_builder = mutation_rebuilder_v2(schema)] (mutation_fragment_v2&& mf, bool) mutable {
if (mf.is_end_of_partition()) {
scrubbed_mt->apply(*std::move(mut_builder).consume_end_of_stream());
} else {
mut->apply(std::move(mf));
std::move(mf).consume(mut_builder);
}
});
@@ -2597,10 +2592,10 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader(schema, env.make_reader_permit()));
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment_v2>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit()));
for (const auto& mf : mfs) {
testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf));
testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, mf));
r.produces(*schema, mf);
}
r.produces_end_of_stream();
@@ -2630,11 +2625,11 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) {
testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size());
BOOST_REQUIRE(table->in_strategy_sstables().size() > 1);
{
auto sst_reader = assert_that(table->as_mutation_source().make_reader(schema, env.make_reader_permit()));
auto mt_reader = scrubbed_mt->as_data_source().make_reader(schema, env.make_reader_permit());
auto sst_reader = assert_that(table->as_mutation_source().make_reader_v2(schema, env.make_reader_permit()));
auto mt_reader = scrubbed_mt->as_data_source().make_reader_v2(schema, env.make_reader_permit());
auto mt_reader_close = deferred_close(mt_reader);
while (auto mf_opt = mt_reader().get()) {
testlog.trace("Expecting {}", mutation_fragment::printer(*schema, *mf_opt));
testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, *mf_opt));
sst_reader.produces(*schema, *mf_opt);
}
sst_reader.produces_end_of_stream();
@@ -2682,14 +2677,11 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) {
testlog.info("Writing sstable {}", sst->get_filename());
const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut = std::optional<mutation>()] (mutation_fragment&& mf, bool) mutable {
if (mf.is_partition_start()) {
mut.emplace(schema, mf.as_partition_start().key());
} else if (mf.is_end_of_partition()) {
scrubbed_mt->apply(std::move(*mut));
mut.reset();
const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut_builder = mutation_rebuilder_v2(schema)] (mutation_fragment_v2&& mf, bool) mutable {
if (mf.is_end_of_partition()) {
scrubbed_mt->apply(*std::move(mut_builder).consume_end_of_stream());
} else {
mut->apply(std::move(mf));
std::move(mf).consume(mut_builder);
}
});
@@ -2712,10 +2704,10 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) {
BOOST_REQUIRE(table->in_strategy_sstables().size() == 1);
BOOST_REQUIRE(table->in_strategy_sstables().front() == sst);
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader(schema, env.make_reader_permit()));
auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector<mutation_fragment_v2>& mfs) {
auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit()));
for (const auto& mf : mfs) {
testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf));
testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, mf));
r.produces(*schema, mf);
}
r.produces_end_of_stream();
@@ -2792,7 +2784,7 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
using expected_partitions_type = std::map<dht::decorated_key, expected_rows_type, dht::decorated_key::less_comparator>;
expected_partitions_type expected_partitions{dht::decorated_key::less_comparator(schema)};
std::deque<mutation_fragment> all_fragments;
std::deque<mutation_fragment_v2> all_fragments;
size_t double_partition_end = 0;
size_t missing_partition_end = 0;
@@ -2817,12 +2809,12 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
auto ck = ss.make_ckey(tests::random::get_int<uint32_t>(0, 8));
testlog.trace("Generating clustering row {}", ck);
all_fragments.emplace_back(*schema, permit, ss.make_row(permit, ck, "cv"));
all_fragments.emplace_back(*schema, permit, ss.make_row_v2(permit, ck, "cv"));
expected_rows.clustering_rows.insert(ck);
} else {
testlog.trace("Generating static row");
all_fragments.emplace_back(*schema, permit, ss.make_static_row(permit, "sv"));
all_fragments.emplace_back(*schema, permit, ss.make_static_row_v2(permit, "sv"));
expected_rows.has_static_row = true;
}
}
@@ -2850,21 +2842,21 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
testlog.info("Generated {} partitions (with {} double and {} missing partition ends), {} rows and {} fragments total", expected_partitions.size(), double_partition_end, missing_partition_end, rows, all_fragments.size());
}
auto copy_fragments = [&schema, &semaphore] (const std::deque<mutation_fragment>& frags) {
auto copy_fragments = [&schema, &semaphore] (const std::deque<mutation_fragment_v2>& frags) {
auto permit = semaphore.make_permit();
std::deque<mutation_fragment> copied_fragments;
std::deque<mutation_fragment_v2> copied_fragments;
for (const auto& frag : frags) {
copied_fragments.emplace_back(*schema, permit, frag);
}
return copied_fragments;
};
std::list<std::deque<mutation_fragment>> segregated_fragment_streams;
std::list<std::deque<mutation_fragment_v2>> segregated_fragment_streams;
mutation_writer::segregate_by_partition(
make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)), sstables::compaction_type_options::scrub::mode::segregate),
mutation_writer::segregate_config{default_priority_class(), 100000},
[&schema, &segregated_fragment_streams] (flat_mutation_reader rd) {
[&schema, &segregated_fragment_streams] (flat_mutation_reader_v2 rd) {
return async([&schema, &segregated_fragment_streams, rd = std::move(rd)] () mutable {
auto close = deferred_close(rd);
auto& fragments = segregated_fragment_streams.emplace_back();
@@ -2892,7 +2884,7 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
readers.reserve(segregated_fragment_streams.size());
for (const auto& segregated_fragment_stream : segregated_fragment_streams) {
readers.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream))));
readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream)));
}
assert_that(make_combined_reader(schema, permit, std::move(readers))).has_monotonic_positions();
@@ -2904,7 +2896,7 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) {
readers.reserve(segregated_fragment_streams.size());
for (const auto& segregated_fragment_stream : segregated_fragment_streams) {
readers.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream))));
readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream)));
}
auto rd = assert_that(make_combined_reader(schema, permit, std::move(readers)));
@@ -2932,8 +2924,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
tests::reader_concurrency_semaphore_wrapper semaphore;
auto permit = semaphore.make_permit();
std::deque<mutation_fragment> corrupt_fragments;
std::deque<mutation_fragment> scrubbed_fragments;
std::deque<mutation_fragment_v2> corrupt_fragments;
std::deque<mutation_fragment_v2> scrubbed_fragments;
const auto ts = api::timestamp_type{1};
auto local_keys = make_local_keys(5, schema);
@@ -2941,7 +2933,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
auto make_partition_start = [&, schema] (unsigned pk) {
auto pkey = partition_key::from_deeply_exploded(*schema, { local_keys.at(pk) });
auto dkey = dht::decorate_key(*schema, pkey);
return mutation_fragment(*schema, permit, partition_start(std::move(dkey), {}));
return mutation_fragment_v2(*schema, permit, partition_start(std::move(dkey), {}));
};
auto make_static_row = [&, schema, ts] {
@@ -2949,7 +2941,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
auto cdef = schema->static_column_at(0);
auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
return mutation_fragment(*schema, permit, static_row(*schema, std::move(r)));
return mutation_fragment_v2(*schema, permit, static_row(*schema, std::move(r)));
};
auto make_clustering_row = [&, schema, ts] (unsigned i) {
@@ -2957,12 +2949,12 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
auto cdef = schema->regular_column_at(0);
auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1)));
r.apply(cdef, atomic_cell_or_collection{std::move(ac)});
return mutation_fragment(*schema, permit,
return mutation_fragment_v2(*schema, permit,
clustering_row(clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))), {}, {}, std::move(r)));
};
auto add_fragment = [&, schema] (mutation_fragment mf, bool add_to_scrubbed = true) {
corrupt_fragments.emplace_back(mutation_fragment(*schema, permit, mf));
auto add_fragment = [&, schema] (mutation_fragment_v2 mf, bool add_to_scrubbed = true) {
corrupt_fragments.emplace_back(mutation_fragment_v2(*schema, permit, mf));
if (add_to_scrubbed) {
scrubbed_fragments.emplace_back(std::move(mf));
}
@@ -2982,7 +2974,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
add_fragment(make_clustering_row(0));
add_fragment(make_clustering_row(1));
add_fragment(make_static_row(), false); // out-of-order static row
add_fragment(mutation_fragment(*schema, permit, partition_end{}));
add_fragment(mutation_fragment_v2(*schema, permit, partition_end{}));
// Partition 1 - out-of-order
add_fragment(make_partition_start(1), false);
@@ -2991,7 +2983,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
add_fragment(make_clustering_row(1), false);
add_fragment(make_clustering_row(2), false);
add_fragment(make_clustering_row(3), false);
add_fragment(mutation_fragment(*schema, permit, partition_end{}), false);
add_fragment(mutation_fragment_v2(*schema, permit, partition_end{}), false);
// Partition 3
add_fragment(make_partition_start(3));
@@ -3005,7 +2997,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) {
auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)),
compaction_type_options::scrub::mode::skip));
for (const auto& mf : scrubbed_fragments) {
testlog.info("Expecting {}", mutation_fragment::printer(*schema, mf));
testlog.info("Expecting {}", mutation_fragment_v2::printer(*schema, mf));
r.produces(*schema, mf);
}
r.produces_end_of_stream();