Add 'test_sstable_stream' to verify SSTable file streaming integrity check. The new tests cover both compressed and uncompressed SSTables and includes: - Checksum mismatch detection verification - Digest mismatch detection verifivation
534 lines
20 KiB
C++
534 lines
20 KiB
C++
/*
|
|
* Copyright (C) 2023-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "test/lib/cql_test_env.hh"
|
|
#include "streaming/stream_blob.hh"
|
|
#include "message/messaging_service.hh"
|
|
#include "test/lib/log.hh"
|
|
#include "test/lib/sstable_utils.hh"
|
|
|
|
#include <boost/lexical_cast.hpp>
|
|
#include <seastar/testing/thread_test_case.hh>
|
|
#include <seastar/util/short_streams.hh>
|
|
#include <seastar/core/seastar.hh>
|
|
#include <seastar/core/reactor.hh>
|
|
#include <seastar/core/future.hh>
|
|
#include <seastar/core/file.hh>
|
|
#include <seastar/core/fstream.hh>
|
|
#include <cstdio>
|
|
#include <sstream>
|
|
#include <cryptopp/sha.h>
|
|
#include "utils/io-wrappers.hh"
|
|
|
|
future<sstring> generate_file_hash(sstring filename) {
|
|
auto f = co_await seastar::open_file_dma(filename, seastar::open_flags::ro);
|
|
auto in = seastar::make_file_input_stream(std::move(f));
|
|
CryptoPP::SHA256 hash;
|
|
unsigned char digest[CryptoPP::SHA256::DIGESTSIZE];
|
|
std::stringstream ss;
|
|
while (true) {
|
|
auto buf = co_await in.read();
|
|
if (buf.empty()) {
|
|
break;
|
|
}
|
|
hash.Update((const unsigned char*)buf.get(), buf.size());
|
|
}
|
|
co_await in.close();
|
|
hash.Final(digest);
|
|
for (int i = 0; i < CryptoPP::SHA256::DIGESTSIZE; i++) {
|
|
ss << std::hex << std::setw(2) << std::setfill('0') << (int)digest[i];
|
|
}
|
|
co_return ss.str();
|
|
}
|
|
|
|
sstring generate_random_filename() {
|
|
char filename[L_tmpnam];
|
|
std::tmpnam(filename);
|
|
return filename;
|
|
}
|
|
|
|
future<> write_random_content_to_file(const sstring& filename, size_t content_size = 1024) {
|
|
auto f = co_await open_file_dma(filename, open_flags::rw | open_flags::create);
|
|
auto ostream = co_await make_file_output_stream(std::move(f));
|
|
srand(time(nullptr));
|
|
for (size_t i = 0; i < content_size; ++i) {
|
|
char c = rand() % 256;
|
|
co_await ostream.write(&c, 1);
|
|
}
|
|
co_await ostream.close();
|
|
}
|
|
|
|
using namespace streaming;
|
|
|
|
static future<bool>
|
|
do_test_file_stream(replica::database& db, netw::messaging_service& ms, std::vector<sstring> filelist, const std::string& suffix, bool inject_error, bool unsupported_file_ops = false) {
|
|
bool ret = false;
|
|
bool verb_register = false;
|
|
auto ops_id = file_stream_id::create_random_id();
|
|
auto& global_db = db.container();
|
|
auto& global_ms = ms.container();
|
|
int n_retries = 0;
|
|
|
|
do {
|
|
try {
|
|
if (!verb_register) {
|
|
co_await smp::invoke_on_all([&] {
|
|
return global_ms.local().register_stream_blob([&](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
|
|
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
|
|
auto sink = global_ms.local().make_sink_for_stream_blob(source);
|
|
(void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [&suffix](auto&, const streaming::stream_blob_meta& meta) -> future<output_result> {
|
|
auto path = meta.filename + suffix;
|
|
auto f = co_await open_file_dma(path, open_flags::wo|open_flags::create);
|
|
auto out = co_await make_file_output_stream(std::move(f));
|
|
co_return output_result{
|
|
[path = std::move(path)](store_result res) -> future<> {
|
|
if (res != store_result::ok) {
|
|
co_await remove_file(path);
|
|
}
|
|
},
|
|
std::move(out)
|
|
};
|
|
}, inject_error).handle_exception([sink, source, ms = global_ms.local().shared_from_this()] (std::exception_ptr eptr) {
|
|
testlog.warn("Failed to run stream blob handler: {}", eptr);
|
|
});
|
|
return make_ready_future<rpc::sink<streaming::stream_blob_cmd_data>>(sink);
|
|
});
|
|
});
|
|
}
|
|
verb_register = true;
|
|
auto table = table_id::create_random_id();
|
|
auto files = std::list<stream_blob_info>();
|
|
auto hostid = db.get_token_metadata().get_my_id();
|
|
seastar::shard_id dst_shard_id = 0;
|
|
co_await mark_tablet_stream_start(ops_id);
|
|
auto targets = std::vector<node_and_shard>{node_and_shard{hostid, dst_shard_id}};
|
|
for (const auto& filename : filelist) {
|
|
auto fops = file_ops::stream_sstables;
|
|
fops = unsupported_file_ops ? file_ops(0xff55) : fops;
|
|
auto file = co_await open_file_dma(filename, open_flags::ro);
|
|
auto& info = files.emplace_back();
|
|
info.filename = filename;
|
|
info.fops = fops;
|
|
info.source = [file = std::move(file)](const file_input_stream_options& foptions) mutable -> future<input_stream<char>> {
|
|
co_return make_file_input_stream(std::move(file), foptions);
|
|
};
|
|
}
|
|
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table, ops_id, service::null_topology_guard, inject_error);
|
|
co_await mark_tablet_stream_done(ops_id);
|
|
testlog.info("do_test_file_stream[{}] status=ok files={} stream_bytes={}", ops_id, filelist.size(), stream_bytes);
|
|
ret = true;
|
|
} catch (seastar::rpc::stream_closed&) {
|
|
testlog.warn("do_test_file_stream[{}] status=fail error={} retry={}", ops_id, std::current_exception(), n_retries++);
|
|
if (n_retries < 3) {
|
|
testlog.info("Retrying send");
|
|
continue;
|
|
}
|
|
} catch (...) {
|
|
testlog.warn("do_test_file_stream[{}] status=fail error={}", ops_id, std::current_exception());
|
|
}
|
|
} while (false);
|
|
|
|
if (verb_register) {
|
|
co_await smp::invoke_on_all([&global_ms] {
|
|
return global_ms.local().unregister_stream_blob();
|
|
});
|
|
}
|
|
co_return ret;
|
|
}
|
|
|
|
static future<> corrupt_digest_component(sstables::shared_sstable sst) {
|
|
auto f = co_await open_file_dma(sstables::test(sst).filename(component_type::Digest).native(), open_flags::rw);
|
|
auto stream = make_file_input_stream(f);
|
|
auto digest_str = co_await util::read_entire_stream_contiguous(stream);
|
|
auto digest = boost::lexical_cast<uint32_t>(digest_str);
|
|
auto new_digest = to_sstring<bytes>(digest + 1); // make it invalid
|
|
co_await f.dma_write(0, new_digest.c_str(), new_digest.size());
|
|
co_await stream.close();
|
|
}
|
|
|
|
static future<> corrupt_data_component(sstables::shared_sstable sst) {
|
|
auto f = co_await open_file_dma(sstables::test(sst).filename(component_type::Data).native(), open_flags::wo);
|
|
const auto align = f.memory_dma_alignment();
|
|
const auto len = f.disk_write_dma_alignment();
|
|
auto wbuf = seastar::temporary_buffer<char>::aligned(align, len);
|
|
std::fill(wbuf.get_write(), wbuf.get_write() + len, 0xba);
|
|
co_await f.dma_write(0, wbuf.get(), len);
|
|
co_await f.close();
|
|
}
|
|
|
|
using compress_sstable = bool_class<struct compress_sstable_tag>;
|
|
static future<>
|
|
do_test_sstable_stream(cql_test_env& env, compress_sstable compress, std::function<future<>(shared_sstable)> corruption_fn = nullptr, const sstring& expected_error_msg = "") {
|
|
bool verb_register = false;
|
|
auto ops_id = file_stream_id::create_random_id();
|
|
auto& db = env.local_db();
|
|
auto& ms = env.get_messaging_service().local();
|
|
auto& global_db = db.container();
|
|
auto& global_ms = ms.container();
|
|
int n_retries = 0;
|
|
bool exception_caught = false;
|
|
do {
|
|
try {
|
|
if (!verb_register) {
|
|
co_await smp::invoke_on_all([&global_db, &global_ms] {
|
|
return global_ms.local().register_stream_blob([&global_db, &global_ms](const rpc::client_info& cinfo, streaming::stream_blob_meta meta, rpc::source<streaming::stream_blob_cmd_data> source) {
|
|
const auto& from = cinfo.retrieve_auxiliary<locator::host_id>("host_id");
|
|
auto sink = global_ms.local().make_sink_for_stream_blob(source);
|
|
(void)stream_blob_handler(global_db.local(), global_ms.local(), from, meta, sink, source, [](auto&, const streaming::stream_blob_meta& meta) -> future<output_result> {
|
|
auto path = meta.filename;
|
|
auto f = co_await open_file_dma(path, open_flags::wo|open_flags::create);
|
|
auto out = co_await make_file_output_stream(std::move(f));
|
|
co_return output_result {
|
|
[path = std::move(path)](store_result res) -> future<> {
|
|
co_await remove_file(path);
|
|
},
|
|
std::move(out)
|
|
};
|
|
}, false).handle_exception([sink, source, ms = global_ms.local().shared_from_this()] (std::exception_ptr eptr) {
|
|
testlog.warn("Failed to run stream blob handler: {}", eptr);
|
|
});
|
|
return make_ready_future<rpc::sink<streaming::stream_blob_cmd_data>>(sink);
|
|
});
|
|
});
|
|
}
|
|
verb_register = true;
|
|
|
|
co_await env.execute_cql("CREATE KEYSPACE ks_test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
|
|
if (compress) {
|
|
co_await env.execute_cql("CREATE TABLE ks_test.cf (pk text PRIMARY KEY, v int);");
|
|
} else {
|
|
co_await env.execute_cql("CREATE TABLE ks_test.cf (pk text PRIMARY KEY, v int) WITH compression = { 'sstable_compression' : '' };");
|
|
}
|
|
|
|
for (int i = 0; i < 10; i++) {
|
|
co_await env.execute_cql(format("INSERT INTO ks_test.cf (pk, v) VALUES ('key_{}', {});", i, i * 10));
|
|
}
|
|
|
|
auto& table = db.find_column_family("ks_test", "cf");
|
|
co_await table.flush();
|
|
auto schema = table.schema();
|
|
|
|
auto range = dht::token_range::make_open_ended_both_sides();
|
|
auto sstables = co_await table.take_storage_snapshot(range);
|
|
|
|
BOOST_REQUIRE_GT(sstables.size(), 0);
|
|
|
|
auto table_id = schema->id();
|
|
auto files = std::list<stream_blob_info>();
|
|
auto hostid = db.get_token_metadata().get_my_id();
|
|
seastar::shard_id dst_shard_id = 0;
|
|
|
|
co_await mark_tablet_stream_start(ops_id);
|
|
auto targets = std::vector<node_and_shard>{node_and_shard{hostid, dst_shard_id}};
|
|
auto permit = co_await db.obtain_reader_permit(table, "test_stream", db::no_timeout, {});
|
|
|
|
auto sst_snapshot = sstables.front();
|
|
if (corruption_fn) {
|
|
co_await corruption_fn(sst_snapshot.sst);
|
|
}
|
|
auto sources = co_await sstables::create_stream_sources(sst_snapshot, permit);
|
|
|
|
for (auto& source : sources) {
|
|
auto& info = files.emplace_back();
|
|
info.filename = source->component_basename();
|
|
info.fops = file_ops::stream_sstables;
|
|
info.source = [source = std::move(source)](const file_input_stream_options& foptions) mutable -> future<input_stream<char>> {
|
|
co_return co_await source->input(foptions);
|
|
};
|
|
}
|
|
|
|
size_t stream_bytes = co_await tablet_stream_files(ms, std::move(files), targets, table_id, ops_id, service::null_topology_guard, false);
|
|
co_await mark_tablet_stream_done(ops_id);
|
|
testlog.info("do_test_sstable_stream[{}] status=ok stream_bytes={}", ops_id, stream_bytes);
|
|
} catch (seastar::rpc::stream_closed&) {
|
|
testlog.warn("do_test_sstable_stream[{}] status=fail error={} retry={}", ops_id, std::current_exception(), n_retries++);
|
|
if (n_retries < 3) {
|
|
testlog.info("Retrying send");
|
|
continue;
|
|
}
|
|
} catch (const malformed_sstable_exception& e) {
|
|
testlog.warn("do_test_sstable_stream[{}] status=fail error={}", ops_id, std::current_exception());
|
|
exception_caught = (sstring(e.what()).find(expected_error_msg) != sstring::npos);
|
|
}
|
|
} while (false);
|
|
|
|
if (corruption_fn) {
|
|
BOOST_REQUIRE(exception_caught);
|
|
} else {
|
|
BOOST_REQUIRE(!exception_caught);
|
|
}
|
|
|
|
if (verb_register) {
|
|
co_await smp::invoke_on_all([&global_ms] {
|
|
return global_ms.local().unregister_stream_blob();
|
|
});
|
|
}
|
|
}
|
|
|
|
void do_test_file_stream(bool inject_error) {
|
|
cql_test_config cfg;
|
|
cfg.ms_listen = true;
|
|
std::vector<sstring> files;
|
|
std::vector<sstring> files_rx;
|
|
std::vector<sstring> hash_tx;
|
|
std::vector<sstring> hash_rx;
|
|
size_t nr_files = 10;
|
|
size_t file_size = 0;
|
|
static const std::string suffix = ".rx";
|
|
|
|
while (files.size() != nr_files) {
|
|
auto name = generate_random_filename();
|
|
files.push_back(name);
|
|
files_rx.push_back(name + suffix);
|
|
}
|
|
|
|
size_t base_size = 1024;
|
|
|
|
#ifdef SEASTAR_DEBUG
|
|
base_size = 1;
|
|
#endif
|
|
|
|
for (auto& file : files) {
|
|
if (file_size == 0) {
|
|
file_size = 1 * 1024 * base_size;
|
|
} else {
|
|
file_size = (rand() % 10) * 1024 * base_size + rand() % base_size;
|
|
}
|
|
file_size = std::max(size_t(1), file_size);
|
|
testlog.info("file_tx={} file_size={}", file, file_size);
|
|
write_random_content_to_file(file, file_size).get();
|
|
}
|
|
|
|
do_with_cql_env_thread([files, inject_error] (auto& e) {
|
|
do_test_file_stream(e.local_db(), e.get_messaging_service().local(), files, suffix, inject_error).get();
|
|
}, cfg).get();
|
|
|
|
bool cleanup = true;
|
|
for (auto& file : files) {
|
|
auto hash = generate_file_hash(file).get();
|
|
testlog.info("file_tx={} hash={}", file, hash);
|
|
hash_tx.push_back(hash);
|
|
if (cleanup) {
|
|
seastar::remove_file(file).get();
|
|
}
|
|
}
|
|
for (auto& file : files_rx) {
|
|
sstring hash = "SKIP";
|
|
try {
|
|
hash = generate_file_hash(file).get();
|
|
if (cleanup) {
|
|
seastar::remove_file(file).get();
|
|
}
|
|
} catch (...) {
|
|
if (!inject_error) {
|
|
throw;
|
|
}
|
|
}
|
|
hash_rx.push_back(hash);
|
|
testlog.info("file_rx={} hash={}", file, hash);
|
|
}
|
|
|
|
BOOST_REQUIRE(hash_tx.size() == hash_rx.size());
|
|
for (size_t i = 0; i < hash_tx.size(); i++) {
|
|
testlog.info("Check tx_hash={} rx_hash={}", hash_tx[i], hash_rx[i]);
|
|
if (inject_error) {
|
|
BOOST_REQUIRE(hash_tx[i] == hash_rx[i] || sstring("SKIP") == hash_rx[i]);
|
|
} else {
|
|
BOOST_REQUIRE(hash_tx[i] == hash_rx[i]);
|
|
}
|
|
}
|
|
}
|
|
|
|
void do_test_unsupported_file_ops() {
|
|
bool inject_error = false;
|
|
bool unsupported_file_ops = true;
|
|
|
|
cql_test_config cfg;
|
|
cfg.ms_listen = true;
|
|
std::vector<sstring> files;
|
|
size_t nr_files = 2;
|
|
size_t file_size = 1024;
|
|
|
|
while (files.size() != nr_files) {
|
|
auto name = generate_random_filename();
|
|
files.push_back(name);
|
|
}
|
|
|
|
for (auto& file : files) {
|
|
testlog.info("file_tx={} file_size={}", file, file_size);
|
|
write_random_content_to_file(file, file_size).get();
|
|
}
|
|
|
|
do_with_cql_env_thread([files, inject_error, unsupported_file_ops] (auto& e) {
|
|
auto ok = do_test_file_stream(e.local_db(), e.get_messaging_service().local(), files, "", inject_error, unsupported_file_ops).get();
|
|
// Stream with a unsupported file ops should fail
|
|
BOOST_REQUIRE(ok == false);
|
|
}, cfg).get();
|
|
|
|
for (auto& file : files) {
|
|
seastar::remove_file(file).get();
|
|
}
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_file_stream) {
|
|
bool inject_error = false;
|
|
do_test_file_stream(inject_error);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_file_stream_inject_error) {
|
|
bool inject_error = true;
|
|
do_test_file_stream(inject_error);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_unsupported_file_ops) {
|
|
do_test_unsupported_file_ops();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_sink_wrapper) {
|
|
std::vector<temporary_buffer<char>> bufs;
|
|
|
|
auto sink = create_memory_sink(bufs);
|
|
auto file = create_file_for_sink(std::move(sink));
|
|
|
|
auto wa = file.disk_write_dma_alignment();
|
|
|
|
std::random_device rd;
|
|
std::mt19937 gen(rd());
|
|
std::uniform_int_distribution<unsigned> dist(0u, 255u);
|
|
|
|
constexpr auto n_bufs = 11;
|
|
|
|
std::vector<temporary_buffer<char>> src;
|
|
src.reserve(n_bufs);
|
|
for (int i = 0; i < n_bufs; ++i) {
|
|
temporary_buffer<char> buf(wa);
|
|
std::generate(buf.get_write(), buf.get_write() + wa, [&] {
|
|
return (char)dist(gen);
|
|
});
|
|
src.emplace_back(std::move(buf));
|
|
}
|
|
|
|
uint64_t pos = 0;
|
|
for (auto& buf : src) {
|
|
pos += file.dma_write(pos, buf.get(), buf.size()).get();
|
|
}
|
|
|
|
auto final_len = n_bufs * wa - wa/3;
|
|
file.truncate(final_len).get();
|
|
file.flush().get();
|
|
file.close().get();
|
|
|
|
auto res = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [](size_t s, auto& buf) {
|
|
return s + buf.size();
|
|
});
|
|
|
|
BOOST_REQUIRE_EQUAL(final_len, res);
|
|
|
|
temporary_buffer<char> lin1(wa*n_bufs), lin2(wa*n_bufs);
|
|
|
|
std::accumulate(src.begin(), src.end(), lin1.get_write(), [](char* dst, temporary_buffer<char>& buf) {
|
|
return std::copy(buf.begin(), buf.end(), dst);
|
|
});
|
|
std::accumulate(bufs.begin(), bufs.end(), lin2.get_write(), [](char* dst, temporary_buffer<char>& buf) {
|
|
return std::copy(buf.begin(), buf.end(), dst);
|
|
});
|
|
|
|
BOOST_REQUIRE(std::equal(lin1.begin(), lin1.begin() + final_len, lin2.begin()));
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_sink_wrapper_iovec) {
|
|
std::vector<temporary_buffer<char>> bufs;
|
|
|
|
auto sink = create_memory_sink(bufs);
|
|
auto file = create_file_for_sink(std::move(sink));
|
|
|
|
auto wa = file.disk_write_dma_alignment();
|
|
|
|
std::random_device rd;
|
|
std::mt19937 gen(rd());
|
|
std::uniform_int_distribution<unsigned> dist(0u, 255u);
|
|
|
|
constexpr auto n_bufs = 11;
|
|
temporary_buffer<char> src(wa * n_bufs);
|
|
|
|
std::generate(src.get_write(), src.get_write() + src.size(), [&] {
|
|
return (char)dist(gen);
|
|
});
|
|
|
|
std::uniform_int_distribution<size_t> sdist(1u, wa);
|
|
|
|
auto* p = src.get_write();
|
|
uint64_t pos = 0, end = src.size();
|
|
|
|
while (pos < end) {
|
|
std::vector<iovec> iovs;
|
|
auto rem = size_t(end - pos);
|
|
auto p2 = p + pos;
|
|
while (iovs.size() < 5 && rem != 0) {
|
|
auto size = std::min(sdist(gen), rem);
|
|
iovs.emplace_back(iovec{p2, size});
|
|
p2 += size;
|
|
rem -= size;
|
|
}
|
|
|
|
auto written = file.dma_write(pos, std::move(iovs)).get();
|
|
pos += written;
|
|
}
|
|
|
|
auto final_len = src.size() - wa/3;
|
|
file.truncate(final_len).get();
|
|
file.flush().get();
|
|
file.close().get();
|
|
|
|
auto res = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [](size_t s, auto& buf) {
|
|
return s + buf.size();
|
|
});
|
|
|
|
BOOST_REQUIRE_EQUAL(final_len, res);
|
|
|
|
temporary_buffer<char> lin(wa*n_bufs);
|
|
|
|
std::accumulate(bufs.begin(), bufs.end(), lin.get_write(), [](char* dst, temporary_buffer<char>& buf) {
|
|
return std::copy(buf.begin(), buf.end(), dst);
|
|
});
|
|
|
|
BOOST_REQUIRE(std::equal(lin.begin(), lin.begin() + final_len, src.begin()));
|
|
}
|
|
|
|
static void test_sstable_stream(compress_sstable compress, std::function<future<>(shared_sstable)> corruption_fn = nullptr, const sstring& expected_error_msg = "") {
|
|
cql_test_config cfg;
|
|
cfg.ms_listen = true;
|
|
do_with_cql_env_thread([&](cql_test_env& env) {
|
|
do_test_sstable_stream(env, compress, corruption_fn, expected_error_msg).get();
|
|
}, cfg).get();
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_compressed) {
|
|
test_sstable_stream(compress_sstable::yes);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_uncompressed) {
|
|
test_sstable_stream(compress_sstable::no);
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_checksum_mismatched_compressed) {
|
|
test_sstable_stream(compress_sstable::yes, corrupt_data_component, "failed checksum");
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_checksum_mismatched_uncompressed) {
|
|
test_sstable_stream(compress_sstable::no, corrupt_data_component, "failed checksum");
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_compressed) {
|
|
test_sstable_stream(compress_sstable::yes, corrupt_digest_component, "Digest mismatch");
|
|
}
|
|
|
|
SEASTAR_THREAD_TEST_CASE(test_sstable_stream_digest_mismatched_uncompressed) {
|
|
test_sstable_stream(compress_sstable::no, corrupt_digest_component, "Digest mismatch");
|
|
}
|