Files
scylladb/test/boost/sstable_test.cc
Taras Veretilnyk a3912cf7f1 sstables: add tests for component digest validation on corrupted SSTables
Add tests that verify SSTable component digest validation detects
corruption on load. Each test writes an SSTable, corrupts a specific
component file by flipping a bit, then asserts that reloading the
SSTable throws malformed_sstable_exception with the expected digest
mismatch message.
2026-03-10 19:24:05 +01:00

1086 lines
46 KiB
C++

/*
* Copyright (C) 2015-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/sstring.hh>
#include <seastar/core/future-util.hh>
#include <seastar/core/align.hh>
#include <seastar/core/aligned_buffer.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/smp.hh>
#include <seastar/util/short_streams.hh>
#include <seastar/util/closeable.hh>
#include "sstables/checksum_utils.hh"
#include "sstables/generation_type.hh"
#include "sstables/sstables.hh"
#include "sstables/key.hh"
#include "sstables/open_info.hh"
#include "sstables/version.hh"
#include "test/lib/exception_utils.hh"
#include "test/lib/random_schema.hh"
#include "test/lib/sstable_utils.hh"
#include "test/lib/random_utils.hh"
#include "test/lib/reader_concurrency_semaphore.hh"
#include "test/lib/scylla_test_case.hh"
#include "test/lib/test_utils.hh"
#include "schema/schema.hh"
#include "sstables/compressor.hh"
#include "replica/database.hh"
#include "test/boost/sstable_test.hh"
#include "test/lib/tmpdir.hh"
#include "partition_slice_builder.hh"
#include "sstables/sstable_mutation_reader.hh"
#include "sstables/binary_search.hh"
#include <boost/range/combine.hpp>
using namespace sstables;
bytes as_bytes(const sstring& s) {
return { reinterpret_cast<const int8_t*>(s.data()), s.size() };
}
future<> test_using_working_sst(schema_ptr s, sstring dir) {
return test_env::do_with_async([s = std::move(s), dir = std::move(dir)] (test_env& env) {
(void)env.reusable_sst(std::move(s), std::move(dir)).get();
});
}
SEASTAR_TEST_CASE(uncompressed_data) {
return test_using_working_sst(uncompressed_schema(), uncompressed_dir());
}
static auto make_schema_for_compressed_sstable() {
return schema_builder("ks", "cf").with_column("pk", utf8_type, column_kind::partition_key).build();
}
SEASTAR_TEST_CASE(compressed_data) {
auto s = make_schema_for_compressed_sstable();
return test_using_working_sst(std::move(s), "test/resource/sstables/compressed");
}
SEASTAR_TEST_CASE(composite_index) {
return test_using_working_sst(composite_schema(), "test/resource/sstables/composite");
}
template<std::invocable<test_env&, sstable_ptr> Func>
inline future<std::invoke_result_t<Func, test_env&, sstable_ptr>>
test_using_reusable_sst(schema_ptr s, sstring dir, sstables::generation_type::int_t gen, Func&& func) {
using ret_type = std::invoke_result_t<Func, test_env&, sstable_ptr>;
return test_env::do_with_async_returning<ret_type>([s = std::move(s), dir = std::move(dir), gen, func = std::move(func)] (test_env& env) {
auto sst = env.reusable_sst(std::move(s), std::move(dir), generation_from_value(gen)).get();
return func(env, std::move(sst));
});
}
future<std::vector<partition_key>> index_read(schema_ptr schema, sstring path) {
return test_using_reusable_sst(std::move(schema), std::move(path), 1, [] (test_env& env, sstable_ptr ptr) {
auto indexes = sstables::test(ptr).read_indexes(env.make_reader_permit()).get();
return indexes | std::views::transform([] (const sstables::test::index_entry& e) { return e.key; }) | std::ranges::to<std::vector<partition_key>>();
});
}
SEASTAR_TEST_CASE(simple_index_read) {
auto vec = co_await index_read(uncompressed_schema(), uncompressed_dir());
BOOST_REQUIRE(vec.size() == 4);
}
SEASTAR_TEST_CASE(composite_index_read) {
auto vec = co_await index_read(composite_schema(), "test/resource/sstables/composite");
BOOST_REQUIRE(vec.size() == 20);
}
template<uint64_t Position, uint64_t EntryPosition, uint64_t EntryKeySize>
future<> summary_query(schema_ptr schema, sstring path, sstables::generation_type::int_t generation) {
return test_using_reusable_sst(std::move(schema), path, generation, [] (test_env& env, sstable_ptr ptr) {
auto entry = sstables::test(ptr).read_summary_entry(Position).get();
BOOST_REQUIRE(entry.position == EntryPosition);
BOOST_REQUIRE(entry.key.size() == EntryKeySize);
});
}
template<uint64_t Position, uint64_t EntryPosition, uint64_t EntryKeySize>
future<> summary_query_fail(schema_ptr schema, sstring path, sstables::generation_type::int_t generation) {
try {
co_await summary_query<Position, EntryPosition, EntryKeySize>(std::move(schema), std::move(path), generation);
} catch (const std::out_of_range&) {
}
}
SEASTAR_TEST_CASE(small_summary_query_ok) {
return summary_query<0, 0, 5>(uncompressed_schema(), uncompressed_dir(), 1);
}
SEASTAR_TEST_CASE(small_summary_query_fail) {
return summary_query_fail<2, 0, 5>(uncompressed_schema(), uncompressed_dir(), 1);
}
SEASTAR_TEST_CASE(small_summary_query_negative_fail) {
return summary_query_fail<-uint64_t(2), 0, 5>(uncompressed_schema(), uncompressed_dir(), 1);
}
SEASTAR_TEST_CASE(big_summary_query_0) {
return summary_query<0, 0, 182>(uncompressed_schema(), "test/resource/sstables/bigsummary", 76);
}
SEASTAR_TEST_CASE(big_summary_query_32) {
return summary_query<32, 0xc4000, 182>(uncompressed_schema(), "test/resource/sstables/bigsummary", 76);
}
// The following two files are just a copy of uncompressed's 1. But the Summary
// is removed (and removed from the TOC as well). We should reconstruct it
// in this case, so the queries should still go through
SEASTAR_TEST_CASE(missing_summary_query_ok) {
return summary_query<0, 0, 5>(uncompressed_schema(), uncompressed_dir(), 2);
}
SEASTAR_TEST_CASE(missing_summary_query_fail) {
return summary_query_fail<2, 0, 5>(uncompressed_schema(), uncompressed_dir(), 2);
}
SEASTAR_TEST_CASE(missing_summary_query_negative_fail) {
return summary_query_fail<-uint64_t(2), 0, 5>(uncompressed_schema(), uncompressed_dir(), 2);
}
// TODO: only one interval is generated with size-based sampling. Test it with a sstable that will actually result
// in two intervals.
#if 0
SEASTAR_TEST_CASE(missing_summary_interval_1_query_ok) {
return summary_query<1, 19, 6>(uncompressed_schema(1), uncompressed_dir(), 2);
}
#endif
SEASTAR_TEST_CASE(missing_summary_first_last_sane) {
return test_using_reusable_sst(uncompressed_schema(), uncompressed_dir(), 2, [] (test_env& env, shared_sstable ptr) {
const auto& summary = ptr->get_summary();
BOOST_REQUIRE(summary.header.size == 1);
BOOST_REQUIRE(summary.positions.size() == 1);
BOOST_REQUIRE(summary.entries.size() == 1);
BOOST_REQUIRE(bytes_view(summary.first_key) == as_bytes("vinna"));
BOOST_REQUIRE(bytes_view(summary.last_key) == as_bytes("finna"));
});
}
static future<std::pair<sstable_ptr, sstable_ptr>> do_write_sst(test_env& env, schema_ptr schema, sstring load_dir, sstring write_dir, sstables::generation_type generation) {
auto sst = co_await env.reusable_sst(std::move(schema), load_dir, generation);
sstable_generation_generator gen;
auto sst2 = co_await sstables::test(sst).store(write_dir, gen());
co_return std::make_pair(sst, sst2);
}
static future<std::pair<sstables::generation_type, sstables::generation_type>> write_sst_info(schema_ptr schema, sstring load_dir, sstring write_dir, sstables::generation_type generation) {
std::pair<sstables::generation_type, sstables::generation_type> ret;
co_await test_env::do_with_async([schema = std::move(schema), load_dir = std::move(load_dir), write_dir = std::move(write_dir),
generation = std::move(generation), &ret] (test_env& env) {
auto [sst1, sst2] = do_write_sst(env, std::move(schema), std::move(load_dir), std::move(write_dir), std::move(generation)).get();
ret = std::make_pair(sst1->generation(), sst2->generation());
});
co_return ret;
}
static future<> check_component_integrity(component_type component) {
tmpdir tmp;
auto load_gen = sstables::generation_type(1);
auto [gen1, gen2] = co_await write_sst_info(make_schema_for_compressed_sstable(), "test/resource/sstables/compressed", tmp.path().string(), load_gen);
auto file_path_a = sstable::filename("test/resource/sstables/compressed", "ks", "cf", la, load_gen, big, component);
auto file_path_b = sstable::filename(tmp.path().string(), "ks", "cf", la, gen2, big, component);
auto eq = co_await tests::compare_files(file_path_a, file_path_b);
BOOST_REQUIRE(eq);
}
SEASTAR_TEST_CASE(check_compressed_info_func) {
return check_component_integrity(component_type::CompressionInfo);
}
future<>
write_and_validate_sst(schema_ptr s, sstring dir, sstables::generation_type load_gen, noncopyable_function<void (shared_sstable sst1, shared_sstable sst2)> func) {
return test_env::do_with_async([s = std::move(s), dir = std::move(dir), load_gen, func = std::move(func)] (test_env& env) mutable {
auto [sst1, sst2] = do_write_sst(env, s, dir, env.tempdir().path().native(), load_gen).get();
func(std::move(sst1), std::move(sst2));
});
}
SEASTAR_TEST_CASE(check_summary_func) {
auto s = make_schema_for_compressed_sstable();
return write_and_validate_sst(std::move(s), "test/resource/sstables/compressed", sstables::generation_type(1), [] (shared_sstable sst1, shared_sstable sst2) {
sstables::test(sst2).read_summary().get();
const summary& sst1_s = sst1->get_summary();
const summary& sst2_s = sst2->get_summary();
BOOST_REQUIRE(::memcmp(&sst1_s.header, &sst2_s.header, sizeof(summary::header)) == 0);
BOOST_REQUIRE(sst1_s.positions == sst2_s.positions);
BOOST_REQUIRE(sst1_s.entries == sst2_s.entries);
BOOST_REQUIRE(sst1_s.first_key.value == sst2_s.first_key.value);
BOOST_REQUIRE(sst1_s.last_key.value == sst2_s.last_key.value);
});
}
SEASTAR_TEST_CASE(check_filter_func) {
return check_component_integrity(component_type::Filter);
}
SEASTAR_TEST_CASE(check_statistics_func) {
auto s = make_schema_for_compressed_sstable();
return write_and_validate_sst(std::move(s), "test/resource/sstables/compressed", sstables::generation_type(1), [] (shared_sstable sst1, shared_sstable sst2) {
sstables::test(sst2).read_statistics().get();
const auto& sst1_s = sst1->get_statistics();
const auto& sst2_s = sst2->get_statistics();
BOOST_REQUIRE(sst1_s.offsets.elements.size() == sst2_s.offsets.elements.size());
BOOST_REQUIRE(sst1_s.contents.size() == sst2_s.contents.size());
for (auto&& e : boost::combine(sst1_s.offsets.elements, sst2_s.offsets.elements)) {
BOOST_REQUIRE(boost::get<0>(e).second == boost::get<1>(e).second);
}
// TODO: compare the field contents from both sstables.
});
}
SEASTAR_TEST_CASE(check_toc_func) {
auto s = make_schema_for_compressed_sstable();
return write_and_validate_sst(std::move(s), "test/resource/sstables/compressed", sstables::generation_type(1), [] (shared_sstable sst1, shared_sstable sst2) {
sstables::test(sst2).read_toc().get();
auto& sst1_c = sstables::test(sst1).get_components();
auto& sst2_c = sstables::test(sst2).get_components();
BOOST_REQUIRE(sst1_c == sst2_c);
});
}
SEASTAR_TEST_CASE(uncompressed_random_access_read) {
return test_using_reusable_sst(uncompressed_schema(), uncompressed_dir(), 1, [] (auto& env, auto sstp) {
temporary_buffer<char> buf = sstp->data_read(97, 6, env.make_reader_permit()).get();
BOOST_REQUIRE(sstring(buf.get(), buf.size()) == "gustaf");
});
}
SEASTAR_TEST_CASE(compressed_random_access_read) {
auto s = make_schema_for_compressed_sstable();
return test_using_reusable_sst(std::move(s), "test/resource/sstables/compressed", 1, [] (auto& env, auto sstp) {
temporary_buffer<char> buf = sstp->data_read(97, 6, env.make_reader_permit()).get();
BOOST_REQUIRE(sstring(buf.get(), buf.size()) == "gustaf");
});
}
SEASTAR_TEST_CASE(find_key_map) {
return test_using_reusable_sst(map_schema(), "test/resource/sstables/map_pk", 1, [] (auto& env, auto sstp) {
schema_ptr s = map_schema();
auto& summary = sstables::test(sstp)._summary();
std::vector<data_value> kk;
auto b1 = to_bytes("2");
auto b2 = to_bytes("2");
auto map_type = map_type_impl::get_instance(bytes_type, bytes_type, true);
auto map_element = std::make_pair<data_value, data_value>(data_value(b1), data_value(b2));
std::vector<std::pair<data_value, data_value>> map;
map.push_back(map_element);
kk.push_back(make_map_value(map_type, map));
auto key = sstables::key::from_deeply_exploded(*s, kk);
BOOST_REQUIRE(sstables::binary_search(s->get_partitioner(), summary.entries, key) == 0);
});
}
SEASTAR_TEST_CASE(find_key_set) {
return test_using_reusable_sst(set_schema(), "test/resource/sstables/set_pk", 1, [] (auto& env, auto sstp) {
schema_ptr s = set_schema();
auto& summary = sstables::test(sstp)._summary();
std::vector<data_value> kk;
std::vector<data_value> set;
bytes b1("1");
bytes b2("2");
set.push_back(data_value(b1));
set.push_back(data_value(b2));
auto set_type = set_type_impl::get_instance(bytes_type, true);
kk.push_back(make_set_value(set_type, set));
auto key = sstables::key::from_deeply_exploded(*s, kk);
BOOST_REQUIRE(sstables::binary_search(s->get_partitioner(), summary.entries, key) == 0);
});
}
SEASTAR_TEST_CASE(find_key_list) {
return test_using_reusable_sst(list_schema(), "test/resource/sstables/list_pk", 1, [] (auto& env, auto sstp) {
schema_ptr s = set_schema();
auto& summary = sstables::test(sstp)._summary();
std::vector<data_value> kk;
std::vector<data_value> list;
bytes b1("1");
bytes b2("2");
list.push_back(data_value(b1));
list.push_back(data_value(b2));
auto list_type = list_type_impl::get_instance(bytes_type, true);
kk.push_back(make_list_value(list_type, list));
auto key = sstables::key::from_deeply_exploded(*s, kk);
BOOST_REQUIRE(sstables::binary_search(s->get_partitioner(), summary.entries, key) == 0);
});
}
SEASTAR_TEST_CASE(find_key_composite) {
return test_using_reusable_sst(composite_schema(), "test/resource/sstables/composite", 1, [] (auto& env, auto sstp) {
schema_ptr s = composite_schema();
auto& summary = sstables::test(sstp)._summary();
std::vector<data_value> kk;
auto b1 = bytes("HCG8Ee7ENWqfCXipk4-Ygi2hzrbfHC8pTtH3tEmV3d9p2w8gJPuMN_-wp1ejLRf4kNEPEgtgdHXa6NoFE7qUig==");
auto b2 = bytes("VJizqYxC35YpLaPEJNt_4vhbmKJxAg54xbiF1UkL_9KQkqghVvq34rZ6Lm8eRTi7JNJCXcH6-WtNUSFJXCOfdg==");
kk.push_back(data_value(b1));
kk.push_back(data_value(b2));
auto key = sstables::key::from_deeply_exploded(*s, kk);
BOOST_REQUIRE(sstables::binary_search(s->get_partitioner(), summary.entries, key) == 0);
});
}
SEASTAR_TEST_CASE(all_in_place) {
return test_using_reusable_sst(uncompressed_schema(), "test/resource/sstables/bigsummary", 76, [] (auto& env, auto sstp) {
auto& summary = sstables::test(sstp)._summary();
int idx = 0;
for (auto& e: summary.entries) {
auto key = sstables::key::from_bytes(bytes(e.key));
BOOST_REQUIRE(sstables::binary_search(sstp->get_schema()->get_partitioner(), summary.entries, key) == idx++);
}
});
}
SEASTAR_TEST_CASE(full_index_search) {
return test_using_reusable_sst(uncompressed_schema(), uncompressed_dir(), 1, [] (auto& env, auto sstp) {
auto index_list = sstables::test(sstp).read_indexes(env.make_reader_permit()).get();
int idx = 0;
for (auto& e : index_list) {
auto key = key::from_partition_key(*sstp->get_schema(), e.key);
BOOST_REQUIRE(sstables::binary_search(sstp->get_schema()->get_partitioner(), index_list, key) == idx++);
}
});
}
SEASTAR_TEST_CASE(not_find_key_composite_bucket0) {
return test_using_reusable_sst(composite_schema(), "test/resource/sstables/composite", 1, [] (auto& env, auto sstp) {
schema_ptr s = composite_schema();
auto& summary = sstables::test(sstp)._summary();
std::vector<data_value> kk;
auto b1 = bytes("ZEunFCoqAidHOrPiU3U6UAvUU01IYGvT3kYtYItJ1ODTk7FOsEAD-dqmzmFNfTDYvngzkZwKrLxthB7ItLZ4HQ==");
auto b2 = bytes("K-GpWx-QtyzLb12z5oNS0C03d3OzNyBKdYJh1XjHiC53KudoqdoFutHUMFLe6H9Emqv_fhwIJEKEb5Csn72f9A==");
kk.push_back(data_value(b1));
kk.push_back(data_value(b2));
auto key = sstables::key::from_deeply_exploded(*s, kk);
// (result + 1) * -1 -1 = 0
BOOST_REQUIRE(sstables::binary_search(s->get_partitioner(), summary.entries, key) == -2);
});
}
// See CASSANDRA-7593. This sstable writes 0 in the range_start. We need to handle that case as well
SEASTAR_TEST_CASE(wrong_range) {
return test_using_reusable_sst(uncompressed_schema(), "test/resource/sstables/wrongrange", 114, [] (auto& env, auto sstp) {
auto range = dht::partition_range::make_singular(make_dkey(uncompressed_schema(), "todata"));
auto s = columns_schema();
auto rd = sstp->make_reader(s, env.make_reader_permit(), range, s->full_slice());
auto close_rd = deferred_close(rd);
(void)read_mutation_from_mutation_reader(rd).get();
});
}
future<sstable_ptr> mutate_sstable_level(test_env& env, sstable_ptr sstp, const std::string& dir_path, uint32_t new_level, bool update_sstable_id = false) {
auto modifier = [new_level] (sstables::sstable& sst) {
sst.mutate_sstable_level(new_level);
};
auto creator = [&env, &dir_path] (shared_sstable sstp) {
return env.make_sstable(sstp->get_schema(), dir_path, sstp->get_version());
};
auto new_sst = co_await sstp->link_with_rewritten_component(std::move(creator), component_type::Statistics, modifier, update_sstable_id);
co_await sstp->unlink();
sstp = new_sst;
sstp = co_await env.reusable_sst(uncompressed_schema(), dir_path, sstp->generation());
co_return sstp;
}
SEASTAR_TEST_CASE(statistics_rewrite) {
return test_env::do_with_async([] (test_env& env) {
auto random_spec = tests::make_random_schema_specification(
"ks",
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8));
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
const auto muts = tests::generate_random_mutations(random_schema, 2).get();
auto sstp = make_sstable_containing(env.make_sstable(schema, sstable::version_types::me), muts);
auto toc_path = fmt::to_string(sstp->toc_filename());
auto dir_path = std::filesystem::path(toc_path).parent_path().string();
BOOST_REQUIRE(sstp->get_sstable_level() != 10);
sstp = mutate_sstable_level(env, sstp, dir_path, 10).get();
sstp = env.reusable_sst(schema, dir_path, sstp->generation()).get();
BOOST_REQUIRE(sstp->get_sstable_level() == 10);
});
}
// Tests for reading a large partition for which the index contains a
// "promoted index", i.e., a sample of the column names inside the partition,
// with which we can avoid reading the entire partition when we look only
// for a specific subset of columns. The test sstable for the read test was
// generated in Cassandra.
static schema_ptr large_partition_schema() {
static thread_local auto s = [] {
schema_builder builder("try1", "data", generate_legacy_id("try1", "data"));
builder.with_column("t1", utf8_type, column_kind::partition_key);
builder.with_column("t2", utf8_type, column_kind::clustering_key);
builder.with_column("t3", utf8_type);
return builder.build(schema_builder::compact_storage::no);
}();
return s;
}
static future<shared_sstable> load_large_partition_sst(test_env& env, const sstables::sstable::version_types version) {
auto s = large_partition_schema();
auto dir = get_test_dir("large_partition", s);
return env.reusable_sst(std::move(s), std::move(dir), 3, version);
}
// This is a rudimentary test that reads an sstable exported from Cassandra
// which contains a promoted index. It just checks that the promoted index
// is read from disk, as an unparsed array, and doesn't actually use it to
// search for anything.
SEASTAR_TEST_CASE(promoted_index_read) {
return for_each_sstable_version([] (const sstables::sstable::version_types version) {
if (!has_summary_and_index(version)) {
// This test is so basic that updating it to support `ms` sstables is not worth the effort.
return make_ready_future<>();
}
return test_env::do_with_async([version] (test_env& env) {
auto sstp = load_large_partition_sst(env, version).get();
std::vector<sstables::test::index_entry> vec = sstables::test(sstp).read_indexes(env.make_reader_permit()).get();
BOOST_REQUIRE(vec.size() == 1);
BOOST_REQUIRE(vec[0].promoted_index_size > 0);
});
});
}
// Use an empty string for ck1, ck2, or both, for unbounded ranges.
static query::partition_slice make_partition_slice(const schema& s, sstring ck1, sstring ck2) {
std::optional<query::clustering_range::bound> b1;
if (!ck1.empty()) {
b1.emplace(clustering_key_prefix::from_single_value(
s, utf8_type->decompose(ck1)));
}
std::optional<query::clustering_range::bound> b2;
if (!ck2.empty()) {
b2.emplace(clustering_key_prefix::from_single_value(
s, utf8_type->decompose(ck2)));
}
return partition_slice_builder(s).
with_range(query::clustering_range(b1, b2)).build();
}
// Count the number of CQL rows in one partition between clustering key
// prefix ck1 to ck2.
static future<int> count_rows(test_env& env, sstable_ptr sstp, schema_ptr s, sstring key, sstring ck1, sstring ck2) {
return seastar::async([&env, sstp, s, key, ck1, ck2] () mutable {
auto ps = make_partition_slice(*s, ck1, ck2);
auto pr = dht::partition_range::make_singular(make_dkey(s, key.c_str()));
auto rd = sstp->make_reader(s, env.make_reader_permit(), pr, ps);
auto close_rd = deferred_close(rd);
auto mfopt = rd().get();
if (!mfopt) {
return 0;
}
int nrows = 0;
mfopt = rd().get();
while (mfopt) {
if (mfopt->is_clustering_row()) {
nrows++;
}
mfopt = rd().get();
}
return nrows;
});
}
// Count the number of CQL rows in one partition
static future<int> count_rows(test_env& env, sstable_ptr sstp, schema_ptr s, sstring key) {
return seastar::async([&env, sstp, s, key] () mutable {
auto pr = dht::partition_range::make_singular(make_dkey(s, key.c_str()));
auto rd = sstp->make_reader(s, env.make_reader_permit(), pr, s->full_slice());
auto close_rd = deferred_close(rd);
auto mfopt = rd().get();
if (!mfopt) {
return 0;
}
int nrows = 0;
mfopt = rd().get();
while (mfopt) {
if (mfopt->is_clustering_row()) {
nrows++;
}
mfopt = rd().get();
}
return nrows;
});
}
// Count the number of CQL rows between clustering key prefix ck1 to ck2
// in all partitions in the sstable (using sstable::read_range_rows).
static future<int> count_rows(test_env& env, sstable_ptr sstp, schema_ptr s, sstring ck1, sstring ck2) {
return seastar::async([&env, sstp, s, ck1, ck2] () mutable {
auto ps = make_partition_slice(*s, ck1, ck2);
auto reader = sstp->make_reader(s, env.make_reader_permit(), query::full_partition_range, ps);
auto close_reader = deferred_close(reader);
int nrows = 0;
auto mfopt = reader().get();
while (mfopt) {
mfopt = reader().get();
BOOST_REQUIRE(mfopt);
while (!mfopt->is_end_of_partition()) {
if (mfopt->is_clustering_row()) {
nrows++;
}
mfopt = reader().get();
}
mfopt = reader().get();
}
return nrows;
});
}
// This test reads, using sstable::read_row(), a slice (a range of clustering
// rows) from one large partition in an sstable written in Cassandra.
// This large partition includes 13520 clustering rows, and spans about
// 700 KB on disk. When we ask to read only a part of it, the promoted index
// (included in this sstable) may be used to allow reading only a part of the
// partition from disk. This test doesn't directly verify that the promoted
// index is actually used - and can work even without a promoted index
// support - but can be used to check that adding promoted index read supports
// did not break anything.
// To verify that the promoted index was actually used to reduce the size
// of read from disk, add printouts to the row reading code.
SEASTAR_TEST_CASE(sub_partition_read) {
schema_ptr s = large_partition_schema();
return for_each_sstable_version([s] (const sstables::sstable::version_types version) {
return test_env::do_with_async([s, version] (test_env& env) {
auto sstp = load_large_partition_sst(env, version).get();
{
auto nrows = count_rows(env, sstp, s, "v1", "18wX", "18xB").get();
// there should be 5 rows (out of 13520 = 20*26*26) in this range:
// 18wX, 18wY, 18wZ, 18xA, 18xB.
BOOST_REQUIRE(nrows == 5);
}
{
auto nrows = count_rows(env, sstp, s, "v1", "13aB", "15aA").get();
// There should be 26*26*2 rows in this range. It spans two
// promoted-index blocks, so we get to test that case.
BOOST_REQUIRE(nrows == 2*26*26);
}
{
auto nrows = count_rows(env, sstp, s, "v1", "10aB", "19aA").get();
// There should be 26*26*9 rows in this range. It spans many
// promoted-index blocks.
BOOST_REQUIRE(nrows == 9*26*26);
}
{
auto nrows = count_rows(env, sstp, s, "v1", "0", "z").get();
// All rows, 20*26*26 of them, are in this range. It spans all
// the promoted-index blocks, but the range is still bounded
// on both sides
BOOST_REQUIRE(nrows == 20*26*26);
}
{
// range that is outside (after) the actual range of the data.
// No rows should match.
auto nrows = count_rows(env, sstp, s, "v1", "y", "z").get();
BOOST_REQUIRE(nrows == 0);
}
{
// range that is outside (before) the actual range of the data.
// No rows should match.
auto nrows = count_rows(env, sstp, s, "v1", "_a", "_b").get();
BOOST_REQUIRE(nrows == 0);
}
{
// half-infinite range
auto nrows = count_rows(env, sstp, s, "v1", "", "10aA").get();
BOOST_REQUIRE(nrows == (1*26*26 + 1));
}
{
// half-infinite range
auto nrows = count_rows(env, sstp, s, "v1", "10aA", "").get();
BOOST_REQUIRE(nrows == 19*26*26);
}
{
// count all rows, but giving an explicit all-encompasing filter
auto nrows = count_rows(env, sstp, s, "v1", "", "").get();
BOOST_REQUIRE(nrows == 20*26*26);
}
{
// count all rows, without a filter
auto nrows = count_rows(env, sstp, s, "v1").get();
BOOST_REQUIRE(nrows == 20*26*26);
}
});
});
}
// Same as previous test, just using read_range_rows instead of read_row
// to read parts of potentially more than one partition (in this particular
// sstable, there is actually just one partition).
SEASTAR_TEST_CASE(sub_partitions_read) {
schema_ptr s = large_partition_schema();
return for_each_sstable_version([s] (const sstables::sstable::version_types version) {
return test_env::do_with_async([s, version] (test_env& env) {
auto sstp = load_large_partition_sst(env, version).get();
auto nrows = count_rows(env, sstp, s, "18wX", "18xB").get();
BOOST_REQUIRE(nrows == 5);
});
});
}
SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) {
return seastar::async([] {
tests::reader_concurrency_semaphore_wrapper semaphore;
tmpdir tmp;
auto file_path = (tmp.path() / "test").string();
file f = open_file_dma(file_path, open_flags::create | open_flags::wo).get();
file_input_stream_options opts;
opts.read_ahead = 0;
compression_parameters cp({
{ compression_parameters::SSTABLE_COMPRESSION, "LZ4Compressor" },
{ compression_parameters::CHUNK_LENGTH_KB, std::to_string(opts.buffer_size/1024) },
});
sstables::compression c;
// this initializes "c"
auto os = make_file_output_stream(f, file_output_stream_options()).get();
auto out = make_compressed_file_m_format_output_stream(std::move(os), &c, cp, make_lz4_sstable_compressor_for_tests());
// Make sure that amount of written data is a multiple of chunk_len so that we hit #2143.
temporary_buffer<char> buf1(c.uncompressed_chunk_length());
strcpy(buf1.get_write(), "buf1");
temporary_buffer<char> buf2(c.uncompressed_chunk_length());
strcpy(buf2.get_write(), "buf2");
size_t uncompressed_size = 0;
out.write(buf1.get(), buf1.size()).get();
uncompressed_size += buf1.size();
out.write(buf2.get(), buf2.size()).get();
uncompressed_size += buf2.size();
out.close().get();
auto compressed_size = seastar::file_size(file_path).get();
c.update(compressed_size);
auto make_is = [&] {
f = open_file_dma(file_path, open_flags::ro).get();
auto stream_creator = [f](uint64_t pos, uint64_t len, file_input_stream_options options)->future<input_stream<char>> {
co_return input_stream<char>(make_file_data_source(std::move(f), pos, len, std::move(options)));
};
return make_compressed_file_m_format_input_stream(stream_creator, &c, 0, uncompressed_size, opts, semaphore.make_permit(), std::nullopt);
};
auto expect = [] (input_stream<char>& in, const temporary_buffer<char>& buf) {
auto b = in.read_exactly(buf.size()).get();
BOOST_REQUIRE(b == buf);
};
auto expect_eof = [] (input_stream<char>& in) {
auto b = in.read().get();
BOOST_REQUIRE(b.empty());
};
{
auto in = make_is();
expect(in, buf1);
expect(in, buf2);
expect_eof(in);
}
{
auto in = make_is();
in.skip(0).get();
expect(in, buf1);
expect(in, buf2);
expect_eof(in);
}
{
auto in = make_is();
expect(in, buf1);
in.skip(0).get();
expect(in, buf2);
expect_eof(in);
}
{
auto in = make_is();
expect(in, buf1);
in.skip(opts.buffer_size).get();
expect_eof(in);
}
{
auto in = make_is();
in.skip(opts.buffer_size * 2).get();
expect_eof(in);
}
{
auto in = make_is();
in.skip(opts.buffer_size).get();
in.skip(opts.buffer_size).get();
expect_eof(in);
}
});
}
// Test that sstables::key_view::tri_compare(const schema& s, partition_key_view other)
// should correctly compare empty keys. The fact we did this incorrectly was
// noticed while fixing #9375, and a separate issue on it is #10178.
BOOST_AUTO_TEST_CASE(test_empty_key_view_comparison) {
auto s_ptr = schema_builder("", "")
.with_column("p", bytes_type, column_kind::partition_key)
.build();
const schema& s = *s_ptr;
sstables::key empty_sstable_key = sstables::key::from_deeply_exploded(s, {data_value(bytes(""))});
sstables::key_view empty_sstable_key_view = empty_sstable_key;
partition_key empty_partition_key = partition_key::from_deeply_exploded(s, {data_value(bytes(""))});
partition_key_view empty_partition_key_view = empty_partition_key;
// Two empty keys should be equal (this check failed in #10178)
BOOST_CHECK_EQUAL(std::strong_ordering::equal,
empty_sstable_key_view.tri_compare(s, empty_partition_key_view));
// For completeness, compare also an empty key to a non-empty key, and
// two equal non-empty keys. An empty key is supposed to be less-than
// a non-empty key.
sstables::key hello_sstable_key = sstables::key::from_deeply_exploded(s, {data_value(bytes("hello"))});
sstables::key_view hello_sstable_key_view = hello_sstable_key;
partition_key hello_partition_key = partition_key::from_deeply_exploded(s, {data_value(bytes("hello"))});
partition_key_view hello_partition_key_view = hello_partition_key;
BOOST_CHECK_EQUAL(std::strong_ordering::less,
empty_sstable_key_view.tri_compare(s, hello_partition_key_view));
BOOST_CHECK_EQUAL(std::strong_ordering::greater,
hello_sstable_key_view.tri_compare(s, empty_partition_key_view));
BOOST_CHECK_EQUAL(std::strong_ordering::equal,
hello_sstable_key_view.tri_compare(s, hello_partition_key_view));
// The underlying cause of #10178 was that legacy_form() returned
// a legacy_compound_view<> which, despite being empty, did not
// have begin()==end(). So let's reproduce that directly:
auto lf = empty_partition_key_view.legacy_form(s);
BOOST_CHECK_EQUAL(0, lf.size());
BOOST_CHECK(lf.begin() == lf.end());
}
// Test that sstables::parse_path is able to parse the paths of sstables
BOOST_AUTO_TEST_CASE(test_parse_path_good) {
struct sstable_case {
std::string_view path;
std::string_view ks;
std::string_view cf;
sstables::entry_descriptor desc;
};
const sstable_case sstables[] = {
{
"/scylla/system/truncated-38c19fd0fb863310a4b70d0cc66628aa/mc-2-big-Data.db",
"system",
"truncated",
entry_descriptor{
generation_type{2},
sstable_version_types::mc,
sstable_format_types::big,
component_type::Data
}
},
{
"/scylla/system/scylla_local-2972ec7ffb2038ddaac1d876f2e3fcbd/mc-3-big-Summary.db",
"system",
"scylla_local",
entry_descriptor{
generation_type{3},
sstable_version_types::mc,
sstable_format_types::big,
component_type::Summary
}
},
{
"/scylla/system_distributed/cdc_generation_timestamps-fdf455c4cfec3e009719d7a45436c89d/me-3g9p_0938_0ecz429c6f019i7yuf-big-Index.db",
"system_distributed",
"cdc_generation_timestamps",
entry_descriptor{
generation_type::from_string("3g9p_0938_0ecz429c6f019i7yuf"),
sstable_version_types::me,
sstable_format_types::big,
component_type::Index
}
},
{
"/system_schema/columns-24101c25a2ae3af787c1b40ee1aca33f/md-3g9r_04ux_4be4w2d7t8bg6u7nok-big-Statistics.db",
"system_schema",
"columns",
entry_descriptor{
generation_type::from_string("3g9r_04ux_4be4w2d7t8bg6u7nok"),
sstable_version_types::md,
sstable_format_types::big,
component_type::Statistics
}
},
{
"/system_schema/columns-24101c25a2ae3af787c1b40ee1aca33f/md-3g9r_04ux_4be4w2d7t8bg6u7nok-big-ReallyBigData.db",
"system_schema",
"columns",
entry_descriptor{
generation_type::from_string("3g9r_04ux_4be4w2d7t8bg6u7nok"),
sstable_version_types::md,
sstable_format_types::big,
component_type::Unknown
}
}
};
for (auto& [path, expected_ks, expected_cf, expected_desc] : sstables) {
auto [desc, ks, cf] = parse_path(path);
BOOST_CHECK_EQUAL(ks, expected_ks);
BOOST_CHECK_EQUAL(cf, expected_cf);
BOOST_CHECK_EQUAL(expected_desc.generation, desc.generation);
BOOST_CHECK_EQUAL(expected_desc.version, desc.version);
BOOST_CHECK_EQUAL(expected_desc.format, desc.format);
BOOST_CHECK_EQUAL(expected_desc.component, desc.component);
}
}
// Test that sstables::parse_path throws at seeing malformed sstable names
BOOST_AUTO_TEST_CASE(test_parse_path_bad) {
const std::string_view paths[] = {
"",
"/",
"=",
"hmm",
"//-/-",
"mc-2-big-Data.db",
"truncated~38c19fd0fb863310a4b70d0cc66628aa/",
"truncated-38c19fd0fb863310a4b70d0cc66628aa/mc-2-big-Data.db",
"truncated~38c19fd0fb863310a4b70d0cc66628aa/mc-2-big-Data.db",
"/scylla/system/truncated~38c19fd0fb863310a4b70d0cc66628aa/404.db",
"/scylla/system/truncated~38c19fd0fb863310a4b70d0cc66628aa/mc/foo/bar.db",
"/scylla/system/truncated~38c19fd0fb863310a4b70d0cc66628aa/mc/-------",
"/scylla/system/truncated~38c19fd0fb863310a4b70d0cc66628aa/mc-2-big-Data.db",
"/scylla/system/truncated-38c19fd0fb863310a4b70d0cc66628aa/zz-2-big-Data.db",
"/scylla/system/truncated-38c19fd0fb863310a4b70d0cc66628aa/mc-x-big-Data.db",
"/scylla/system/truncated-38c19fd0fb863310a4b70d0cc66628aa/mc-i~am~not~an~id-big-Data.db",
"/scylla/system/truncated~38c19fd0fb863310a4b70d0cc66628aa/mc-2--Data.db",
"/scylla/system/truncated~38c19fd0fb863310a4b70d0cc66628aa/mc-2-grand-Data.db",
};
for (auto path : paths) {
BOOST_CHECK_THROW(parse_path(path), std::exception);
}
}
using compress_sstable = tests::random_schema_specification::compress_sstable;
static future<> test_component_digest_persistence(component_type component, sstable::version_types version, compress_sstable compress = compress_sstable::no, bool rewrite_statistics = false) {
return test_env::do_with_async([component, version, compress, rewrite_statistics] (test_env& env) mutable {
auto random_spec = tests::make_random_schema_specification(
"ks",
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8),
compress);
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
const auto muts = tests::generate_random_mutations(random_schema, 2).get();
auto sst_original = make_sstable_containing(env.make_sstable(schema, version), muts);
auto& components = sstables::test(sst_original).get_components();
bool has_component = components.find(component) != components.end();
BOOST_REQUIRE(has_component);
auto toc_path = fmt::to_string(sst_original->toc_filename());
auto entry_desc = sstables::parse_path(toc_path, schema->ks_name(), schema->cf_name());
auto dir_path = std::filesystem::path(toc_path).parent_path().string();
std::optional<uint32_t> original_digest;
if (rewrite_statistics) {
auto original_sstable_id = sst_original->sstable_identifier();
original_digest = sst_original->get_component_digest(component);
BOOST_REQUIRE(original_digest.has_value());
sst_original = mutate_sstable_level(env, sst_original, dir_path, 10, true).get();
entry_desc.generation = sst_original->generation();
auto new_digest = sst_original->get_component_digest(component);
BOOST_REQUIRE(new_digest.has_value());
BOOST_REQUIRE(original_digest.value() != new_digest.value());
BOOST_REQUIRE_NE(original_sstable_id, sst_original->sstable_identifier());
}
sst_original = nullptr;
auto sst_reopened = env.make_sstable(schema, dir_path, entry_desc.generation, entry_desc.version, entry_desc.format);
sst_reopened->load(schema->get_sharder()).get();
auto loaded_digest = sst_reopened->get_component_digest(component);
BOOST_REQUIRE(loaded_digest.has_value());
auto f = open_file_dma(sstables::test(sst_reopened).filename(component).native(), open_flags::ro).get();
auto stream = make_file_input_stream(f);
auto close_stream = deferred_close(stream);
auto component_data = util::read_entire_stream_contiguous(stream).get();
auto calculated_digest = crc32_utils::checksum(component_data.begin(), component_data.size());
BOOST_REQUIRE_EQUAL(calculated_digest, loaded_digest.value());
// calculate scylla component digest, by reading file_size - sizeof(uint32_t)
auto f2 = open_file_dma(sstables::test(sst_reopened).filename(sstables::component_type::Scylla).native(), open_flags::ro).get();
auto stream2 = make_file_input_stream(f2);
auto close_stream2 = deferred_close(stream2);
auto scylla_data = util::read_entire_stream_contiguous(stream2).get();
auto calc_scylla_digest = crc32_utils::checksum(scylla_data.begin(), scylla_data.size() - sizeof(uint32_t));
BOOST_REQUIRE_EQUAL(calc_scylla_digest, sst_reopened->get_component_digest(sstables::component_type::Scylla).value());
});
}
SEASTAR_TEST_CASE(test_digest_persistence_index) {
return test_component_digest_persistence(component_type::Index, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_partitions) {
return test_component_digest_persistence(component_type::Partitions, sstable::version_types::ms);
}
SEASTAR_TEST_CASE(test_digest_persistence_rows) {
return test_component_digest_persistence(component_type::Rows, sstable::version_types::ms);
}
SEASTAR_TEST_CASE(test_digest_persistence_summary) {
return test_component_digest_persistence(component_type::Summary, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_filter) {
return test_component_digest_persistence(component_type::Filter, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_compression) {
return test_component_digest_persistence(component_type::CompressionInfo, sstable::version_types::me, compress_sstable::yes);
}
SEASTAR_TEST_CASE(test_digest_persistence_toc) {
return test_component_digest_persistence(component_type::TOC, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_statistics) {
return test_component_digest_persistence(component_type::Statistics, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_statistics_rewrite) {
return test_component_digest_persistence(component_type::Statistics, sstable::version_types::me, compress_sstable::no, true);
}
SEASTAR_TEST_CASE(test_digest_persistence_data) {
return test_component_digest_persistence(component_type::Data, sstable::version_types::me);
}
SEASTAR_TEST_CASE(test_digest_persistence_data_compressed) {
return test_component_digest_persistence(component_type::Data, sstable::version_types::me, compress_sstable::yes);
}
static void corrupt_sstable(sstables::shared_sstable sst, component_type component) {
auto path = sstables::test(sst).filename(component).native();
auto size = seastar::file_size(path).get();
auto f = open_file_dma(path, open_flags::rw).get();
auto close_f = deferred_close(f);
const auto mem_align = f.memory_dma_alignment();
const auto dma_align = f.disk_write_dma_alignment();
auto block_offset = align_down(size - 1, dma_align);
auto buf = seastar::temporary_buffer<char>::aligned(mem_align, dma_align);
f.dma_read(block_offset, buf.get_write(), dma_align).get();
// Flip one bit in the last byte of the file to corrupt it minimally.
// Using a single-bit flip avoids creating values that overflow
// during parsing.
buf.get_write()[size - 1 - block_offset] += 1;
f.dma_write(block_offset, buf.get(), dma_align).get();
f.truncate(size).get();
}
static future<> test_component_digest_validation(component_type component, sstable::version_types version, sstring expected_message, compress_sstable compress = compress_sstable::no) {
return test_env::do_with_async([component, version, expected_message = std::move(expected_message), compress] (test_env& env) mutable {
auto random_spec = tests::make_random_schema_specification(
"ks",
std::uniform_int_distribution<size_t>(1, 4),
std::uniform_int_distribution<size_t>(2, 4),
std::uniform_int_distribution<size_t>(2, 8),
std::uniform_int_distribution<size_t>(2, 8),
compress);
auto random_schema = tests::random_schema{tests::random::get_int<uint32_t>(), *random_spec};
auto schema = random_schema.schema();
const auto muts = tests::generate_random_mutations(random_schema, 2).get();
auto sst = make_sstable_containing(env.make_sstable(schema, version), muts);
auto digest = sst->get_component_digest(component);
BOOST_REQUIRE(digest.has_value());
auto toc_path = fmt::to_string(sst->toc_filename());
auto entry_desc = sstables::parse_path(toc_path, schema->ks_name(), schema->cf_name());
auto dir_path = std::filesystem::path(toc_path).parent_path().string();
corrupt_sstable(sst, component);
// Loading the sstable should detect the digest mismatch
auto sst_corrupted = env.make_sstable(schema, dir_path, entry_desc.generation, entry_desc.version, entry_desc.format);
BOOST_REQUIRE_EXCEPTION(sst_corrupted->load(schema->get_sharder()).get(), malformed_sstable_exception,
exception_predicate::message_contains(expected_message));
});
}
SEASTAR_TEST_CASE(test_digest_validation_statistics) {
return test_component_digest_validation(component_type::Statistics, sstable::version_types::me, "Statistics digest mismatch");
}
SEASTAR_TEST_CASE(test_digest_validation_filter) {
return test_component_digest_validation(component_type::Filter, sstable::version_types::me, "Filter digest mismatch");
}
SEASTAR_TEST_CASE(test_digest_validation_compression) {
return test_component_digest_validation(component_type::CompressionInfo, sstable::version_types::me, "CompressionInfo digest mismatch", compress_sstable::yes);
}
SEASTAR_TEST_CASE(test_digest_validation_toc) {
return test_component_digest_validation(component_type::TOC, sstable::version_types::me, "TOC digest mismatch");
}
SEASTAR_TEST_CASE(test_digest_validation_scylla) {
return test_component_digest_validation(component_type::Scylla, sstable::version_types::me, "Scylla digest mismatch");
}