From b69bc97673eaef2b3eeda032e82cb09d5beb686a Mon Sep 17 00:00:00 2001 From: Petr Gusev Date: Wed, 21 Jun 2023 11:54:22 +0400 Subject: [PATCH] repair_test: add test_reader_with_different_strategies --- test/boost/repair_test.cc | 105 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/test/boost/repair_test.cc b/test/boost/repair_test.cc index 7a6bc0fd0f..e131b7e3ed 100644 --- a/test/boost/repair_test.cc +++ b/test/boost/repair_test.cc @@ -12,9 +12,12 @@ #include "repair/hash.hh" #include "repair/row.hh" #include "repair/writer.hh" +#include "repair/reader.hh" #include "repair/row_level.hh" #include "test/lib/mutation_source_test.hh" #include "test/lib/random_utils.hh" +#include "test/lib/cql_test_env.hh" +#include "service/storage_proxy.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include "test/lib/scylla_test_case.hh" #include "readers/mutation_fragment_v1_stream.hh" @@ -133,3 +136,105 @@ SEASTAR_TEST_CASE(flush_repair_rows_on_wire_to_sstable) { }); } +SEASTAR_TEST_CASE(test_reader_with_different_strategies) { + // The test generates random mutations and persists them into the database. + // It then tries to read them back with different repair_reader read_strategies. + // Two cases are considered - when remote_sharder is different from + // the local one and when it's the same. + // In the first case we compare the output of multishard_split and multishard_filter + // readers, in the second case we exercise the local read strategy and + // compare its output to both multishard_split and multishard_filter. + + return do_with_cql_env([] (cql_test_env& e) -> future<> { + random_mutation_generator gen{random_mutation_generator::generate_counters::no, local_shard_only::no}; + co_await e.db().invoke_on_all([gs = global_schema_ptr(gen.schema())](replica::database& db) -> future<> { + co_await db.add_column_family_and_make_directory(gs.get()); + db.find_column_family(gs.get()).mark_ready_for_writes(); + }); + auto& cf = e.local_db().find_column_family(gen.schema()); + const auto& local_sharder = cf.schema()->get_sharder(); + + const auto token_max = std::numeric_limits::max(); + const auto token_min = std::numeric_limits::min(); + auto min_token = token_max; + auto max_token = token_min; + { + auto mutations = gen(100); + for (const auto& m: mutations) { + const auto t = m.token().raw(); + min_token = std::min(min_token, t); + max_token = std::max(max_token, t); + } + auto& storage_proxy = e.get_storage_proxy().local(); + co_await storage_proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr()); + } + + auto do_check = [&](const dht::sharder& remote_sharder, + repair_reader::read_strategy strategy1, repair_reader::read_strategy strategy2) -> future<> + { + const auto& s = *cf.schema(); + // local strategy can read only from the current shard + const auto remote_shard = strategy1 == repair_reader::read_strategy::local || strategy2 == repair_reader::read_strategy::local + ? this_shard_id() + : tests::random::get_int(remote_sharder.shard_count() - 1); + const auto random_range = std::invoke([&] { + const auto left = tests::random::get_int(token_min, max_token); + const auto right = tests::random::get_int(left == token_max ? token_max : left + 1, token_max); + return dht::token_range::make( + {dht::token::from_int64(left), tests::random::with_probability(0.5)}, + {dht::token::from_int64(right), tests::random::with_probability(0.5)}); + }); + auto read_all = [&](repair_reader::read_strategy strategy) -> future> { + auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e), + random_range, remote_sharder, remote_shard, 0, strategy); + std::vector result; + while (auto mf = co_await reader.read_mutation_fragment()) { + result.push_back(std::move(*mf)); + } + co_await reader.on_end_of_stream(); + co_await reader.close(); + co_return result; + }; + auto dump = [&](std::ostream& target, const std::vector& fragments) { + for (const auto& f: fragments) { + ::fmt::print(target, "\n{}", mutation_fragment::printer(s, f)); + } + }; + + const auto data1 = co_await read_all(strategy1); + const auto data2 = co_await read_all(strategy2); + std::optional mismatch; + if (data1.size() != data2.size()) { + mismatch = ::format("size1 {} != size2 {}", data1.size(), data2.size()); + } else { + for (int i = 0; i < data1.size(); ++i) { + const auto& mf1 = data1[i]; + const auto& mf2 = data2[i]; + if (!mf1.equal(s, mf2)) { + mismatch = ::format("{} != {}", + mutation_fragment::printer(s, mf1), mutation_fragment::printer(s, mf2)); + break; + } + } + } + if (mismatch) { + std::cout << "data1:"; + dump(std::cout, data1); + std::cout << "\ndata2:"; + dump(std::cout, data2); + std::cout << std::endl; + BOOST_FAIL(::format("s1={}, s2={}, mismatch={}", strategy1, strategy2, *mismatch)); + } + }; + + co_await do_check(dht::sharder(local_sharder.shard_count() + 1, local_sharder.sharding_ignore_msb()), + repair_reader::read_strategy::multishard_split, + repair_reader::read_strategy::multishard_filter); + co_await do_check(local_sharder, + repair_reader::read_strategy::local, + repair_reader::read_strategy::multishard_filter); + co_await do_check(local_sharder, + repair_reader::read_strategy::local, + repair_reader::read_strategy::multishard_split); + }); +}