Merge 'sstables: Add digest checking in the validation path of the sstable layer' from Nikos Dragazis

This PR builds upon the PR for checksum validation (#20207) to further enhance scrub's corruption detection capabilities by validating digests as well. The digest (full checksum) is the checksum over the entire data, as opposed to per-chunk checksums which apply to individual chunks. Until now, digests were not examined on any code paths. This PR integrates digest checking into the compressed/checksummed data sources as an optional feature and enables it only through the validation path of the sstable layer (`sstable::validate()`). The validation path is used by the following tools:

* scrub in validate mode
* `sstable validate`

All other reads, including normal user reads, are unaffected by this change.

The PR consists of:
* Extensions to the compressed and checksummed data sources to support digest checking. The data sources receive the expected digest as a parameter and calculate the actual digest incrementally across multiple get() calls. The check happens on the get() call that reaches EOF and results to an exception if the digest is invalid. A digest check requires reading the whole file range. Therefore, a partial read or skip() is treated as an internal error.
* A new shareable digest component loaded on demand by the validation code. No lifecycle management.
* Grouping of old scrub/validate tests for compressed and uncompressed SSTables to reduce code duplication.
* scrub/validate tests for SSTables with valid checksums but invalid digests, and SSTables with no digests at all.
* scrub/validate tests with 3.x Cassandra SSTables to ensure compatibility.

Refs #19058.

New feature, no backport is needed.

Closes scylladb/scylladb#20720

* github.com:scylladb/scylladb:
  test: Test scrub/validate with SSTables from Cassandra
  compaction: Make quarantine optional for perform_sstable_scrub()
  test: Make random schema optional in scrub_test_framework
  test: Add tests for invalid digests
  test: Merge scrub/validate tests for compressed and uncompressed cases
  sstables: Verify digests on validation path
  sstables: Check if digest component exists
  sstables: Add digest in the SSTable components
  sstables: Add digest check in compressed data source
  sstables: Add digest check in checksummed data source
This commit is contained in:
Avi Kivity
2024-10-09 21:33:08 +03:00
44 changed files with 511 additions and 140 deletions

View File

@@ -1740,9 +1740,13 @@ compaction_manager::rewrite_sstables(table_state& t, sstables::compaction_type_o
namespace compaction {
class validate_sstables_compaction_task_executor : public sstables_task_executor {
compaction_manager::quarantine_invalid_sstables _quarantine_sstables;
public:
validate_sstables_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping, table_state* t, tasks::task_id parent_id, std::vector<sstables::shared_sstable> sstables)
validate_sstables_compaction_task_executor(compaction_manager& mgr, throw_if_stopping do_throw_if_stopping,
table_state* t, tasks::task_id parent_id, std::vector<sstables::shared_sstable> sstables,
compaction_manager::quarantine_invalid_sstables quarantine_sstables)
: sstables_task_executor(mgr, do_throw_if_stopping, t, sstables::compaction_type::Scrub, "Scrub compaction in validate mode", std::move(sstables), parent_id)
, _quarantine_sstables(quarantine_sstables)
{}
protected:
@@ -1771,7 +1775,7 @@ private:
sst->get_sstable_level(),
sstables::compaction_descriptor::default_max_sstable_bytes,
sst->run_identifier(),
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate));
sstables::compaction_type_options::make_scrub(sstables::compaction_type_options::scrub::mode::validate, _quarantine_sstables));
co_return co_await sstables::compact_sstables(std::move(desc), _compaction_data, *_compacting_table, _progress_monitor);
} catch (sstables::compaction_stopped_exception&) {
// ignore, will be handled by can_proceed()
@@ -1801,14 +1805,14 @@ static std::vector<sstables::shared_sstable> get_all_sstables(table_state& t) {
return s;
}
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub_validate_mode(table_state& t, tasks::task_info info) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub_validate_mode(table_state& t, tasks::task_info info, quarantine_invalid_sstables quarantine_sstables) {
auto gh = start_compaction(t);
if (!gh) {
co_return compaction_stats_opt{};
}
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
auto all_sstables = get_all_sstables(t);
co_return co_await perform_compaction<validate_sstables_compaction_task_executor>(throw_if_stopping::no, info, &t, info.id, std::move(all_sstables));
co_return co_await perform_compaction<validate_sstables_compaction_task_executor>(throw_if_stopping::no, info, &t, info.id, std::move(all_sstables), quarantine_sstables);
}
namespace compaction {
@@ -2139,7 +2143,7 @@ compaction_manager::maybe_split_sstable(sstables::shared_sstable sst, table_stat
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub(table_state& t, sstables::compaction_type_options::scrub opts, tasks::task_info info) {
auto scrub_mode = opts.operation_mode;
if (scrub_mode == sstables::compaction_type_options::scrub::mode::validate) {
return perform_sstable_scrub_validate_mode(t, info);
return perform_sstable_scrub_validate_mode(t, info, opts.quarantine_sstables);
}
owned_ranges_ptr owned_ranges_ptr = {};
sstring option_desc = fmt::format("mode: {};\nquarantine_mode: {}\n", opts.operation_mode, opts.quarantine_operation_mode);

View File

@@ -228,7 +228,8 @@ private:
// similar-sized compaction.
void postpone_compaction_for_table(compaction::table_state* t);
future<compaction_stats_opt> perform_sstable_scrub_validate_mode(compaction::table_state& t, tasks::task_info info);
using quarantine_invalid_sstables = sstables::compaction_type_options::scrub::quarantine_invalid_sstables;
future<compaction_stats_opt> perform_sstable_scrub_validate_mode(compaction::table_state& t, tasks::task_info info, quarantine_invalid_sstables quarantine_sstables);
future<> update_static_shares(float shares);
using get_candidates_func = std::function<future<std::vector<sstables::shared_sstable>>()>;

View File

@@ -16,6 +16,7 @@
#include "types.hh"
#include "exceptions.hh"
#include "checksum_utils.hh"
#include "data_source_types.hh"
namespace sstables {
@@ -23,11 +24,13 @@ extern logging::logger sstlog;
// File data source implementation for SSTables with attached checksum
// data and no compression
template <ChecksumUtils ChecksumType>
template <ChecksumUtils ChecksumType, bool check_digest>
class checksummed_file_data_source_impl : public data_source_impl {
std::optional<input_stream<char>> _input_stream;
const checksum& _checksum;
[[no_unique_address]] digest_members<check_digest> _digests;
uint64_t _chunk_size_trailing_zeros;
uint64_t _file_len;
uint64_t _underlying_pos;
uint64_t _pos;
uint64_t _beg_pos;
@@ -35,8 +38,10 @@ class checksummed_file_data_source_impl : public data_source_impl {
public:
checksummed_file_data_source_impl(file f, uint64_t file_len,
const checksum& checksum, uint64_t pos, size_t len,
file_input_stream_options options)
file_input_stream_options options,
std::optional<uint32_t> digest)
: _checksum(checksum)
, _file_len(file_len)
, _pos(pos)
, _beg_pos(pos)
, _end_pos(pos + len)
@@ -53,16 +58,27 @@ public:
on_internal_error(sstlog, format("Invalid chunk size: {}", chunk_size));
}
_chunk_size_trailing_zeros = count_trailing_zeros(chunk_size);
if (_pos > file_len) {
if (_pos > _file_len) {
on_internal_error(sstlog, "attempt to read beyond end");
}
if (len == 0 || _pos == file_len) {
if (len == 0 || _pos == _file_len) {
// Nothing to read
_end_pos = _pos;
return;
}
if (len > file_len - _pos) {
_end_pos = file_len;
if (len > _file_len - _pos) {
_end_pos = _file_len;
}
if constexpr (check_digest) {
if (!digest) {
on_internal_error(sstlog, "Requested digest check but no digest was provided.");
}
if (_end_pos - _pos < _file_len) {
on_internal_error(sstlog, seastar::format(
"Cannot check digest with a partial read: current pos={}, end pos={}, file len={}",
_pos, _end_pos, _file_len));
}
_digests = {*digest, ChecksumType::init_checksum()};
}
auto start = _beg_pos & ~(chunk_size - 1);
auto end = (_end_pos & ~(chunk_size - 1)) + chunk_size;
@@ -88,9 +104,20 @@ public:
if (expected_checksum != actual_checksum) {
throw sstables::malformed_sstable_exception(format("Checksummed chunk of size {} at file offset {} failed checksum: expected={}, actual={}", buf.size(), _underlying_pos, expected_checksum, actual_checksum));
}
if constexpr (check_digest) {
_digests.actual_digest = checksum_combine_or_feed<ChecksumType>(_digests.actual_digest, actual_checksum, buf.begin(), buf.size());
}
buf.trim_front(_pos & (chunk_size - 1));
_pos += buf.size();
_underlying_pos += chunk_size;
if constexpr (check_digest) {
if (_pos == _file_len && _digests.expected_digest != _digests.actual_digest) {
throw malformed_sstable_exception(seastar::format("Digest mismatch: expected={}, actual={}", _digests.expected_digest, _digests.actual_digest));
}
}
return buf;
});
}
@@ -103,6 +130,9 @@ public:
}
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
if constexpr (check_digest) {
on_internal_error(sstlog, "Tried to skip on a data source for which digest check has been requested.");
}
auto chunk_size = _checksum.chunk_size;
if (_pos + n > _end_pos) {
on_internal_error(sstlog, format("Skipping over the end position is disallowed: current pos={}, end pos={}, skip len={}", _pos, _end_pos, n));
@@ -120,37 +150,44 @@ public:
}
};
template <ChecksumUtils ChecksumType>
template <ChecksumUtils ChecksumType, bool check_digest>
class checksummed_file_data_source : public data_source {
public:
checksummed_file_data_source(file f, uint64_t file_len, const checksum& checksum,
uint64_t offset, size_t len, file_input_stream_options options)
: data_source(std::make_unique<checksummed_file_data_source_impl<ChecksumType>>(
std::move(f), file_len, checksum, offset, len, std::move(options)))
uint64_t offset, size_t len, file_input_stream_options options,
std::optional<uint32_t> digest)
: data_source(std::make_unique<checksummed_file_data_source_impl<ChecksumType, check_digest>>(
std::move(f), file_len, checksum, offset, len, std::move(options), digest))
{}
};
template <ChecksumUtils ChecksumType>
inline input_stream<char> make_checksummed_file_input_stream(
file f, uint64_t file_len, const checksum& checksum, uint64_t offset,
size_t len, file_input_stream_options options)
size_t len, file_input_stream_options options, std::optional<uint32_t> digest)
{
return input_stream<char>(checksummed_file_data_source<ChecksumType>(
std::move(f), file_len, checksum, offset, len, std::move(options)));
if (digest) {
return input_stream<char>(checksummed_file_data_source<ChecksumType, true>(
std::move(f), file_len, checksum, offset, len, std::move(options), digest));
}
return input_stream<char>(checksummed_file_data_source<ChecksumType, false>(
std::move(f), file_len, checksum, offset, len, std::move(options), digest));
}
input_stream<char> make_checksummed_file_k_l_format_input_stream(
file f, uint64_t file_len, const checksum& checksum, uint64_t offset,
size_t len, file_input_stream_options options)
size_t len, file_input_stream_options options, std::optional<uint32_t> digest)
{
return make_checksummed_file_input_stream<adler32_utils>(std::move(f), file_len, checksum, offset, len, std::move(options));
return make_checksummed_file_input_stream<adler32_utils>(std::move(f), file_len,
checksum, offset, len, std::move(options), digest);
}
input_stream<char> make_checksummed_file_m_format_input_stream(
file f, uint64_t file_len, const checksum& checksum, uint64_t offset,
size_t len, file_input_stream_options options)
size_t len, file_input_stream_options options, std::optional<uint32_t> digest)
{
return make_checksummed_file_input_stream<crc32_utils>(std::move(f), file_len, checksum, offset, len, std::move(options));
return make_checksummed_file_input_stream<crc32_utils>(std::move(f), file_len,
checksum, offset, len, std::move(options), digest);
}
}

