repair_test: add test_reader_with_different_strategies
This commit is contained in:
@@ -12,9 +12,12 @@
|
||||
#include "repair/hash.hh"
|
||||
#include "repair/row.hh"
|
||||
#include "repair/writer.hh"
|
||||
#include "repair/reader.hh"
|
||||
#include "repair/row_level.hh"
|
||||
#include "test/lib/mutation_source_test.hh"
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/cql_test_env.hh"
|
||||
#include "service/storage_proxy.hh"
|
||||
#include "test/lib/reader_concurrency_semaphore.hh"
|
||||
#include "test/lib/scylla_test_case.hh"
|
||||
#include "readers/mutation_fragment_v1_stream.hh"
|
||||
@@ -133,3 +136,105 @@ SEASTAR_TEST_CASE(flush_repair_rows_on_wire_to_sstable) {
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_TEST_CASE(test_reader_with_different_strategies) {
|
||||
// The test generates random mutations and persists them into the database.
|
||||
// It then tries to read them back with different repair_reader read_strategies.
|
||||
// Two cases are considered - when remote_sharder is different from
|
||||
// the local one and when it's the same.
|
||||
// In the first case we compare the output of multishard_split and multishard_filter
|
||||
// readers, in the second case we exercise the local read strategy and
|
||||
// compare its output to both multishard_split and multishard_filter.
|
||||
|
||||
return do_with_cql_env([] (cql_test_env& e) -> future<> {
|
||||
random_mutation_generator gen{random_mutation_generator::generate_counters::no, local_shard_only::no};
|
||||
co_await e.db().invoke_on_all([gs = global_schema_ptr(gen.schema())](replica::database& db) -> future<> {
|
||||
co_await db.add_column_family_and_make_directory(gs.get());
|
||||
db.find_column_family(gs.get()).mark_ready_for_writes();
|
||||
});
|
||||
auto& cf = e.local_db().find_column_family(gen.schema());
|
||||
const auto& local_sharder = cf.schema()->get_sharder();
|
||||
|
||||
const auto token_max = std::numeric_limits<int64_t>::max();
|
||||
const auto token_min = std::numeric_limits<int64_t>::min();
|
||||
auto min_token = token_max;
|
||||
auto max_token = token_min;
|
||||
{
|
||||
auto mutations = gen(100);
|
||||
for (const auto& m: mutations) {
|
||||
const auto t = m.token().raw();
|
||||
min_token = std::min(min_token, t);
|
||||
max_token = std::max(max_token, t);
|
||||
}
|
||||
auto& storage_proxy = e.get_storage_proxy().local();
|
||||
co_await storage_proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr());
|
||||
}
|
||||
|
||||
auto do_check = [&](const dht::sharder& remote_sharder,
|
||||
repair_reader::read_strategy strategy1, repair_reader::read_strategy strategy2) -> future<>
|
||||
{
|
||||
const auto& s = *cf.schema();
|
||||
// local strategy can read only from the current shard
|
||||
const auto remote_shard = strategy1 == repair_reader::read_strategy::local || strategy2 == repair_reader::read_strategy::local
|
||||
? this_shard_id()
|
||||
: tests::random::get_int(remote_sharder.shard_count() - 1);
|
||||
const auto random_range = std::invoke([&] {
|
||||
const auto left = tests::random::get_int(token_min, max_token);
|
||||
const auto right = tests::random::get_int(left == token_max ? token_max : left + 1, token_max);
|
||||
return dht::token_range::make(
|
||||
{dht::token::from_int64(left), tests::random::with_probability(0.5)},
|
||||
{dht::token::from_int64(right), tests::random::with_probability(0.5)});
|
||||
});
|
||||
auto read_all = [&](repair_reader::read_strategy strategy) -> future<std::vector<mutation_fragment>> {
|
||||
auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e),
|
||||
random_range, remote_sharder, remote_shard, 0, strategy);
|
||||
std::vector<mutation_fragment> result;
|
||||
while (auto mf = co_await reader.read_mutation_fragment()) {
|
||||
result.push_back(std::move(*mf));
|
||||
}
|
||||
co_await reader.on_end_of_stream();
|
||||
co_await reader.close();
|
||||
co_return result;
|
||||
};
|
||||
auto dump = [&](std::ostream& target, const std::vector<mutation_fragment>& fragments) {
|
||||
for (const auto& f: fragments) {
|
||||
::fmt::print(target, "\n{}", mutation_fragment::printer(s, f));
|
||||
}
|
||||
};
|
||||
|
||||
const auto data1 = co_await read_all(strategy1);
|
||||
const auto data2 = co_await read_all(strategy2);
|
||||
std::optional<sstring> mismatch;
|
||||
if (data1.size() != data2.size()) {
|
||||
mismatch = ::format("size1 {} != size2 {}", data1.size(), data2.size());
|
||||
} else {
|
||||
for (int i = 0; i < data1.size(); ++i) {
|
||||
const auto& mf1 = data1[i];
|
||||
const auto& mf2 = data2[i];
|
||||
if (!mf1.equal(s, mf2)) {
|
||||
mismatch = ::format("{} != {}",
|
||||
mutation_fragment::printer(s, mf1), mutation_fragment::printer(s, mf2));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (mismatch) {
|
||||
std::cout << "data1:";
|
||||
dump(std::cout, data1);
|
||||
std::cout << "\ndata2:";
|
||||
dump(std::cout, data2);
|
||||
std::cout << std::endl;
|
||||
BOOST_FAIL(::format("s1={}, s2={}, mismatch={}", strategy1, strategy2, *mismatch));
|
||||
}
|
||||
};
|
||||
|
||||
co_await do_check(dht::sharder(local_sharder.shard_count() + 1, local_sharder.sharding_ignore_msb()),
|
||||
repair_reader::read_strategy::multishard_split,
|
||||
repair_reader::read_strategy::multishard_filter);
|
||||
co_await do_check(local_sharder,
|
||||
repair_reader::read_strategy::local,
|
||||
repair_reader::read_strategy::multishard_filter);
|
||||
co_await do_check(local_sharder,
|
||||
repair_reader::read_strategy::local,
|
||||
repair_reader::read_strategy::multishard_split);
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user