/* * Copyright (C) 2023-present ScyllaDB */ /* * SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0 */ #include "test/lib/cql_test_env.hh" #include "streaming/stream_blob.hh" #include "message/messaging_service.hh" #include "test/lib/log.hh" #include "test/lib/sstable_utils.hh" #include #include #include #include #include #include #include #include #include #include #include #include "utils/io-wrappers.hh" future generate_file_hash(sstring filename) { auto f = co_await seastar::open_file_dma(filename, seastar::open_flags::ro); auto in = seastar::make_file_input_stream(std::move(f)); CryptoPP::SHA256 hash; unsigned char digest[CryptoPP::SHA256::DIGESTSIZE]; std::stringstream ss; while (true) { auto buf = co_await in.read(); if (buf.empty()) { break; } hash.Update((const unsigned char*)buf.get(), buf.size()); } co_await in.close(); hash.Final(digest); for (int i = 0; i < CryptoPP::SHA256::DIGESTSIZE; i++) { ss << std::hex << std::setw(2) << std::setfill('0') << (int)digest[i]; } co_return ss.str(); } sstring generate_random_filename() { char filename[L_tmpnam]; std::tmpnam(filename); return filename; } future<> write_random_content_to_file(const sstring& filename, size_t content_size = 1024) { auto f = co_await open_file_dma(filename, open_flags::rw | open_flags::create); auto ostream = co_await make_file_output_stream(std::move(f)); srand(time(nullptr)); for (size_t i = 0; i < content_size; ++i) { char c = rand() % 256; co_await ostream.write(&c, 1); } co_await ostream.close(); } using namespace streaming; static future do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vector filelist, const std::string& suffix, bool inject_error, bool unsupported_file_ops = false) { bool ret = false; bool verb_register = false; auto ops_id = file_stream_id::create_random_id(); auto& global_db = db.container(); auto& global_ms = ms.container(); int n_retries = 0; do { try { if (!verb_register) { co_await smp::invoke_on_all([&] { return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source source) { const auto& from = cinfo.retrieve_auxiliary("host_id"); auto sink = global_ms.local().make_sink_for_stream_blob(source); (void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future { auto path = meta.filename + suffix; auto f = co_await open_file_dma(path, open_flags::wo|open_flags::create); auto out = co_await make_file_output_stream(std::move(f)); co_return output_result{ [path = std::move(path)](store_result res) -> future<> { if (res != store_result::ok) { co_await remove_file(path); } }, std::move(out) }; }, inject_error).handle_exception([sink, source, ms = global_ms.local().shared_from_this()] (std::exception_ptr eptr) { testlog.warn("Failed to run stream blob handler: {}", eptr); }); return make_ready_future>(sink); }); }); } verb_register = true; auto table = table_id::create_random_id(); auto files = std::list(); auto hostid = db.get_token_metadata().get_my_id(); seastar::shard_id dst_shard_id = 0; co_await mark_tablet_stream_start(ops_id); auto targets = std::vector{node_and_shard{hostid, dst_shard_id}}; for (const auto& filename : filelist) { auto fops = file_ops::stream_sstables; fops = unsupported_file_ops ? file_ops(0xff55) : fops; auto file = co_await open_file_dma(filename, open_flags::ro); auto& info = files.emplace_back(); info.filename = filename; info.fops = fops; info.source = [file = std::move(file)](const file_input_stream_options& foptions) mutable -> future> { co_return make_file_input_stream(std::move(file), foptions); }; } size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, service::null_topology_guard, inject_error); co_await mark_tablet_stream_done(ops_id); testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes); ret = true; } catch (seastar::rpc::stream_closed&) { testlog.warn("do_test_file_stream[{}] status=fail error={} retry={}", ops_id, std::current_exception(), n_retries++); if (n_retries < 3) { testlog.info("Retrying send"); continue; } } catch (...) { testlog.warn("do_test_file_stream[{}] status=fail error={}", ops_id, std::current_exception()); } } while (false); if (verb_register) { co_await smp::invoke_on_all([&global_ms] { return global_ms.local().unregister_stream_blob(); }); } co_return ret; } static future<> corrupt_digest_component(sstables::shared_sstable sst) { auto f = co_await open_file_dma(sstables::test(sst).filename(component_type::Digest).native(), open_flags::rw); auto stream = make_file_input_stream(f); auto digest_str = co_await util::read_entire_stream_contiguous(stream); auto digest = boost::lexical_cast(digest_str); auto new_digest = to_sstring(digest + 1); // make it invalid co_await f.dma_write(0, new_digest.c_str(), new_digest.size()); co_await stream.close(); } static future<> corrupt_data_component(sstables::shared_sstable sst) { auto f = co_await open_file_dma(sstables::test(sst).filename(component_type::Data).native(), open_flags::wo); const auto align = f.memory_dma_alignment(); const auto len = f.disk_write_dma_alignment(); auto wbuf = seastar::temporary_buffer::aligned(align, len); std::fill(wbuf.get_write(), wbuf.get_write() + len, 0xba); co_await f.dma_write(0, wbuf.get(), len); co_await f.close(); } using compress_sstable = bool_class; static future<> do_test_sstable_stream(cql_test_env& env, compress_sstable compress, std::function(shared_sstable)> corruption_fn = nullptr, const sstring& expected_error_msg = "") { bool verb_register = false; auto ops_id = file_stream_id::create_random_id(); auto& db = env.local_db(); auto& ms = env.get_messaging_service().local(); auto& global_db = db.container(); auto& global_ms = ms.container(); int n_retries = 0; bool exception_caught = false; do { try { if (!verb_register) { co_await smp::invoke_on_all([&global_db, &global_ms] { return global_ms.local().register_stream_blob([&global_db, &global_ms](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source source) { const auto& from = cinfo.retrieve_auxiliary("host_id"); auto sink = global_ms.local().make_sink_for_stream_blob(source); (void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [](auto&, const streaming::stream_blob_meta& meta) -> future { auto path = meta.filename; auto f = co_await open_file_dma(path, open_flags::wo|open_flags::create); auto out = co_await make_file_output_stream(std::move(f)); co_return output_result { [path = std::move(path)](store_result res) -> future<> { co_await remove_file(path); }, std::move(out) }; }, false).handle_exception([sink, source, ms = global_ms.local().shared_from_this()] (std::exception_ptr eptr) { testlog.warn("Failed to run stream blob handler: {}", eptr); }); return make_ready_future>(sink); }); }); } verb_register = true; co_await env.execute_cql("CREATE KEYSPACE ks_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"); if (compress) { co_await env.execute_cql("CREATE TABLE ks_test.cf (pk text PRIMARY KEY, v int);"); } else { co_await env.execute_cql("CREATE TABLE ks_test.cf (pk text PRIMARY KEY, v int) WITH compression = { 'sstable_compression' : '' };"); } for (int i = 0; i < 10; i++) { co_await env.execute_cql(format("INSERT INTO ks_test.cf (pk, v) VALUES ('key_{}', {});", i, i * 10)); } auto& table = db.find_column_family("ks_test", "cf"); co_await table.flush(); auto schema = table.schema(); auto range = dht::token_range::make_open_ended_both_sides(); auto sstables = co_await table.take_storage_snapshot(range); BOOST_REQUIRE_GT(sstables.size(), 0); auto table_id = schema->id(); auto files = std::list(); auto hostid = db.get_token_metadata().get_my_id(); seastar::shard_id dst_shard_id = 0; co_await mark_tablet_stream_start(ops_id); auto targets = std::vector{node_and_shard{hostid, dst_shard_id}}; auto permit = co_await db.obtain_reader_permit(table, "test_stream", db::no_timeout, {}); auto sst_snapshot = sstables.front(); if (corruption_fn) { co_await corruption_fn(sst_snapshot.sst); } auto sources = co_await sstables::create_stream_sources(sst_snapshot, permit); for (auto& source : sources) { auto& info = files.emplace_back(); info.filename = source->component_basename(); info.fops = file_ops::stream_sstables; info.source = [source = std::move(source)](const file_input_stream_options& foptions) mutable -> future> { co_return co_await source->input(foptions); }; } size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table_id, ops_id, service::null_topology_guard, false); co_await mark_tablet_stream_done(ops_id); testlog.info("do_test_sstable_stream[{}] status=ok stream_bytes={}", ops_id, stream_bytes); } catch (seastar::rpc::stream_closed&) { testlog.warn("do_test_sstable_stream[{}] status=fail error={} retry={}", ops_id, std::current_exception(), n_retries++); if (n_retries < 3) { testlog.info("Retrying send"); continue; } } catch (const malformed_sstable_exception& e) { testlog.warn("do_test_sstable_stream[{}] status=fail error={}", ops_id, std::current_exception()); exception_caught = (sstring(e.what()).find(expected_error_msg) != sstring::npos); } } while (false); if (corruption_fn) { BOOST_REQUIRE(exception_caught); } else { BOOST_REQUIRE(!exception_caught); } if (verb_register) { co_await smp::invoke_on_all([&global_ms] { return global_ms.local().unregister_stream_blob(); }); } } void do_test_file_stream(bool inject_error) { cql_test_config cfg; cfg.ms_listen = true; std::vector files; std::vector files_rx; std::vector hash_tx; std::vector hash_rx; size_t nr_files = 10; size_t file_size = 0; static const std::string suffix = ".rx"; while (files.size() != nr_files) { auto name = generate_random_filename(); files.push_back(name); files_rx.push_back(name + suffix); } size_t base_size = 1024; #ifdef SEASTAR_DEBUG base_size = 1; #endif for (auto& file : files) { if (file_size == 0) { file_size = 1 * 1024 * base_size; } else { file_size = (rand() % 10) * 1024 * base_size + rand() % base_size; } file_size = std::max(size_t(1), file_size); testlog.info("file_tx={} file_size={}", file, file_size); write_random_content_to_file(file, file_size).get(); } do_with_cql_env_thread([files, inject_error] (auto& e) { do_test_file_stream(e.local_db(), e.get_messaging_service().local(), files, suffix, inject_error).get(); }, cfg).get(); bool cleanup = true; for (auto& file : files) { auto hash = generate_file_hash(file).get(); testlog.info("file_tx={} hash={}", file, hash); hash_tx.push_back(hash); if (cleanup) { seastar::remove_file(file).get(); } } for (auto& file : files_rx) { sstring hash = "SKIP"; try { hash = generate_file_hash(file).get(); if (cleanup) { seastar::remove_file(file).get(); } } catch (...) { if (!inject_error) { throw; } } hash_rx.push_back(hash); testlog.info("file_rx={} hash={}", file, hash); } BOOST_REQUIRE(hash_tx.size() == hash_rx.size()); for (size_t i = 0; i < hash_tx.size(); i++) { testlog.info("Check tx_hash={} rx_hash={}", hash_tx[i], hash_rx[i]); if (inject_error) { BOOST_REQUIRE(hash_tx[i] == hash_rx[i] || sstring("SKIP") == hash_rx[i]); } else { BOOST_REQUIRE(hash_tx[i] == hash_rx[i]); } } } void do_test_unsupported_file_ops() { bool inject_error = false; bool unsupported_file_ops = true; cql_test_config cfg; cfg.ms_listen = true; std::vector files; size_t nr_files = 2; size_t file_size = 1024; while (files.size() != nr_files) { auto name = generate_random_filename(); files.push_back(name); } for (auto& file : files) { testlog.info("file_tx={} file_size={}", file, file_size); write_random_content_to_file(file, file_size).get(); } do_with_cql_env_thread([files, inject_error, unsupported_file_ops] (auto& e) { auto ok = do_test_file_stream(e.local_db(), e.get_messaging_service().local(), files, "", inject_error, unsupported_file_ops).get(); // Stream with a unsupported file ops should fail BOOST_REQUIRE(ok == false); }, cfg).get(); for (auto& file : files) { seastar::remove_file(file).get(); } } SEASTAR_THREAD_TEST_CASE(test_file_stream) { bool inject_error = false; do_test_file_stream(inject_error); } SEASTAR_THREAD_TEST_CASE(test_file_stream_inject_error) { bool inject_error = true; do_test_file_stream(inject_error); } SEASTAR_THREAD_TEST_CASE(test_unsupported_file_ops) { do_test_unsupported_file_ops(); } SEASTAR_THREAD_TEST_CASE(test_sink_wrapper) { std::vector> bufs; auto sink = create_memory_sink(bufs); auto file = create_file_for_sink(std::move(sink)); auto wa = file.disk_write_dma_alignment(); std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution dist(0u, 255u); constexpr auto n_bufs = 11; std::vector> src; src.reserve(n_bufs); for (int i = 0; i < n_bufs; ++i) { temporary_buffer buf(wa); std::generate(buf.get_write(), buf.get_write() + wa, [&] { return (char)dist(gen); }); src.emplace_back(std::move(buf)); } uint64_t pos = 0; for (auto& buf : src) { pos += file.dma_write(pos, buf.get(), buf.size()).get(); } auto final_len = n_bufs * wa - wa/3; file.truncate(final_len).get(); file.flush().get(); file.close().get(); auto res = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [](size_t s, auto& buf) { return s + buf.size(); }); BOOST_REQUIRE_EQUAL(final_len, res); temporary_buffer lin1(wa*n_bufs), lin2(wa*n_bufs); std::accumulate(src.begin(), src.end(), lin1.get_write(), [](char* dst, temporary_buffer& buf) { return std::copy(buf.begin(), buf.end(), dst); }); std::accumulate(bufs.begin(), bufs.end(), lin2.get_write(), [](char* dst, temporary_buffer& buf) { return std::copy(buf.begin(), buf.end(), dst); }); BOOST_REQUIRE(std::equal(lin1.begin(), lin1.begin() + final_len, lin2.begin())); } SEASTAR_THREAD_TEST_CASE(test_sink_wrapper_iovec) { std::vector> bufs; auto sink = create_memory_sink(bufs); auto file = create_file_for_sink(std::move(sink)); auto wa = file.disk_write_dma_alignment(); std::random_device rd; std::mt19937 gen(rd()); std::uniform_int_distribution dist(0u, 255u); constexpr auto n_bufs = 11; temporary_buffer src(wa * n_bufs); std::generate(src.get_write(), src.get_write() + src.size(), [&] { return (char)dist(gen); }); std::uniform_int_distribution sdist(1u, wa); auto* p = src.get_write(); uint64_t pos = 0, end = src.size(); while (pos < end) { std::vector iovs; auto rem = size_t(end - pos); auto p2 = p + pos; while (iovs.size() < 5 && rem != 0) { auto size = std::min(sdist(gen), rem); iovs.emplace_back(iovec{p2, size}); p2 += size; rem -= size; } auto written = file.dma_write(pos, std::move(iovs)).get(); pos += written; } auto final_len = src.size() - wa/3; file.truncate(final_len).get(); file.flush().get(); file.close().get(); auto res = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [](size_t s, auto& buf) { return s + buf.size(); }); BOOST_REQUIRE_EQUAL(final_len, res); temporary_buffer lin(wa*n_bufs); std::accumulate(bufs.begin(), bufs.end(), lin.get_write(), [](char* dst, temporary_buffer& buf) { return std::copy(buf.begin(), buf.end(), dst); }); BOOST_REQUIRE(std::equal(lin.begin(), lin.begin() + final_len, src.begin())); } static void test_sstable_stream(compress_sstable compress, std::function(shared_sstable)> corruption_fn = nullptr, const sstring& expected_error_msg = "") { cql_test_config cfg; cfg.ms_listen = true; do_with_cql_env_thread([&](cql_test_env& env) { do_test_sstable_stream(env, compress, corruption_fn, expected_error_msg).get(); }, cfg).get(); } SEASTAR_THREAD_TEST_CASE(test_sstable_stream_compressed) { test_sstable_stream(compress_sstable::yes); } SEASTAR_THREAD_TEST_CASE(test_sstable_stream_uncompressed) { test_sstable_stream(compress_sstable::no); } SEASTAR_THREAD_TEST_CASE(test_sstable_stream_checksum_mismatched_compressed) { test_sstable_stream(compress_sstable::yes, corrupt_data_component, "failed checksum"); } SEASTAR_THREAD_TEST_CASE(test_sstable_stream_checksum_mismatched_uncompressed) { test_sstable_stream(compress_sstable::no, corrupt_data_component, "failed checksum"); } SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_compressed) { test_sstable_stream(compress_sstable::yes, corrupt_digest_component, "Digest mismatch"); } SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_uncompressed) { test_sstable_stream(compress_sstable::no, corrupt_digest_component, "Digest mismatch"); }