View File

@@ -17,9 +17,13 @@ namespace sstables {
input_stream<char> make_checksummed_file_k_l_format_input_stream(file f,
uint64_t file_len, const sstables::checksum& checksum,
uint64_t offset, size_t len, class file_input_stream_options options);
uint64_t offset, size_t len,
class file_input_stream_options options,
std::optional<uint32_t> digest);
input_stream<char> make_checksummed_file_m_format_input_stream(file f,
uint64_t file_len, const sstables::checksum& checksum,
uint64_t offset, size_t len, class file_input_stream_options options);
uint64_t offset, size_t len,
class file_input_stream_options options,
std::optional<uint32_t> digest);
}

View File

@@ -24,6 +24,7 @@
#include "utils/assert.hh"
#include "utils/class_registrator.hh"
#include "reader_permit.hh"
#include "data_source_types.hh"
namespace sstables {
@@ -337,12 +338,21 @@ compression::locate(uint64_t position, const compression::segmented_offsets::acc
}
template <ChecksumUtils ChecksumType>
// For SSTables 2.x (formats 'ka' and 'la'), the full checksum is a combination of checksums of compressed chunks.
// For SSTables 3.x (format 'mc'), however, it is supposed to contain the full checksum of the file written so
// the per-chunk checksums also count.
enum class compressed_checksum_mode {
checksum_chunks_only,
checksum_all,
};
template <ChecksumUtils ChecksumType, bool check_digest, compressed_checksum_mode mode>
class compressed_file_data_source_impl : public data_source_impl {
std::optional<input_stream<char>> _input_stream;
sstables::compression* _compression_metadata;
sstables::compression::segmented_offsets::accessor _offsets;
sstables::local_compression _compression;
[[no_unique_address]] sstables::digest_members<check_digest> _digests;
reader_permit _permit;
uint64_t _underlying_pos;
uint64_t _pos;
@@ -350,19 +360,20 @@ class compressed_file_data_source_impl : public data_source_impl {
uint64_t _end_pos;
public:
compressed_file_data_source_impl(file f, sstables::compression* cm,
uint64_t pos, size_t len, file_input_stream_options options, reader_permit permit)
uint64_t pos, size_t len, file_input_stream_options options,
reader_permit permit, std::optional<uint32_t> digest)
: _compression_metadata(cm)
, _offsets(_compression_metadata->offsets.get_accessor())
, _compression(*cm)
, _permit(std::move(permit))
{
_beg_pos = pos;
_pos = _beg_pos = pos;
if (pos > _compression_metadata->uncompressed_file_length()) {
throw std::runtime_error("attempt to uncompress beyond end");
}
if (len == 0 || pos == _compression_metadata->uncompressed_file_length()) {
// Nothing to read
_end_pos = _pos = _beg_pos;
_end_pos = _pos;
return;
}
if (len <= _compression_metadata->uncompressed_file_length() - pos) {
@@ -370,6 +381,17 @@ public:
} else {
_end_pos = _compression_metadata->uncompressed_file_length();
}
if constexpr (check_digest) {
if (!digest) {
on_internal_error(sstables::sstlog, "Requested digest check but no digest was provided.");
}
if (_end_pos - _pos < _compression_metadata->uncompressed_file_length()) {
on_internal_error(sstables::sstlog, seastar::format(
"Cannot check digest with a partial read: current pos={}, end pos={}, uncompressed file len={}",
_pos, _end_pos, _compression_metadata->uncompressed_file_length()));
}
_digests = {*digest, ChecksumType::init_checksum()};
}
// _beg_pos and _end_pos specify positions in the compressed stream.
// We need to translate them into a range of uncompressed chunks,
// and open a file_input_stream to read that range.
@@ -380,7 +402,6 @@ public:
end.chunk_start + end.chunk_len - start.chunk_start,
std::move(options));
_underlying_pos = start.chunk_start;
_pos = _beg_pos;
}
virtual future<temporary_buffer<char>> get() override {
if (_pos >= _end_pos) {
@@ -412,6 +433,15 @@ public:
throw sstables::malformed_sstable_exception(format("compressed chunk of size {} at file offset {} failed checksum, expected={}, actual={}", addr.chunk_len, _underlying_pos, expected_checksum, actual_checksum));
}
if constexpr (check_digest) {
_digests.actual_digest = checksum_combine_or_feed<ChecksumType>(_digests.actual_digest, actual_checksum, buf.get(), compressed_len);
if constexpr (mode == compressed_checksum_mode::checksum_all) {
uint32_t be_actual_checksum = cpu_to_be(actual_checksum);
_digests.actual_digest = ChecksumType::checksum(_digests.actual_digest,
reinterpret_cast<const char*>(&be_actual_checksum), sizeof(be_actual_checksum));
}
}
// We know that the uncompressed data will take exactly
// chunk_length bytes (or less, if reading the last chunk).
temporary_buffer<char> out(
@@ -426,6 +456,12 @@ public:
_pos += out.size();
_underlying_pos += addr.chunk_len;
if constexpr (check_digest) {
if (_pos == _compression_metadata->uncompressed_file_length()
&& _digests.expected_digest != _digests.actual_digest) {
throw sstables::malformed_sstable_exception(seastar::format("Digest mismatch: expected={}, actual={}", _digests.expected_digest, _digests.actual_digest));
}
}
return make_tracked_temporary_buffer(std::move(out), std::move(res_units));
});
});
@@ -439,6 +475,9 @@ public:
}
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
if constexpr (check_digest) {
on_internal_error(sstables::sstlog, "Tried to skip on a data source for which digest check has been requested.");
}
if (_pos + n > _end_pos) {
on_internal_error(sstables::sstlog, format("Skipping over the end position is disallowed: current pos={}, end pos={}, skip len={}", _pos, _end_pos, n));
}
@@ -456,33 +495,31 @@ public:
}
};
template <ChecksumUtils ChecksumType>
template <ChecksumUtils ChecksumType, bool check_digest, compressed_checksum_mode mode>
class compressed_file_data_source : public data_source {
public:
compressed_file_data_source(file f, sstables::compression* cm,
uint64_t offset, size_t len, file_input_stream_options options, reader_permit permit)
: data_source(std::make_unique<compressed_file_data_source_impl<ChecksumType>>(
std::move(f), cm, offset, len, std::move(options), std::move(permit)))
uint64_t offset, size_t len, file_input_stream_options options, reader_permit permit,
std::optional<uint32_t> digest)
: data_source(std::make_unique<compressed_file_data_source_impl<ChecksumType, check_digest, mode>>(
std::move(f), cm, offset, len, std::move(options), std::move(permit), digest))
{}
};
template <ChecksumUtils ChecksumType>
template <ChecksumUtils ChecksumType, compressed_checksum_mode mode>
inline input_stream<char> make_compressed_file_input_stream(
file f, sstables::compression *cm, uint64_t offset, size_t len,
file_input_stream_options options, reader_permit permit)
file_input_stream_options options, reader_permit permit,
std::optional<uint32_t> digest)
{
return input_stream<char>(compressed_file_data_source<ChecksumType>(
std::move(f), cm, offset, len, std::move(options), std::move(permit)));
if (digest) [[unlikely]] {
return input_stream<char>(compressed_file_data_source<ChecksumType, true, mode>(
std::move(f), cm, offset, len, std::move(options), std::move(permit), digest));
}
return input_stream<char>(compressed_file_data_source<ChecksumType, false, mode>(
std::move(f), cm, offset, len, std::move(options), std::move(permit), digest));
}
// For SSTables 2.x (formats 'ka' and 'la'), the full checksum is a combination of checksums of compressed chunks.
// For SSTables 3.x (format 'mc'), however, it is supposed to contain the full checksum of the file written so
// the per-chunk checksums also count.
enum class compressed_checksum_mode {
checksum_chunks_only,
checksum_all,
};
// compressed_file_data_sink_impl works as a filter for a file output stream,
// where the buffer flushed will be compressed and its checksum computed, then
// the result passed to a regular output stream.
@@ -584,15 +621,19 @@ inline output_stream<char> make_compressed_file_output_stream(output_stream<char
input_stream<char> sstables::make_compressed_file_k_l_format_input_stream(file f,
sstables::compression* cm, uint64_t offset, size_t len,
class file_input_stream_options options, reader_permit permit)
class file_input_stream_options options, reader_permit permit,
std::optional<uint32_t> digest)
{
return make_compressed_file_input_stream<adler32_utils>(std::move(f), cm, offset, len, std::move(options), std::move(permit));
return make_compressed_file_input_stream<adler32_utils, compressed_checksum_mode::checksum_chunks_only>(
std::move(f), cm, offset, len, std::move(options), std::move(permit), digest);
}
input_stream<char> sstables::make_compressed_file_m_format_input_stream(file f,
sstables::compression *cm, uint64_t offset, size_t len,
class file_input_stream_options options, reader_permit permit) {
return make_compressed_file_input_stream<crc32_utils>(std::move(f), cm, offset, len, std::move(options), std::move(permit));
class file_input_stream_options options, reader_permit permit,
std::optional<uint32_t> digest) {
return make_compressed_file_input_stream<crc32_utils, compressed_checksum_mode::checksum_all>(
std::move(f), cm, offset, len, std::move(options), std::move(permit), digest);
}
output_stream<char> sstables::make_compressed_file_m_format_output_stream(output_stream<char> out,

View File

@@ -369,11 +369,13 @@ compressor_ptr get_sstable_compressor(const compression&);
// sstable alive, and the compression metadata is only a part of it.
input_stream<char> make_compressed_file_k_l_format_input_stream(file f,
sstables::compression* cm, uint64_t offset, size_t len,
class file_input_stream_options options, reader_permit permit);
class file_input_stream_options options, reader_permit permit,
std::optional<uint32_t> digest);
input_stream<char> make_compressed_file_m_format_input_stream(file f,
sstables::compression* cm, uint64_t offset, size_t len,
class file_input_stream_options options, reader_permit permit);
class file_input_stream_options options, reader_permit permit,
std::optional<uint32_t> digest);
output_stream<char> make_compressed_file_m_format_output_stream(output_stream<char> out,
sstables::compression* cm,

View File

@@ -0,0 +1,24 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <cstdint>
namespace sstables {
template<bool check_digest>
struct digest_members {
uint32_t expected_digest;
uint32_t actual_digest;
};
template<>
struct digest_members<false> {};
}

View File

@@ -25,6 +25,7 @@ struct shareable_components {
sstables::statistics statistics;
std::optional<sstables::scylla_metadata> scylla_metadata;
weak_ptr<sstables::checksum> checksum;
std::optional<uint32_t> digest;
};
} // namespace sstables

