Optimise the storage of compression chunk offsets

To reduce the memory footprint of compression-info, n offsets are
grouped together into segments, where each segment stores a base
absolute offset into the file, the other offsets in the segments being
relative offsets (and thus of reduced size). Also offsets are
allocated only just enough bits to store their maximum value. The
offsets are thus packed in a buffer like so:
     arrrarrrarrr...
where n is 4, a is an absolute offset and r are offsets relative to a.

The optimal value of n can be calculated for a given file_size (f) and
chunk_size (c), by finding the minima of the following function:

f(n) = (f/c)/n * (log2(f) + (n - 1)*log2((n-1)*(c + 64)))

This is done in an empirical way, using a script (see below).

Furthermore segments are stored in buckets, where each bucket has its
own base offset. Each bucket therefore can address an equal chunk of the
file and furthermore each segment in a bucket can address an equal
sub-chunk of this area.
The value of a given offset i is thus:
    bucket_base_offset_for(i) + segment_base_offset_for(i) + offset(i)

To account for the bucketed storage we calculate a local_f, which is
optimized so that a bucketful of segmented offsets can address the
largest possible chunk of f. As value of this local_f only depends on
the bucket_size (b) and c the value of n can be made independent of f
and therefore only depend on one dynamic value, c. This makes life much
simpler as we don't need to know the size of the file up-front, we can
just append buckets to the storage on demand, while the required storage
is still less than a third [1] of the original storage requirements
(std::deque<uint64>).

The table with the minima(f(n)) for different f and c values is
pre-computed by gen_segmented_compress_params.py and
stored in sstables/segmented_compress_params.hh. This script also
creates a table with the best values of local_f for the given
bucket_size. At runtime we only select the best params based on c.

[1] This was calculated for c=4K and b=4K
This commit is contained in:
Botond Dénes
2017-08-15 14:59:52 +03:00
parent eae33a1f19
commit 028c7a0888
6 changed files with 1249 additions and 22 deletions

View File

