diff --git a/test/boost/sstable_compaction_test.cc b/test/boost/sstable_compaction_test.cc index 6edd3c226c..7db3a94aa6 100644 --- a/test/boost/sstable_compaction_test.cc +++ b/test/boost/sstable_compaction_test.cc @@ -66,6 +66,7 @@ #include "db/config.hh" #include "mutation_writer/partition_based_splitting_writer.hh" #include "compaction/table_state.hh" +#include "mutation_rebuilder.hh" #include #include @@ -2149,10 +2150,10 @@ SEASTAR_TEST_CASE(sstable_cleanup_correctness_test) { }); } -std::vector write_corrupt_sstable(test_env& env, sstable& sst, reader_permit permit, - std::function write_to_secondary) { +std::vector write_corrupt_sstable(test_env& env, sstable& sst, reader_permit permit, + std::function write_to_secondary) { auto schema = sst.get_schema(); - std::vector corrupt_fragments; + std::vector corrupt_fragments; const auto ts = api::timestamp_type{1}; @@ -2184,7 +2185,7 @@ std::vector write_corrupt_sstable(test_env& env, sstable& sst testlog.trace("Writing partition {}", pkey.with_schema(*schema)); - write_to_secondary(mutation_fragment(*schema, permit, partition_start(dkey, {})), is_corrupt); + write_to_secondary(mutation_fragment_v2(*schema, permit, partition_start(dkey, {})), is_corrupt); corrupt_fragments.emplace_back(*schema, permit, partition_start(dkey, {})); writer.consume_new_partition(dkey); @@ -2193,7 +2194,7 @@ std::vector write_corrupt_sstable(test_env& env, sstable& sst testlog.trace("Writing row {}", sr.position()); - write_to_secondary(mutation_fragment(*schema, permit, static_row(*schema, sr)), is_corrupt); + write_to_secondary(mutation_fragment_v2(*schema, permit, static_row(*schema, sr)), is_corrupt); corrupt_fragments.emplace_back(*schema, permit, static_row(*schema, sr)); writer.consume(std::move(sr)); } @@ -2204,7 +2205,7 @@ std::vector write_corrupt_sstable(test_env& env, sstable& sst testlog.trace("Writing row {}", cr.position()); - write_to_secondary(mutation_fragment(*schema, permit, clustering_row(*schema, cr)), is_corrupt); + write_to_secondary(mutation_fragment_v2(*schema, permit, clustering_row(*schema, cr)), is_corrupt); corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, cr)); writer.consume(clustering_row(*schema, cr)); @@ -2212,7 +2213,7 @@ std::vector write_corrupt_sstable(test_env& env, sstable& sst if (i == (rows_count / 2)) { auto bad_cr = make_clustering_row(i - 2); testlog.trace("Writing out-of-order row {}", bad_cr.position()); - write_to_secondary(mutation_fragment(*schema, permit, clustering_row(*schema, cr)), true); + write_to_secondary(mutation_fragment_v2(*schema, permit, clustering_row(*schema, cr)), true); corrupt_fragments.emplace_back(*schema, permit, clustering_row(*schema, bad_cr)); writer.consume(std::move(bad_cr)); } @@ -2220,7 +2221,7 @@ std::vector write_corrupt_sstable(test_env& env, sstable& sst testlog.trace("Writing partition_end"); - write_to_secondary(mutation_fragment(*schema, permit, partition_end{}), is_corrupt); + write_to_secondary(mutation_fragment_v2(*schema, permit, partition_end{}), is_corrupt); corrupt_fragments.emplace_back(*schema, permit, partition_end{}); writer.consume_end_of_partition(); }; @@ -2268,14 +2269,11 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) { testlog.info("Writing sstable {}", sst->get_filename()); - const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut = std::optional()] (mutation_fragment&& mf, bool) mutable { - if (mf.is_partition_start()) { - mut.emplace(schema, mf.as_partition_start().key()); - } else if (mf.is_end_of_partition()) { - scrubbed_mt->apply(std::move(*mut)); - mut.reset(); + const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut_builder = mutation_rebuilder_v2(schema)] (mutation_fragment_v2&& mf, bool) mutable { + if (mf.is_end_of_partition()) { + scrubbed_mt->apply(*std::move(mut_builder).consume_end_of_stream()); } else { - mut->apply(std::move(mf)); + std::move(mf).consume(mut_builder); } }); @@ -2298,10 +2296,10 @@ SEASTAR_TEST_CASE(sstable_scrub_validate_mode_test) { BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); - auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { - auto r = assert_that(sst->as_mutation_source().make_reader(schema, env.make_reader_permit())); + auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { + auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit())); for (const auto& mf : mfs) { - testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf)); + testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, mf)); r.produces(*schema, mf); } r.produces_end_of_stream(); @@ -2467,10 +2465,10 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { return env.make_sstable(schema, tmp.path().string(), (*gen)++); }; - std::vector scrubbed_fragments; + std::vector scrubbed_fragments; auto sst = sst_gen(); - const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&] (mutation_fragment&& mf, bool is_corrupt) { + const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&] (mutation_fragment_v2&& mf, bool is_corrupt) { if (!is_corrupt) { scrubbed_fragments.emplace_back(std::move(mf)); } @@ -2497,10 +2495,10 @@ SEASTAR_TEST_CASE(sstable_scrub_skip_mode_test) { BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); - auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { - auto r = assert_that(sst->as_mutation_source().make_reader(schema, permit)); + auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { + auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, permit)); for (const auto& mf : mfs) { - testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf)); + testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, mf)); r.produces(*schema, mf); } r.produces_end_of_stream(); @@ -2567,14 +2565,11 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { testlog.info("Writing sstable {}", sst->get_filename()); - const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut = std::optional()] (mutation_fragment&& mf, bool) mutable { - if (mf.is_partition_start()) { - mut.emplace(schema, mf.as_partition_start().key()); - } else if (mf.is_end_of_partition()) { - scrubbed_mt->apply(std::move(*mut)); - mut.reset(); + const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut_builder = mutation_rebuilder_v2(schema)] (mutation_fragment_v2&& mf, bool) mutable { + if (mf.is_end_of_partition()) { + scrubbed_mt->apply(*std::move(mut_builder).consume_end_of_stream()); } else { - mut->apply(std::move(mf)); + std::move(mf).consume(mut_builder); } }); @@ -2597,10 +2592,10 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); - auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { - auto r = assert_that(sst->as_mutation_source().make_reader(schema, env.make_reader_permit())); + auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { + auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit())); for (const auto& mf : mfs) { - testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf)); + testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, mf)); r.produces(*schema, mf); } r.produces_end_of_stream(); @@ -2630,11 +2625,11 @@ SEASTAR_TEST_CASE(sstable_scrub_segregate_mode_test) { testlog.info("Scrub resulted in {} sstables", table->in_strategy_sstables().size()); BOOST_REQUIRE(table->in_strategy_sstables().size() > 1); { - auto sst_reader = assert_that(table->as_mutation_source().make_reader(schema, env.make_reader_permit())); - auto mt_reader = scrubbed_mt->as_data_source().make_reader(schema, env.make_reader_permit()); + auto sst_reader = assert_that(table->as_mutation_source().make_reader_v2(schema, env.make_reader_permit())); + auto mt_reader = scrubbed_mt->as_data_source().make_reader_v2(schema, env.make_reader_permit()); auto mt_reader_close = deferred_close(mt_reader); while (auto mf_opt = mt_reader().get()) { - testlog.trace("Expecting {}", mutation_fragment::printer(*schema, *mf_opt)); + testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, *mf_opt)); sst_reader.produces(*schema, *mf_opt); } sst_reader.produces_end_of_stream(); @@ -2682,14 +2677,11 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) { testlog.info("Writing sstable {}", sst->get_filename()); - const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut = std::optional()] (mutation_fragment&& mf, bool) mutable { - if (mf.is_partition_start()) { - mut.emplace(schema, mf.as_partition_start().key()); - } else if (mf.is_end_of_partition()) { - scrubbed_mt->apply(std::move(*mut)); - mut.reset(); + const auto corrupt_fragments = write_corrupt_sstable(env, *sst, permit, [&, mut_builder = mutation_rebuilder_v2(schema)] (mutation_fragment_v2&& mf, bool) mutable { + if (mf.is_end_of_partition()) { + scrubbed_mt->apply(*std::move(mut_builder).consume_end_of_stream()); } else { - mut->apply(std::move(mf)); + std::move(mf).consume(mut_builder); } }); @@ -2712,10 +2704,10 @@ SEASTAR_TEST_CASE(sstable_scrub_quarantine_mode_test) { BOOST_REQUIRE(table->in_strategy_sstables().size() == 1); BOOST_REQUIRE(table->in_strategy_sstables().front() == sst); - auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { - auto r = assert_that(sst->as_mutation_source().make_reader(schema, env.make_reader_permit())); + auto verify_fragments = [&] (sstables::shared_sstable sst, const std::vector& mfs) { + auto r = assert_that(sst->as_mutation_source().make_reader_v2(schema, env.make_reader_permit())); for (const auto& mf : mfs) { - testlog.trace("Expecting {}", mutation_fragment::printer(*schema, mf)); + testlog.trace("Expecting {}", mutation_fragment_v2::printer(*schema, mf)); r.produces(*schema, mf); } r.produces_end_of_stream(); @@ -2792,7 +2784,7 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) { using expected_partitions_type = std::map; expected_partitions_type expected_partitions{dht::decorated_key::less_comparator(schema)}; - std::deque all_fragments; + std::deque all_fragments; size_t double_partition_end = 0; size_t missing_partition_end = 0; @@ -2817,12 +2809,12 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) { auto ck = ss.make_ckey(tests::random::get_int(0, 8)); testlog.trace("Generating clustering row {}", ck); - all_fragments.emplace_back(*schema, permit, ss.make_row(permit, ck, "cv")); + all_fragments.emplace_back(*schema, permit, ss.make_row_v2(permit, ck, "cv")); expected_rows.clustering_rows.insert(ck); } else { testlog.trace("Generating static row"); - all_fragments.emplace_back(*schema, permit, ss.make_static_row(permit, "sv")); + all_fragments.emplace_back(*schema, permit, ss.make_static_row_v2(permit, "sv")); expected_rows.has_static_row = true; } } @@ -2850,21 +2842,21 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) { testlog.info("Generated {} partitions (with {} double and {} missing partition ends), {} rows and {} fragments total", expected_partitions.size(), double_partition_end, missing_partition_end, rows, all_fragments.size()); } - auto copy_fragments = [&schema, &semaphore] (const std::deque& frags) { + auto copy_fragments = [&schema, &semaphore] (const std::deque& frags) { auto permit = semaphore.make_permit(); - std::deque copied_fragments; + std::deque copied_fragments; for (const auto& frag : frags) { copied_fragments.emplace_back(*schema, permit, frag); } return copied_fragments; }; - std::list> segregated_fragment_streams; + std::list> segregated_fragment_streams; mutation_writer::segregate_by_partition( make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(all_fragments)), sstables::compaction_type_options::scrub::mode::segregate), mutation_writer::segregate_config{default_priority_class(), 100000}, - [&schema, &segregated_fragment_streams] (flat_mutation_reader rd) { + [&schema, &segregated_fragment_streams] (flat_mutation_reader_v2 rd) { return async([&schema, &segregated_fragment_streams, rd = std::move(rd)] () mutable { auto close = deferred_close(rd); auto& fragments = segregated_fragment_streams.emplace_back(); @@ -2892,7 +2884,7 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) { readers.reserve(segregated_fragment_streams.size()); for (const auto& segregated_fragment_stream : segregated_fragment_streams) { - readers.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream)))); + readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream))); } assert_that(make_combined_reader(schema, permit, std::move(readers))).has_monotonic_positions(); @@ -2904,7 +2896,7 @@ SEASTAR_THREAD_TEST_CASE(test_scrub_segregate_stack) { readers.reserve(segregated_fragment_streams.size()); for (const auto& segregated_fragment_stream : segregated_fragment_streams) { - readers.emplace_back(upgrade_to_v2(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream)))); + readers.emplace_back(make_flat_mutation_reader_from_fragments(schema, permit, copy_fragments(segregated_fragment_stream))); } auto rd = assert_that(make_combined_reader(schema, permit, std::move(readers))); @@ -2932,8 +2924,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { tests::reader_concurrency_semaphore_wrapper semaphore; auto permit = semaphore.make_permit(); - std::deque corrupt_fragments; - std::deque scrubbed_fragments; + std::deque corrupt_fragments; + std::deque scrubbed_fragments; const auto ts = api::timestamp_type{1}; auto local_keys = make_local_keys(5, schema); @@ -2941,7 +2933,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto make_partition_start = [&, schema] (unsigned pk) { auto pkey = partition_key::from_deeply_exploded(*schema, { local_keys.at(pk) }); auto dkey = dht::decorate_key(*schema, pkey); - return mutation_fragment(*schema, permit, partition_start(std::move(dkey), {})); + return mutation_fragment_v2(*schema, permit, partition_start(std::move(dkey), {})); }; auto make_static_row = [&, schema, ts] { @@ -2949,7 +2941,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto cdef = schema->static_column_at(0); auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1))); r.apply(cdef, atomic_cell_or_collection{std::move(ac)}); - return mutation_fragment(*schema, permit, static_row(*schema, std::move(r))); + return mutation_fragment_v2(*schema, permit, static_row(*schema, std::move(r))); }; auto make_clustering_row = [&, schema, ts] (unsigned i) { @@ -2957,12 +2949,12 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto cdef = schema->regular_column_at(0); auto ac = atomic_cell::make_live(*cdef.type, ts, cdef.type->decompose(data_value(1))); r.apply(cdef, atomic_cell_or_collection{std::move(ac)}); - return mutation_fragment(*schema, permit, + return mutation_fragment_v2(*schema, permit, clustering_row(clustering_key::from_single_value(*schema, int32_type->decompose(data_value(int(i)))), {}, {}, std::move(r))); }; - auto add_fragment = [&, schema] (mutation_fragment mf, bool add_to_scrubbed = true) { - corrupt_fragments.emplace_back(mutation_fragment(*schema, permit, mf)); + auto add_fragment = [&, schema] (mutation_fragment_v2 mf, bool add_to_scrubbed = true) { + corrupt_fragments.emplace_back(mutation_fragment_v2(*schema, permit, mf)); if (add_to_scrubbed) { scrubbed_fragments.emplace_back(std::move(mf)); } @@ -2982,7 +2974,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(0)); add_fragment(make_clustering_row(1)); add_fragment(make_static_row(), false); // out-of-order static row - add_fragment(mutation_fragment(*schema, permit, partition_end{})); + add_fragment(mutation_fragment_v2(*schema, permit, partition_end{})); // Partition 1 - out-of-order add_fragment(make_partition_start(1), false); @@ -2991,7 +2983,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { add_fragment(make_clustering_row(1), false); add_fragment(make_clustering_row(2), false); add_fragment(make_clustering_row(3), false); - add_fragment(mutation_fragment(*schema, permit, partition_end{}), false); + add_fragment(mutation_fragment_v2(*schema, permit, partition_end{}), false); // Partition 3 add_fragment(make_partition_start(3)); @@ -3005,7 +2997,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_reader_test) { auto r = assert_that(make_scrubbing_reader(make_flat_mutation_reader_from_fragments(schema, permit, std::move(corrupt_fragments)), compaction_type_options::scrub::mode::skip)); for (const auto& mf : scrubbed_fragments) { - testlog.info("Expecting {}", mutation_fragment::printer(*schema, mf)); + testlog.info("Expecting {}", mutation_fragment_v2::printer(*schema, mf)); r.produces(*schema, mf); } r.produces_end_of_stream();