View File

@@ -1935,6 +1935,7 @@ future<uint64_t> sstable::validate(reader_permit permit, abort_source& abort,
lw_shared_ptr<checksum> checksum;
try {
checksum = co_await read_checksum();
co_await read_digest();
} catch (const malformed_sstable_exception& e) {
ex = handle_sstable_exception(e, errors);
}
@@ -2468,15 +2469,18 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len,
f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename()));
}
std::optional<uint32_t> digest;
if (integrity == integrity_check::yes) {
digest = get_digest();
}
if (_components->compression && raw == raw_stream::no) {
// Disabling integrity checks is not supported by compressed
// file input streams. `integrity` is ignored.
if (_version >= sstable_version_types::mc) {
return make_compressed_file_m_format_input_stream(f, &_components->compression,
pos, len, std::move(options), permit);
return make_compressed_file_m_format_input_stream(f, &_components->compression,
pos, len, std::move(options), permit, digest);
} else {
return make_compressed_file_k_l_format_input_stream(f, &_components->compression,
pos, len, std::move(options), permit);
pos, len, std::move(options), permit, digest);
}
}
if (_components->checksum && integrity == integrity_check::yes) {
@@ -2484,10 +2488,10 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len,
auto file_len = data_size();
if (_version >= sstable_version_types::mc) {
return make_checksummed_file_m_format_input_stream(f, file_len,
*checksum, pos, len, std::move(options));
*checksum, pos, len, std::move(options), digest);
} else {
return make_checksummed_file_k_l_format_input_stream(f, file_len,
*checksum, pos, len, std::move(options));
*checksum, pos, len, std::move(options), digest);
}
}
return make_file_input_stream(f, pos, len, std::move(options));
@@ -2598,7 +2602,13 @@ static future<bool> do_validate_uncompressed(input_stream<char>& stream, const c
co_return valid;
}
future<uint32_t> sstable::read_digest() {
future<std::optional<uint32_t>> sstable::read_digest() {
if (_components->digest) {
co_return *_components->digest;
}
if (!has_component(component_type::Digest)) {
co_return std::nullopt;
}
sstring digest_str;
co_await do_read_simple(component_type::Digest, [&] (version_types v, file digest_file) -> future<> {
@@ -2619,7 +2629,8 @@ future<uint32_t> sstable::read_digest() {
maybe_rethrow_exception(std::move(ex));
});
co_return boost::lexical_cast<uint32_t>(digest_str);
_components->digest = boost::lexical_cast<uint32_t>(digest_str);
co_return _components->digest;
}
future<lw_shared_ptr<checksum>> sstable::read_checksum() {
@@ -2665,6 +2676,9 @@ future<lw_shared_ptr<checksum>> sstable::read_checksum() {
future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_permit permit) {
const auto digest = co_await sst->read_digest();
if (!digest) {
throw std::runtime_error(seastar::format("No digest available for SSTable: {}", sst->get_filename()));
}
auto data_stream = sst->data_stream(0, sst->ondisk_data_size(), permit, nullptr, nullptr, sstable::raw_stream::yes);
@@ -2675,9 +2689,9 @@ future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_
try {
if (sst->get_compression()) {
if (sst->get_version() >= sstable_version_types::mc) {
valid = co_await do_validate_compressed<crc32_utils>(data_stream, sst->get_compression(), true, digest);
valid = co_await do_validate_compressed<crc32_utils>(data_stream, sst->get_compression(), true, *digest);
} else {
valid = co_await do_validate_compressed<adler32_utils>(data_stream, sst->get_compression(), false, digest);
valid = co_await do_validate_compressed<adler32_utils>(data_stream, sst->get_compression(), false, *digest);
}
} else {
auto checksum = co_await sst->read_checksum();
@@ -2685,9 +2699,9 @@ future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_
sstlog.warn("No checksums available for SSTable: {}", sst->get_filename());
ret = validate_checksums_result::no_checksum;
} else if (sst->get_version() >= sstable_version_types::mc) {
valid = co_await do_validate_uncompressed<crc32_utils>(data_stream, *checksum, digest);
valid = co_await do_validate_uncompressed<crc32_utils>(data_stream, *checksum, *digest);
} else {
valid = co_await do_validate_uncompressed<adler32_utils>(data_stream, *checksum, digest);
valid = co_await do_validate_uncompressed<adler32_utils>(data_stream, *checksum, *digest);
}
}
} catch (...) {

View File

@@ -928,6 +928,10 @@ public:
return _components->checksum ? _components->checksum->shared_from_this() : nullptr;
}
std::optional<uint32_t> get_digest() const {
return _components->digest;
}
// Gets ratio of droppable tombstone. A tombstone is considered droppable here
// for cells and tombstones expired before the time point "GC before", which
// is the point before which expiring data can be purged.
@@ -1007,7 +1011,7 @@ public:
gc_clock::time_point get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const;
gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const;
future<uint32_t> read_digest();
future<std::optional<uint32_t>> read_digest();
future<lw_shared_ptr<checksum>> read_checksum();
};

View File

@@ -13,6 +13,7 @@
#include <seastar/core/align.hh>
#include <seastar/core/aligned_buffer.hh>
#include <seastar/util/closeable.hh>
#include <seastar/util/short_streams.hh>
#include <seastar/core/coroutine.hh>
#include "sstables/generation_type.hh"
@@ -2192,6 +2193,8 @@ using compress_sstable = tests::random_schema_specification::compress_sstable;
// A framework for scrub-related tests.
// Lives in a seastar thread
enum class random_schema { no, yes };
template <random_schema create_random_schema>
class scrub_test_framework {
public:
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
@@ -2274,8 +2277,53 @@ public:
}
};
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_content) {
scrub_test_framework test(compress_sstable::yes);
template <>
class scrub_test_framework<random_schema::no> {
public:
using test_func = std::function<void(table_for_tests&, compaction::table_state&, std::vector<sstables::shared_sstable>)>;
private:
sharded<test_env> _env;
public:
scrub_test_framework()
{
_env.start().get();
}
~scrub_test_framework() {
_env.stop().get();
}
test_env& env() { return _env.local(); }
void run(schema_ptr schema, shared_sstable sst, test_func func) {
auto& env = this->env();
auto table = env.make_table_for_tests(schema);
auto close_cf = deferred_stop(table);
table->start();
table->add_sstable_and_update_cache(sst).get();
bool found_sstable = false;
foreach_table_state_with_thread(table, [&] (compaction::table_state& ts) {
auto sstables = in_strategy_sstables(ts);
if (sstables.empty()) {
return;
}
BOOST_REQUIRE(sstables.size() == 1);
BOOST_REQUIRE(sstables.front() == sst);
found_sstable = true;
func(table, ts, sstables);
}).get();
BOOST_REQUIRE(found_sstable);
}
};
void scrub_validate_corrupted_content(compress_sstable compress) {
scrub_test_framework<random_schema::yes> test(compress);
auto schema = test.schema();
@@ -2302,8 +2350,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_content) {
});
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file) {
scrub_test_framework test(compress_sstable::yes);
void scrub_validate_corrupted_file(compress_sstable compress) {
scrub_test_framework<random_schema::yes> test(compress);
auto schema = test.schema();
@@ -2338,60 +2386,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file) {
});
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_valid_sstable) {
scrub_test_framework test(compress_sstable::yes);
auto schema = test.schema();
auto muts = tests::generate_random_mutations(test.random_schema()).get();
test.run(schema, muts, [] (table_for_tests& table, compaction::table_state& ts, std::vector<sstables::shared_sstable> sstables) {
BOOST_REQUIRE(sstables.size() == 1);
auto sst = sstables.front();
sstables::compaction_type_options::scrub opts = {
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
};
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_EQUAL(stats->validation_errors, 0);
BOOST_REQUIRE(!sst->is_quarantined());
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
});
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_content_uncompressed) {
scrub_test_framework test(compress_sstable::no);
auto schema = test.schema();
auto muts = tests::generate_random_mutations(
test.random_schema(),
tests::uncompactible_timestamp_generator(test.seed()),
tests::no_expiry_expiry_generator(),
std::uniform_int_distribution<size_t>(10, 10)).get();
std::swap(*muts.begin(), *(muts.begin() + 1));
test.run(schema, muts, [] (table_for_tests& table, compaction::table_state& ts, std::vector<sstables::shared_sstable> sstables) {
BOOST_REQUIRE(sstables.size() == 1);
auto sst = sstables.front();
sstables::compaction_type_options::scrub opts = {
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
};
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_GT(stats->validation_errors, 0);
BOOST_REQUIRE(sst->is_quarantined());
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
});
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file_uncompressed) {
scrub_test_framework test(compress_sstable::no);
void scrub_validate_corrupted_digest(compress_sstable compress) {
scrub_test_framework<random_schema::yes> test(compress);
auto schema = test.schema();
@@ -2405,14 +2401,16 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file_uncompr
BOOST_REQUIRE(sstables.size() == 1);
auto sst = sstables.front();
// Corrupt the data to cause an invalid checksum.
auto f = open_file_dma(sstables::test(sst).filename(component_type::Data).native(), open_flags::wo).get();
const auto wbuf_align = f.memory_dma_alignment();
const auto wbuf_len = f.disk_write_dma_alignment();
auto wbuf = seastar::temporary_buffer<char>::aligned(wbuf_align, wbuf_len);
std::fill(wbuf.get_write(), wbuf.get_write() + wbuf_len, 0xba);
f.dma_write(0, wbuf.get(), wbuf_len).get();
f.close().get();
// This test is about corrupted data with valid per-chunk checksums.
// This kind of corruption should be detected by the digest check.
// Triggering this is not trivial, so we corrupt the Digest file instead.
auto f = open_file_dma(sstables::test(sst).filename(component_type::Digest).native(), open_flags::rw).get();
auto stream = make_file_input_stream(f);
auto close_stream = deferred_close(stream);
auto digest_str = util::read_entire_stream_contiguous(stream).get();
auto digest = boost::lexical_cast<uint32_t>(digest_str);
auto new_digest = to_sstring<bytes>(digest + 1); // a random invalid digest
f.dma_write(0, new_digest.c_str(), new_digest.size()).get();
sstables::compaction_type_options::scrub opts = {
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
@@ -2426,8 +2424,8 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file_uncompr
});
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_valid_sstable_uncompressed) {
scrub_test_framework test(compress_sstable::no);
void scrub_validate_no_digest(compress_sstable compress) {
scrub_test_framework<random_schema::yes> test(compress);
auto schema = test.schema();
@@ -2437,6 +2435,10 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_valid_sstable_uncompre
BOOST_REQUIRE(sstables.size() == 1);
auto sst = sstables.front();
// Checksum and digest checking should be orthogonal.
// Ensure that per-chunk checksums are properly checked when digest is missing.
sstables::test(sst).rewrite_toc_without_component(component_type::Digest);
sstables::compaction_type_options::scrub opts = {
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
};
@@ -2448,15 +2450,90 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_valid_sstable_uncompre
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
BOOST_REQUIRE(!sst->get_checksum());
// Corrupt the data to cause an invalid checksum.
auto f = open_file_dma(sstables::test(sst).filename(component_type::Data).native(), open_flags::wo).get();
auto close_f = deferred_close(f);
const auto wbuf_align = f.memory_dma_alignment();
const auto wbuf_len = f.disk_write_dma_alignment();
auto wbuf = seastar::temporary_buffer<char>::aligned(wbuf_align, wbuf_len);
std::fill(wbuf.get_write(), wbuf.get_write() + wbuf_len, 0xba);
f.dma_write(0, wbuf.get(), wbuf_len).get();
stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_GT(stats->validation_errors, 0);
BOOST_REQUIRE(sst->is_quarantined());
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
});
}
void scrub_validate_valid(compress_sstable compress) {
scrub_test_framework<random_schema::yes> test(compress);
auto schema = test.schema();
auto muts = tests::generate_random_mutations(test.random_schema()).get();
test.run(schema, muts, [] (table_for_tests& table, compaction::table_state& ts, std::vector<sstables::shared_sstable> sstables) {
BOOST_REQUIRE(sstables.size() == 1);
auto sst = sstables.front();
sstables::compaction_type_options::scrub opts = {
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
};
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
BOOST_REQUIRE(stats.has_value());
BOOST_REQUIRE_EQUAL(stats->validation_errors, 0);
BOOST_REQUIRE(!sst->is_quarantined());
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
});
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_content) {
for (const auto& compress : {compress_sstable::no, compress_sstable::yes}) {
testlog.info("Validating {}compressed SSTable with content-level corruption...", compress == compress_sstable::no ? "un" : "");
scrub_validate_corrupted_content(compress);
}
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file) {
for (const auto& compress : {compress_sstable::no, compress_sstable::yes}) {
testlog.info("Validating {}compressed SSTable with invalid checksums...", compress == compress_sstable::no ? "un" : "");
scrub_validate_corrupted_file(compress);
}
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file_digest) {
for (const auto& compress : {compress_sstable::no, compress_sstable::yes}) {
testlog.info("Validating {}compressed SSTable with invalid digest...", compress == compress_sstable::no ? "un" : "");
scrub_validate_corrupted_digest(compress);
}
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_no_digest) {
for (const auto& compress : {compress_sstable::no, compress_sstable::yes}) {
testlog.info("Validating {}compressed SSTable with no digest...", compress == compress_sstable::no ? "un" : "");
scrub_validate_no_digest(compress);
}
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_valid_sstable) {
for (const auto& compress : {compress_sstable::no, compress_sstable::yes}) {
testlog.info("Validating {}compressed SSTable...", compress == compress_sstable::no ? "un" : "");
scrub_validate_valid(compress);
}
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_multiple_instances_uncompressed) {
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
return;
#endif
scrub_test_framework test(compress_sstable::no);
scrub_test_framework<random_schema::yes> test(compress_sstable::no);
auto schema = test.schema();
@@ -2506,6 +2583,132 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_multiple_instances_unc
});
}
// Following tests run scrub in validate mode with SSTables produced by Cassandra.
// The purpose is to verify compatibility.
//
// The SSTables live in the source tree under:
// test/resource/sstables/3.x/{uncompressed,lz4}/partition_key_with_values_of_different_types and
// test/resource/sstables/3.x/{uncompressed,lz4}/integrity_check
//
// The former are pre-existing SSTables that we use to test the valid case.
//
// The latter were tailor-made to cover the invalid case by triggering the checksum and digest checks.
// The SSTables were produced with the following schema:
//
// CREATE KEYSPACE test_ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
//
// CREATE TABLE test_ks.test_table ( pk INT,
// bool_val BOOLEAN,
// double_val DOUBLE,
// float_val FLOAT,
// int_val INT,
// long_val BIGINT,
// timestamp_val TIMESTAMP,
// timeuuid_val TIMEUUID,
// uuid_val UUID,
// text_val TEXT,
// PRIMARY KEY(pk))
// WITH compression = {<compression_params>};
//
// where <compression_params> is one of the following:
// {'enabled': false} for the uncompressed case,
// {'chunk_length_in_kb': '4', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'} for the compressed case.
static schema_builder make_cassandra_schema_builder() {
return schema_builder("test_ks", "test_table")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("bool_val", boolean_type)
.with_column("double_val", double_type)
.with_column("float_val", float_type)
.with_column("int_val", int32_type)
.with_column("long_val", long_type)
.with_column("timestamp_val", timestamp_type)
.with_column("timeuuid_val", timeuuid_type)
.with_column("uuid_val", uuid_type)
.with_column("text_val", utf8_type);
}
void scrub_validate_cassandra_compat(const compression_parameters& cp, sstring sstable_dir,
generation_type::int_t gen, sstable::version_types version, bool valid) {
scrub_test_framework<random_schema::no> test;
auto schema = make_cassandra_schema_builder()
.set_compressor_params(cp)
.build();
auto sst = test.env().reusable_sst(schema, sstable_dir, gen, version).get();
test.run(schema, sst, [valid] (table_for_tests& table, compaction::table_state& ts, std::vector<sstables::shared_sstable> sstables) {
BOOST_REQUIRE(sstables.size() == 1);
auto sst = sstables.front();
using scrub = sstables::compaction_type_options::scrub;
sstables::compaction_type_options::scrub opts = {
.operation_mode = scrub::mode::validate,
.quarantine_sstables = scrub::quarantine_invalid_sstables::no,
};
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
BOOST_REQUIRE(stats.has_value());
if (valid) {
BOOST_REQUIRE_EQUAL(stats->validation_errors, 0);
} else {
BOOST_REQUIRE_GT(stats->validation_errors, 0);
}
BOOST_REQUIRE(!sst->is_quarantined());
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
BOOST_REQUIRE(!sst->get_checksum());
});
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_valid_sstable_cassandra_compat) {
for (const auto& [cp, subdir] : {
std::pair{compression_parameters::no_compression(), "uncompressed"},
{compression_parameters(compressor::lz4), "lz4"}
}) {
testlog.info("Validating {}compressed SSTable from Cassandra...", cp.get_compressor() ? "" : "un");
scrub_validate_cassandra_compat(
cp,
seastar::format("test/resource/sstables/3.x/{}/partition_key_with_values_of_different_types", subdir),
1,
sstable::version_types::mc,
true
);
}
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file_cassandra_compat) {
for (const auto& [cp, subdir] : {
std::pair{compression_parameters::no_compression(), "uncompressed"},
{compression_parameters(compressor::lz4), "lz4"}
}) {
testlog.info("Validating {}compressed SSTable from Cassandra with invalid checksums...", cp.get_compressor() ? "" : "un");
scrub_validate_cassandra_compat(
cp,
seastar::format("test/resource/sstables/3.x/{}/integrity_check/invalid_checksums", subdir),
1,
sstable::version_types::me,
false
);
}
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file_digest_cassandra_compat) {
for (const auto& [cp, subdir] : {
std::pair{compression_parameters::no_compression(), "uncompressed"},
{compression_parameters(compressor::lz4), "lz4"}
}) {
testlog.info("Validating {}compressed SSTable from Cassandra with invalid digest...", cp.get_compressor() ? "" : "un");
scrub_validate_cassandra_compat(
cp,
seastar::format("test/resource/sstables/3.x/{}/integrity_check/invalid_digest", subdir),
1,
sstable::version_types::me,
false
);
}
}
SEASTAR_TEST_CASE(sstable_validate_test) {
return test_env::do_with_async([] (test_env& env) {
auto schema = schema_builder("ks", get_name())
@@ -2645,7 +2848,7 @@ SEASTAR_TEST_CASE(sstable_validate_test) {
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_abort_mode_test) {
scrub_test_framework test(compress_sstable::yes);
scrub_test_framework<random_schema::yes> test(compress_sstable::yes);
auto schema = test.schema();
@@ -2669,7 +2872,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_abort_mode_test) {
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_skip_mode_test) {
scrub_test_framework test(compress_sstable::yes);
scrub_test_framework<random_schema::yes> test(compress_sstable::yes);
auto schema = test.schema();
@@ -2717,7 +2920,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_skip_mode_test) {
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_segregate_mode_test) {
scrub_test_framework test(compress_sstable::yes);
scrub_test_framework<random_schema::yes> test(compress_sstable::yes);
auto schema = test.schema();
@@ -2757,7 +2960,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_segregate_mode_test) {
}
SEASTAR_THREAD_TEST_CASE(sstable_scrub_quarantine_mode_test) {
scrub_test_framework test(compress_sstable::yes);
scrub_test_framework<random_schema::yes> test(compress_sstable::yes);
auto schema = test.schema();

View File

@@ -676,7 +676,7 @@ SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) {
auto make_is = [&] {
f = open_file_dma(file_path, open_flags::ro).get();
return make_compressed_file_m_format_input_stream(f, &c, 0, uncompressed_size, opts, semaphore.make_permit());
return make_compressed_file_m_format_input_stream(f, &c, 0, uncompressed_size, opts, semaphore.make_permit(), std::nullopt);
};
auto expect = [] (input_stream<char>& in, const temporary_buffer<char>& buf) {

View File

@@ -0,0 +1,8 @@
Statistics.db
Index.db
CompressionInfo.db
Filter.db
Data.db
Summary.db
Digest.crc32
TOC.txt

View File

@@ -0,0 +1,8 @@
Statistics.db
Index.db
CompressionInfo.db
Filter.db
Data.db
Summary.db
Digest.crc32
TOC.txt

View File

@@ -0,0 +1,8 @@
Statistics.db
Index.db
Filter.db
CRC.db
Data.db
Summary.db
Digest.crc32
TOC.txt

View File

@@ -0,0 +1,8 @@
Statistics.db
Index.db
Filter.db
CRC.db
Data.db
Summary.db
Digest.crc32
TOC.txt