Implement compressed_raw_file_data_source that streams compressed chunks without decompression while verifying checksums and calculating digests. Extends raw_stream enum to support compressed_chunks mode. This data_source implementation will be used in the next commits for file based streaming.
737 lines
32 KiB
C++
737 lines
32 KiB
C++
/*
|
|
* Copyright (C) 2015-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include <stdexcept>
|
|
#include <cstdlib>
|
|
|
|
#include <seastar/core/align.hh>
|
|
#include <seastar/core/bitops.hh>
|
|
#include <seastar/core/byteorder.hh>
|
|
#include <seastar/core/fstream.hh>
|
|
#include <seastar/core/on_internal_error.hh>
|
|
|
|
#include "compress.hh"
|
|
#include "compressor.hh"
|
|
#include "exceptions.hh"
|
|
#include "unimplemented.hh"
|
|
#include "segmented_compress_params.hh"
|
|
#include "utils/assert.hh"
|
|
#include "utils/class_registrator.hh"
|
|
#include "reader_permit.hh"
|
|
#include "data_source_types.hh"
|
|
|
|
namespace sstables {
|
|
|
|
extern logging::logger sstlog;
|
|
|
|
enum class mask_type : uint8_t {
|
|
set,
|
|
clear
|
|
};
|
|
|
|
// size_bits cannot be >= 64
|
|
static inline uint64_t make_mask(uint8_t size_bits, uint8_t offset, mask_type t) noexcept {
|
|
const uint64_t mask = ((1 << size_bits) - 1) << offset;
|
|
return t == mask_type::set ? mask : ~mask;
|
|
}
|
|
|
|
/*
|
|
* ----> memory addresses
|
|
* MSB LSB
|
|
* | | | | |3|2|1|0| CPU integer (big or little endian byte order)
|
|
* -------
|
|
* |
|
|
* +-----+ << shift = prefix bits
|
|
* |
|
|
* -------
|
|
* 7 6 5 4 3 2 1 0 index*
|
|
* | |3|2|1|0| | | | raw storage (unaligned, little endian byte order)
|
|
* = ------- =====
|
|
* | |
|
|
* | +-> prefix bits
|
|
* +-> suffix bits
|
|
*
|
|
* |0|1|1|1|1|0|0|0| read/write mask
|
|
*
|
|
* * On big endian systems the indices in storage are reversed and
|
|
* run left to right: 0 1 .. 6 7. To avoid differences in the
|
|
* encoding logic on machines with different native byte orders
|
|
* reads and writes to storage must be explicitly little endian.
|
|
*/
|
|
struct bit_displacement {
|
|
uint64_t shift;
|
|
uint64_t mask;
|
|
};
|
|
|
|
inline bit_displacement displacement_for(uint64_t prefix_bits, uint8_t size_bits, mask_type t) {
|
|
return {prefix_bits, make_mask(size_bits, prefix_bits, t)};
|
|
}
|
|
|
|
std::pair<bucket_info, segment_info> params_for_chunk_size(uint32_t chunk_size) {
|
|
const uint8_t chunk_size_log2 = log2ceil(chunk_size);
|
|
|
|
auto it = std::ranges::find_if(bucket_infos, [&] (const bucket_info& bi) {
|
|
return bi.chunk_size_log2 == chunk_size_log2;
|
|
});
|
|
|
|
// This scenario should be so rare that we only fall back to a safe
|
|
// set of parameters, not optimal ones.
|
|
if (it == bucket_infos.end()) {
|
|
const uint8_t data_size = bucket_infos.front().best_data_size_log2;
|
|
return {{chunk_size_log2, data_size, (8 * bucket_size - 56) / data_size},
|
|
{chunk_size_log2, data_size, uint8_t(1)}};
|
|
}
|
|
|
|
auto b = *it;
|
|
auto s = *std::ranges::find_if(segment_infos, [&] (const segment_info& si) {
|
|
return si.data_size_log2 == b.best_data_size_log2 && si.chunk_size_log2 == b.chunk_size_log2;
|
|
});
|
|
|
|
return {std::move(b), std::move(s)};
|
|
}
|
|
|
|
uint64_t compression::segmented_offsets::read(uint64_t bucket_index, uint64_t offset_bits, uint64_t size_bits) const {
|
|
const uint64_t offset_byte = offset_bits / 8;
|
|
uint64_t value = seastar::read_le<uint64_t>(_storage[bucket_index].storage.get() + offset_byte);
|
|
|
|
const auto displacement = displacement_for(offset_bits % 8, size_bits, mask_type::set);
|
|
|
|
value &= displacement.mask;
|
|
value >>= displacement.shift;
|
|
|
|
return value;
|
|
}
|
|
|
|
void compression::segmented_offsets::write(uint64_t bucket_index, uint64_t offset_bits, uint64_t size_bits, uint64_t value) {
|
|
const uint64_t offset_byte = offset_bits / 8;
|
|
|
|
uint64_t old_value = seastar::read_le<uint64_t>(_storage[bucket_index].storage.get() + offset_byte);
|
|
|
|
const auto displacement = displacement_for(offset_bits % 8, size_bits, mask_type::clear);
|
|
|
|
value <<= displacement.shift;
|
|
|
|
if ((~displacement.mask | value) != ~displacement.mask) {
|
|
throw std::invalid_argument(format("{}: to-be-written value would overflow the allocated bits", __FUNCTION__));
|
|
}
|
|
|
|
old_value &= displacement.mask;
|
|
value |= old_value;
|
|
|
|
seastar::write_le(_storage[bucket_index].storage.get() + offset_byte, value);
|
|
}
|
|
|
|
void compression::segmented_offsets::state::update_position_trackers(std::size_t index, uint16_t segment_size_bits,
|
|
uint32_t segments_per_bucket, uint8_t grouped_offsets) {
|
|
if (_current_index != index - 1) {
|
|
_current_index = index;
|
|
const uint64_t current_segment_index = _current_index / grouped_offsets;
|
|
_current_bucket_segment_index = current_segment_index % segments_per_bucket;
|
|
_current_segment_relative_index = _current_index % grouped_offsets;
|
|
_current_bucket_index = current_segment_index / segments_per_bucket;
|
|
_current_segment_offset_bits = (_current_bucket_segment_index % segments_per_bucket) * segment_size_bits;
|
|
} else {
|
|
++_current_index;
|
|
++_current_segment_relative_index;
|
|
|
|
// Crossed segment boundary.
|
|
if (_current_segment_relative_index == grouped_offsets) {
|
|
++_current_bucket_segment_index;
|
|
_current_segment_relative_index = 0;
|
|
|
|
// Crossed bucket boundary.
|
|
if (_current_bucket_segment_index == segments_per_bucket) {
|
|
++_current_bucket_index;
|
|
_current_bucket_segment_index = 0;
|
|
_current_segment_offset_bits = 0;
|
|
} else {
|
|
_current_segment_offset_bits += segment_size_bits;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void compression::segmented_offsets::init(uint32_t chunk_size) {
|
|
if (chunk_size == 0) {
|
|
throw sstables::malformed_sstable_exception("Segmented offsets chunk size is zero.");
|
|
}
|
|
|
|
_chunk_size = chunk_size;
|
|
|
|
const auto params = params_for_chunk_size(chunk_size);
|
|
|
|
sstlog.trace(
|
|
"{} {}(): chunk size {} (log2)",
|
|
fmt::ptr(this),
|
|
__FUNCTION__,
|
|
static_cast<int>(params.first.chunk_size_log2));
|
|
|
|
_grouped_offsets = params.second.grouped_offsets;
|
|
_segment_base_offset_size_bits = params.second.data_size_log2;
|
|
_segmented_offset_size_bits = static_cast<uint64_t>(log2ceil((_chunk_size + 64) * (_grouped_offsets - 1)));
|
|
_segment_size_bits = _segment_base_offset_size_bits + (_grouped_offsets - 1) * _segmented_offset_size_bits;
|
|
_segments_per_bucket = params.first.segments_per_bucket;
|
|
}
|
|
|
|
uint64_t compression::segmented_offsets::at(std::size_t i, compression::segmented_offsets::state& s) const {
|
|
if (i >= _size) {
|
|
throw std::out_of_range(format("{}: index {} is out of range", __FUNCTION__, i));
|
|
}
|
|
|
|
s.update_position_trackers(i, _segment_size_bits, _segments_per_bucket, _grouped_offsets);
|
|
const uint64_t bucket_base_offset = _storage[s._current_bucket_index].base_offset;
|
|
const uint64_t segment_base_offset = bucket_base_offset + read(s._current_bucket_index, s._current_segment_offset_bits, _segment_base_offset_size_bits);
|
|
|
|
if (s._current_segment_relative_index == 0) {
|
|
return segment_base_offset;
|
|
}
|
|
|
|
return segment_base_offset
|
|
+ read(s._current_bucket_index,
|
|
s._current_segment_offset_bits + _segment_base_offset_size_bits + (s._current_segment_relative_index - 1) * _segmented_offset_size_bits,
|
|
_segmented_offset_size_bits);
|
|
}
|
|
|
|
void compression::segmented_offsets::push_back(uint64_t offset, compression::segmented_offsets::state& s) {
|
|
s.update_position_trackers(_size, _segment_size_bits, _segments_per_bucket, _grouped_offsets);
|
|
|
|
if (s._current_bucket_index == _storage.size()) {
|
|
_storage.push_back(bucket{_last_written_offset, std::unique_ptr<char[]>(new char[bucket_size])});
|
|
}
|
|
|
|
const uint64_t bucket_base_offset = _storage[s._current_bucket_index].base_offset;
|
|
|
|
if (s._current_segment_relative_index == 0) {
|
|
write(s._current_bucket_index, s._current_segment_offset_bits, _segment_base_offset_size_bits, offset - bucket_base_offset);
|
|
} else {
|
|
const uint64_t segment_base_offset = bucket_base_offset + read(s._current_bucket_index, s._current_segment_offset_bits, _segment_base_offset_size_bits);
|
|
write(s._current_bucket_index,
|
|
s._current_segment_offset_bits + _segment_base_offset_size_bits + (s._current_segment_relative_index - 1) * _segmented_offset_size_bits,
|
|
_segmented_offset_size_bits,
|
|
offset - segment_base_offset);
|
|
}
|
|
_last_written_offset = offset;
|
|
++_size;
|
|
}
|
|
|
|
void compression::set_compressor(compressor_ptr c) {
|
|
if (c) {
|
|
unqualified_name uqn(compression_parameters::name_prefix, c->name());
|
|
const sstring& cn = uqn;
|
|
name.value = bytes(cn.begin(), cn.end());
|
|
for (auto& [k, v] : c->options()) {
|
|
if (k != compression_parameters::SSTABLE_COMPRESSION) {
|
|
options.elements.push_back({
|
|
{bytes(k.begin(), k.end())},
|
|
{bytes(v.begin(), v.end())}
|
|
});
|
|
}
|
|
}
|
|
}
|
|
_compressor = std::move(c);
|
|
}
|
|
|
|
void compression::discard_hidden_options() {
|
|
auto is_hidden_option = [] (const option& o) -> bool {
|
|
auto k_str = std::string_view(reinterpret_cast<const char*>(o.key.value.data()), o.key.value.size());
|
|
return compressor::is_hidden_option_name(k_str);
|
|
};
|
|
decltype(options) filtered_options;
|
|
for (auto& e : options.elements) {
|
|
if (!is_hidden_option(e)) {
|
|
filtered_options.elements.emplace_back(std::move(e));
|
|
}
|
|
}
|
|
options = std::move(filtered_options);
|
|
}
|
|
|
|
compressor& compression::get_compressor() const {
|
|
SCYLLA_ASSERT(_compressor);
|
|
return *_compressor.get();
|
|
}
|
|
|
|
void compression::update(uint64_t compressed_file_length) {
|
|
_compressed_file_length = compressed_file_length;
|
|
}
|
|
|
|
// locate() takes a byte position in the uncompressed stream, and finds the
|
|
// the location of the compressed chunk on disk which contains it, and the
|
|
// offset in this chunk.
|
|
// locate() may only be used for offsets of actual bytes, and in particular
|
|
// the end-of-file position (one past the last byte) MUST not be used. If the
|
|
// caller wants to read from the end of file, it should simply read nothing.
|
|
compression::chunk_and_offset
|
|
compression::locate(uint64_t position, const compression::segmented_offsets::accessor& accessor) {
|
|
auto ucl = uncompressed_chunk_length();
|
|
auto chunk_index = position / ucl;
|
|
decltype(ucl) chunk_offset = position % ucl;
|
|
auto chunk_start = accessor.at(chunk_index);
|
|
auto chunk_end = (chunk_index + 1 == offsets.size())
|
|
? _compressed_file_length
|
|
: accessor.at(chunk_index + 1);
|
|
return { chunk_start, chunk_end - chunk_start, chunk_offset };
|
|
}
|
|
|
|
std::map<sstring, sstring> options_from_compression(const compression& c) {
|
|
std::map<sstring, sstring> result;
|
|
result.emplace(compression_parameters::SSTABLE_COMPRESSION, sstring(c.name.value.begin(), c.name.value.end()));
|
|
result.emplace(compression_parameters::CHUNK_LENGTH_KB, to_sstring(c.chunk_len / 1024));
|
|
for (const auto& [k, v] : c.options.elements) {
|
|
auto k_str = sstring(k.value.begin(), k.value.end());
|
|
auto v_str = sstring(v.value.begin(), v.value.end());
|
|
if (compressor::is_hidden_option_name(k_str)) {
|
|
continue;
|
|
}
|
|
result.emplace(std::move(k_str), std::move(v_str));
|
|
}
|
|
return result;
|
|
}
|
|
|
|
} // namespace sstables
|
|
|
|
// For SSTables 2.x (formats 'ka' and 'la'), the full checksum is a combination of checksums of compressed chunks.
|
|
// For SSTables 3.x (format 'mc'), however, it is supposed to contain the full checksum of the file written so
|
|
// the per-chunk checksums also count.
|
|
enum class compressed_checksum_mode {
|
|
checksum_chunks_only,
|
|
checksum_all,
|
|
};
|
|
|
|
template <ChecksumUtils ChecksumType, bool check_digest, compressed_checksum_mode mode>
|
|
class compressed_file_data_source_impl : public data_source_impl {
|
|
std::function<future<input_stream<char>>()> _stream_creator;
|
|
std::optional<input_stream<char>> _input_stream;
|
|
sstables::compression* _compression_metadata;
|
|
sstables::compression::segmented_offsets::accessor _offsets;
|
|
[[no_unique_address]] sstables::digest_members<check_digest> _digests;
|
|
reader_permit _permit;
|
|
uint64_t _underlying_pos;
|
|
uint64_t _pos;
|
|
uint64_t _beg_pos;
|
|
uint64_t _end_pos;
|
|
public:
|
|
compressed_file_data_source_impl(sstables::stream_creator_fn stream_creator, sstables::compression* cm,
|
|
uint64_t pos, size_t len, file_input_stream_options options,
|
|
reader_permit permit, std::optional<uint32_t> digest)
|
|
: _compression_metadata(cm)
|
|
, _offsets(_compression_metadata->offsets.get_accessor())
|
|
, _permit(std::move(permit))
|
|
{
|
|
_pos = _beg_pos = pos;
|
|
if (pos > _compression_metadata->uncompressed_file_length()) {
|
|
throw std::runtime_error("attempt to uncompress beyond end");
|
|
}
|
|
if (len == 0 || pos == _compression_metadata->uncompressed_file_length()) {
|
|
// Nothing to read
|
|
_end_pos = _pos;
|
|
return;
|
|
}
|
|
if (len <= _compression_metadata->uncompressed_file_length() - pos) {
|
|
_end_pos = pos + len;
|
|
} else {
|
|
_end_pos = _compression_metadata->uncompressed_file_length();
|
|
}
|
|
if constexpr (check_digest) {
|
|
if (!digest) {
|
|
on_internal_error(sstables::sstlog, "Requested digest check but no digest was provided.");
|
|
}
|
|
if (_end_pos - _pos < _compression_metadata->uncompressed_file_length()) {
|
|
sstables::sstlog.debug("Compressed reader cannot calculate digest with partial read: current pos={}, end pos={}, uncompressed file len={}. Disabling digest check.",
|
|
_pos, _end_pos, _compression_metadata->uncompressed_file_length());
|
|
_digests = {false};
|
|
} else {
|
|
_digests = {true, *digest, ChecksumType::init_checksum()};
|
|
}
|
|
}
|
|
// _beg_pos and _end_pos specify positions in the compressed stream.
|
|
// We need to translate them into a range of uncompressed chunks,
|
|
// and open a file_input_stream to read that range.
|
|
auto start = _compression_metadata->locate(_beg_pos, _offsets);
|
|
auto end = _compression_metadata->locate(_end_pos - 1, _offsets);
|
|
_stream_creator = [stream_creator{std::move(stream_creator)}, start = start.chunk_start, length = end.chunk_start + end.chunk_len - start.chunk_start, options] mutable {
|
|
return stream_creator(start, length, std::move(options));
|
|
};
|
|
_underlying_pos = start.chunk_start;
|
|
}
|
|
virtual future<temporary_buffer<char>> get() override {
|
|
if (_pos >= _end_pos) {
|
|
co_return temporary_buffer<char>();
|
|
}
|
|
|
|
if (!_input_stream) {
|
|
_input_stream = co_await _stream_creator();
|
|
}
|
|
auto addr = _compression_metadata->locate(_pos, _offsets);
|
|
// Uncompress the next chunk. We need to skip part of the first
|
|
// chunk, but then continue to read from beginning of chunks.
|
|
if (_pos != _beg_pos && addr.offset != 0) {
|
|
throw std::runtime_error(format("compressed reader not aligned to chunk boundary: pos={} offset={}", _pos, addr.offset));
|
|
}
|
|
if (!addr.chunk_len) {
|
|
throw sstables::malformed_sstable_exception(format("compressed chunk_len must be greater than zero, chunk_start={}", addr.chunk_start));
|
|
}
|
|
auto buf = co_await _input_stream->read_exactly(addr.chunk_len);
|
|
if (buf.size() != addr.chunk_len) {
|
|
throw sstables::malformed_sstable_exception(format("compressed reader hit premature end-of-file at file offset {}, expected chunk_len={}, actual={}", _underlying_pos, addr.chunk_len, buf.size()));
|
|
}
|
|
auto res_units = co_await _permit.request_memory(_compression_metadata->uncompressed_chunk_length());
|
|
// The last 4 bytes of the chunk are the adler32/crc32 checksum
|
|
// of the rest of the (compressed) chunk.
|
|
auto compressed_len = addr.chunk_len - 4;
|
|
// FIXME: Do not always calculate checksum - Cassandra has a
|
|
// probability (defaulting to 1.0, but still...)
|
|
auto expected_checksum = read_be<uint32_t>(buf.get() + compressed_len);
|
|
auto actual_checksum = ChecksumType::checksum(buf.get(), compressed_len);
|
|
if (expected_checksum != actual_checksum) {
|
|
throw sstables::malformed_sstable_exception(format("compressed chunk of size {} at file offset {} failed checksum, expected={}, actual={}", addr.chunk_len, _underlying_pos, expected_checksum, actual_checksum));
|
|
}
|
|
|
|
if constexpr (check_digest) {
|
|
if (_digests.can_calculate_digest) {
|
|
_digests.actual_digest = checksum_combine_or_feed<ChecksumType>(_digests.actual_digest, actual_checksum, buf.get(), compressed_len);
|
|
if constexpr (mode == compressed_checksum_mode::checksum_all) {
|
|
uint32_t be_actual_checksum = cpu_to_be(actual_checksum);
|
|
_digests.actual_digest = ChecksumType::checksum(_digests.actual_digest,
|
|
reinterpret_cast<const char*>(&be_actual_checksum), sizeof(be_actual_checksum));
|
|
}
|
|
}
|
|
}
|
|
|
|
// We know that the uncompressed data will take exactly
|
|
// chunk_length bytes (or less, if reading the last chunk).
|
|
temporary_buffer<char> out(
|
|
_compression_metadata->uncompressed_chunk_length());
|
|
// The compressed data is the whole chunk, minus the last 4
|
|
// bytes (which contain the checksum verified above).
|
|
|
|
auto len = _compression_metadata->get_compressor().uncompress(buf.get(), compressed_len, out.get_write(), out.size());
|
|
|
|
out.trim(len);
|
|
out.trim_front(addr.offset);
|
|
_pos += out.size();
|
|
_underlying_pos += addr.chunk_len;
|
|
|
|
if constexpr (check_digest) {
|
|
if (_digests.can_calculate_digest
|
|
&& _pos == _compression_metadata->uncompressed_file_length()
|
|
&& _digests.expected_digest != _digests.actual_digest) {
|
|
throw sstables::malformed_sstable_exception(seastar::format("Digest mismatch: expected={}, actual={}", _digests.expected_digest, _digests.actual_digest));
|
|
}
|
|
}
|
|
co_return make_tracked_temporary_buffer(std::move(out), std::move(res_units));
|
|
}
|
|
|
|
virtual future<> close() override {
|
|
if (!_input_stream) {
|
|
return make_ready_future<>();
|
|
}
|
|
return _input_stream->close();
|
|
}
|
|
|
|
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
|
|
if constexpr (check_digest) {
|
|
if (_digests.can_calculate_digest) {
|
|
sstables::sstlog.debug("Compressed reader cannot calculate digest with skipped data: current pos={}, end pos={}, skip len={}. Disabling digest check.", _pos, _end_pos, n);
|
|
_digests.can_calculate_digest = false;
|
|
}
|
|
}
|
|
if (_pos + n > _end_pos) {
|
|
on_internal_error(sstables::sstlog, format("Skipping over the end position is disallowed: current pos={}, end pos={}, skip len={}", _pos, _end_pos, n));
|
|
}
|
|
_pos += n;
|
|
if (_pos == _end_pos) {
|
|
co_return temporary_buffer<char>();
|
|
}
|
|
auto addr = _compression_metadata->locate(_pos, _offsets);
|
|
auto underlying_n = addr.chunk_start - _underlying_pos;
|
|
_underlying_pos = addr.chunk_start;
|
|
_beg_pos = _pos;
|
|
if (!_input_stream) {
|
|
_input_stream = co_await _stream_creator();
|
|
}
|
|
co_await _input_stream->skip(underlying_n);
|
|
co_return temporary_buffer<char>();
|
|
}
|
|
};
|
|
|
|
template <bool check_digest>
|
|
class compressed_raw_file_data_source_impl : public data_source_impl {
|
|
std::function<future<input_stream<char>>()> _stream_creator;
|
|
std::optional<input_stream<char>> _input_stream;
|
|
sstables::compression* _compression_metadata;
|
|
sstables::compression::segmented_offsets::accessor _offsets;
|
|
[[no_unique_address]] sstables::digest_members<check_digest> _digests;
|
|
reader_permit _permit;
|
|
uint64_t _pos{0};
|
|
uint64_t _current_chunk_index{0};
|
|
|
|
private:
|
|
uint64_t get_chunk_len(uint64_t chunk_index) const {
|
|
auto chunk_start = _offsets.at(chunk_index);
|
|
auto chunk_end = (chunk_index + 1 == _compression_metadata->offsets.size())
|
|
? _compression_metadata->compressed_file_length()
|
|
: _offsets.at(chunk_index + 1);
|
|
auto len = chunk_end - chunk_start;
|
|
return len;
|
|
}
|
|
|
|
public:
|
|
compressed_raw_file_data_source_impl(sstables::stream_creator_fn stream_creator, sstables::compression* cm,
|
|
file_input_stream_options options,
|
|
reader_permit permit, std::optional<uint32_t> digest)
|
|
: _compression_metadata(cm)
|
|
, _offsets(_compression_metadata->offsets.get_accessor())
|
|
, _permit(std::move(permit))
|
|
{
|
|
if constexpr (check_digest) {
|
|
if (!digest) {
|
|
on_internal_error(sstables::sstlog, "Requested digest check but no digest was provided.");
|
|
}
|
|
_digests = {true, *digest, crc32_utils::init_checksum()};
|
|
}
|
|
|
|
_stream_creator = [stream_creator{std::move(stream_creator)}, start = _pos, length = _compression_metadata->compressed_file_length(), options] mutable {
|
|
return stream_creator(start, length, std::move(options));
|
|
};
|
|
}
|
|
|
|
virtual future<temporary_buffer<char>> get() override {
|
|
if (_pos >= _compression_metadata->compressed_file_length()) {
|
|
co_return temporary_buffer<char>();
|
|
}
|
|
|
|
if (!_input_stream) {
|
|
_input_stream = co_await _stream_creator();
|
|
}
|
|
|
|
auto chunk_len = get_chunk_len(_current_chunk_index);
|
|
if (!chunk_len) {
|
|
throw sstables::malformed_sstable_exception(format("compressed raw reader chunk_len must be greater than zero, pos={}", _pos));
|
|
}
|
|
|
|
auto res_units = co_await _permit.request_memory(chunk_len);
|
|
auto buf = co_await _input_stream->read_exactly(chunk_len);
|
|
if (buf.size() != chunk_len) {
|
|
throw sstables::malformed_sstable_exception(format("compressed raw reader hit premature end-of-file at file offset {}, expected chunk_len={}, actual={}", _pos, chunk_len, buf.size()));
|
|
}
|
|
|
|
auto compressed_len = chunk_len - 4;
|
|
auto expected_checksum = read_be<uint32_t>(buf.get() + compressed_len);
|
|
auto actual_checksum = crc32_utils::checksum(buf.get(), compressed_len);
|
|
if (expected_checksum != actual_checksum) {
|
|
throw sstables::malformed_sstable_exception(format("compressed chunk of size {} at file offset {} failed checksum, expected={}, actual={}", chunk_len, _pos, expected_checksum, actual_checksum));
|
|
}
|
|
|
|
if constexpr (check_digest) {
|
|
if (_digests.can_calculate_digest) {
|
|
_digests.actual_digest = checksum_combine_or_feed<crc32_utils>(_digests.actual_digest, actual_checksum, buf.get(), compressed_len);
|
|
uint32_t be_actual_checksum = cpu_to_be(actual_checksum);
|
|
_digests.actual_digest = crc32_utils::checksum(_digests.actual_digest,
|
|
reinterpret_cast<const char*>(&be_actual_checksum), sizeof(be_actual_checksum));
|
|
}
|
|
}
|
|
|
|
_current_chunk_index++;
|
|
_pos += buf.size();
|
|
|
|
if constexpr (check_digest) {
|
|
if (_digests.can_calculate_digest
|
|
&& _current_chunk_index == _compression_metadata->offsets.size()
|
|
&& _digests.expected_digest != _digests.actual_digest) {
|
|
throw sstables::malformed_sstable_exception(seastar::format("Digest mismatch: expected={}, actual={}", _digests.expected_digest, _digests.actual_digest));
|
|
}
|
|
}
|
|
|
|
co_return make_tracked_temporary_buffer(std::move(buf), std::move(res_units));
|
|
}
|
|
|
|
virtual future<> close() override {
|
|
if (!_input_stream) {
|
|
return make_ready_future<>();
|
|
}
|
|
return _input_stream->close();
|
|
}
|
|
|
|
virtual future<temporary_buffer<char>> skip(uint64_t n) override {
|
|
throw std::runtime_error("compressed raw file data source does not support skip()");
|
|
}
|
|
};
|
|
|
|
template <bool check_digest>
|
|
class compressed_raw_file_data_source : public data_source {
|
|
public:
|
|
compressed_raw_file_data_source(sstables::stream_creator_fn stream_creator, sstables::compression* cm,
|
|
file_input_stream_options options, reader_permit permit, std::optional<uint32_t> digest)
|
|
: data_source(std::make_unique<compressed_raw_file_data_source_impl<check_digest>>(
|
|
std::move(stream_creator), cm, std::move(options), std::move(permit), digest))
|
|
{}
|
|
};
|
|
|
|
template <ChecksumUtils ChecksumType, bool check_digest, compressed_checksum_mode mode>
|
|
class compressed_file_data_source : public data_source {
|
|
public:
|
|
compressed_file_data_source(sstables::stream_creator_fn stream_creator, sstables::compression* cm,
|
|
uint64_t offset, size_t len, file_input_stream_options options, reader_permit permit,
|
|
std::optional<uint32_t> digest)
|
|
: data_source(std::make_unique<compressed_file_data_source_impl<ChecksumType, check_digest, mode>>(
|
|
std::move(stream_creator), cm, offset, len, std::move(options), std::move(permit), digest))
|
|
{}
|
|
};
|
|
|
|
template <ChecksumUtils ChecksumType, compressed_checksum_mode mode>
|
|
inline input_stream<char> make_compressed_file_input_stream(sstables::stream_creator_fn stream_creator, sstables::compression *cm, uint64_t offset, size_t len,
|
|
file_input_stream_options options, reader_permit permit,
|
|
std::optional<uint32_t> digest)
|
|
{
|
|
if (digest) [[unlikely]] {
|
|
return input_stream<char>(compressed_file_data_source<ChecksumType, true, mode>(
|
|
std::move(stream_creator), cm, offset, len, std::move(options), std::move(permit), digest));
|
|
}
|
|
return input_stream<char>(compressed_file_data_source<ChecksumType, false, mode>(
|
|
std::move(stream_creator), cm, offset, len, std::move(options), std::move(permit), digest));
|
|
}
|
|
|
|
// compressed_file_data_sink_impl works as a filter for a file output stream,
|
|
// where the buffer flushed will be compressed and its checksum computed, then
|
|
// the result passed to a regular output stream.
|
|
template <typename ChecksumType, compressed_checksum_mode mode>
|
|
requires ChecksumUtils<ChecksumType>
|
|
class compressed_file_data_sink_impl : public data_sink_impl {
|
|
output_stream<char> _out;
|
|
sstables::compression* _compression_metadata;
|
|
sstables::compression::segmented_offsets::writer _offsets;
|
|
size_t _pos = 0;
|
|
uint32_t _full_checksum;
|
|
public:
|
|
compressed_file_data_sink_impl(output_stream<char> out, sstables::compression* cm)
|
|
: _out(std::move(out))
|
|
, _compression_metadata(cm)
|
|
, _offsets(_compression_metadata->offsets.get_writer())
|
|
, _full_checksum(ChecksumType::init_checksum())
|
|
{}
|
|
|
|
private:
|
|
future<> do_put(temporary_buffer<char> buf) {
|
|
auto output_len = _compression_metadata->get_compressor().compress_max_size(buf.size());
|
|
|
|
// account space for checksum that goes after compressed data.
|
|
temporary_buffer<char> compressed(output_len + 4);
|
|
|
|
// compress flushed data.
|
|
auto len = _compression_metadata->get_compressor().compress(buf.get(), buf.size(), compressed.get_write(), output_len);
|
|
if (len > output_len) {
|
|
return make_exception_future(std::runtime_error("possible overflow during compression"));
|
|
}
|
|
|
|
// total length of the uncompressed data.
|
|
_compression_metadata->set_uncompressed_file_length(_compression_metadata->uncompressed_file_length() + buf.size());
|
|
|
|
_offsets.push_back(_pos);
|
|
// account compressed data + 32-bit checksum.
|
|
_pos += len + 4;
|
|
_compression_metadata->set_compressed_file_length(_pos);
|
|
|
|
// compute 32-bit checksum for compressed data.
|
|
uint32_t per_chunk_checksum = ChecksumType::checksum(compressed.get(), len);
|
|
_full_checksum = checksum_combine_or_feed<ChecksumType>(_full_checksum, per_chunk_checksum, compressed.get(), len);
|
|
|
|
// write checksum into buffer after compressed data.
|
|
write_be<uint32_t>(compressed.get_write() + len, per_chunk_checksum);
|
|
|
|
if constexpr (mode == compressed_checksum_mode::checksum_all) {
|
|
uint32_t be_per_chunk_checksum = cpu_to_be(per_chunk_checksum);
|
|
_full_checksum = ChecksumType::checksum(_full_checksum,
|
|
reinterpret_cast<const char*>(&be_per_chunk_checksum), sizeof(be_per_chunk_checksum));
|
|
}
|
|
|
|
_compression_metadata->set_full_checksum(_full_checksum);
|
|
|
|
compressed.trim(len + 4);
|
|
|
|
auto f = _out.write(compressed.get(), compressed.size());
|
|
return f.then([compressed = std::move(compressed)] {});
|
|
}
|
|
public:
|
|
virtual future<> put(std::span<temporary_buffer<char>> bufs) override {
|
|
return data_sink_impl::fallback_put(bufs, [this] (temporary_buffer<char>&& buf) {
|
|
return do_put(std::move(buf));
|
|
});
|
|
}
|
|
|
|
virtual future<> close() override {
|
|
return _out.close();
|
|
}
|
|
|
|
virtual size_t buffer_size() const noexcept override {
|
|
return _compression_metadata->uncompressed_chunk_length();
|
|
}
|
|
};
|
|
|
|
template <typename ChecksumType, compressed_checksum_mode mode>
|
|
requires ChecksumUtils<ChecksumType>
|
|
class compressed_file_data_sink : public data_sink {
|
|
public:
|
|
compressed_file_data_sink(output_stream<char> out, sstables::compression* cm)
|
|
: data_sink(std::make_unique<compressed_file_data_sink_impl<ChecksumType, mode>>(
|
|
std::move(out), cm)) {}
|
|
};
|
|
|
|
template <typename ChecksumType, compressed_checksum_mode mode>
|
|
requires ChecksumUtils<ChecksumType>
|
|
inline output_stream<char> make_compressed_file_output_stream(output_stream<char> out,
|
|
sstables::compression* cm,
|
|
const compression_parameters& cp,
|
|
compressor_ptr p) {
|
|
cm->set_compressor(std::move(p));
|
|
// buffer of output stream is set to chunk length, because flush must
|
|
// happen every time a chunk was filled up.
|
|
cm->set_uncompressed_chunk_length(cp.chunk_length());
|
|
// FIXME: crc_check_chance can be configured by the user.
|
|
// probability to verify the checksum of a compressed chunk we read.
|
|
// defaults to 1.0.
|
|
cm->options.elements.push_back({{"crc_check_chance"}, {"1.0"}});
|
|
|
|
return output_stream<char>(compressed_file_data_sink<ChecksumType, mode>(std::move(out), cm));
|
|
}
|
|
|
|
input_stream<char> sstables::make_compressed_file_k_l_format_input_stream(stream_creator_fn stream_creator,
|
|
sstables::compression* cm, uint64_t offset, size_t len,
|
|
class file_input_stream_options options, reader_permit permit,
|
|
std::optional<uint32_t> digest)
|
|
{
|
|
return make_compressed_file_input_stream<adler32_utils, compressed_checksum_mode::checksum_chunks_only>(
|
|
std::move(stream_creator), cm, offset, len, std::move(options), std::move(permit), digest);
|
|
}
|
|
|
|
input_stream<char> sstables::make_compressed_file_m_format_input_stream(stream_creator_fn stream_creator,
|
|
sstables::compression *cm, uint64_t offset, size_t len,
|
|
class file_input_stream_options options, reader_permit permit,
|
|
std::optional<uint32_t> digest) {
|
|
return make_compressed_file_input_stream<crc32_utils, compressed_checksum_mode::checksum_all>(
|
|
std::move(stream_creator), cm, offset, len, std::move(options), std::move(permit), digest);
|
|
}
|
|
|
|
output_stream<char> sstables::make_compressed_file_m_format_output_stream(output_stream<char> out,
|
|
sstables::compression* cm,
|
|
const compression_parameters& cp,
|
|
compressor_ptr p) {
|
|
return make_compressed_file_output_stream<crc32_utils, compressed_checksum_mode::checksum_all>(
|
|
std::move(out), cm, cp, std::move(p));
|
|
}
|
|
|
|
input_stream<char> sstables::make_compressed_raw_file_input_stream(sstables::stream_creator_fn stream_creator, sstables::compression *cm,
|
|
file_input_stream_options options, reader_permit permit, std::optional<uint32_t> digest)
|
|
{
|
|
if (digest) [[unlikely]] {
|
|
return input_stream<char>(compressed_raw_file_data_source<true>(
|
|
std::move(stream_creator), cm, std::move(options), std::move(permit), digest));
|
|
}
|
|
return input_stream<char>(compressed_raw_file_data_source<false>(
|
|
std::move(stream_creator), cm, std::move(options), std::move(permit), digest));
|
|
}
|