/* * Copyright (C) 2022-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "replica/memtable.hh" #include "readers/from_fragments.hh" #include "repair/hash.hh" #include "repair/row.hh" #include "repair/writer.hh" #include "repair/reader.hh" #include "repair/row_level.hh" #include "test/lib/mutation_source_test.hh" #include "test/lib/random_utils.hh" #include "test/lib/cql_test_env.hh" #include "service/storage_proxy.hh" #include "test/lib/reader_concurrency_semaphore.hh" #include #undef SEASTAR_TESTING_MAIN #include #include #include "test/lib/sstable_utils.hh" #include "readers/mutation_fragment_v1_stream.hh" #include "schema/schema_registry.hh" #include "utils/chunked_vector.hh" #include "repair/incremental.hh" BOOST_AUTO_TEST_SUITE(repair_test) // Helper mutation_fragment_queue that stores the received stream of // mutation_fragments in a passed in deque of mutation_fragment_v2. // This allows easy reader construction to verify what was sent to the queue class test_mutation_fragment_queue_impl : public mutation_fragment_queue::impl, enable_lw_shared_from_this { std::deque& _fragments; public: test_mutation_fragment_queue_impl(std::deque& fragments) : mutation_fragment_queue::impl() , _fragments(fragments) {} virtual future<> push(mutation_fragment_v2 mf) override { _fragments.push_back(std::move(mf)); return make_ready_future(); } virtual void abort(std::exception_ptr ep) override {} virtual void push_end_of_stream() override {} }; mutation_fragment_queue make_test_mutation_fragment_queue(schema_ptr s, reader_permit permit, std::deque& fragments) { return mutation_fragment_queue(std::move(s), std::move(permit), seastar::make_shared(fragments)); } // repair_writer::impl abstracts away underlying writer that will receive // mutation fragments sent to the underlying queue. This implementation // receives the queue as its dependency and delegates all related work // to the queue. class test_repair_writer_impl : public repair_writer::impl { mutation_fragment_queue _queue; public: test_repair_writer_impl(mutation_fragment_queue queue) : _queue(std::move(queue)) {} virtual future<> wait_for_writer_done() override { return make_ready_future(); } virtual mutation_fragment_queue& queue() override { return _queue; } virtual void create_writer(lw_shared_ptr writer) override { // Empty implementation. The queue received in constructor // should already contain a fully initialised consumer. } }; // Creates a helper repair_writer object that will fill in the passed in // deque of fragments as it receives repair_rows during flush lw_shared_ptr make_test_repair_writer(schema_ptr schema, reader_permit permit, std::deque& fragments) { mutation_fragment_queue mq = make_test_mutation_fragment_queue(schema, permit, fragments); return make_lw_shared(std::move(schema), std::move(permit), std::make_unique(std::move(mq))); } repair_rows_on_wire make_random_repair_rows_on_wire(random_mutation_generator& gen, schema_ptr s, reader_permit permit, lw_shared_ptr m) { repair_rows_on_wire input; utils::chunked_vector muts = gen(100); for (mutation& mut : muts) { partition_key pk = mut.key(); auto m2 = make_memtable(s, {mut}); m->apply(mut); auto reader = mutation_fragment_v1_stream(m2->make_mutation_reader(s, permit)); auto close_reader = deferred_close(reader); utils::chunked_vector mfs; reader.consume_pausable([s, &mfs](mutation_fragment mf) { if ((mf.is_partition_start() && !mf.as_partition_start().partition_tombstone()) || mf.is_end_of_partition()) { // Stream of mutations coming from the wire doesn't contain partition_end // fragments. partition_start can be sent only if it contains a tombstone. return stop_iteration::no; } mfs.push_back(freeze(*s, std::move(mf))); return stop_iteration::no; }).get(); input.push_back(partition_key_and_mutation_fragments(pk, std::move(mfs))); } return input; }; SEASTAR_TEST_CASE(flush_repair_rows_on_wire_to_sstable) { // The basic premise of repairing is applying missing mutations from other nodes // to the current one and vice versa. The missing mutations are passed on the // wire in the form of repair_rows_on_wire objects. // // This test exercises the path of receiving rows on wire and flushing them // to disk. repair_rows_on_wire is optimised for wire transfer and not for // internal manipulation of the data and writing to disk, so they are converted // to a friendlier representation of std::list. Such list is then // flushed to disk. // // The test generates a random stream of mutations, converts them to repair_rows_on_wire, // converts them to std::list and verifies that if they were flushed // to disk, they would produce the original stream. return seastar::async([&] { tests::reader_concurrency_semaphore_wrapper semaphore; reader_permit permit = semaphore.make_permit(); random_mutation_generator gen{random_mutation_generator::generate_counters::no}; schema_ptr s = gen.schema(); auto m = make_lw_shared(s); repair_rows_on_wire input = make_random_repair_rows_on_wire(gen, s, permit, m); std::deque fragments; lw_shared_ptr writer = make_test_repair_writer(s, permit, fragments); uint64_t seed = tests::random::get_int(); std::list repair_rows = to_repair_rows_list(std::move(input), s, seed, repair_master::yes, permit, repair_hasher(seed, s)).get(); flush_rows(s, repair_rows, writer); writer->wait_for_writer_done().get(); compare_readers(*s, m->make_mutation_reader(s, permit), make_mutation_reader_from_fragments(s, permit, std::move(fragments))); }); } SEASTAR_TEST_CASE(test_reader_with_different_strategies) { // The test generates random mutations and persists them into the database. // It then tries to read them back with different repair_reader read_strategies. // Two cases are considered - when remote_sharder is different from // the local one and when it's the same. // In the first case we compare the output of multishard_split and multishard_filter // readers, in the second case we exercise the local read strategy and // compare its output to both multishard_split and multishard_filter. return do_with_cql_env([] (cql_test_env& e) -> future<> { random_mutation_generator gen{random_mutation_generator::generate_counters::no, local_shard_only::no}; co_await e.db().invoke_on_all([gs = global_schema_ptr(gen.schema())](replica::database& db) -> future<> { co_await db.add_column_family_and_make_directory(gs.get(), replica::database::is_new_cf::yes); }); auto& cf = e.local_db().find_column_family(gen.schema()); const auto& local_sharder = cf.schema()->get_sharder(); const auto token_max = std::numeric_limits::max(); const auto token_min = std::numeric_limits::min(); auto min_token = token_max; auto max_token = token_min; { auto mutations = gen(100); for (const auto& m: mutations) { const auto t = m.token().raw(); min_token = std::min(min_token, t); max_token = std::max(max_token, t); } auto& storage_proxy = e.get_storage_proxy().local(); co_await storage_proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr()); } auto do_check = [&](const dht::static_sharder& remote_sharder, repair_reader::read_strategy strategy1, repair_reader::read_strategy strategy2) -> future<> { const auto& s = *cf.schema(); // local strategy can read only from the current shard const auto remote_shard = strategy1 == repair_reader::read_strategy::local || strategy2 == repair_reader::read_strategy::local ? this_shard_id() : tests::random::get_int(remote_sharder.shard_count() - 1); const auto random_range = std::invoke([&] { const auto left = tests::random::get_int(token_min, max_token); const auto right = tests::random::get_int(left == token_max ? token_max : left + 1, token_max); return dht::token_range::make( {dht::token::from_int64(left), tests::random::with_probability(0.5)}, {dht::token::from_int64(right), tests::random::with_probability(0.5)}); }); auto read_all = [&](repair_reader::read_strategy strategy) -> future> { auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e), random_range, remote_sharder, remote_shard, 0, strategy, gc_clock::now(), incremental_repair_meta()); std::vector result; while (auto mf = co_await reader.read_mutation_fragment()) { result.push_back(std::move(*mf)); } co_await reader.on_end_of_stream(); co_await reader.close(); co_return result; }; auto dump = [&](std::ostream& target, const std::vector& fragments) { for (const auto& f: fragments) { ::fmt::print(target, "\n{}", mutation_fragment::printer(s, f)); } }; const auto data1 = co_await read_all(strategy1); const auto data2 = co_await read_all(strategy2); std::optional mismatch; if (data1.size() != data2.size()) { mismatch = ::format("size1 {} != size2 {}", data1.size(), data2.size()); } else { for (unsigned i = 0; i < data1.size(); ++i) { const auto& mf1 = data1[i]; const auto& mf2 = data2[i]; if (!mf1.equal(s, mf2)) { mismatch = ::format("{} != {}", mutation_fragment::printer(s, mf1), mutation_fragment::printer(s, mf2)); break; } } } if (mismatch) { std::cout << "data1:"; dump(std::cout, data1); std::cout << "\ndata2:"; dump(std::cout, data2); std::cout << std::endl; BOOST_FAIL(::format("s1={}, s2={}, mismatch={}", strategy1, strategy2, *mismatch)); } }; co_await do_check(dht::static_sharder(local_sharder.shard_count() + 1, local_sharder.sharding_ignore_msb()), repair_reader::read_strategy::multishard_split, repair_reader::read_strategy::multishard_filter); co_await do_check(local_sharder, repair_reader::read_strategy::local, repair_reader::read_strategy::multishard_filter); co_await do_check(local_sharder, repair_reader::read_strategy::local, repair_reader::read_strategy::multishard_split); }); } static future<> corrupt_data_component(sstables::shared_sstable sst) { auto f = co_await open_file_dma(sstables::test(sst).filename(component_type::Data).native(), open_flags::wo); const auto align = f.memory_dma_alignment(); const auto len = f.disk_write_dma_alignment(); auto wbuf = seastar::temporary_buffer::aligned(align, len); std::fill(wbuf.get_write(), wbuf.get_write() + len, 0xba); co_await f.dma_write(0, wbuf.get(), len); co_await f.close(); } static future<> run_repair_reader_corruption_test(random_mutation_generator::compress compress, const sstring& expected_error_msg) { return do_with_cql_env([=](cql_test_env& e) -> future<> { random_mutation_generator gen{random_mutation_generator::generate_counters::no, local_shard_only::no, random_mutation_generator::generate_uncompactable::no, std::nullopt, "ks", "cf", compress}; co_await e.db().invoke_on_all([gs = global_schema_ptr(gen.schema())](replica::database& db) -> future<> { co_await db.add_column_family_and_make_directory(gs.get(), replica::database::is_new_cf::yes); }); auto& cf = e.local_db().find_column_family(gen.schema()); const auto& local_sharder = cf.schema()->get_sharder(); auto mutations = gen(30); auto& storage_proxy = e.get_storage_proxy().local(); co_await storage_proxy.mutate_locally(std::move(mutations), tracing::trace_state_ptr()); co_await cf.flush(); auto sstables = cf.get_sstables(); BOOST_REQUIRE_GT(sstables->size(), 0); auto sst = *sstables->begin(); co_await corrupt_data_component(sst); bool caught_expected_error = false; auto test_range = dht::token_range::make_open_ended_both_sides(); auto reader = repair_reader(e.db(), cf, cf.schema(), make_reader_permit(e), test_range, local_sharder, 0, 0, repair_reader::read_strategy::local, gc_clock::now(), incremental_repair_meta()); try { while (auto mf = co_await reader.read_mutation_fragment()) { // Read until error occurs } co_await reader.on_end_of_stream(); } catch (const malformed_sstable_exception& e) { caught_expected_error = (sstring(e.what()).find(expected_error_msg) != sstring::npos); } co_await reader.close(); BOOST_REQUIRE(caught_expected_error); }); } SEASTAR_TEST_CASE(test_repair_reader_checksum_mismatch_compressed) { return run_repair_reader_corruption_test( random_mutation_generator::compress::yes, "failed checksum" ); } SEASTAR_TEST_CASE(test_repair_reader_checksum_mismatch_uncompressed) { return run_repair_reader_corruption_test( random_mutation_generator::compress::no, "failed checksum" ); } SEASTAR_TEST_CASE(repair_rows_size_considers_external_memory) { return seastar::async([&] { tests::reader_concurrency_semaphore_wrapper semaphore; reader_permit permit = semaphore.make_permit(); random_mutation_generator gen{random_mutation_generator::generate_counters::no}; schema_ptr s = gen.schema(); auto m = make_lw_shared(s); auto r = *make_random_repair_rows_on_wire(gen, s, permit, m).begin(); uint64_t seed = tests::random::get_int(); auto& frozen_mf = *r.get_mutation_fragments().begin(); auto mf = make_lw_shared(frozen_mf.unfreeze(*s, permit)); auto dk_ptr = make_lw_shared(*s, dht::decorate_key(*s, r.get_key()), seed); position_in_partition pos(mf->position()); repair_sync_boundary boundary{dk_ptr->dk, pos}; auto fmf_size = frozen_mf.representation().size(); // Test that frozen mutation fragment memory is counted. repair_row row{frozen_mf, std::nullopt, nullptr, std::nullopt, is_dirty_on_master::no, nullptr}; BOOST_REQUIRE_EQUAL(row.size(), fmf_size + sizeof(repair_row)); // Test that mutation fragment memory is counted. repair_row row_with_mf{frozen_mf, std::nullopt, nullptr, std::nullopt, is_dirty_on_master::no, mf}; BOOST_REQUIRE_EQUAL(row_with_mf.size(), fmf_size + mf->memory_usage() + sizeof(repair_row)); // Test that boundary memory is counted. repair_row row_with_boundary{frozen_mf, pos, dk_ptr, std::nullopt, is_dirty_on_master::no, nullptr}; BOOST_REQUIRE_EQUAL(row_with_boundary.size(), fmf_size + boundary.pk.external_memory_usage() + boundary.position.external_memory_usage() + sizeof(repair_row)); }); } BOOST_AUTO_TEST_SUITE_END()