Files
scylladb/test/boost/file_stream_test.cc
Taras Veretilnyk 3003669c96 file_stream_test: add sstable file streaming integrity verification test cases
Add 'test_sstable_stream' to verify SSTable file streaming integrity check.
The new tests cover both compressed and uncompressed SSTables and includes:
- Checksum mismatch detection verification
- Digest mismatch detection verifivation
2025-11-21 12:52:35 +01:00

534 lines
20 KiB
C++

/*
* Copyright (C) 2023-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include "test/lib/cql_test_env.hh"
#include "streaming/stream_blob.hh"
#include "message/messaging_service.hh"
#include "test/lib/log.hh"
#include "test/lib/sstable_utils.hh"
#include <boost/lexical_cast.hpp>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/util/short_streams.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/future.hh>
#include <seastar/core/file.hh>
#include <seastar/core/fstream.hh>
#include <cstdio>
#include <sstream>
#include <cryptopp/sha.h>
#include "utils/io-wrappers.hh"
future<sstring> generate_file_hash(sstring filename) {
auto f = co_await seastar::open_file_dma(filename, seastar::open_flags::ro);
auto in = seastar::make_file_input_stream(std::move(f));
CryptoPP::SHA256 hash;
unsigned char digest[CryptoPP::SHA256::DIGESTSIZE];
std::stringstream ss;
while (true) {
auto buf = co_await in.read();
if (buf.empty()) {
break;
}
hash.Update((const unsigned char*)buf.get(), buf.size());
}
co_await in.close();
hash.Final(digest);
for (int i = 0; i < CryptoPP::SHA256::DIGESTSIZE; i++) {
ss << std::hex << std::setw(2) << std::setfill('0') << (int)digest[i];
}
co_return ss.str();
}
sstring generate_random_filename() {
char filename[L_tmpnam];
std::tmpnam(filename);
return filename;
}
future<> write_random_content_to_file(const sstring& filename, size_t content_size = 1024) {
auto f = co_await open_file_dma(filename, open_flags::rw | open_flags::create);
auto ostream = co_await make_file_output_stream(std::move(f));
srand(time(nullptr));
for (size_t i = 0; i < content_size; ++i) {
char c = rand() % 256;
co_await ostream.write(&c, 1);
}
co_await ostream.close();
}
using namespace streaming;
static future<bool>
do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vector<sstring> filelist, const std::string& suffix, bool inject_error, bool unsupported_file_ops = false) {
bool ret = false;
bool verb_register = false;
auto ops_id = file_stream_id::create_random_id();
auto& global_db = db.container();
auto& global_ms = ms.container();
int n_retries = 0;
do {
try {
if (!verb_register) {
co_await smp::invoke_on_all([&] {
return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = global_ms.local().make_sink_for_stream_blob(source);
(void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future<output_result> {
auto path = meta.filename + suffix;
auto f = co_await open_file_dma(path, open_flags::wo|open_flags::create);
auto out = co_await make_file_output_stream(std::move(f));
co_return output_result{
[path = std::move(path)](store_result res) -> future<> {
if (res != store_result::ok) {
co_await remove_file(path);
}
},
std::move(out)
};
}, inject_error).handle_exception([sink, source, ms = global_ms.local().shared_from_this()] (std::exception_ptr eptr) {
testlog.warn("Failed to run stream blob handler: {}", eptr);
});
return make_ready_future<rpc::sink<streaming::stream_blob_cmd_data>>(sink);
});
});
}
verb_register = true;
auto table = table_id::create_random_id();
auto files = std::list<stream_blob_info>();
auto hostid = db.get_token_metadata().get_my_id();
seastar::shard_id dst_shard_id = 0;
co_await mark_tablet_stream_start(ops_id);
auto targets = std::vector<node_and_shard>{node_and_shard{hostid, dst_shard_id}};
for (const auto& filename : filelist) {
auto fops = file_ops::stream_sstables;
fops = unsupported_file_ops ? file_ops(0xff55) : fops;
auto file = co_await open_file_dma(filename, open_flags::ro);
auto& info = files.emplace_back();
info.filename = filename;
info.fops = fops;
info.source = [file = std::move(file)](const file_input_stream_options& foptions) mutable -> future<input_stream<char>> {
co_return make_file_input_stream(std::move(file), foptions);
};
}
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, service::null_topology_guard, inject_error);
co_await mark_tablet_stream_done(ops_id);
testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes);
ret = true;
} catch (seastar::rpc::stream_closed&) {
testlog.warn("do_test_file_stream[{}] status=fail error={} retry={}", ops_id, std::current_exception(), n_retries++);
if (n_retries < 3) {
testlog.info("Retrying send");
continue;
}
} catch (...) {
testlog.warn("do_test_file_stream[{}] status=fail error={}", ops_id, std::current_exception());
}
} while (false);
if (verb_register) {
co_await smp::invoke_on_all([&global_ms] {
return global_ms.local().unregister_stream_blob();
});
}
co_return ret;
}
static future<> corrupt_digest_component(sstables::shared_sstable sst) {
auto f = co_await open_file_dma(sstables::test(sst).filename(component_type::Digest).native(), open_flags::rw);
auto stream = make_file_input_stream(f);
auto digest_str = co_await util::read_entire_stream_contiguous(stream);
auto digest = boost::lexical_cast<uint32_t>(digest_str);
auto new_digest = to_sstring<bytes>(digest + 1); // make it invalid
co_await f.dma_write(0, new_digest.c_str(), new_digest.size());
co_await stream.close();
}
static future<> corrupt_data_component(sstables::shared_sstable sst) {
auto f = co_await open_file_dma(sstables::test(sst).filename(component_type::Data).native(), open_flags::wo);
const auto align = f.memory_dma_alignment();
const auto len = f.disk_write_dma_alignment();
auto wbuf = seastar::temporary_buffer<char>::aligned(align, len);
std::fill(wbuf.get_write(), wbuf.get_write() + len, 0xba);
co_await f.dma_write(0, wbuf.get(), len);
co_await f.close();
}
using compress_sstable = bool_class<struct compress_sstable_tag>;
static future<>
do_test_sstable_stream(cql_test_env& env, compress_sstable compress, std::function<future<>(shared_sstable)> corruption_fn = nullptr, const sstring& expected_error_msg = "") {
bool verb_register = false;
auto ops_id = file_stream_id::create_random_id();
auto& db = env.local_db();
auto& ms = env.get_messaging_service().local();
auto& global_db = db.container();
auto& global_ms = ms.container();
int n_retries = 0;
bool exception_caught = false;
do {
try {
if (!verb_register) {
co_await smp::invoke_on_all([&global_db, &global_ms] {
return global_ms.local().register_stream_blob([&global_db, &global_ms](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
auto sink = global_ms.local().make_sink_for_stream_blob(source);
(void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [](auto&, const streaming::stream_blob_meta& meta) -> future<output_result> {
auto path = meta.filename;
auto f = co_await open_file_dma(path, open_flags::wo|open_flags::create);
auto out = co_await make_file_output_stream(std::move(f));
co_return output_result {
[path = std::move(path)](store_result res) -> future<> {
co_await remove_file(path);
},
std::move(out)
};
}, false).handle_exception([sink, source, ms = global_ms.local().shared_from_this()] (std::exception_ptr eptr) {
testlog.warn("Failed to run stream blob handler: {}", eptr);
});
return make_ready_future<rpc::sink<streaming::stream_blob_cmd_data>>(sink);
});
});
}
verb_register = true;
co_await env.execute_cql("CREATE KEYSPACE ks_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
if (compress) {
co_await env.execute_cql("CREATE TABLE ks_test.cf (pk text PRIMARY KEY, v int);");
} else {
co_await env.execute_cql("CREATE TABLE ks_test.cf (pk text PRIMARY KEY, v int) WITH compression = { 'sstable_compression' : '' };");
}
for (int i = 0; i < 10; i++) {
co_await env.execute_cql(format("INSERT INTO ks_test.cf (pk, v) VALUES ('key_{}', {});", i, i * 10));
}
auto& table = db.find_column_family("ks_test", "cf");
co_await table.flush();
auto schema = table.schema();
auto range = dht::token_range::make_open_ended_both_sides();
auto sstables = co_await table.take_storage_snapshot(range);
BOOST_REQUIRE_GT(sstables.size(), 0);
auto table_id = schema->id();
auto files = std::list<stream_blob_info>();
auto hostid = db.get_token_metadata().get_my_id();
seastar::shard_id dst_shard_id = 0;
co_await mark_tablet_stream_start(ops_id);
auto targets = std::vector<node_and_shard>{node_and_shard{hostid, dst_shard_id}};
auto permit = co_await db.obtain_reader_permit(table, "test_stream", db::no_timeout, {});
auto sst_snapshot = sstables.front();
if (corruption_fn) {
co_await corruption_fn(sst_snapshot.sst);
}
auto sources = co_await sstables::create_stream_sources(sst_snapshot, permit);
for (auto& source : sources) {
auto& info = files.emplace_back();
info.filename = source->component_basename();
info.fops = file_ops::stream_sstables;
info.source = [source = std::move(source)](const file_input_stream_options& foptions) mutable -> future<input_stream<char>> {
co_return co_await source->input(foptions);
};
}
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table_id, ops_id, service::null_topology_guard, false);
co_await mark_tablet_stream_done(ops_id);
testlog.info("do_test_sstable_stream[{}] status=ok stream_bytes={}", ops_id, stream_bytes);
} catch (seastar::rpc::stream_closed&) {
testlog.warn("do_test_sstable_stream[{}] status=fail error={} retry={}", ops_id, std::current_exception(), n_retries++);
if (n_retries < 3) {
testlog.info("Retrying send");
continue;
}
} catch (const malformed_sstable_exception& e) {
testlog.warn("do_test_sstable_stream[{}] status=fail error={}", ops_id, std::current_exception());
exception_caught = (sstring(e.what()).find(expected_error_msg) != sstring::npos);
}
} while (false);
if (corruption_fn) {
BOOST_REQUIRE(exception_caught);
} else {
BOOST_REQUIRE(!exception_caught);
}
if (verb_register) {
co_await smp::invoke_on_all([&global_ms] {
return global_ms.local().unregister_stream_blob();
});
}
}
void do_test_file_stream(bool inject_error) {
cql_test_config cfg;
cfg.ms_listen = true;
std::vector<sstring> files;
std::vector<sstring> files_rx;
std::vector<sstring> hash_tx;
std::vector<sstring> hash_rx;
size_t nr_files = 10;
size_t file_size = 0;
static const std::string suffix = ".rx";
while (files.size() != nr_files) {
auto name = generate_random_filename();
files.push_back(name);
files_rx.push_back(name + suffix);
}
size_t base_size = 1024;
#ifdef SEASTAR_DEBUG
base_size = 1;
#endif
for (auto& file : files) {
if (file_size == 0) {
file_size = 1 * 1024 * base_size;
} else {
file_size = (rand() % 10) * 1024 * base_size + rand() % base_size;
}
file_size = std::max(size_t(1), file_size);
testlog.info("file_tx={} file_size={}", file, file_size);
write_random_content_to_file(file, file_size).get();
}
do_with_cql_env_thread([files, inject_error] (auto& e) {
do_test_file_stream(e.local_db(), e.get_messaging_service().local(), files, suffix, inject_error).get();
}, cfg).get();
bool cleanup = true;
for (auto& file : files) {
auto hash = generate_file_hash(file).get();
testlog.info("file_tx={} hash={}", file, hash);
hash_tx.push_back(hash);
if (cleanup) {
seastar::remove_file(file).get();
}
}
for (auto& file : files_rx) {
sstring hash = "SKIP";
try {
hash = generate_file_hash(file).get();
if (cleanup) {
seastar::remove_file(file).get();
}
} catch (...) {
if (!inject_error) {
throw;
}
}
hash_rx.push_back(hash);
testlog.info("file_rx={} hash={}", file, hash);
}
BOOST_REQUIRE(hash_tx.size() == hash_rx.size());
for (size_t i = 0; i < hash_tx.size(); i++) {
testlog.info("Check tx_hash={} rx_hash={}", hash_tx[i], hash_rx[i]);
if (inject_error) {
BOOST_REQUIRE(hash_tx[i] == hash_rx[i] || sstring("SKIP") == hash_rx[i]);
} else {
BOOST_REQUIRE(hash_tx[i] == hash_rx[i]);
}
}
}
void do_test_unsupported_file_ops() {
bool inject_error = false;
bool unsupported_file_ops = true;
cql_test_config cfg;
cfg.ms_listen = true;
std::vector<sstring> files;
size_t nr_files = 2;
size_t file_size = 1024;
while (files.size() != nr_files) {
auto name = generate_random_filename();
files.push_back(name);
}
for (auto& file : files) {
testlog.info("file_tx={} file_size={}", file, file_size);
write_random_content_to_file(file, file_size).get();
}
do_with_cql_env_thread([files, inject_error, unsupported_file_ops] (auto& e) {
auto ok = do_test_file_stream(e.local_db(), e.get_messaging_service().local(), files, "", inject_error, unsupported_file_ops).get();
// Stream with a unsupported file ops should fail
BOOST_REQUIRE(ok == false);
}, cfg).get();
for (auto& file : files) {
seastar::remove_file(file).get();
}
}
SEASTAR_THREAD_TEST_CASE(test_file_stream) {
bool inject_error = false;
do_test_file_stream(inject_error);
}
SEASTAR_THREAD_TEST_CASE(test_file_stream_inject_error) {
bool inject_error = true;
do_test_file_stream(inject_error);
}
SEASTAR_THREAD_TEST_CASE(test_unsupported_file_ops) {
do_test_unsupported_file_ops();
}
SEASTAR_THREAD_TEST_CASE(test_sink_wrapper) {
std::vector<temporary_buffer<char>> bufs;
auto sink = create_memory_sink(bufs);
auto file = create_file_for_sink(std::move(sink));
auto wa = file.disk_write_dma_alignment();
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<unsigned> dist(0u, 255u);
constexpr auto n_bufs = 11;
std::vector<temporary_buffer<char>> src;
src.reserve(n_bufs);
for (int i = 0; i < n_bufs; ++i) {
temporary_buffer<char> buf(wa);
std::generate(buf.get_write(), buf.get_write() + wa, [&] {
return (char)dist(gen);
});
src.emplace_back(std::move(buf));
}
uint64_t pos = 0;
for (auto& buf : src) {
pos += file.dma_write(pos, buf.get(), buf.size()).get();
}
auto final_len = n_bufs * wa - wa/3;
file.truncate(final_len).get();
file.flush().get();
file.close().get();
auto res = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [](size_t s, auto& buf) {
return s + buf.size();
});
BOOST_REQUIRE_EQUAL(final_len, res);
temporary_buffer<char> lin1(wa*n_bufs), lin2(wa*n_bufs);
std::accumulate(src.begin(), src.end(), lin1.get_write(), [](char* dst, temporary_buffer<char>& buf) {
return std::copy(buf.begin(), buf.end(), dst);
});
std::accumulate(bufs.begin(), bufs.end(), lin2.get_write(), [](char* dst, temporary_buffer<char>& buf) {
return std::copy(buf.begin(), buf.end(), dst);
});
BOOST_REQUIRE(std::equal(lin1.begin(), lin1.begin() + final_len, lin2.begin()));
}
SEASTAR_THREAD_TEST_CASE(test_sink_wrapper_iovec) {
std::vector<temporary_buffer<char>> bufs;
auto sink = create_memory_sink(bufs);
auto file = create_file_for_sink(std::move(sink));
auto wa = file.disk_write_dma_alignment();
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<unsigned> dist(0u, 255u);
constexpr auto n_bufs = 11;
temporary_buffer<char> src(wa * n_bufs);
std::generate(src.get_write(), src.get_write() + src.size(), [&] {
return (char)dist(gen);
});
std::uniform_int_distribution<size_t> sdist(1u, wa);
auto* p = src.get_write();
uint64_t pos = 0, end = src.size();
while (pos < end) {
std::vector<iovec> iovs;
auto rem = size_t(end - pos);
auto p2 = p + pos;
while (iovs.size() < 5 && rem != 0) {
auto size = std::min(sdist(gen), rem);
iovs.emplace_back(iovec{p2, size});
p2 += size;
rem -= size;
}
auto written = file.dma_write(pos, std::move(iovs)).get();
pos += written;
}
auto final_len = src.size() - wa/3;
file.truncate(final_len).get();
file.flush().get();
file.close().get();
auto res = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [](size_t s, auto& buf) {
return s + buf.size();
});
BOOST_REQUIRE_EQUAL(final_len, res);
temporary_buffer<char> lin(wa*n_bufs);
std::accumulate(bufs.begin(), bufs.end(), lin.get_write(), [](char* dst, temporary_buffer<char>& buf) {
return std::copy(buf.begin(), buf.end(), dst);
});
BOOST_REQUIRE(std::equal(lin.begin(), lin.begin() + final_len, src.begin()));
}
static void test_sstable_stream(compress_sstable compress, std::function<future<>(shared_sstable)> corruption_fn = nullptr, const sstring& expected_error_msg = "") {
cql_test_config cfg;
cfg.ms_listen = true;
do_with_cql_env_thread([&](cql_test_env& env) {
do_test_sstable_stream(env, compress, corruption_fn, expected_error_msg).get();
}, cfg).get();
}
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_compressed) {
test_sstable_stream(compress_sstable::yes);
}
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_uncompressed) {
test_sstable_stream(compress_sstable::no);
}
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_checksum_mismatched_compressed) {
test_sstable_stream(compress_sstable::yes, corrupt_data_component, "failed checksum");
}
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_checksum_mismatched_uncompressed) {
test_sstable_stream(compress_sstable::no, corrupt_data_component, "failed checksum");
}
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_compressed) {
test_sstable_stream(compress_sstable::yes, corrupt_digest_component, "Digest mismatch");
}
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_uncompressed) {
test_sstable_stream(compress_sstable::no, corrupt_digest_component, "Digest mismatch");
}