@@ -22,7 +22,9 @@
#include <stdexcept>
#include <cstdlib>
#include <boost/range/algorithm/find_if.hpp>
#include <seastar/core/align.hh>
#include <seastar/core/bitops.hh>
#include <seastar/core/byteorder.hh>
#include <seastar/core/fstream.hh>
@@ -34,9 +36,227 @@
#include "unimplemented.hh"
#include "stdx.hh"
#include "segmented_compress_params.hh"
namespace sstables {
static logging::logger sstlog("sstable");
enum class mask_type : uint8_t {
set,
clear
};
// size_bits cannot be >= 64
static inline uint64_t make_mask(uint8_t size_bits, uint8_t offset, mask_type t) noexcept {
const uint64_t mask = ((1 << size_bits) - 1) << offset;
return t == mask_type::set ? mask : ~mask;
}
/*
* ----> memory addresses
*
* Little Endian (e.g. x86)
*
* |1|2|3|4| | | | | CPU integer
* -------
* |
* +-+ << shift = prefix bits
* |
*
* | |1|2|3|4| | | | raw storage (unaligned)
* = ------- =====
* | |
* | +-> suffix bits
* +-> prefix bits
*
*
* Big Endian (e.g. PPC)
*
* | | | | |4|3|2|1| CPU integer
* -------
* |
* +-----+ << shift = suffix bits
* |
*
* | |4|3|2|1| | | | raw storage (unaligned)
* = ------- =====
* | |
* | +-> suffix bits
* +-> prefix bits
*
* |0|1|1|1|1|0|0|0| read/write mask
*/
struct bit_displacement {
uint64_t shift;
uint64_t mask;
};
inline uint64_t displacement_bits(uint64_t prefix_bits, uint8_t size_bits) {
// Works with gcc and clang
#if defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
return prefix_bits;
#elif defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__
return 64 - prefix_bits - size_bits;
#else
#error "Unsupported platform or compiler, cannot detect endianness"
#endif
}
inline bit_displacement displacement_for(uint64_t prefix_bits, uint8_t size_bits, mask_type t) {
const uint64_t d = displacement_bits(prefix_bits, size_bits);
return {d, make_mask(size_bits, d, t)};
}
std::pair<bucket_info, segment_info> params_for_chunk_size(uint32_t chunk_size) {
const uint8_t chunk_size_log2 = log2ceil(chunk_size);
auto it = boost::find_if(bucket_infos, [&] (const bucket_info& bi) {
return bi.chunk_size_log2 == chunk_size_log2;
});
// This scenario should be so rare that we only fall back to a safe
// set of parameters, not optimal ones.
if (it == bucket_infos.end()) {
const uint8_t data_size = bucket_infos.front().best_data_size_log2;
return {{chunk_size_log2, data_size, (8 * bucket_size - 56) / data_size},
{chunk_size_log2, data_size, uint8_t(1)}};
}
auto b = *it;
auto s = *boost::find_if(segment_infos, [&] (const segment_info& si) {
return si.data_size_log2 == b.best_data_size_log2 && si.chunk_size_log2 == b.chunk_size_log2;
});
return {std::move(b), std::move(s)};
}
uint64_t compression::segmented_offsets::read(uint64_t bucket_index, uint64_t offset_bits, uint64_t size_bits) const {
const uint64_t offset_byte = offset_bits / 8;
uint64_t value{0};
std::copy_n(_storage[bucket_index].storage.get() + offset_byte, sizeof(value), reinterpret_cast<char*>(&value));
const auto displacement = displacement_for(offset_bits % 8, size_bits, mask_type::set);
value &= displacement.mask;
value >>= displacement.shift;
return value;
}
void compression::segmented_offsets::write(uint64_t bucket_index, uint64_t offset_bits, uint64_t size_bits, uint64_t value) {
const uint64_t offset_byte = offset_bits / 8;
uint64_t old_value{0};
std::copy_n(_storage[bucket_index].storage.get() + offset_byte, sizeof(old_value), reinterpret_cast<char*>(&old_value));
const auto displacement = displacement_for(offset_bits % 8, size_bits, mask_type::clear);
value <<= displacement.shift;
if ((~displacement.mask | value) != ~displacement.mask) {
throw std::invalid_argument(sprint("{}: to-be-written value would overflow the allocated bits", __FUNCTION__));
}
old_value &= displacement.mask;
value |= old_value;
std::copy_n(reinterpret_cast<char*>(&value), sizeof(value), _storage[bucket_index].storage.get() + offset_byte);
}
void compression::segmented_offsets::update_position_trackers(std::size_t index) const {
if (_current_index != index - 1) {
_current_index = index;
_current_bucket_segment_index = _current_index / _grouped_offsets;
_current_segment_relative_index = _current_index % _grouped_offsets;
_current_bucket_index = _current_bucket_segment_index / _segments_per_bucket;
_current_segment_offset_bits = (_current_bucket_segment_index % _segments_per_bucket) * _segment_size_bits;
} else {
++_current_index;
++_current_segment_relative_index;
// Crossed segment boundary.
if (_current_segment_relative_index == _grouped_offsets) {
++_current_bucket_segment_index;
_current_segment_relative_index = 0;
// Crossed bucket boundary.
if (_current_bucket_segment_index == _segments_per_bucket) {
++_current_bucket_index;
_current_bucket_segment_index = 0;
_current_segment_offset_bits = 0;
} else {
_current_segment_offset_bits += _segment_size_bits;
}
}
}
}
void compression::segmented_offsets::init(uint32_t chunk_size) {
assert(chunk_size != 0);
_chunk_size = chunk_size;
const auto params = params_for_chunk_size(chunk_size);
sstlog.trace(
"{} {}(): chunk size {} (log2)",
this,
__FUNCTION__,
static_cast<int>(params.first.chunk_size_log2));
_grouped_offsets = params.second.grouped_offsets;
_segment_base_offset_size_bits = params.second.data_size_log2;
_segmented_offset_size_bits = static_cast<uint64_t>(log2ceil((_chunk_size + 64) * (_grouped_offsets - 1)));
_segment_size_bits = _segment_base_offset_size_bits + (_grouped_offsets - 1) * _segmented_offset_size_bits;
_segments_per_bucket = params.first.segments_per_bucket;
}
uint64_t compression::segmented_offsets::at(std::size_t i) const {
if (i >= _size) {
throw std::out_of_range(sprint("{}: index {} is out of range", __FUNCTION__, i));
}
update_position_trackers(i);
const uint64_t bucket_base_offset = _storage[_current_bucket_index].base_offset;
const uint64_t segment_base_offset = bucket_base_offset + read(_current_bucket_index, _current_segment_offset_bits, _segment_base_offset_size_bits);
if (_current_segment_relative_index == 0) {
return segment_base_offset;
}
return segment_base_offset
+ read(_current_bucket_index,
_current_segment_offset_bits + _segment_base_offset_size_bits + (_current_segment_relative_index - 1) * _segmented_offset_size_bits,
_segmented_offset_size_bits);
}
void compression::segmented_offsets::push_back(uint64_t offset) {
update_position_trackers(_size);
if (_current_bucket_index == _storage.size()) {
_storage.push_back(bucket{_last_written_offset, std::unique_ptr<char[]>(new char[bucket_size])});
}
const uint64_t bucket_base_offset = _storage[_current_bucket_index].base_offset;
if (_current_segment_relative_index == 0) {
write(_current_bucket_index, _current_segment_offset_bits, _segment_base_offset_size_bits, offset - bucket_base_offset);
} else {
const uint64_t segment_base_offset = bucket_base_offset + read(_current_bucket_index, _current_segment_offset_bits, _segment_base_offset_size_bits);
write(_current_bucket_index,
_current_segment_offset_bits + _segment_base_offset_size_bits + (_current_segment_relative_index - 1) * _segmented_offset_size_bits,
_segmented_offset_size_bits,
offset - segment_base_offset);
}
_last_written_offset = offset;
++_size;
}
void compression::update(uint64_t compressed_file_length) {
// FIXME: also process _compression.options (just for crc-check frequency)
if (name.value == "LZ4Compressor") {
@@ -81,10 +301,10 @@ compression::locate(uint64_t position) const {
auto ucl = uncompressed_chunk_length();
auto chunk_index = position / ucl;
decltype(ucl) chunk_offset = position % ucl;
auto chunk_start = offsets.elements.at(chunk_index);
auto chunk_end = (chunk_index + 1 == offsets.elements.size())
auto chunk_start = offsets.at(chunk_index);
auto chunk_end = (chunk_index + 1 == offsets.size())
? _compressed_file_length
: offsets.elements.at(chunk_index + 1);
: offsets.at(chunk_index + 1);
return { chunk_start, chunk_end - chunk_start, chunk_offset };
}
@@ -241,18 +461,18 @@ public:
: _compression_metadata(cm)
{
_beg_pos = pos;
if (pos > _compression_metadata->data_len) {
if (pos > _compression_metadata->uncompressed_file_length()) {
throw std::runtime_error("attempt to uncompress beyond end");
}
if (len == 0 || pos == _compression_metadata->data_len) {
if (len == 0 || pos == _compression_metadata->uncompressed_file_length()) {
// Nothing to read
_end_pos = _pos = _beg_pos;
return;
}
if (len <= _compression_metadata->data_len - pos) {
if (len <= _compression_metadata->uncompressed_file_length() - pos) {
_end_pos = pos + len;
} else {
_end_pos = _compression_metadata->data_len;
_end_pos = _compression_metadata->uncompressed_file_length();
}
// _beg_pos and _end_pos specify positions in the compressed stream.
// We need to translate them into a range of uncompressed chunks,

View File

@@ -48,6 +48,7 @@
#include <vector>
#include <cstdint>
#include <iterator>
#include <zlib.h>
#include "core/file.hh"
@@ -105,14 +106,204 @@ inline uint32_t checksum_adler32_combine(uint32_t adler1, uint32_t adler2, size_
namespace sstables {
struct compression {
// To reduce the memory footpring of compression-info, n offsets are grouped
// together into segments, where each segment stores a base absolute offset
// into the file, the other offsets in the segments being relative offsets
// (and thus of reduced size). Also offsets are allocated only just enough
// bits to store their maximum value. The offsets are thus packed in a
// buffer like so:
// arrrarrrarrr...
// where n is 4, a is an absolute offset and r are offsets relative to a.
// Segments are stored in buckets, where each bucket has its own base offset.
// Segments in a buckets are optimized to address as large of a chunk of the
// data as possible for a given chunk size and bucket size.
//
// This is not a general purpose container. There are limitations:
// * Can't be used before init() is called.
// * at() is best called incrementally, altough random lookups are
// perfectly valid as well.
// * The iterator and at() can't provide references to the elements.
// * No point insert is available.
class segmented_offsets {
struct bucket {
uint64_t base_offset;
std::unique_ptr<char[]> storage;
};
uint32_t _chunk_size{0};
uint8_t _segment_base_offset_size_bits{0};
uint8_t _segmented_offset_size_bits{0};
uint16_t _segment_size_bits{0};
uint32_t _segments_per_bucket{0};
uint8_t _grouped_offsets{0};
mutable std::size_t _current_index{0};
mutable std::size_t _current_bucket_index{0};
mutable uint64_t _current_bucket_segment_index{0};
mutable uint64_t _current_segment_relative_index{0};
mutable uint64_t _current_segment_offset_bits{0};
uint64_t _last_written_offset{0};
std::size_t _size{0};
std::deque<bucket> _storage;
uint64_t read(uint64_t bucket_index, uint64_t offset_bits, uint64_t size_bits) const;
void write(uint64_t bucket_index, uint64_t offset_bits, uint64_t size_bits, uint64_t value);
void update_position_trackers(std::size_t index) const;
public:
class const_iterator : public std::iterator<std::random_access_iterator_tag, const uint64_t> {
friend class segmented_offsets;
struct end_tag {};
const segmented_offsets& _offsets;
std::size_t _index;
const_iterator(const segmented_offsets& offsets)
: _offsets(offsets)
, _index(0) {
}
const_iterator(const segmented_offsets& offsets, end_tag)
: _offsets(offsets)
, _index(_offsets.size()) {
}
public:
const_iterator(const const_iterator& other) = default;
const_iterator& operator=(const const_iterator& other) {
assert(&_offsets == &other._offsets);
_index = other._index;
}
const_iterator operator++(int) {
const_iterator it{*this};
return ++it;
}
const_iterator& operator++() {
*this += 1;
return *this;
}
const_iterator operator+(ssize_t i) const {
const_iterator it{*this};
it += i;
return it;
}
const_iterator& operator+=(ssize_t i) {
_index += i;
return *this;
}
const_iterator operator--(int) {
const_iterator it{*this};
return --it;
}
const_iterator& operator--() {
*this -= 1;
return *this;
}
const_iterator operator-(ssize_t i) const {
const_iterator it{*this};
it -= i;
return it;
}
const_iterator& operator-=(ssize_t i) {
_index -= i;
return *this;
}
value_type operator*() const {
return _offsets.at(_index);
}
value_type operator[](ssize_t i) const {
return _offsets.at(_index + i);
}
bool operator==(const const_iterator& other) const {
return _index == other._index;
}
bool operator!=(const const_iterator& other) const {
return !(*this == other);
}
bool operator<(const const_iterator& other) const {
return _index < other._index;
}
bool operator<=(const const_iterator& other) const {
return _index <= other._index;
}
bool operator>(const const_iterator& other) const {
return _index > other._index;
}
bool operator>=(const const_iterator& other) const {
return _index >= other._index;
}
};
segmented_offsets() = default;
segmented_offsets(const segmented_offsets&) = delete;
segmented_offsets& operator=(const segmented_offsets&) = delete;
segmented_offsets(segmented_offsets&&) = default;
segmented_offsets& operator=(segmented_offsets&&) = default;
// Has to be called before using the class. Doing otherwise
// results in undefined behaviour! Don't call more than once!
// TODO: fold into constructor, once the parse() et. al. code
// allows it.
void init(uint32_t chunk_size);
uint32_t chunk_size() const noexcept {
return _chunk_size;
}
std::size_t size() const noexcept {
return _size;
}
uint64_t at(std::size_t i) const;
void push_back(uint64_t offset);
const_iterator begin() const {
return const_iterator(*this);
}
const_iterator end() const {
return const_iterator(*this, const_iterator::end_tag{});
}
const_iterator cbegin() const {
return const_iterator(*this);
}
const_iterator cend() const {
return const_iterator(*this, const_iterator::end_tag{});
}
};
disk_string<uint16_t> name;
disk_array<uint32_t, option> options;
uint32_t chunk_len;
uint64_t data_len;
disk_array<uint32_t, uint64_t> offsets;
template <typename Describer>
auto describe_type(Describer f) { return f(name, options, chunk_len, data_len, offsets); }
segmented_offsets offsets;
private:
// Variables determined from the above deserialized values, held for convenience:
@@ -150,10 +341,21 @@ public:
unsigned uncompressed_chunk_length() const noexcept {
return chunk_len;
}
uint64_t uncompressed_file_length() const {
void set_uncompressed_chunk_length(uint32_t cl) {
chunk_len = cl;
offsets.init(chunk_len);
}
uint64_t uncompressed_file_length() const noexcept {
return data_len;
}
void set_uncompressed_file_length(uint64_t fl) {
data_len = fl;
}
uint64_t compressed_file_length() const {
return _compressed_file_length;
}

View File

@@ -0,0 +1,757 @@
/*
* Copyright (C) 2017 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
/*
* This file was autogenerated by gen_segmented_compress_params.py.
*/
#include "compress.hh"
#include <array>
namespace sstables {
const uint64_t bucket_size{4096};
struct bucket_info {
uint64_t chunk_size_log2;
uint64_t best_data_size_log2;
uint64_t segments_per_bucket;
};
// The largest data chunk from the file a bucketful of offsets can
// cover, precalculated for different chunk sizes, plus the number
// of segments that are needed to address the whole area.
const std::array<bucket_info, 27> bucket_infos{{
{4, 16, 817 /*out of the max of 1024*/},
{5, 17, 527 /*out of the max of 683*/},
{6, 18, 605 /*out of the max of 820*/},
{7, 19, 474 /*out of the max of 683*/},
{8, 20, 380 /*out of the max of 586*/},
{9, 21, 311 /*out of the max of 512*/},
{10, 22, 289 /*out of the max of 512*/},
{11, 23, 270 /*out of the max of 512*/},
{12, 23, 255 /*out of the max of 256*/},
{13, 24, 240 /*out of the max of 256*/},
{14, 25, 227 /*out of the max of 256*/},
{15, 26, 215 /*out of the max of 256*/},
{16, 27, 204 /*out of the max of 256*/},
{17, 28, 194 /*out of the max of 256*/},
{18, 29, 185 /*out of the max of 256*/},
{19, 30, 177 /*out of the max of 256*/},
{20, 31, 170 /*out of the max of 256*/},
{21, 32, 163 /*out of the max of 256*/},
{22, 33, 157 /*out of the max of 256*/},
{23, 34, 151 /*out of the max of 256*/},
{24, 35, 146 /*out of the max of 256*/},
{25, 36, 141 /*out of the max of 256*/},
{26, 37, 136 /*out of the max of 256*/},
{27, 38, 131 /*out of the max of 256*/},
{28, 38, 128 /*out of the max of 128*/},
{29, 39, 124 /*out of the max of 128*/},
{30, 40, 120 /*out of the max of 128*/}}};
struct segment_info {
uint8_t data_size_log2;
uint8_t chunk_size_log2;
uint8_t grouped_offsets;
};
// Precomputed optimal segment informations for different data and chunk sizes.
const std::array<segment_info, 675> segment_infos{{
{16, 4, 4},
{16, 5, 6},
{16, 6, 5},
{16, 7, 6},
{16, 8, 4},
{16, 9, 4},
{16, 10, 4},
{16, 11, 4},
{16, 12, 2},
{16, 13, 2},
{16, 14, 2},
{16, 15, 1},
{16, 16, 1},
{16, 17, 1},
{16, 18, 1},
{16, 19, 1},
{16, 20, 1},
{16, 21, 1},
{16, 22, 1},
{16, 23, 1},
{16, 24, 1},
{16, 25, 1},
{16, 26, 1},
{16, 27, 1},
{16, 28, 1},
{16, 29, 1},
{16, 30, 1},
{17, 4, 7},
{17, 5, 6},
{17, 6, 5},
{17, 7, 6},
{17, 8, 4},
{17, 9, 4},
{17, 10, 4},
{17, 11, 4},
{17, 12, 4},
{17, 13, 2},
{17, 14, 2},
{17, 15, 2},
{17, 16, 1},
{17, 17, 1},
{17, 18, 1},
{17, 19, 1},
{17, 20, 1},
{17, 21, 1},
{17, 22, 1},
{17, 23, 1},
{17, 24, 1},
{17, 25, 1},
{17, 26, 1},
{17, 27, 1},
{17, 28, 1},
{17, 29, 1},
{17, 30, 1},
{18, 4, 7},
{18, 5, 6},
{18, 6, 5},
{18, 7, 6},
{18, 8, 4},
{18, 9, 4},
{18, 10, 4},
{18, 11, 4},
{18, 12, 4},
{18, 13, 4},
{18, 14, 2},
{18, 15, 2},
{18, 16, 2},
{18, 17, 1},
{18, 18, 1},
{18, 19, 1},
{18, 20, 1},
{18, 21, 1},
{18, 22, 1},
{18, 23, 1},
{18, 24, 1},
{18, 25, 1},
{18, 26, 1},
{18, 27, 1},
{18, 28, 1},
{18, 29, 1},
{18, 30, 1},
{19, 4, 7},
{19, 5, 6},
{19, 6, 5},
{19, 7, 6},
{19, 8, 7},
{19, 9, 8},
{19, 10, 4},
{19, 11, 4},
{19, 12, 4},
{19, 13, 4},
{19, 14, 4},
{19, 15, 2},
{19, 16, 2},
{19, 17, 2},
{19, 18, 1},
{19, 19, 1},
{19, 20, 1},
{19, 21, 1},
{19, 22, 1},
{19, 23, 1},
{19, 24, 1},
{19, 25, 1},
{19, 26, 1},
{19, 27, 1},
{19, 28, 1},
{19, 29, 1},
{19, 30, 1},
{20, 4, 7},
{20, 5, 6},
{20, 6, 9},
{20, 7, 6},
{20, 8, 7},
{20, 9, 8},
{20, 10, 8},
{20, 11, 4},
{20, 12, 4},
{20, 13, 4},
{20, 14, 4},
{20, 15, 4},
{20, 16, 2},
{20, 17, 2},
{20, 18, 2},
{20, 19, 1},
{20, 20, 1},
{20, 21, 1},
{20, 22, 1},
{20, 23, 1},
{20, 24, 1},
{20, 25, 1},
{20, 26, 1},
{20, 27, 1},
{20, 28, 1},
{20, 29, 1},
{20, 30, 1},
{21, 4, 7},
{21, 5, 6},
{21, 6, 9},
{21, 7, 6},
{21, 8, 7},
{21, 9, 8},
{21, 10, 8},
{21, 11, 8},
{21, 12, 4},
{21, 13, 4},
{21, 14, 4},
{21, 15, 4},
{21, 16, 4},
{21, 17, 2},
{21, 18, 2},
{21, 19, 2},
{21, 20, 1},
{21, 21, 1},
{21, 22, 1},
{21, 23, 1},
{21, 24, 1},
{21, 25, 1},
{21, 26, 1},
{21, 27, 1},
{21, 28, 1},
{21, 29, 1},
{21, 30, 1},
{22, 4, 7},
{22, 5, 11},
{22, 6, 9},
{22, 7, 11},
{22, 8, 7},
{22, 9, 8},
{22, 10, 8},
{22, 11, 8},
{22, 12, 8},
{22, 13, 4},
{22, 14, 4},
{22, 15, 4},
{22, 16, 4},
{22, 17, 4},
{22, 18, 2},
{22, 19, 2},
{22, 20, 2},
{22, 21, 1},
{22, 22, 1},
{22, 23, 1},
{22, 24, 1},
{22, 25, 1},
{22, 26, 1},
{22, 27, 1},
{22, 28, 1},
{22, 29, 1},
{22, 30, 1},
{23, 4, 13},
{23, 5, 11},
{23, 6, 9},
{23, 7, 11},
{23, 8, 7},
{23, 9, 8},
{23, 10, 8},
{23, 11, 8},
{23, 12, 8},
{23, 13, 8},
{23, 14, 4},
{23, 15, 4},
{23, 16, 4},
{23, 17, 4},
{23, 18, 4},
{23, 19, 2},
{23, 20, 2},
{23, 21, 2},
{23, 22, 1},
{23, 23, 1},
{23, 24, 1},
{23, 25, 1},
{23, 26, 1},
{23, 27, 1},
{23, 28, 1},
{23, 29, 1},
{23, 30, 1},
{24, 4, 13},
{24, 5, 11},
{24, 6, 9},
{24, 7, 11},
{24, 8, 7},
{24, 9, 8},
{24, 10, 8},
{24, 11, 8},
{24, 12, 8},
{24, 13, 8},
{24, 14, 8},
{24, 15, 4},
{24, 16, 4},
{24, 17, 4},
{24, 18, 4},
{24, 19, 4},
{24, 20, 2},
{24, 21, 2},
{24, 22, 2},
{24, 23, 1},
{24, 24, 1},
{24, 25, 1},
{24, 26, 1},
{24, 27, 1},
{24, 28, 1},
{24, 29, 1},
{24, 30, 1},
{25, 4, 13},
{25, 5, 11},
{25, 6, 9},
{25, 7, 11},
{25, 8, 7},
{25, 9, 8},
{25, 10, 8},
{25, 11, 8},
{25, 12, 8},
{25, 13, 8},
{25, 14, 8},
{25, 15, 8},
{25, 16, 4},
{25, 17, 4},
{25, 18, 4},
{25, 19, 4},
{25, 20, 4},
{25, 21, 2},
{25, 22, 2},
{25, 23, 2},
{25, 24, 1},
{25, 25, 1},
{25, 26, 1},
{25, 27, 1},
{25, 28, 1},
{25, 29, 1},
{25, 30, 1},
{26, 4, 13},
{26, 5, 11},
{26, 6, 9},
{26, 7, 11},
{26, 8, 13},
{26, 9, 8},
{26, 10, 8},
{26, 11, 8},
{26, 12, 8},
{26, 13, 8},
{26, 14, 8},
{26, 15, 8},
{26, 16, 8},
{26, 17, 4},
{26, 18, 4},
{26, 19, 4},
{26, 20, 4},
{26, 21, 4},
{26, 22, 2},
{26, 23, 2},
{26, 24, 2},
{26, 25, 1},
{26, 26, 1},
{26, 27, 1},
{26, 28, 1},
{26, 29, 1},
{26, 30, 1},
{27, 4, 13},
{27, 5, 11},
{27, 6, 9},
{27, 7, 11},
{27, 8, 13},
{27, 9, 8},
{27, 10, 8},
{27, 11, 8},
{27, 12, 8},
{27, 13, 8},
{27, 14, 8},
{27, 15, 8},
{27, 16, 8},
{27, 17, 8},
{27, 18, 4},
{27, 19, 4},
{27, 20, 4},
{27, 21, 4},
{27, 22, 4},
{27, 23, 2},
{27, 24, 2},
{27, 25, 2},
{27, 26, 1},
{27, 27, 1},
{27, 28, 1},
{27, 29, 1},
{27, 30, 1},
{28, 4, 13},
{28, 5, 11},
{28, 6, 9},
{28, 7, 11},
{28, 8, 13},
{28, 9, 8},
{28, 10, 8},
{28, 11, 8},
{28, 12, 8},
{28, 13, 8},
{28, 14, 8},
{28, 15, 8},
{28, 16, 8},
{28, 17, 8},
{28, 18, 8},
{28, 19, 4},
{28, 20, 4},
{28, 21, 4},
{28, 22, 4},
{28, 23, 4},
{28, 24, 2},
{28, 25, 2},
{28, 26, 2},
{28, 27, 1},
{28, 28, 1},
{28, 29, 1},
{28, 30, 1},
{29, 4, 13},
{29, 5, 11},
{29, 6, 17},
{29, 7, 11},
{29, 8, 13},
{29, 9, 15},
{29, 10, 16},
{29, 11, 8},
{29, 12, 8},
{29, 13, 8},
{29, 14, 8},
{29, 15, 8},
{29, 16, 8},
{29, 17, 8},
{29, 18, 8},
{29, 19, 8},
{29, 20, 4},
{29, 21, 4},
{29, 22, 4},
{29, 23, 4},
{29, 24, 4},
{29, 25, 2},
{29, 26, 2},
{29, 27, 2},
{29, 28, 1},
{29, 29, 1},
{29, 30, 1},
{30, 4, 13},
{30, 5, 11},
{30, 6, 17},
{30, 7, 11},
{30, 8, 13},
{30, 9, 15},
{30, 10, 16},
{30, 11, 16},
{30, 12, 8},
{30, 13, 8},
{30, 14, 8},
{30, 15, 8},
{30, 16, 8},
{30, 17, 8},
{30, 18, 8},
{30, 19, 8},
{30, 20, 8},
{30, 21, 4},
{30, 22, 4},
{30, 23, 4},
{30, 24, 4},
{30, 25, 4},
{30, 26, 2},
{30, 27, 2},
{30, 28, 2},
{30, 29, 1},
{30, 30, 1},
{31, 4, 13},
{31, 5, 11},
{31, 6, 17},
{31, 7, 11},
{31, 8, 13},
{31, 9, 15},
{31, 10, 16},
{31, 11, 16},
{31, 12, 16},
{31, 13, 8},
{31, 14, 8},
{31, 15, 8},
{31, 16, 8},
{31, 17, 8},
{31, 18, 8},
{31, 19, 8},
{31, 20, 8},
{31, 21, 8},
{31, 22, 4},
{31, 23, 4},
{31, 24, 4},
{31, 25, 4},
{31, 26, 4},
{31, 27, 2},
{31, 28, 2},
{31, 29, 2},
{31, 30, 1},
{32, 4, 13},
{32, 5, 22},
{32, 6, 17},
{32, 7, 11},
{32, 8, 13},
{32, 9, 15},
{32, 10, 16},
{32, 11, 16},
{32, 12, 16},
{32, 13, 16},
{32, 14, 8},
{32, 15, 8},
{32, 16, 8},
{32, 17, 8},
{32, 18, 8},
{32, 19, 8},
{32, 20, 8},
{32, 21, 8},
{32, 22, 8},
{32, 23, 4},
{32, 24, 4},
{32, 25, 4},
{32, 26, 4},
{32, 27, 4},
{32, 28, 2},
{32, 29, 2},
{32, 30, 2},
{33, 4, 13},
{33, 5, 22},
{33, 6, 17},
{33, 7, 22},
{33, 8, 13},
{33, 9, 15},
{33, 10, 16},
{33, 11, 16},
{33, 12, 16},
{33, 13, 16},
{33, 14, 16},
{33, 15, 8},
{33, 16, 8},
{33, 17, 8},
{33, 18, 8},
{33, 19, 8},
{33, 20, 8},
{33, 21, 8},
{33, 22, 8},
{33, 23, 8},
{33, 24, 4},
{33, 25, 4},
{33, 26, 4},
{33, 27, 4},
{33, 28, 4},
{33, 29, 2},
{33, 30, 2},
{34, 4, 13},
{34, 5, 22},
{34, 6, 17},
{34, 7, 22},
{34, 8, 13},
{34, 9, 15},
{34, 10, 16},
{34, 11, 16},
{34, 12, 16},
{34, 13, 16},
{34, 14, 16},
{34, 15, 16},
{34, 16, 8},
{34, 17, 8},
{34, 18, 8},
{34, 19, 8},
{34, 20, 8},
{34, 21, 8},
{34, 22, 8},
{34, 23, 8},
{34, 24, 8},
{34, 25, 4},
{34, 26, 4},
{34, 27, 4},
{34, 28, 4},
{34, 29, 4},
{34, 30, 2},
{35, 4, 13},
{35, 5, 22},
{35, 6, 17},
{35, 7, 22},
{35, 8, 13},
{35, 9, 15},
{35, 10, 16},
{35, 11, 16},
{35, 12, 16},
{35, 13, 16},
{35, 14, 16},
{35, 15, 16},
{35, 16, 16},
{35, 17, 8},
{35, 18, 8},
{35, 19, 8},
{35, 20, 8},
{35, 21, 8},
{35, 22, 8},
{35, 23, 8},
{35, 24, 8},
{35, 25, 8},
{35, 26, 4},
{35, 27, 4},
{35, 28, 4},
{35, 29, 4},
{35, 30, 4},
{36, 4, 26},
{36, 5, 22},
{36, 6, 17},
{36, 7, 22},
{36, 8, 13},
{36, 9, 15},
{36, 10, 16},
{36, 11, 16},
{36, 12, 16},
{36, 13, 16},
{36, 14, 16},
{36, 15, 16},
{36, 16, 16},
{36, 17, 16},
{36, 18, 8},
{36, 19, 8},
{36, 20, 8},
{36, 21, 8},
{36, 22, 8},
{36, 23, 8},
{36, 24, 8},
{36, 25, 8},
{36, 26, 8},
{36, 27, 4},
{36, 28, 4},
{36, 29, 4},
{36, 30, 4},
{37, 4, 26},
{37, 5, 22},
{37, 6, 17},
{37, 7, 22},
{37, 8, 13},
{37, 9, 15},
{37, 10, 16},
{37, 11, 16},
{37, 12, 16},
{37, 13, 16},
{37, 14, 16},
{37, 15, 16},
{37, 16, 16},
{37, 17, 16},
{37, 18, 16},
{37, 19, 8},
{37, 20, 8},
{37, 21, 8},
{37, 22, 8},
{37, 23, 8},
{37, 24, 8},
{37, 25, 8},
{37, 26, 8},
{37, 27, 8},
{37, 28, 4},
{37, 29, 4},
{37, 30, 4},
{38, 4, 26},
{38, 5, 22},
{38, 6, 17},
{38, 7, 22},
{38, 8, 26},
{38, 9, 15},
{38, 10, 16},
{38, 11, 16},
{38, 12, 16},
{38, 13, 16},
{38, 14, 16},
{38, 15, 16},
{38, 16, 16},
{38, 17, 16},
{38, 18, 16},
{38, 19, 16},
{38, 20, 8},
{38, 21, 8},
{38, 22, 8},
{38, 23, 8},
{38, 24, 8},
{38, 25, 8},
{38, 26, 8},
{38, 27, 8},
{38, 28, 8},
{38, 29, 4},
{38, 30, 4},
{39, 4, 26},
{39, 5, 22},
{39, 6, 17},
{39, 7, 22},
{39, 8, 26},
{39, 9, 15},
{39, 10, 16},
{39, 11, 16},
{39, 12, 16},
{39, 13, 16},
{39, 14, 16},
{39, 15, 16},
{39, 16, 16},
{39, 17, 16},
{39, 18, 16},
{39, 19, 16},
{39, 20, 16},
{39, 21, 8},
{39, 22, 8},
{39, 23, 8},
{39, 24, 8},
{39, 25, 8},
{39, 26, 8},
{39, 27, 8},
{39, 28, 8},
{39, 29, 8},
{39, 30, 4},
{40, 4, 26},
{40, 5, 22},
{40, 6, 17},
{40, 7, 22},
{40, 8, 26},
{40, 9, 15},
{40, 10, 16},
{40, 11, 16},
{40, 12, 16},
{40, 13, 16},
{40, 14, 16},
{40, 15, 16},
{40, 16, 16},
{40, 17, 16},
{40, 18, 16},
{40, 19, 16},
{40, 20, 16},
{40, 21, 16},
{40, 22, 8},
{40, 23, 8},
{40, 24, 8},
{40, 25, 8},
{40, 26, 8},
{40, 27, 8},
{40, 28, 8},
{40, 29, 8},
{40, 30, 8}}};
} // namespace sstables

View File

@@ -849,6 +849,56 @@ inline void write(file_writer& out, const utils::streaming_histogram& sh) {
write(out, max_bin_size, a);
}
future<> parse(random_access_reader& in, compression& c) {
auto data_len_ptr = make_lw_shared<uint64_t>(0);
auto chunk_len_ptr = make_lw_shared<uint32_t>(0);
return parse(in, c.name, c.options, *chunk_len_ptr, *data_len_ptr).then([&in, &c, chunk_len_ptr, data_len_ptr] {
c.set_uncompressed_chunk_length(*chunk_len_ptr);
c.set_uncompressed_file_length(*data_len_ptr);
auto len = make_lw_shared<uint32_t>();
return parse(in, *len).then([&in, &c, len] {
auto eoarr = [&c, len] { return c.offsets.size() == *len; };
auto element = make_lw_shared<uint64_t>(0);
return do_until(eoarr, [element, &in, &c, len] {
auto now = std::min(*len - c.offsets.size(), 100000 / sizeof(uint64_t));
return in.read_exactly(now * sizeof(uint64_t)).then([&c, len, now] (auto buf) {
uint64_t value;
for (size_t i = 0; i < now; ++i) {
std::copy_n(buf.get() + i * sizeof(uint64_t), sizeof(uint64_t), &value);
c.offsets.push_back(net::ntoh(value));
}
});
});
});
});
}
void write(file_writer& out, const compression& c) {
write(out, c.name, c.options, c.uncompressed_chunk_length(), c.uncompressed_file_length());
write(out, static_cast<uint32_t>(c.offsets.size()));
std::vector<uint64_t> tmp;
const size_t per_loop = 100000 / sizeof(uint64_t);
tmp.resize(per_loop);
size_t idx = 0;
while (idx != c.offsets.size()) {
auto now = std::min(c.offsets.size() - idx, per_loop);
// copy offsets into tmp converting each entry into big-endian representation.
auto nr = c.offsets.begin() + idx;
for (size_t i = 0; i < now; i++) {
tmp[i] = net::hton(nr[i]);
}
auto p = reinterpret_cast<const char*>(tmp.data());
auto bytes = now * sizeof(uint64_t);
out.write(p, bytes).get();
idx += now;
}
}
// This is small enough, and well-defined. Easier to just read it all
// at once
future<> sstable::read_toc() {
@@ -1794,8 +1844,7 @@ static void seal_summary(summary& s,
static void prepare_compression(compression& c, const schema& schema) {
const auto& cp = schema.get_compressor_params();
c.set_compressor(cp.get_compressor());
c.chunk_len = cp.chunk_length();
c.data_len = 0;
c.set_uncompressed_chunk_length(cp.chunk_length());
// FIXME: crc_check_chance can be configured by the user.
// probability to verify the checksum of a compressed chunk we read.
// defaults to 1.0.
@@ -2290,7 +2339,7 @@ future<> sstable::generate_summary(const io_priority_class& pc) {
uint64_t sstable::data_size() const {
if (has_component(sstable::component_type::CompressionInfo)) {
return _components->compression.data_len;
return _components->compression.uncompressed_file_length();
}
return _data_file_size;
}

View File

@@ -210,12 +210,12 @@ public:
throw std::runtime_error("possible overflow during compression");
}
_compression_metadata->offsets.elements.push_back(_pos);
_compression_metadata->offsets.push_back(_pos);
// account compressed data + 32-bit checksum.
_pos += len + 4;
_compression_metadata->set_compressed_file_length(_pos);
// total length of the uncompressed data.
_compression_metadata->data_len += buf.size();
_compression_metadata->set_uncompressed_file_length(_compression_metadata->uncompressed_file_length() + buf.size());
// compute 32-bit checksum for compressed data.
uint32_t per_chunk_checksum = checksum_adler32(compressed.get(), len);
@@ -229,7 +229,7 @@ public:
auto f = _out.write(compressed.get(), compressed.size());
return f.then([compressed = std::move(compressed)] {});
}
virtual future<> close() {
virtual future<> close() override {
return _out.close();
}
};

View File

@@ -1270,14 +1270,13 @@ SEASTAR_TEST_CASE(test_skipping_in_compressed_stream) {
sstables::compression c;
c.set_compressor(compressor::lz4);
c.chunk_len = opts.buffer_size;
c.data_len = 0;
c.set_uncompressed_chunk_length(opts.buffer_size);
c.init_full_checksum();
// Make sure that amount of written data is a multiple of chunk_len so that we hit #2143.
temporary_buffer<char> buf1(c.chunk_len);
temporary_buffer<char> buf1(c.uncompressed_chunk_length());
strcpy(buf1.get_write(), "buf1");
temporary_buffer<char> buf2(c.chunk_len);
temporary_buffer<char> buf2(c.uncompressed_chunk_length());
strcpy(buf2.get_write(), "buf2");
size_t uncompressed_size = 0;