Merge 'Validate checksums for uncompressed SSTables' from Nikos Dragazis
This PR introduces a new file data source implementation for uncompressed SSTables that will be validating the checksum of each chunk that is being read. Unlike for compressed SSTables, checksum validation for uncompressed SSTables will be active for scrub/validate reads but not for normal user reads to ensure we will not have any performance regression. It consists of: * A new file data source for uncompressed SSTables. * Integration of checksums into SSTable's shareable components. The validation code loads the component on demand and manages its lifecycle with shared pointers. * A new `integrity_check` flag to enable the new file data source for uncompressed SSTables. The flag is currently enabled only through the validation path, i.e., it does not affect normal user reads. * New scrub tests for both compressed and uncompressed SSTables, as well as improvements in the existing ones. * A change in JSON response of `scylla validate-checksums` to report if an uncompressed SSTable cannot be validated due to lack of checksums (no `CRC.db` in `TOC.txt`). Refs #19058. New feature, no backport is needed. Closes scylladb/scylladb#20207 * github.com:scylladb/scylladb: test: Add test to validate SSTables with no checksums tools: Fix typo in help message of scylla validate-checksums sstables: Allow validate_checksums() to report missing checksums test: Add test for concurrent scrub/validate operations test: Add scrub/validate tests for uncompressed SSTables test/lib: Add option to create uncompressed random schemas test: Add test for scrub/validate with file-level corruption test: Check validation errors in scrub tests sstables: Enable checksum validation for uncompressed SSTables sstables: Expose integrity option via crawling mutation readers sstables: Expose integrity option via data_consume_rows() sstables: Add option for integrity check in data streams sstables: Remove unused variable sstables: Add checksum in the SSTable components sstables: Introduce checksummed file data source implementation sstables: Replace assert with on_internal_error
This commit is contained in:
@@ -859,6 +859,7 @@ scylla_core = (['message/messaging_service.cc',
|
||||
'sstables/kl/reader.cc',
|
||||
'sstables/sstable_version.cc',
|
||||
'sstables/compress.cc',
|
||||
'sstables/checksummed_data_source.cc',
|
||||
'sstables/sstable_mutation_reader.cc',
|
||||
'compaction/compaction.cc',
|
||||
'compaction/compaction_strategy.cc',
|
||||
|
||||
@@ -581,14 +581,20 @@ During normal reads, ScyllaDB validates the per-chunk checksum for compressed SS
|
||||
The digest and the per-chunk checksum of uncompressed SStables are currently not checked on any code paths.
|
||||
|
||||
This operation reads the entire ``Data.db`` and validates both kinds of checksums against the data.
|
||||
Errors found are logged to stderr. The output contains a bool for each SStable that is true if the SStable matches all checksums.
|
||||
Errors found are logged to stderr. The output contains an object for each SStable that indicates if the SStable has checksums (false only for uncompressed SStables
|
||||
for which ``CRC.db`` is not present in ``TOC.txt``), and if the SStable matches all checksums.
|
||||
|
||||
The content is dumped in JSON, using the following schema:
|
||||
|
||||
.. code-block:: none
|
||||
:class: hide-copy-button
|
||||
|
||||
$ROOT := { "$sstable_path": Bool, ... }
|
||||
$ROOT := { "$sstable_path": $RESULT, ... }
|
||||
|
||||
$RESULT := {
|
||||
"has_checksums": Bool,
|
||||
"valid_checksums": Bool, // optional
|
||||
}
|
||||
|
||||
decompress
|
||||
^^^^^^^^^^
|
||||
|
||||
@@ -2,6 +2,7 @@ add_library(sstables STATIC)
|
||||
target_sources(sstables
|
||||
PRIVATE
|
||||
compress.cc
|
||||
checksummed_data_source.cc
|
||||
integrity_checked_file_impl.cc
|
||||
kl/reader.cc
|
||||
metadata_collector.cc
|
||||
|
||||
156
sstables/checksummed_data_source.cc
Normal file
156
sstables/checksummed_data_source.cc
Normal file
@@ -0,0 +1,156 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
#include <seastar/core/bitops.hh>
|
||||
#include <seastar/core/fstream.hh>
|
||||
#include <seastar/core/future.hh>
|
||||
#include <seastar/core/temporary_buffer.hh>
|
||||
|
||||
#include "types.hh"
|
||||
#include "exceptions.hh"
|
||||
#include "checksum_utils.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
extern logging::logger sstlog;
|
||||
|
||||
// File data source implementation for SSTables with attached checksum
|
||||
// data and no compression
|
||||
template <ChecksumUtils ChecksumType>
|
||||
class checksummed_file_data_source_impl : public data_source_impl {
|
||||
std::optional<input_stream<char>> _input_stream;
|
||||
const checksum& _checksum;
|
||||
uint64_t _chunk_size_trailing_zeros;
|
||||
uint64_t _underlying_pos;
|
||||
uint64_t _pos;
|
||||
uint64_t _beg_pos;
|
||||
uint64_t _end_pos;
|
||||
public:
|
||||
checksummed_file_data_source_impl(file f, uint64_t file_len,
|
||||
const checksum& checksum, uint64_t pos, size_t len,
|
||||
file_input_stream_options options)
|
||||
: _checksum(checksum)
|
||||
, _pos(pos)
|
||||
, _beg_pos(pos)
|
||||
, _end_pos(pos + len)
|
||||
{
|
||||
// _beg_pos and _end_pos specify positions in the stream.
|
||||
// These are not necessarily aligned on chunk boundaries.
|
||||
// To be able to verify the checksums, we need to translate
|
||||
// them into a range of chunks that contain the given
|
||||
// address range, and open a file input stream to read that
|
||||
// range. The _underlying_pos always points to the current
|
||||
// chunk-aligned position of the file input stream.
|
||||
auto chunk_size = checksum.chunk_size;
|
||||
if (chunk_size == 0 || (chunk_size & (chunk_size - 1)) != 0) {
|
||||
on_internal_error(sstlog, format("Invalid chunk size: {}", chunk_size));
|
||||
}
|
||||
_chunk_size_trailing_zeros = count_trailing_zeros(chunk_size);
|
||||
if (_pos > file_len) {
|
||||
on_internal_error(sstlog, "attempt to read beyond end");
|
||||
}
|
||||
if (len == 0 || _pos == file_len) {
|
||||
// Nothing to read
|
||||
_end_pos = _pos;
|
||||
return;
|
||||
}
|
||||
if (len > file_len - _pos) {
|
||||
_end_pos = file_len;
|
||||
}
|
||||
auto start = _beg_pos & ~(chunk_size - 1);
|
||||
auto end = (_end_pos & ~(chunk_size - 1)) + chunk_size;
|
||||
_input_stream = make_file_input_stream(std::move(f), start, end - start, std::move(options));
|
||||
_underlying_pos = start;
|
||||
}
|
||||
|
||||
virtual future<temporary_buffer<char>> get() override {
|
||||
auto chunk_size = _checksum.chunk_size;
|
||||
if (_pos >= _end_pos) {
|
||||
return make_ready_future<temporary_buffer<char>>();
|
||||
}
|
||||
// Read the next chunk. We need to skip part of the first
|
||||
// chunk, but then continue to read from beginning of chunks.
|
||||
// Also, we need to take into account that the last chunk can
|
||||
// be smaller than `chunk_size`.
|
||||
if (_pos != _beg_pos && (_pos & (chunk_size - 1)) != 0) {
|
||||
throw std::runtime_error(format("Checksummed reader not aligned to chunk boundary: pos={}, chunk_size={}", _pos, chunk_size));
|
||||
}
|
||||
return _input_stream->read_exactly(chunk_size).then([this, chunk_size](temporary_buffer<char> buf) {
|
||||
auto expected_checksum = _checksum.checksums[_pos >> _chunk_size_trailing_zeros];
|
||||
auto actual_checksum = ChecksumType::checksum(buf.get(), buf.size());
|
||||
if (expected_checksum != actual_checksum) {
|
||||
throw sstables::malformed_sstable_exception(format("Checksummed chunk of size {} at file offset {} failed checksum: expected={}, actual={}", buf.size(), _underlying_pos, expected_checksum, actual_checksum));
|
||||
}
|
||||
buf.trim_front(_pos & (chunk_size - 1));
|
||||
_pos += buf.size();
|
||||
_underlying_pos += chunk_size;
|
||||
return buf;
|
||||
});
|
||||
}
|
||||
|
||||
virtual future<> close() override {
|
||||
if (!_input_stream) {
|
||||
return make_ready_future<>();
|
||||
}
|
||||
return _input_stream->close();
|
||||
}
|
||||
|
||||
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
|
||||
auto chunk_size = _checksum.chunk_size;
|
||||
if (_pos + n > _end_pos) {
|
||||
on_internal_error(sstlog, format("Skipping over the end position is disallowed: current pos={}, end pos={}, skip len={}", _pos, _end_pos, n));
|
||||
}
|
||||
_pos += n;
|
||||
if (_pos == _end_pos) {
|
||||
return make_ready_future<temporary_buffer<char>>();
|
||||
}
|
||||
auto underlying_n = (_pos & ~(chunk_size - 1)) - _underlying_pos;
|
||||
_beg_pos = _pos;
|
||||
_underlying_pos += underlying_n;
|
||||
return _input_stream->skip(underlying_n).then([] {
|
||||
return make_ready_future<temporary_buffer<char>>();
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
template <ChecksumUtils ChecksumType>
|
||||
class checksummed_file_data_source : public data_source {
|
||||
public:
|
||||
checksummed_file_data_source(file f, uint64_t file_len, const checksum& checksum,
|
||||
uint64_t offset, size_t len, file_input_stream_options options)
|
||||
: data_source(std::make_unique<checksummed_file_data_source_impl<ChecksumType>>(
|
||||
std::move(f), file_len, checksum, offset, len, std::move(options)))
|
||||
{}
|
||||
};
|
||||
|
||||
template <ChecksumUtils ChecksumType>
|
||||
inline input_stream<char> make_checksummed_file_input_stream(
|
||||
file f, uint64_t file_len, const checksum& checksum, uint64_t offset,
|
||||
size_t len, file_input_stream_options options)
|
||||
{
|
||||
return input_stream<char>(checksummed_file_data_source<ChecksumType>(
|
||||
std::move(f), file_len, checksum, offset, len, std::move(options)));
|
||||
}
|
||||
|
||||
input_stream<char> make_checksummed_file_k_l_format_input_stream(
|
||||
file f, uint64_t file_len, const checksum& checksum, uint64_t offset,
|
||||
size_t len, file_input_stream_options options)
|
||||
{
|
||||
return make_checksummed_file_input_stream<adler32_utils>(std::move(f), file_len, checksum, offset, len, std::move(options));
|
||||
}
|
||||
|
||||
input_stream<char> make_checksummed_file_m_format_input_stream(
|
||||
file f, uint64_t file_len, const checksum& checksum, uint64_t offset,
|
||||
size_t len, file_input_stream_options options)
|
||||
{
|
||||
return make_checksummed_file_input_stream<crc32_utils>(std::move(f), file_len, checksum, offset, len, std::move(options));
|
||||
}
|
||||
|
||||
}
|
||||
25
sstables/checksummed_data_source.hh
Normal file
25
sstables/checksummed_data_source.hh
Normal file
@@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright (C) 2024-present ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* SPDX-License-Identifier: AGPL-3.0-or-later
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/seastar.hh>
|
||||
#include <seastar/core/iostream.hh>
|
||||
|
||||
#include "sstables/types.hh"
|
||||
|
||||
namespace sstables {
|
||||
|
||||
input_stream<char> make_checksummed_file_k_l_format_input_stream(file f,
|
||||
uint64_t file_len, const sstables::checksum& checksum,
|
||||
uint64_t offset, size_t len, class file_input_stream_options options);
|
||||
|
||||
input_stream<char> make_checksummed_file_m_format_input_stream(file f,
|
||||
uint64_t file_len, const sstables::checksum& checksum,
|
||||
uint64_t offset, size_t len, class file_input_stream_options options);
|
||||
}
|
||||
@@ -14,6 +14,7 @@
|
||||
#include <seastar/core/bitops.hh>
|
||||
#include <seastar/core/byteorder.hh>
|
||||
#include <seastar/core/fstream.hh>
|
||||
#include <seastar/core/on_internal_error.hh>
|
||||
|
||||
#include "../compress.hh"
|
||||
#include "compress.hh"
|
||||
@@ -387,7 +388,7 @@ public:
|
||||
// Uncompress the next chunk. We need to skip part of the first
|
||||
// chunk, but then continue to read from beginning of chunks.
|
||||
if (_pos != _beg_pos && addr.offset != 0) {
|
||||
throw std::runtime_error("compressed reader out of sync");
|
||||
throw std::runtime_error(format("compressed reader not aligned to chunk boundary: pos={} offset={}", _pos, addr.offset));
|
||||
}
|
||||
if (!addr.chunk_len) {
|
||||
throw sstables::malformed_sstable_exception(format("compressed chunk_len must be greater than zero, chunk_start={}", addr.chunk_start));
|
||||
@@ -436,8 +437,10 @@ public:
|
||||
}
|
||||
|
||||
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
|
||||
if (_pos + n > _end_pos) {
|
||||
on_internal_error(sstables::sstlog, format("Skipping over the end position is disallowed: current pos={}, end pos={}, skip len={}", _pos, _end_pos, n));
|
||||
}
|
||||
_pos += n;
|
||||
SCYLLA_ASSERT(_pos <= _end_pos);
|
||||
if (_pos == _end_pos) {
|
||||
return make_ready_future<temporary_buffer<char>>();
|
||||
}
|
||||
|
||||
@@ -1347,7 +1347,7 @@ private:
|
||||
sstable::disk_read_range drr{begin, *end};
|
||||
auto last_end = _fwd_mr ? _sst->data_size() : drr.end;
|
||||
_read_enabled = bool(drr);
|
||||
_context = data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, std::move(drr), last_end);
|
||||
_context = data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, std::move(drr), last_end, sstable::integrity_check::no);
|
||||
}
|
||||
|
||||
_monitor.on_read_started(_context->reader_position());
|
||||
@@ -1548,10 +1548,11 @@ public:
|
||||
crawling_sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
read_monitor& mon)
|
||||
read_monitor& mon,
|
||||
sstable::integrity_check integrity)
|
||||
: mp_row_consumer_reader_k_l(std::move(schema), permit, std::move(sst))
|
||||
, _consumer(this, _schema, std::move(permit), _schema->full_slice(), std::move(trace_state), streamed_mutation::forwarding::no, _sst)
|
||||
, _context(data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer))
|
||||
, _context(data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, integrity))
|
||||
, _monitor(mon) {
|
||||
_monitor.on_read_started(_context->reader_position());
|
||||
}
|
||||
@@ -1594,9 +1595,10 @@ mutation_reader make_crawling_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
read_monitor& monitor) {
|
||||
read_monitor& monitor,
|
||||
sstable::integrity_check integrity) {
|
||||
return make_mutation_reader<crawling_sstable_mutation_reader>(std::move(sstable), std::move(schema), std::move(permit),
|
||||
std::move(trace_state), monitor);
|
||||
std::move(trace_state), monitor, integrity);
|
||||
}
|
||||
|
||||
} // namespace kl
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "readers/mutation_reader_fwd.hh"
|
||||
#include "readers/mutation_reader.hh"
|
||||
#include "sstables/progress_monitor.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
|
||||
namespace sstables {
|
||||
namespace kl {
|
||||
@@ -45,7 +46,8 @@ mutation_reader make_crawling_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
read_monitor& monitor);
|
||||
read_monitor& monitor,
|
||||
sstable::integrity_check integrity);
|
||||
|
||||
} // namespace kl
|
||||
} // namespace sstables
|
||||
|
||||
@@ -1544,7 +1544,7 @@ private:
|
||||
sstable::disk_read_range drr{begin, *end};
|
||||
auto last_end = _fwd_mr ? _sst->data_size() : drr.end;
|
||||
_read_enabled = bool(drr);
|
||||
_context = data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, std::move(drr), last_end);
|
||||
_context = data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, std::move(drr), last_end, sstable::integrity_check::no);
|
||||
}
|
||||
|
||||
_monitor.on_read_started(_context->reader_position());
|
||||
@@ -1748,10 +1748,11 @@ public:
|
||||
mx_crawling_sstable_mutation_reader(shared_sstable sst, schema_ptr schema,
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
read_monitor& mon)
|
||||
read_monitor& mon,
|
||||
sstable::integrity_check integrity)
|
||||
: mp_row_consumer_reader_mx(std::move(schema), permit, std::move(sst))
|
||||
, _consumer(this, _schema, std::move(permit), _schema->full_slice(), std::move(trace_state), streamed_mutation::forwarding::no, _sst)
|
||||
, _context(data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer))
|
||||
, _context(data_consume_rows<DataConsumeRowsContext>(*_schema, _sst, _consumer, integrity))
|
||||
, _monitor(mon) {
|
||||
_monitor.on_read_started(_context->reader_position());
|
||||
}
|
||||
@@ -1792,9 +1793,10 @@ mutation_reader make_crawling_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
read_monitor& monitor) {
|
||||
read_monitor& monitor,
|
||||
sstable::integrity_check integrity) {
|
||||
return make_mutation_reader<mx_crawling_sstable_mutation_reader>(std::move(sstable), std::move(schema), std::move(permit),
|
||||
std::move(trace_state), monitor);
|
||||
std::move(trace_state), monitor, integrity);
|
||||
}
|
||||
|
||||
void mp_row_consumer_reader_mx::on_next_partition(dht::decorated_key key, tombstone tomb) {
|
||||
@@ -2040,7 +2042,7 @@ future<uint64_t> validate(
|
||||
sstables::read_monitor& monitor) {
|
||||
auto schema = sstable->get_schema();
|
||||
validating_consumer consumer(schema, permit, sstable, std::move(error_handler));
|
||||
auto context = data_consume_rows<data_consume_rows_context_m<validating_consumer>>(*schema, sstable, consumer);
|
||||
auto context = data_consume_rows<data_consume_rows_context_m<validating_consumer>>(*schema, sstable, consumer, sstable::integrity_check::yes);
|
||||
|
||||
std::optional<sstables::index_reader> idx_reader;
|
||||
idx_reader.emplace(sstable, permit, tracing::trace_state_ptr{}, sstables::use_caching::no, false);
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
#include "readers/mutation_reader_fwd.hh"
|
||||
#include "readers/mutation_reader.hh"
|
||||
#include "sstables/progress_monitor.hh"
|
||||
#include "sstables/sstables.hh"
|
||||
|
||||
namespace sstables {
|
||||
namespace mx {
|
||||
@@ -48,7 +49,8 @@ mutation_reader make_crawling_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
read_monitor& monitor);
|
||||
read_monitor& monitor,
|
||||
sstable::integrity_check integrity);
|
||||
|
||||
// Validate the content of the sstable with the mutation_fragment_stream_valdiator,
|
||||
// additionally cross checking that the content is laid out as expected by the
|
||||
|
||||
@@ -9,6 +9,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <seastar/core/weak_ptr.hh>
|
||||
|
||||
#include "compress.hh"
|
||||
#include "sstables/types.hh"
|
||||
#include "utils/i_filter.hh"
|
||||
@@ -22,6 +24,7 @@ struct shareable_components {
|
||||
sstables::summary summary;
|
||||
sstables::statistics statistics;
|
||||
std::optional<sstables::scylla_metadata> scylla_metadata;
|
||||
weak_ptr<sstables::checksum> checksum;
|
||||
};
|
||||
|
||||
} // namespace sstables
|
||||
|
||||
@@ -108,14 +108,15 @@ position_in_partition_view get_slice_upper_bound(const schema& s, const query::p
|
||||
// The amount of this excessive read is controlled by read ahead
|
||||
// heuristics which learn from the usefulness of previous read aheads.
|
||||
template <typename DataConsumeRowsContext>
|
||||
inline std::unique_ptr<DataConsumeRowsContext> data_consume_rows(const schema& s, shared_sstable sst, typename DataConsumeRowsContext::consumer& consumer, sstable::disk_read_range toread, uint64_t last_end) {
|
||||
inline std::unique_ptr<DataConsumeRowsContext> data_consume_rows(const schema& s, shared_sstable sst, typename DataConsumeRowsContext::consumer& consumer,
|
||||
sstable::disk_read_range toread, uint64_t last_end, sstable::integrity_check integrity) {
|
||||
// Although we were only asked to read until toread.end, we'll not limit
|
||||
// the underlying file input stream to this end, but rather to last_end.
|
||||
// This potentially enables read-ahead beyond end, until last_end, which
|
||||
// can be beneficial if the user wants to fast_forward_to() on the
|
||||
// returned context, and may make small skips.
|
||||
auto input = sst->data_stream(toread.start, last_end - toread.start,
|
||||
consumer.permit(), consumer.trace_state(), sst->_partition_range_history);
|
||||
consumer.permit(), consumer.trace_state(), sst->_partition_range_history, sstable::raw_stream::no, integrity);
|
||||
return std::make_unique<DataConsumeRowsContext>(s, std::move(sst), consumer, std::move(input), toread.start, toread.end - toread.start);
|
||||
}
|
||||
|
||||
@@ -155,9 +156,10 @@ inline std::unique_ptr<DataConsumeRowsContext> data_consume_single_partition(con
|
||||
|
||||
// Like data_consume_rows() with bounds, but iterates over whole range
|
||||
template <typename DataConsumeRowsContext>
|
||||
inline std::unique_ptr<DataConsumeRowsContext> data_consume_rows(const schema& s, shared_sstable sst, typename DataConsumeRowsContext::consumer& consumer) {
|
||||
auto data_size = sst->data_size();
|
||||
return data_consume_rows<DataConsumeRowsContext>(s, std::move(sst), consumer, {0, data_size}, data_size);
|
||||
inline std::unique_ptr<DataConsumeRowsContext> data_consume_rows(const schema& s, shared_sstable sst, typename DataConsumeRowsContext::consumer& consumer,
|
||||
sstable::integrity_check integrity) {
|
||||
auto data_size = sst->data_size();
|
||||
return data_consume_rows<DataConsumeRowsContext>(s, std::move(sst), consumer, {0, data_size}, data_size, integrity);
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
|
||||
@@ -47,6 +47,7 @@
|
||||
#include "metadata_collector.hh"
|
||||
#include "progress_monitor.hh"
|
||||
#include "compress.hh"
|
||||
#include "checksummed_data_source.hh"
|
||||
#include "index_reader.hh"
|
||||
#include "downsampling.hh"
|
||||
#include <boost/algorithm/string.hpp>
|
||||
@@ -1898,14 +1899,43 @@ sstable_writer sstable::get_writer(const schema& s, uint64_t estimated_partition
|
||||
|
||||
future<uint64_t> sstable::validate(reader_permit permit, abort_source& abort,
|
||||
std::function<void(sstring)> error_handler, sstables::read_monitor& monitor) {
|
||||
auto handle_sstable_exception = [&error_handler](const malformed_sstable_exception& e, uint64_t& errors) -> std::exception_ptr {
|
||||
std::exception_ptr ex;
|
||||
try {
|
||||
error_handler(format("unrecoverable error: {}", e));
|
||||
++errors;
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
return ex;
|
||||
};
|
||||
|
||||
uint64_t errors = 0;
|
||||
std::exception_ptr ex;
|
||||
lw_shared_ptr<checksum> checksum;
|
||||
try {
|
||||
checksum = co_await read_checksum();
|
||||
} catch (const malformed_sstable_exception& e) {
|
||||
ex = handle_sstable_exception(e, errors);
|
||||
}
|
||||
if (ex) {
|
||||
co_return coroutine::exception(std::move(ex));
|
||||
}
|
||||
if (errors) {
|
||||
co_return errors;
|
||||
}
|
||||
co_await utils::get_local_injector().inject("sstable_validate/pause", [] (auto& handler) {
|
||||
sstlog.info("sstable_validate/pause init");
|
||||
auto ret = handler.wait_for_message(std::chrono::steady_clock::now() + std::chrono::seconds{5});
|
||||
sstlog.info("sstable_validate/pause done");
|
||||
return ret;
|
||||
});
|
||||
|
||||
if (_version >= sstable_version_types::mc) {
|
||||
co_return co_await mx::validate(shared_from_this(), std::move(permit), abort, std::move(error_handler), monitor);
|
||||
}
|
||||
|
||||
auto reader = make_crawling_reader(_schema, permit, nullptr, monitor);
|
||||
|
||||
uint64_t errors = 0;
|
||||
std::exception_ptr ex;
|
||||
auto reader = make_crawling_reader(_schema, permit, nullptr, monitor, integrity_check::yes);
|
||||
|
||||
try {
|
||||
auto validator = mutation_fragment_stream_validator(*_schema);
|
||||
@@ -1937,12 +1967,7 @@ future<uint64_t> sstable::validate(reader_permit permit, abort_source& abort,
|
||||
++errors;
|
||||
}
|
||||
} catch (const malformed_sstable_exception& e) {
|
||||
try {
|
||||
error_handler(format("unrecoverable error: {}", e));
|
||||
++errors;
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
ex = handle_sstable_exception(e, errors);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
@@ -2308,11 +2333,12 @@ sstable::make_crawling_reader(
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state,
|
||||
read_monitor& monitor) {
|
||||
read_monitor& monitor,
|
||||
integrity_check integrity) {
|
||||
if (_version >= version_types::mc) {
|
||||
return mx::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor);
|
||||
return mx::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
|
||||
}
|
||||
return kl::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor);
|
||||
return kl::make_crawling_reader(shared_from_this(), std::move(schema), std::move(permit), std::move(trace_state), monitor, integrity);
|
||||
}
|
||||
|
||||
static std::tuple<entry_descriptor, sstring, sstring> make_entry_descriptor(const std::filesystem::path& sst_path, sstring* const provided_ks, sstring* const provided_cf) {
|
||||
@@ -2410,7 +2436,8 @@ component_type sstable::component_from_sstring(version_types v, const sstring &s
|
||||
}
|
||||
|
||||
input_stream<char> sstable::data_stream(uint64_t pos, size_t len,
|
||||
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history, raw_stream raw) {
|
||||
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history, raw_stream raw,
|
||||
integrity_check integrity) {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = sstable_buffer_size;
|
||||
options.read_ahead = 4;
|
||||
@@ -2421,8 +2448,9 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len,
|
||||
f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename()));
|
||||
}
|
||||
|
||||
input_stream<char> stream;
|
||||
if (_components->compression && raw == raw_stream::no) {
|
||||
// Disabling integrity checks is not supported by compressed
|
||||
// file input streams. `integrity` is ignored.
|
||||
if (_version >= sstable_version_types::mc) {
|
||||
return make_compressed_file_m_format_input_stream(f, &_components->compression,
|
||||
pos, len, std::move(options), permit);
|
||||
@@ -2431,7 +2459,17 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len,
|
||||
pos, len, std::move(options), permit);
|
||||
}
|
||||
}
|
||||
|
||||
if (_components->checksum && integrity == integrity_check::yes) {
|
||||
auto checksum = get_checksum();
|
||||
auto file_len = data_size();
|
||||
if (_version >= sstable_version_types::mc) {
|
||||
return make_checksummed_file_m_format_input_stream(f, file_len,
|
||||
*checksum, pos, len, std::move(options));
|
||||
} else {
|
||||
return make_checksummed_file_k_l_format_input_stream(f, file_len,
|
||||
*checksum, pos, len, std::move(options));
|
||||
}
|
||||
}
|
||||
return make_file_input_stream(f, pos, len, std::move(options));
|
||||
}
|
||||
|
||||
@@ -2564,10 +2602,15 @@ future<uint32_t> sstable::read_digest() {
|
||||
co_return boost::lexical_cast<uint32_t>(digest_str);
|
||||
}
|
||||
|
||||
future<checksum> sstable::read_checksum() {
|
||||
sstables::checksum checksum;
|
||||
|
||||
co_await do_read_simple(component_type::CRC, [&] (version_types v, file crc_file) -> future<> {
|
||||
future<lw_shared_ptr<checksum>> sstable::read_checksum() {
|
||||
if (_components->checksum) {
|
||||
co_return _components->checksum->shared_from_this();
|
||||
}
|
||||
if (!has_component(component_type::CRC)) {
|
||||
co_return nullptr;
|
||||
}
|
||||
auto checksum = make_lw_shared<sstables::checksum>();
|
||||
co_await do_read_simple(component_type::CRC, [checksum, this] (version_types v, file crc_file) -> future<> {
|
||||
file_input_stream_options options;
|
||||
options.buffer_size = 4096;
|
||||
|
||||
@@ -2580,12 +2623,12 @@ future<checksum> sstable::read_checksum() {
|
||||
|
||||
auto buf = co_await crc_stream.read_exactly(size);
|
||||
check_buf_size(buf, size);
|
||||
checksum.chunk_size = net::ntoh(read_unaligned<uint32_t>(buf.get()));
|
||||
checksum->chunk_size = net::ntoh(read_unaligned<uint32_t>(buf.get()));
|
||||
|
||||
buf = co_await crc_stream.read_exactly(size);
|
||||
while (!buf.empty()) {
|
||||
check_buf_size(buf, size);
|
||||
checksum.checksums.push_back(net::ntoh(read_unaligned<uint32_t>(buf.get())));
|
||||
checksum->checksums.push_back(net::ntoh(read_unaligned<uint32_t>(buf.get())));
|
||||
buf = co_await crc_stream.read_exactly(size);
|
||||
}
|
||||
} catch (...) {
|
||||
@@ -2594,17 +2637,19 @@ future<checksum> sstable::read_checksum() {
|
||||
|
||||
co_await crc_stream.close();
|
||||
maybe_rethrow_exception(std::move(ex));
|
||||
_components->checksum = checksum->weak_from_this();
|
||||
});
|
||||
|
||||
co_return checksum;
|
||||
co_return std::move(checksum);
|
||||
}
|
||||
|
||||
future<bool> validate_checksums(shared_sstable sst, reader_permit permit) {
|
||||
future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_permit permit) {
|
||||
const auto digest = co_await sst->read_digest();
|
||||
|
||||
auto data_stream = sst->data_stream(0, sst->ondisk_data_size(), permit, nullptr, nullptr, sstable::raw_stream::yes);
|
||||
|
||||
auto valid = true;
|
||||
auto ret = validate_checksums_result::valid;
|
||||
std::exception_ptr ex;
|
||||
|
||||
try {
|
||||
@@ -2616,10 +2661,13 @@ future<bool> validate_checksums(shared_sstable sst, reader_permit permit) {
|
||||
}
|
||||
} else {
|
||||
auto checksum = co_await sst->read_checksum();
|
||||
if (sst->get_version() >= sstable_version_types::mc) {
|
||||
valid = co_await do_validate_uncompressed<crc32_utils>(data_stream, checksum, digest);
|
||||
if (!checksum) {
|
||||
sstlog.warn("No checksums available for SSTable: {}", sst->get_filename());
|
||||
ret = validate_checksums_result::no_checksum;
|
||||
} else if (sst->get_version() >= sstable_version_types::mc) {
|
||||
valid = co_await do_validate_uncompressed<crc32_utils>(data_stream, *checksum, digest);
|
||||
} else {
|
||||
valid = co_await do_validate_uncompressed<adler32_utils>(data_stream, checksum, digest);
|
||||
valid = co_await do_validate_uncompressed<adler32_utils>(data_stream, *checksum, digest);
|
||||
}
|
||||
}
|
||||
} catch (...) {
|
||||
@@ -2629,7 +2677,10 @@ future<bool> validate_checksums(shared_sstable sst, reader_permit permit) {
|
||||
co_await data_stream.close();
|
||||
maybe_rethrow_exception(std::move(ex));
|
||||
|
||||
co_return valid;
|
||||
if (!valid) {
|
||||
ret = validate_checksums_result::invalid;
|
||||
}
|
||||
co_return ret;
|
||||
}
|
||||
|
||||
void sstable::set_first_and_last_keys() {
|
||||
|
||||
@@ -190,6 +190,7 @@ public:
|
||||
using format_types = sstable_format_types;
|
||||
using manager_list_link_type = bi::list_member_hook<bi::link_mode<bi::auto_unlink>>;
|
||||
using manager_set_link_type = bi::set_member_hook<bi::link_mode<bi::auto_unlink>>;
|
||||
using integrity_check = bool_class<class integrity_check_tag>;
|
||||
public:
|
||||
sstable(schema_ptr schema,
|
||||
sstring table_dir,
|
||||
@@ -287,7 +288,8 @@ public:
|
||||
schema_ptr schema,
|
||||
reader_permit permit,
|
||||
tracing::trace_state_ptr trace_state = {},
|
||||
read_monitor& monitor = default_read_monitor());
|
||||
read_monitor& monitor = default_read_monitor(),
|
||||
integrity_check integrity = integrity_check::no);
|
||||
|
||||
// Returns mutation_source containing all writes contained in this sstable.
|
||||
// The mutation_source shares ownership of this sstable.
|
||||
@@ -724,9 +726,13 @@ public:
|
||||
//
|
||||
// When created with `raw_stream::yes`, the sstable data file will be
|
||||
// streamed as-is, without decompressing (if compressed).
|
||||
//
|
||||
// When created with `integrity_check::yes`, the integrity mechanisms
|
||||
// of the underlying data streams will be enabled.
|
||||
using raw_stream = bool_class<class raw_stream_tag>;
|
||||
input_stream<char> data_stream(uint64_t pos, size_t len,
|
||||
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history, raw_stream raw = raw_stream::no);
|
||||
reader_permit permit, tracing::trace_state_ptr trace_state, lw_shared_ptr<file_input_stream_history> history,
|
||||
raw_stream raw = raw_stream::no, integrity_check integrity = integrity_check::no);
|
||||
|
||||
// Read exactly the specific byte range from the data file (after
|
||||
// uncompression, if the file is compressed). This can be used to read
|
||||
@@ -922,6 +928,10 @@ public:
|
||||
return _components->summary;
|
||||
}
|
||||
|
||||
const lw_shared_ptr<const checksum> get_checksum() const {
|
||||
return _components->checksum ? _components->checksum->shared_from_this() : nullptr;
|
||||
}
|
||||
|
||||
// Gets ratio of droppable tombstone. A tombstone is considered droppable here
|
||||
// for cells and tombstones expired before the time point "GC before", which
|
||||
// is the point before which expiring data can be purged.
|
||||
@@ -986,19 +996,19 @@ public:
|
||||
friend class sstables_manager;
|
||||
template <typename DataConsumeRowsContext>
|
||||
friend std::unique_ptr<DataConsumeRowsContext>
|
||||
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, disk_read_range, uint64_t);
|
||||
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, disk_read_range, uint64_t, sstable::integrity_check);
|
||||
template <typename DataConsumeRowsContext>
|
||||
friend std::unique_ptr<DataConsumeRowsContext>
|
||||
data_consume_single_partition(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, disk_read_range);
|
||||
template <typename DataConsumeRowsContext>
|
||||
friend std::unique_ptr<DataConsumeRowsContext>
|
||||
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&);
|
||||
data_consume_rows(const schema&, shared_sstable, typename DataConsumeRowsContext::consumer&, sstable::integrity_check);
|
||||
friend void lw_shared_ptr_deleter<sstables::sstable>::dispose(sstable* s);
|
||||
gc_clock::time_point get_gc_before_for_drop_estimation(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const;
|
||||
gc_clock::time_point get_gc_before_for_fully_expire(const gc_clock::time_point& compaction_time, const tombstone_gc_state& gc_state, const schema_ptr& s) const;
|
||||
|
||||
future<uint32_t> read_digest();
|
||||
future<checksum> read_checksum();
|
||||
future<lw_shared_ptr<checksum>> read_checksum();
|
||||
};
|
||||
|
||||
// Validate checksums
|
||||
@@ -1022,9 +1032,17 @@ public:
|
||||
// This method validates both the full checksum and the per-chunk checksum
|
||||
// for the entire Data.db.
|
||||
//
|
||||
// Returns true if all checksums are valid.
|
||||
// Returns `valid` if all checksums are valid.
|
||||
// Returns `invalid` if at least one checksum is invalid.
|
||||
// Returns `no_checksum` if the sstable is uncompressed and does not have
|
||||
// a CRC component (CRC.db is missing from TOC.txt).
|
||||
// Validation errors are logged individually.
|
||||
future<bool> validate_checksums(shared_sstable sst, reader_permit permit);
|
||||
enum class validate_checksums_result {
|
||||
invalid = 0,
|
||||
valid = 1,
|
||||
no_checksum = 2
|
||||
};
|
||||
future<validate_checksums_result> validate_checksums(shared_sstable sst, reader_permit permit);
|
||||
|
||||
struct index_sampling_state {
|
||||
static constexpr size_t default_summary_byte_cost = 2000;
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
#include "disk_types.hh"
|
||||
#include <seastar/core/enum.hh>
|
||||
#include <seastar/core/weak_ptr.hh>
|
||||
#include "bytes.hh"
|
||||
#include "gc_clock.hh"
|
||||
#include "locator/host_id.hh"
|
||||
@@ -616,10 +617,20 @@ struct scylla_metadata {
|
||||
static constexpr int DEFAULT_CHUNK_SIZE = 65536;
|
||||
|
||||
// checksums are generated using adler32 algorithm.
|
||||
struct checksum {
|
||||
struct checksum : public weakly_referencable<checksum>, enable_lw_shared_from_this<checksum> {
|
||||
uint32_t chunk_size;
|
||||
utils::chunked_vector<uint32_t> checksums;
|
||||
|
||||
checksum()
|
||||
: chunk_size(0)
|
||||
, checksums()
|
||||
{}
|
||||
|
||||
explicit checksum(uint32_t chunk_size, utils::chunked_vector<uint32_t> checksums)
|
||||
: chunk_size(chunk_size)
|
||||
, checksums(std::move(checksums))
|
||||
{}
|
||||
|
||||
template <typename Describer>
|
||||
auto describe_type(sstable_version_types v, Describer f) { return f(chunk_size, checksums); }
|
||||
};
|
||||
|
||||
@@ -146,7 +146,7 @@ class checksummed_file_writer : public file_writer {
|
||||
public:
|
||||
checksummed_file_writer(data_sink out, size_t buffer_size, sstring filename)
|
||||
: file_writer(make_checksummed_file_output_stream<ChecksumType>(std::move(out), _c, _full_checksum), std::move(filename))
|
||||
, _c({uint32_t(std::min(size_t(DEFAULT_CHUNK_SIZE), buffer_size))})
|
||||
, _c(uint32_t(std::min(size_t(DEFAULT_CHUNK_SIZE), buffer_size)), {})
|
||||
, _full_checksum(ChecksumType::init_checksum()) {}
|
||||
|
||||
// Since we are exposing a reference to _full_checksum, we delete the move
|
||||
|
||||
@@ -76,6 +76,7 @@ public:
|
||||
virtual std::vector<data_type> clustering_key_columns(std::mt19937& engine) override { return _underlying_spec->clustering_key_columns(engine); }
|
||||
virtual std::vector<data_type> regular_columns(std::mt19937& engine) override { return _underlying_spec->regular_columns(engine); }
|
||||
virtual std::vector<data_type> static_columns(std::mt19937& engine) override { return _underlying_spec->static_columns(engine); }
|
||||
virtual compress_sstable& compress() override { return _underlying_spec->compress(); };
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
@@ -66,6 +66,7 @@
|
||||
#include "test/lib/random_utils.hh"
|
||||
#include "test/lib/key_utils.hh"
|
||||
#include "test/lib/test_utils.hh"
|
||||
#include "test/lib/eventually.hh"
|
||||
#include "readers/from_mutations_v2.hh"
|
||||
#include "readers/from_fragments_v2.hh"
|
||||
#include "readers/combined.hh"
|
||||
@@ -2013,6 +2014,7 @@ static void verify_fragments(std::vector<sstables::shared_sstable> ssts, reader_
|
||||
r.produces_end_of_stream();
|
||||
};
|
||||
|
||||
using compress_sstable = tests::random_schema_specification::compress_sstable;
|
||||
|
||||
// A framework for scrub-related tests.
|
||||
// Lives in a seastar thread
|
||||
@@ -2027,14 +2029,15 @@ private:
|
||||
tests::random_schema _random_schema;
|
||||
|
||||
public:
|
||||
scrub_test_framework()
|
||||
scrub_test_framework(compress_sstable compress)
|
||||
: _seed(tests::random::get_int<uint32_t>())
|
||||
, _random_schema_spec(tests::make_random_schema_specification(
|
||||
"scrub_test_framework",
|
||||
std::uniform_int_distribution<size_t>(2, 4),
|
||||
std::uniform_int_distribution<size_t>(2, 4),
|
||||
std::uniform_int_distribution<size_t>(2, 8),
|
||||
std::uniform_int_distribution<size_t>(2, 8)))
|
||||
std::uniform_int_distribution<size_t>(2, 8),
|
||||
compress))
|
||||
, _random_schema(_seed, *_random_schema_spec)
|
||||
{
|
||||
_env.start().get();
|
||||
@@ -2097,8 +2100,8 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test) {
|
||||
scrub_test_framework test;
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_content) {
|
||||
scrub_test_framework test(compress_sstable::yes);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
@@ -2113,19 +2116,56 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test) {
|
||||
BOOST_REQUIRE(sstables.size() == 1);
|
||||
auto sst = sstables.front();
|
||||
|
||||
// No way to really test validation besides observing the log messages.
|
||||
sstables::compaction_type_options::scrub opts = {
|
||||
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
|
||||
};
|
||||
table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_GT(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file) {
|
||||
scrub_test_framework test(compress_sstable::yes);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
auto muts = tests::generate_random_mutations(
|
||||
test.random_schema(),
|
||||
tests::uncompactible_timestamp_generator(test.seed()),
|
||||
tests::no_expiry_expiry_generator(),
|
||||
std::uniform_int_distribution<size_t>(10, 10)).get();
|
||||
|
||||
test.run(schema, muts, [] (table_for_tests& table, compaction::table_state& ts, std::vector<sstables::shared_sstable> sstables) {
|
||||
BOOST_REQUIRE(sstables.size() == 1);
|
||||
auto sst = sstables.front();
|
||||
|
||||
// Corrupt the data to cause an invalid checksum.
|
||||
auto f = open_file_dma(sstables::test(sst).filename(component_type::Data).native(), open_flags::wo).get();
|
||||
const auto wbuf_align = f.memory_dma_alignment();
|
||||
const auto wbuf_len = f.disk_write_dma_alignment();
|
||||
auto wbuf = seastar::temporary_buffer<char>::aligned(wbuf_align, wbuf_len);
|
||||
std::fill(wbuf.get_write(), wbuf.get_write() + wbuf_len, 0xba);
|
||||
f.dma_write(0, wbuf.get(), wbuf_len).get();
|
||||
f.close().get();
|
||||
|
||||
sstables::compaction_type_options::scrub opts = {
|
||||
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
|
||||
};
|
||||
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_GT(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_valid_sstable) {
|
||||
scrub_test_framework test;
|
||||
scrub_test_framework test(compress_sstable::yes);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
@@ -2135,15 +2175,160 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_valid_sstable) {
|
||||
BOOST_REQUIRE(sstables.size() == 1);
|
||||
auto sst = sstables.front();
|
||||
|
||||
// No way to really test validation besides observing the log messages.
|
||||
sstables::compaction_type_options::scrub opts = {
|
||||
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
|
||||
};
|
||||
table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_EQUAL(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(!sst->is_quarantined());
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_content_uncompressed) {
|
||||
scrub_test_framework test(compress_sstable::no);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
auto muts = tests::generate_random_mutations(
|
||||
test.random_schema(),
|
||||
tests::uncompactible_timestamp_generator(test.seed()),
|
||||
tests::no_expiry_expiry_generator(),
|
||||
std::uniform_int_distribution<size_t>(10, 10)).get();
|
||||
std::swap(*muts.begin(), *(muts.begin() + 1));
|
||||
|
||||
test.run(schema, muts, [] (table_for_tests& table, compaction::table_state& ts, std::vector<sstables::shared_sstable> sstables) {
|
||||
BOOST_REQUIRE(sstables.size() == 1);
|
||||
auto sst = sstables.front();
|
||||
|
||||
sstables::compaction_type_options::scrub opts = {
|
||||
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
|
||||
};
|
||||
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_GT(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_corrupted_file_uncompressed) {
|
||||
scrub_test_framework test(compress_sstable::no);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
auto muts = tests::generate_random_mutations(
|
||||
test.random_schema(),
|
||||
tests::uncompactible_timestamp_generator(test.seed()),
|
||||
tests::no_expiry_expiry_generator(),
|
||||
std::uniform_int_distribution<size_t>(10, 10)).get();
|
||||
|
||||
test.run(schema, muts, [] (table_for_tests& table, compaction::table_state& ts, std::vector<sstables::shared_sstable> sstables) {
|
||||
BOOST_REQUIRE(sstables.size() == 1);
|
||||
auto sst = sstables.front();
|
||||
|
||||
// Corrupt the data to cause an invalid checksum.
|
||||
auto f = open_file_dma(sstables::test(sst).filename(component_type::Data).native(), open_flags::wo).get();
|
||||
const auto wbuf_align = f.memory_dma_alignment();
|
||||
const auto wbuf_len = f.disk_write_dma_alignment();
|
||||
auto wbuf = seastar::temporary_buffer<char>::aligned(wbuf_align, wbuf_len);
|
||||
std::fill(wbuf.get_write(), wbuf.get_write() + wbuf_len, 0xba);
|
||||
f.dma_write(0, wbuf.get(), wbuf_len).get();
|
||||
f.close().get();
|
||||
|
||||
sstables::compaction_type_options::scrub opts = {
|
||||
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
|
||||
};
|
||||
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_GT(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(sst->is_quarantined());
|
||||
BOOST_REQUIRE(in_strategy_sstables(ts).empty());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_valid_sstable_uncompressed) {
|
||||
scrub_test_framework test(compress_sstable::no);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
auto muts = tests::generate_random_mutations(test.random_schema()).get();
|
||||
|
||||
test.run(schema, muts, [] (table_for_tests& table, compaction::table_state& ts, std::vector<sstables::shared_sstable> sstables) {
|
||||
BOOST_REQUIRE(sstables.size() == 1);
|
||||
auto sst = sstables.front();
|
||||
|
||||
sstables::compaction_type_options::scrub opts = {
|
||||
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
|
||||
};
|
||||
auto stats = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{}).get();
|
||||
|
||||
BOOST_REQUIRE(stats.has_value());
|
||||
BOOST_REQUIRE_EQUAL(stats->validation_errors, 0);
|
||||
BOOST_REQUIRE(!sst->is_quarantined());
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
|
||||
BOOST_REQUIRE(!sst->get_checksum());
|
||||
});
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_validate_mode_test_multiple_instances_uncompressed) {
|
||||
#ifndef SCYLLA_ENABLE_ERROR_INJECTION
|
||||
fmt::print("Skipping test as it depends on error injection. Please run in mode where it's enabled (debug,dev).\n");
|
||||
return;
|
||||
#endif
|
||||
scrub_test_framework test(compress_sstable::no);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
auto muts = tests::generate_random_mutations(test.random_schema()).get();
|
||||
|
||||
test.run(schema, muts, [] (table_for_tests& table, compaction::table_state& ts, std::vector<sstables::shared_sstable> sstables) {
|
||||
BOOST_REQUIRE(sstables.size() == 1);
|
||||
auto sst = sstables.front();
|
||||
|
||||
sstables::compaction_type_options::scrub opts = {
|
||||
.operation_mode = sstables::compaction_type_options::scrub::mode::validate,
|
||||
};
|
||||
|
||||
utils::get_local_injector().enable("sstable_validate/pause");
|
||||
|
||||
auto scrub1 = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{});
|
||||
BOOST_REQUIRE(eventually_true([sst] {
|
||||
auto checksum = sst->get_checksum();
|
||||
return checksum != nullptr;
|
||||
}));
|
||||
auto checksum1 = sst->get_checksum();
|
||||
|
||||
auto scrub2 = table->get_compaction_manager().perform_sstable_scrub(ts, opts, tasks::task_info{});
|
||||
BOOST_REQUIRE(eventually_true([sst] {
|
||||
auto checksum = sst->get_checksum();
|
||||
return checksum != nullptr;
|
||||
}));
|
||||
auto checksum2 = sst->get_checksum();
|
||||
|
||||
// Scrub instances use the same checksum component.
|
||||
BOOST_REQUIRE(checksum1);
|
||||
BOOST_REQUIRE(checksum2);
|
||||
BOOST_REQUIRE(checksum1 == checksum2);
|
||||
checksum1.release();
|
||||
checksum2.release();
|
||||
|
||||
utils::get_local_injector().receive_message("sstable_validate/pause");
|
||||
when_all_succeed(std::move(scrub1), std::move(scrub2)).get();
|
||||
|
||||
BOOST_REQUIRE(!sst->is_quarantined());
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).size(), 1);
|
||||
BOOST_REQUIRE_EQUAL(in_strategy_sstables(ts).front(), sst);
|
||||
// Checksum component released after scrub instances terminate.
|
||||
BOOST_REQUIRE(sst->get_checksum() == nullptr);
|
||||
|
||||
utils::get_local_injector().disable("sstable_validate/pause");
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2274,8 +2459,8 @@ SEASTAR_TEST_CASE(sstable_validate_test) {
|
||||
f.dma_write(0, wbuf.get(), wbuf_len).get();
|
||||
f.close().get();
|
||||
|
||||
auto valid = sstables::validate_checksums(sst, permit).get();
|
||||
BOOST_REQUIRE_EQUAL(valid, false);
|
||||
auto res = sstables::validate_checksums(sst, permit).get();
|
||||
BOOST_REQUIRE(res == validate_checksums_result::invalid);
|
||||
|
||||
|
||||
const auto errors = sst->validate(permit, abort, error_handler{count}).get();
|
||||
@@ -2286,7 +2471,7 @@ SEASTAR_TEST_CASE(sstable_validate_test) {
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_abort_mode_test) {
|
||||
scrub_test_framework test;
|
||||
scrub_test_framework test(compress_sstable::yes);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
@@ -2310,7 +2495,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_abort_mode_test) {
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_skip_mode_test) {
|
||||
scrub_test_framework test;
|
||||
scrub_test_framework test(compress_sstable::yes);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
@@ -2358,7 +2543,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_skip_mode_test) {
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_segregate_mode_test) {
|
||||
scrub_test_framework test;
|
||||
scrub_test_framework test(compress_sstable::yes);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
@@ -2398,7 +2583,7 @@ SEASTAR_THREAD_TEST_CASE(sstable_scrub_segregate_mode_test) {
|
||||
}
|
||||
|
||||
SEASTAR_THREAD_TEST_CASE(sstable_scrub_quarantine_mode_test) {
|
||||
scrub_test_framework test;
|
||||
scrub_test_framework test(compress_sstable::yes);
|
||||
|
||||
auto schema = test.schema();
|
||||
|
||||
|
||||
@@ -2812,12 +2812,12 @@ SEASTAR_TEST_CASE(test_validate_checksums) {
|
||||
|
||||
sst->load(sst->get_schema()->get_sharder()).get();
|
||||
|
||||
bool valid;
|
||||
validate_checksums_result res;
|
||||
|
||||
testlog.info("Validating intact {}", sst->get_filename());
|
||||
|
||||
valid = sstables::validate_checksums(sst, permit).get();
|
||||
BOOST_REQUIRE(valid);
|
||||
res = sstables::validate_checksums(sst, permit).get();
|
||||
BOOST_REQUIRE(res == validate_checksums_result::valid);
|
||||
|
||||
auto sst_file = open_file_dma(test(sst).filename(sstables::component_type::Data).native(), open_flags::wo).get();
|
||||
auto close_sst_file = defer([&sst_file] { sst_file.close().get(); });
|
||||
@@ -2831,8 +2831,8 @@ SEASTAR_TEST_CASE(test_validate_checksums) {
|
||||
sst_file.dma_write(sst->ondisk_data_size() / 2, buf.begin(), buf.size()).get();
|
||||
}
|
||||
|
||||
valid = sstables::validate_checksums(sst, permit).get();
|
||||
BOOST_REQUIRE(!valid);
|
||||
res = sstables::validate_checksums(sst, permit).get();
|
||||
BOOST_REQUIRE(res == validate_checksums_result::invalid);
|
||||
|
||||
testlog.info("Validating truncated {}", sst->get_filename());
|
||||
|
||||
@@ -2840,8 +2840,15 @@ SEASTAR_TEST_CASE(test_validate_checksums) {
|
||||
sst_file.truncate(sst->ondisk_data_size() / 2).get();
|
||||
}
|
||||
|
||||
valid = sstables::validate_checksums(sst, permit).get();
|
||||
BOOST_REQUIRE(!valid);
|
||||
res = sstables::validate_checksums(sst, permit).get();
|
||||
BOOST_REQUIRE(res == validate_checksums_result::invalid);
|
||||
|
||||
if (compression_params == no_compression_params) {
|
||||
testlog.info("Validating with no checksums {}", sst->get_filename());
|
||||
sstables::test(sst).rewrite_toc_without_component(component_type::CRC);
|
||||
auto res = sstables::validate_checksums(sst, permit).get();
|
||||
BOOST_REQUIRE(res == validate_checksums_result::no_checksum);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
@@ -85,7 +85,7 @@ template <typename... Args>
|
||||
sstables::shared_sstable
|
||||
make_sstable_for_all_shards(replica::table& table, sstables::sstable_state state, sstables::generation_type generation) {
|
||||
// Unlike the previous helper, we'll assume we're in a thread here. It's less flexible
|
||||
// but the users are usually in a thread, and rewrite_toc_without_scylla_component requires
|
||||
// but the users are usually in a thread, and rewrite_toc_without_component requires
|
||||
// a thread. We could fix that, but deferring that for now.
|
||||
auto s = table.schema();
|
||||
auto mt = make_lw_shared<replica::memtable>(s);
|
||||
@@ -102,7 +102,7 @@ make_sstable_for_all_shards(replica::table& table, sstables::sstable_state state
|
||||
// it came from Cassandra
|
||||
testlog.debug("make_sstable_for_all_shards: {}: rewriting TOC", sst->get_filename());
|
||||
sstables::test(sst).remove_component(sstables::component_type::Scylla).get();
|
||||
sstables::test(sst).rewrite_toc_without_scylla_component();
|
||||
sstables::test(sst).rewrite_toc_without_component(sstables::component_type::Scylla);
|
||||
return sst;
|
||||
}
|
||||
|
||||
|
||||
@@ -132,6 +132,7 @@ class default_random_schema_specification : public random_schema_specification {
|
||||
std::uniform_int_distribution<size_t> _regular_column_count_dist;
|
||||
std::uniform_int_distribution<size_t> _static_column_count_dist;
|
||||
type_generator _type_generator;
|
||||
compress_sstable _compress;
|
||||
|
||||
private:
|
||||
static unsigned generate_unique_id(std::mt19937& engine, std::unordered_set<unsigned>& used_ids) {
|
||||
@@ -172,13 +173,15 @@ public:
|
||||
std::uniform_int_distribution<size_t> partition_column_count_dist,
|
||||
std::uniform_int_distribution<size_t> clustering_column_count_dist,
|
||||
std::uniform_int_distribution<size_t> regular_column_count_dist,
|
||||
std::uniform_int_distribution<size_t> static_column_count_dist)
|
||||
std::uniform_int_distribution<size_t> static_column_count_dist,
|
||||
compress_sstable compress)
|
||||
: random_schema_specification(std::move(keyspace_name))
|
||||
, _partition_column_count_dist(partition_column_count_dist)
|
||||
, _clustering_column_count_dist(clustering_column_count_dist)
|
||||
, _regular_column_count_dist(regular_column_count_dist)
|
||||
, _static_column_count_dist(static_column_count_dist)
|
||||
, _type_generator(*this) {
|
||||
, _type_generator(*this)
|
||||
, _compress(compress) {
|
||||
SCYLLA_ASSERT(_partition_column_count_dist.a() > 0);
|
||||
}
|
||||
virtual sstring table_name(std::mt19937& engine) override {
|
||||
@@ -199,6 +202,9 @@ public:
|
||||
virtual std::vector<data_type> static_columns(std::mt19937& engine) override {
|
||||
return generate_types(engine, _static_column_count_dist, type_generator::is_multi_cell::yes, false);
|
||||
}
|
||||
virtual compress_sstable& compress() override {
|
||||
return _compress;
|
||||
}
|
||||
};
|
||||
|
||||
} // anonymous namespace
|
||||
@@ -208,9 +214,10 @@ std::unique_ptr<random_schema_specification> make_random_schema_specification(
|
||||
std::uniform_int_distribution<size_t> partition_column_count_dist,
|
||||
std::uniform_int_distribution<size_t> clustering_column_count_dist,
|
||||
std::uniform_int_distribution<size_t> regular_column_count_dist,
|
||||
std::uniform_int_distribution<size_t> static_column_count_dist) {
|
||||
std::uniform_int_distribution<size_t> static_column_count_dist,
|
||||
random_schema_specification::compress_sstable compress) {
|
||||
return std::make_unique<default_random_schema_specification>(std::move(keyspace_name), partition_column_count_dist, clustering_column_count_dist,
|
||||
regular_column_count_dist, static_column_count_dist);
|
||||
regular_column_count_dist, static_column_count_dist, compress);
|
||||
}
|
||||
|
||||
namespace {
|
||||
@@ -830,6 +837,9 @@ schema_ptr build_random_schema(uint32_t seed, random_schema_specification& spec)
|
||||
builder.with_column(to_bytes(format("v{}", r)), std::move(regular_columns[r]), column_kind::regular_column);
|
||||
}
|
||||
|
||||
if (spec.compress() == random_schema_specification::compress_sstable::no) {
|
||||
builder.set_compressor_params(compression_parameters::no_compression());
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,9 @@ class cql_test_env;
|
||||
namespace tests {
|
||||
|
||||
class random_schema_specification {
|
||||
public:
|
||||
using compress_sstable = bool_class<class compress_sstable_tag>;
|
||||
private:
|
||||
sstring _keyspace_name;
|
||||
public:
|
||||
explicit random_schema_specification(sstring keyspace_name) : _keyspace_name(std::move(keyspace_name)) { }
|
||||
@@ -34,6 +37,7 @@ public:
|
||||
virtual std::vector<data_type> clustering_key_columns(std::mt19937& engine) = 0;
|
||||
virtual std::vector<data_type> regular_columns(std::mt19937& engine) = 0;
|
||||
virtual std::vector<data_type> static_columns(std::mt19937& engine) = 0;
|
||||
virtual compress_sstable& compress() = 0;
|
||||
};
|
||||
|
||||
/// Helper class that can generate a subset of all valid combination of types.
|
||||
@@ -68,7 +72,8 @@ std::unique_ptr<random_schema_specification> make_random_schema_specification(
|
||||
std::uniform_int_distribution<size_t> partition_column_count_dist = std::uniform_int_distribution<size_t>(1, 4),
|
||||
std::uniform_int_distribution<size_t> clustering_column_count_dist = std::uniform_int_distribution<size_t>(0, 4),
|
||||
std::uniform_int_distribution<size_t> regular_column_count_dist = std::uniform_int_distribution<size_t>(1, 4),
|
||||
std::uniform_int_distribution<size_t> static_column_count_dist = std::uniform_int_distribution<size_t>(0, 4));
|
||||
std::uniform_int_distribution<size_t> static_column_count_dist = std::uniform_int_distribution<size_t>(0, 4),
|
||||
random_schema_specification::compress_sstable compress = random_schema_specification::compress_sstable::yes);
|
||||
|
||||
/// Generate values for any type.
|
||||
///
|
||||
|
||||
@@ -184,8 +184,9 @@ public:
|
||||
_sst->_shards.push_back(this_shard_id());
|
||||
}
|
||||
|
||||
void rewrite_toc_without_scylla_component() {
|
||||
_sst->_recognized_components.erase(component_type::Scylla);
|
||||
void rewrite_toc_without_component(component_type component) {
|
||||
SCYLLA_ASSERT(component != component_type::TOC);
|
||||
_sst->_recognized_components.erase(component);
|
||||
remove_file(_sst->filename(component_type::TOC)).get();
|
||||
_sst->_storage->open(*_sst);
|
||||
_sst->seal_sstable(false).get();
|
||||
|
||||
@@ -1576,9 +1576,26 @@ void validate_checksums_operation(schema_ptr schema, reader_permit permit, const
|
||||
json_writer writer(json_output_stream);
|
||||
writer.StartStream();
|
||||
for (auto& sst : sstables) {
|
||||
const auto valid = sstables::validate_checksums(sst, permit).get();
|
||||
const auto res = sstables::validate_checksums(sst, permit).get();
|
||||
writer.Key(sst->get_filename());
|
||||
writer.Bool(valid);
|
||||
writer.StartObject();
|
||||
writer.Key("has_checksums");
|
||||
switch (res) {
|
||||
case validate_checksums_result::valid:
|
||||
writer.Bool(true);
|
||||
writer.Key("valid_checksums");
|
||||
writer.Bool(true);
|
||||
break;
|
||||
case validate_checksums_result::invalid:
|
||||
writer.Bool(true);
|
||||
writer.Key("valid_checksums");
|
||||
writer.Bool(false);
|
||||
break;
|
||||
case validate_checksums_result::no_checksum:
|
||||
writer.Bool(false);
|
||||
break;
|
||||
}
|
||||
writer.EndObject();
|
||||
}
|
||||
writer.EndStream();
|
||||
fmt::print(std::cout, "{}", json_output_stream.view());
|
||||
@@ -2874,8 +2891,7 @@ for more information on this operation, including what the different modes do.
|
||||
{{"validate-checksums",
|
||||
"Validate the checksums of the sstable(s)",
|
||||
R"(
|
||||
Validate both the whole-file and the per-chunk checksums checksums of the data
|
||||
component.
|
||||
Validate both the whole-file and the per-chunk checksums of the data component.
|
||||
|
||||
See https://docs.scylladb.com/operating-scylla/admin-tools/scylla-sstable#validate-checksums
|
||||
for more information on this operation.
|
||||
|
||||
Reference in New Issue
Block a user