The directory utils/ is supposed to contain general-purpose utility
classes and functions, which are either already used across the project,
or are designed to be used across the project.
This patch moves 8 files out of utils/:
utils/advanced_rpc_compressor.hh
utils/advanced_rpc_compressor.cc
utils/advanced_rpc_compressor_protocol.hh
utils/stream_compressor.hh
utils/stream_compressor.cc
utils/dict_trainer.cc
utils/dict_trainer.hh
utils/shared_dict.hh
These 8 files together implement the compression feature of RPC.
None of them are used by any other Scylla component (e.g., sstables have
a different compression), or are ready to be used by another component,
so this patch moves all of them into message/, where RPC is implemented.
Theoretically, we may want in the future to use this cluster of classes
for some other component, but even then, we shouldn't just have these
files individually in utils/ - these are not useful stand-alone
utilities. One cannot use "shared_dict.hh" assuming it is some sort of
general-purpose shared hash table or something - it is completely
specific to compression and zstd, and specifically to its use in those
other classes.
Beyond moving these 8 files, this patch also contains changes to:
1. Fix includes to the 5 moved header files (.hh).
2. Fix configure.py, utils/CMakeLists.txt and message/CMakeLists.txt
for the three moved source files (.cc).
3. In the moved files, change from the "utils::" namespace, to the
"netw::" namespace used by RPC. Also needed to change a bunch
of callers for the new namespace. Also, had to add "utils::"
explicitly in several places which previously assumed the
current namespace is "utils::".
Signed-off-by: Nadav Har'El <nyh@scylladb.com>
Closes scylladb/scylladb#25149
566 lines
24 KiB
C++
566 lines
24 KiB
C++
/*
|
|
* Copyright (C) 2023-present ScyllaDB
|
|
*/
|
|
|
|
/*
|
|
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
|
|
*/
|
|
|
|
#include "stream_compressor.hh"
|
|
|
|
#include <array>
|
|
#include <memory>
|
|
#include <bit>
|
|
#include <seastar/core/byteorder.hh>
|
|
#include <seastar/rpc/rpc_types.hh>
|
|
#include <seastar/core/memory.hh>
|
|
#include "utils/small_vector.hh"
|
|
#include "seastarx.hh"
|
|
#include "utils/crc.hh"
|
|
|
|
#define ZSTD_STATIC_LINKING_ONLY
|
|
#include <zstd.h>
|
|
|
|
namespace netw {
|
|
|
|
namespace {
|
|
|
|
size_t varint_length(uint8_t first_byte) {
|
|
return 1 + std::countr_zero(first_byte);
|
|
}
|
|
|
|
uint32_t varint_decode(uint64_t all_bytes) {
|
|
size_t n_bytes = 1 + std::countr_zero(all_bytes);
|
|
size_t ignored_msb = (64 - 8 * n_bytes);
|
|
return ((all_bytes << ignored_msb) >> ignored_msb) >> n_bytes;
|
|
}
|
|
|
|
std::pair<size_t, uint64_t> varint_encode(uint32_t x) {
|
|
size_t n_bytes = (std::max(std::bit_width(x), 1) + 6) / 7;
|
|
return {n_bytes, ((x << 1) | 1) << (n_bytes - 1)};
|
|
}
|
|
|
|
} // namespace
|
|
|
|
size_t raw_stream::copy(ZSTD_outBuffer* out, ZSTD_inBuffer* in) {
|
|
size_t n = std::min(out->size - out->pos, in->size - in->pos);
|
|
memcpy((char*)out->dst + out->pos, (const char*)in->src + in->pos, n);
|
|
in->pos += n;
|
|
out->pos += n;
|
|
return 0;
|
|
}
|
|
|
|
void raw_stream::decompress(ZSTD_outBuffer* out, ZSTD_inBuffer* in, bool) {
|
|
copy(out, in);
|
|
}
|
|
|
|
size_t raw_stream::compress(ZSTD_outBuffer* out, ZSTD_inBuffer* in, ZSTD_EndDirective end) {
|
|
return copy(out, in);
|
|
}
|
|
|
|
void raw_stream::reset() noexcept {
|
|
}
|
|
|
|
// Throw if ret is an ZSTD error code.
|
|
static void check_zstd(size_t ret, const char* text) {
|
|
if (ZSTD_isError(ret)) {
|
|
throw std::runtime_error(fmt::format("{} error: {}", text, ZSTD_getErrorName(ret)));
|
|
}
|
|
}
|
|
|
|
// IMPORTANT: with this constant, we are hardcoding a max window size in the decoder.
|
|
//
|
|
// We do this to make the allocator usage by ZSTD more predictable.
|
|
// (If you let zstd alloc and free memory on its own,
|
|
// it has some "helpful" heuristics which sometimes reallocate the buffers
|
|
// even if the context is reused, and we want to avoid that.
|
|
// See https://github.com/facebook/zstd/commit/3d523c741be041f17c28e43b89ab6dfcaee281d2)
|
|
//
|
|
// But the hardcoding means that the decoder won't be able to handle frames with a window
|
|
// size bigger than this. (The decoding will return an error that there's not enough memory).
|
|
//
|
|
// That means that the window size becomes a part of the protocol:
|
|
// the compressor mustn't send messages with a greater window size.
|
|
//
|
|
// If you wish to enlarge the window size for whatever reason,
|
|
// you have to ensure that the new version of the compressor isn't talking
|
|
// to a decompressor with a window size hardcoded to something too small.
|
|
//
|
|
// (In case of RPC compression, you can do this by bumping the COMPRESSOR_NAME
|
|
// in advanced_rpc_compressor.cc when releasing the new Scylla version).
|
|
constexpr size_t ZSTD_WINDOW_SIZE_LOG = 17;
|
|
|
|
zstd_dstream::zstd_dstream() {
|
|
// IMPORTANT: window size set in the decompressor can't be smaller
|
|
// must be bigger than the window size of the compressor
|
|
// talking to us.
|
|
const auto window_size = 1 << ZSTD_WINDOW_SIZE_LOG;
|
|
const size_t workspace_size = ZSTD_estimateDStreamSize(window_size);
|
|
{
|
|
// zstd needs a large contiguous allocation, it's unavoidable.
|
|
const memory::scoped_large_allocation_warning_threshold slawt{1024*1024+1};
|
|
_ctx.reset(static_cast<ZSTD_DStream*>(malloc(workspace_size)));
|
|
}
|
|
if (!_ctx) {
|
|
throw std::bad_alloc();
|
|
}
|
|
if (!ZSTD_initStaticDStream(_ctx.get(), workspace_size)) {
|
|
throw std::runtime_error("ZSTD_initStaticCStream() failed");
|
|
}
|
|
|
|
// IMPORTANT: this must match the compressor.
|
|
check_zstd(ZSTD_DCtx_setParameter(_ctx.get(), ZSTD_d_format, ZSTD_f_zstd1_magicless), "ZSTD_CCtx_setParameter(.., ZSTD_c_format, ZSTD_f_zstd1_magicless)");
|
|
}
|
|
|
|
void zstd_dstream::reset() noexcept {
|
|
ZSTD_DCtx_reset(_ctx.get(), ZSTD_reset_session_only);
|
|
_in_progress = false;
|
|
}
|
|
|
|
void zstd_dstream::decompress(ZSTD_outBuffer* out, ZSTD_inBuffer* in, bool end_of_frame) {
|
|
if (!_in_progress && in->pos == in->size) {
|
|
// Without this early return, we would start the decompression of the next frame.
|
|
// ZSTD_decompressStream() would return something positive, and our truncation
|
|
// checking logic could wrongly claim truncation.
|
|
return;
|
|
}
|
|
size_t ret = ZSTD_decompressStream(_ctx.get(), out, in);
|
|
check_zstd(ret, "ZSTD_decompressStream");
|
|
_in_progress = bool(ret);
|
|
if (in->pos == in->size && end_of_frame && _in_progress) {
|
|
// Should never happen in practice, barring a bug/corruption.
|
|
// It's not the compressor's job to verify data integrity,
|
|
// so we could as well not check for this.
|
|
//
|
|
// But we do it as a good practice.
|
|
// To an extent, it keeps the effects of a corruption localized to the corrupted message.
|
|
// This could make some possible debugging easier.
|
|
// (If we didn't check for truncation, a bad message could leave leftovers in the compressor,
|
|
// which would cause weirdness when decompressing the next -- perhaps correct -- message).
|
|
throw std::runtime_error("truncated ZSTD frame");
|
|
}
|
|
}
|
|
void zstd_dstream::set_dict(const ZSTD_DDict* dict) {
|
|
_dict = dict;
|
|
ZSTD_DCtx_refDDict(_ctx.get(), _dict);
|
|
}
|
|
|
|
zstd_cstream::zstd_cstream() {
|
|
struct params_deleter {
|
|
void operator()(ZSTD_CCtx_params* params) const noexcept {
|
|
ZSTD_freeCCtxParams(params);
|
|
}
|
|
};
|
|
|
|
std::unique_ptr<ZSTD_CCtx_params, params_deleter> params(ZSTD_createCCtxParams());
|
|
// For now, we hardcode a 128 kiB window and the lowest compression level here.
|
|
// We don't need more for RPC compression (or rather: we value lower CPU
|
|
// usage over mildly stronger compression).
|
|
auto compression_level = 1;
|
|
check_zstd(ZSTD_CCtxParams_init(params.get(), compression_level), "ZSTD_Cctx_params_init(.., 1)");
|
|
// IMPORTANT: this must match the decompressor.
|
|
check_zstd(ZSTD_CCtxParams_setParameter(params.get(), ZSTD_c_format, ZSTD_f_zstd1_magicless), "ZSTD_CCTxParams_setParameter(.., ZSTD_c_format, ZSTD_f_zstd1_magicless)");
|
|
check_zstd(ZSTD_CCtxParams_setParameter(params.get(), ZSTD_c_contentSizeFlag, 0), "ZSTD_CCTxParams_setParameter(.., ZSTD_c_contentSizeFlag, 0)");
|
|
check_zstd(ZSTD_CCtxParams_setParameter(params.get(), ZSTD_c_checksumFlag, 0), "ZSTD_CCTxParams_setParameter(.., ZSTD_c_checksumFlag, 0)");
|
|
check_zstd(ZSTD_CCtxParams_setParameter(params.get(), ZSTD_c_dictIDFlag, 0), "ZSTD_CCTxParams_setParameter(.., ZSTD_c_dictIDFlag, 0)");
|
|
// IMPORTANT: window size in compressor mustn't be greater than
|
|
// the max window size handlable by the decompressor.
|
|
check_zstd(ZSTD_CCtxParams_setParameter(params.get(), ZSTD_c_windowLog, ZSTD_WINDOW_SIZE_LOG), "ZSTD_CCtx_setParameter(.., ZSTD_c_windowLog, 17)");
|
|
|
|
const size_t workspace_size = ZSTD_estimateCStreamSize_usingCCtxParams(params.get());
|
|
{
|
|
// zstd needs a large contiguous allocation, it's unavoidable.
|
|
const memory::scoped_large_allocation_warning_threshold slawt{1024*1024+1};
|
|
_ctx.reset(static_cast<ZSTD_CStream*>(malloc(workspace_size)));
|
|
}
|
|
if (!_ctx) {
|
|
throw std::bad_alloc();
|
|
}
|
|
if (!ZSTD_initStaticCStream(_ctx.get(), workspace_size)) {
|
|
throw std::runtime_error("ZSTD_initStaticCStream() failed");
|
|
}
|
|
|
|
check_zstd(ZSTD_CCtx_setParametersUsingCCtxParams(_ctx.get(), params.get()), "ZSTD_CCtx_setParametersUsingCCtxParams(..)");
|
|
}
|
|
|
|
size_t zstd_cstream::compress(ZSTD_outBuffer* out, ZSTD_inBuffer* in, ZSTD_EndDirective end) {
|
|
size_t ret = ZSTD_compressStream2(_ctx.get(), out, in, end);
|
|
check_zstd(ret, "ZSTD_compressStream2");
|
|
return ret;
|
|
}
|
|
|
|
void zstd_cstream::reset() noexcept {
|
|
ZSTD_CCtx_reset(_ctx.get(), ZSTD_reset_session_only);
|
|
}
|
|
void zstd_cstream::set_dict(const ZSTD_CDict* dict) {
|
|
_dict = dict;
|
|
ZSTD_CCtx_refCDict(_ctx.get(), _dict);
|
|
}
|
|
|
|
// Used as a intermediary buffer by lz4_cstream and lz4_dstream;
|
|
// Since this buffer is shared, lz4_cstream and lz4_dstream must be used synchronously.
|
|
// That is, when the processing of a frame (the string of data between `reset()`s) starts,
|
|
// it has to be finished before another compressor instance attempts to use it.
|
|
//
|
|
// If there ever is a need to process multiple frames concurrently, the dependency on this
|
|
// buffer has to be eliminated, and each concurrent stream must be given a separate buffer.
|
|
//
|
|
// This makes for a bad/dangerous API, but it's only used in this file, so it's okay for now.
|
|
static thread_local std::array<char, LZ4_COMPRESSBOUND(max_lz4_window_size) + sizeof(uint64_t)> _lz4_scratch;
|
|
|
|
lz4_cstream::lz4_cstream(size_t window_size)
|
|
: _buf(window_size)
|
|
{
|
|
reset();
|
|
}
|
|
|
|
void lz4_cstream::reset() noexcept {
|
|
_scratch_beg = _scratch_end = _buf_pos = 0;
|
|
LZ4_initStream(&_ctx, sizeof(_ctx));
|
|
LZ4_attach_dictionary(&_ctx, _dict);
|
|
}
|
|
|
|
void lz4_cstream::resetFast() noexcept {
|
|
_scratch_beg = _scratch_end = _buf_pos = 0;
|
|
LZ4_resetStream_fast(&_ctx);
|
|
LZ4_attach_dictionary(&_ctx, _dict);
|
|
}
|
|
|
|
// When new data arrives in `in`, we copy an arbitrary amount of it to `_buf`,
|
|
// (the amount is arbitrary, but it has to fit contiguously in `_buf`),
|
|
// compress the new block from `_buf` to `_lz4_scratch`,
|
|
// then we copy everything from `_lz4_scratch` to `out`.
|
|
// Repeat until `in` is empty.
|
|
size_t lz4_cstream::compress(ZSTD_outBuffer* out, ZSTD_inBuffer* in, ZSTD_EndDirective end) {
|
|
if (_scratch_end == _scratch_beg && in->size != in->pos) {
|
|
// If we already copied everything we compressed to `out`,
|
|
// and there is still some data in `in`,
|
|
// we are going to compress a new block from `in` to `_lz4_scratch`.
|
|
|
|
if (_buf_pos == _buf.size()) {
|
|
// The ring buffer wraps around here.
|
|
_buf_pos = 0;
|
|
}
|
|
// We will compress the biggest prefix of `in` that fits contiguously inside `buf`.
|
|
// In principle, this is slightly suboptimal -- ideally, if `in` is smaller than the contiguous space in `buf`,
|
|
// we should only copy `in` to `buf` and wait with the compressor call until future `in`s
|
|
// fill the contiguous space entirely, or `end` is `ZSTD_e_flush` or `ZSTD_e_end`.
|
|
// But for streaming LZ4 it doesn't really make a difference.
|
|
size_t n = std::min(_buf.size() - _buf_pos, in->size - in->pos);
|
|
// The compressed block mustn't fill the entire ring buffer.
|
|
// It must be at least 1 byte shorter. It's a dumb quirk of lz4. Perhaps it should be called a bug.
|
|
n = std::min(max_lz4_window_size - 1, n);
|
|
std::memcpy(_buf.data() + _buf_pos, static_cast<const char*>(in->src) + in->pos, n);
|
|
in->pos += n;
|
|
// The first header_size bytes contain the length of the compressed block, so we compress to _lz4_scratch.data() + sizeof(uint64_t).
|
|
int x = LZ4_compress_fast_continue(&_ctx, _buf.data() + _buf_pos, _lz4_scratch.data() + sizeof(uint64_t), n, _lz4_scratch.size(), 1);
|
|
if (x < 0) {
|
|
throw std::runtime_error(fmt::format(
|
|
"LZ4_compress_fast_continue failed with negative return value {}. "
|
|
"LZ4 shouldn't fail. This indicates a bug or a corruption.",
|
|
x));
|
|
}
|
|
auto [header_size, header_value] = varint_encode(x);
|
|
_scratch_end = sizeof(uint64_t) + x;
|
|
_scratch_beg = sizeof(uint64_t) - header_size;
|
|
_buf_pos += n;
|
|
// We prepend to every block its compressed length.
|
|
// This is necessary, because LZ4 needs to know it, but it doesn't store it by itself.
|
|
seastar::write_le<uint64_t>(_lz4_scratch.data(), header_value << (8 * (sizeof(uint64_t) - header_size)));
|
|
}
|
|
if (_scratch_end > _scratch_beg) {
|
|
// If we compressed a block to _lz4_scratch, but we still haven't copied it all to `out`,
|
|
// we copy as much as possible to `out`.
|
|
size_t n = std::min(_scratch_end - _scratch_beg, out->size - out->pos);
|
|
std::memcpy(static_cast<char*>(out->dst) + out->pos, _lz4_scratch.data() + _scratch_beg, n);
|
|
_scratch_beg += n;
|
|
out->pos += n;
|
|
}
|
|
if (_scratch_beg == _scratch_end && in->size == in->pos && end == ZSTD_e_end) {
|
|
// If we have no more data in `_lz4_scratch`, there is no more data in `in`, and `end`
|
|
// says that this is the last block in the stream, we reset the stream state.
|
|
resetFast();
|
|
// Note that the we currently don't emit anything to mark end of stream.
|
|
// We rely on the outer layer to notify the decompressor about it manually.
|
|
// This is inconsistent with what zstd's API does. We could fix that for elegance points.
|
|
};
|
|
// Return a non-zero value if there is still some data lingering in the compressor's
|
|
// internal buffers (`_lz4_scratch`), which has to be copied out.
|
|
// This will prompt the caller to call us again, even if `in` is empty.
|
|
return _scratch_beg != _scratch_end;
|
|
}
|
|
// The passed dict must live until it is unset by another set_dict(). (Or until the compressor is destroyed).
|
|
// The pointer can be null, this will unset the current dict.
|
|
void lz4_cstream::set_dict(const LZ4_stream_t* dict) {
|
|
_dict = dict;
|
|
resetFast();
|
|
}
|
|
|
|
lz4_dstream::lz4_dstream(size_t window_size)
|
|
: _buf(window_size)
|
|
{}
|
|
|
|
void lz4_dstream::reset() noexcept {
|
|
_scratch_pos = _buf_end = _buf_beg = 0;
|
|
LZ4_setStreamDecode(&_ctx, reinterpret_cast<const char*>(_dict.size() ? _dict.data() : nullptr), _dict.size());
|
|
}
|
|
|
|
void lz4_dstream::decompress(ZSTD_outBuffer* out, ZSTD_inBuffer* in, bool end_of_frame) {
|
|
if (_buf_beg == _buf_end) {
|
|
// If we have no decompressed data that wasn't yet output,
|
|
// we will decompress a new block.
|
|
|
|
if (_buf_end == _buf.size()) {
|
|
// The ring buffer wraps around here.
|
|
_buf_end = _buf_beg = 0;
|
|
}
|
|
// First, we have to ingest the first few bytes of input data, which contain
|
|
// the post-compression size of the following block.
|
|
size_t header_size = 1;
|
|
if (_scratch_pos < header_size) {
|
|
size_t n = std::min(header_size - _scratch_pos, in->size - in->pos);
|
|
std::memcpy(_lz4_scratch.data() + _scratch_pos, static_cast<const char*>(in->src) + in->pos, n);
|
|
_scratch_pos += n;
|
|
in->pos += n;
|
|
}
|
|
if (_scratch_pos >= header_size) {
|
|
header_size = varint_length(static_cast<uint8_t>(_lz4_scratch[0]));
|
|
}
|
|
if (_scratch_pos < header_size) {
|
|
size_t n = std::min(header_size - _scratch_pos, in->size - in->pos);
|
|
std::memcpy(_lz4_scratch.data() + _scratch_pos, static_cast<const char*>(in->src) + in->pos, n);
|
|
_scratch_pos += n;
|
|
in->pos += n;
|
|
}
|
|
// If we know the first header_size bytes, we can read the compressed size and ingest that many bytes,
|
|
// so we have a full compressed block in contiguous memory.
|
|
if (_scratch_pos >= header_size) {
|
|
auto x = varint_decode(seastar::read_le<uint32_t>(_lz4_scratch.data()));
|
|
if (x + header_size > _lz4_scratch.size()) {
|
|
throw std::runtime_error(fmt::format("Oversized LZ4 block: {} bytes.", x + header_size));
|
|
}
|
|
size_t n = std::min(x + header_size - _scratch_pos, in->size - in->pos);
|
|
std::memcpy(_lz4_scratch.data() + _scratch_pos, static_cast<const char*>(in->src) + in->pos, n);
|
|
_scratch_pos += n;
|
|
in->pos += n;
|
|
if (_scratch_pos == x + header_size) {
|
|
// Now the full compressed block is in contiguous memory.
|
|
// We can decompress it to `_buf` and clear `_lz4_scratch`.
|
|
size_t n_buf = _buf.size() - _buf_end;
|
|
int ret = LZ4_decompress_safe_continue(&_ctx, _lz4_scratch.data() + header_size, _buf.data() + _buf_end, x, n_buf);
|
|
if (ret < 0) {
|
|
throw std::runtime_error(fmt::format("LZ4_decompress_safe_continue failed with negative return value {}. This indicates a corruption of the RPC connection.", ret));
|
|
}
|
|
_buf_end += ret;
|
|
_scratch_pos = 0;
|
|
}
|
|
}
|
|
}
|
|
if (_buf_end > _buf_beg) {
|
|
// If we have some decompressed data that wasn't yet output,
|
|
// we output as much as possible.
|
|
size_t n = std::min(_buf_end - _buf_beg, out->size - out->pos);
|
|
std::memcpy(static_cast<char*>(out->dst) + out->pos, _buf.data() + _buf_beg, n);
|
|
out->pos += n;
|
|
_buf_beg += n;
|
|
}
|
|
if (end_of_frame && _buf_beg == _buf_end && in->size == in->pos) {
|
|
// If there is no data in internal buffers to be output, there is no more data in `in`,
|
|
// and `in` was the last block in the stream, we reset the stream state.
|
|
if (_scratch_pos != 0) {
|
|
throw std::runtime_error(fmt::format("Truncated LZ4 frame."));
|
|
}
|
|
reset();
|
|
}
|
|
// Sanity check.
|
|
assert(_buf_end >= _buf_beg);
|
|
}
|
|
// The passed dict must live until it is unset by another set_dict(). (Or until the decompressor is destroyed).
|
|
// The span can be empty, this will unset the current dict.
|
|
void lz4_dstream::set_dict(std::span<const std::byte> dict) {
|
|
_dict = dict;
|
|
reset();
|
|
}
|
|
|
|
rpc::snd_buf compress_impl(size_t head_space, const rpc::snd_buf& data, stream_compressor& compressor, bool end_of_frame, size_t chunk_size) try {
|
|
const auto size = data.size;
|
|
|
|
auto src = std::get_if<temporary_buffer<char>>(&data.bufs);
|
|
if (!src) {
|
|
src = std::get<std::vector<temporary_buffer<char>>>(data.bufs).data();
|
|
}
|
|
|
|
size_t size_left = size;
|
|
size_t size_compressed = 0;
|
|
|
|
ZSTD_inBuffer inbuf = {};
|
|
ZSTD_outBuffer outbuf = {};
|
|
|
|
utils::small_vector<temporary_buffer<char>, 16> dst_buffers;
|
|
|
|
// Note: we always allocate chunk_size here, then we resize it to fit at the end.
|
|
// Maybe that's a waste of cycles, and we should allocate a buffer that's about as big
|
|
// as the input, and not shrink at the end?
|
|
dst_buffers.emplace_back(std::max<size_t>(head_space, chunk_size));
|
|
outbuf.dst = dst_buffers.back().get_write();
|
|
outbuf.size = dst_buffers.back().size();
|
|
outbuf.pos = head_space;
|
|
|
|
// Stream compressors can handle frames of size 0,
|
|
// but when a Zstd compressor is called multiple times (around 10, I think?)
|
|
// with an empty input, it tries to be helpful and emits a "no progress" error.
|
|
//
|
|
// To avoid that, we handle empty frames we special-case empty frames with the `if`
|
|
// below, and we just return an empty buffer as the result,
|
|
// without putting the empty message through the compressor.
|
|
if (size > 0) {
|
|
while (true) {
|
|
if (size_left && inbuf.pos == inbuf.size) {
|
|
size_left -= src->size();
|
|
inbuf.src = src->get();
|
|
inbuf.size = src->size();
|
|
inbuf.pos = 0;
|
|
++src;
|
|
continue;
|
|
}
|
|
if (outbuf.pos == outbuf.size) {
|
|
size_compressed += outbuf.pos;
|
|
dst_buffers.emplace_back(chunk_size);
|
|
outbuf.dst = dst_buffers.back().get_write();
|
|
outbuf.size = dst_buffers.back().size();
|
|
outbuf.pos = 0;
|
|
continue;
|
|
}
|
|
size_t ret = compressor.compress(&outbuf, &inbuf, size_left ? ZSTD_e_continue : (end_of_frame ? ZSTD_e_end : ZSTD_e_flush));
|
|
if (!size_left // No more input chunks.
|
|
&& inbuf.pos == inbuf.size // No more data in the last chunk.
|
|
&& ret == 0 // No data remaining in compressor's internal buffers.
|
|
) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
size_compressed += outbuf.pos;
|
|
dst_buffers.back().trim(outbuf.pos);
|
|
|
|
// In this routine, we always allocate fragments of size chunk_size, even
|
|
// if the message is tiny.
|
|
// Hence, at this point, dst_buffers contains up to (128 kiB) unused memory.
|
|
// When there are many concurrent RPC messages, this waste might add up to
|
|
// a considerable overhead.
|
|
// Let's pay some cycles to shrink the underlying allocation to fit, to
|
|
// avoid any problems with that unseen memory usage.
|
|
dst_buffers.back() = dst_buffers.back().clone();
|
|
|
|
if (dst_buffers.size() == 1) {
|
|
return rpc::snd_buf(std::move(dst_buffers.front()));
|
|
}
|
|
return rpc::snd_buf({std::make_move_iterator(dst_buffers.begin()), std::make_move_iterator(dst_buffers.end())}, size_compressed);
|
|
} catch (...) {
|
|
compressor.reset();
|
|
throw;
|
|
}
|
|
|
|
template <typename T>
|
|
requires std::same_as<T, rpc::rcv_buf> || std::same_as<T, rpc::snd_buf>
|
|
uint32_t crc_impl_generic(const T& data) noexcept {
|
|
auto size = data.size;
|
|
|
|
auto it = std::get_if<temporary_buffer<char>>(&data.bufs);
|
|
if (!it) {
|
|
it = std::get<std::vector<temporary_buffer<char>>>(data.bufs).data();
|
|
}
|
|
|
|
utils::crc32 crc;
|
|
while (size > 0) {
|
|
crc.process(reinterpret_cast<const uint8_t*>(it->get()), it->size());
|
|
size -= it->size();
|
|
++it;
|
|
}
|
|
return crc.get();
|
|
}
|
|
|
|
uint32_t crc_impl(const rpc::snd_buf& data) noexcept {
|
|
return crc_impl_generic(data);
|
|
}
|
|
|
|
uint32_t crc_impl(const rpc::rcv_buf& data) noexcept {
|
|
return crc_impl_generic(data);
|
|
}
|
|
|
|
rpc::rcv_buf decompress_impl(const seastar::rpc::rcv_buf& data, stream_decompressor& decompressor, bool end_of_frame, size_t chunk_size) try {
|
|
// As a special-case, if we take empty input, we immediately return an empty output,
|
|
// without going through the actual compressors.
|
|
//
|
|
// This is due to a quirk of Zstd, see the matching comment in `decompress_impl`
|
|
// for details.
|
|
if (data.size == 0) {
|
|
return rpc::rcv_buf();
|
|
}
|
|
|
|
size_t size_decompressed = 0;
|
|
|
|
size_t input_remaining = data.size;
|
|
auto it = std::get_if<temporary_buffer<char>>(&data.bufs);
|
|
if (!it) {
|
|
it = std::get<std::vector<temporary_buffer<char>>>(data.bufs).data();
|
|
}
|
|
|
|
utils::small_vector<temporary_buffer<char>, 16> dst_buffers;
|
|
|
|
ZSTD_inBuffer inbuf = {};
|
|
ZSTD_outBuffer outbuf = {};
|
|
while (true) {
|
|
if (input_remaining && inbuf.pos == inbuf.size) {
|
|
inbuf.src = it->get();
|
|
inbuf.size = it->size();
|
|
input_remaining -= it->size();
|
|
++it;
|
|
inbuf.pos = 0;
|
|
continue;
|
|
}
|
|
if (outbuf.pos == outbuf.size) {
|
|
size_decompressed += outbuf.pos;
|
|
dst_buffers.emplace_back(chunk_size);
|
|
outbuf.dst = dst_buffers.back().get_write();
|
|
outbuf.size = dst_buffers.back().size();
|
|
outbuf.pos = 0;
|
|
continue;
|
|
}
|
|
decompressor.decompress(&outbuf, &inbuf, end_of_frame && !input_remaining);
|
|
if (!input_remaining // No more input chunks.
|
|
&& inbuf.pos == inbuf.size // No more data in the last input chunk.
|
|
&& outbuf.pos < outbuf.size // No more data in decompressor's internal buffers.
|
|
) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
size_decompressed += outbuf.pos;
|
|
if (!dst_buffers.empty()) {
|
|
dst_buffers.back().trim(outbuf.pos);
|
|
// In this routine, we always allocate fragments of size chunk_size, even
|
|
// if the message is tiny.
|
|
// Hence, at this point, dst_buffers contains up to (128 kiB) unused memory.
|
|
// When there are many concurrent RPC messages, this waste might add up to
|
|
// a considerable overhead.
|
|
// Let's pay some cycles to shrink the underlying allocation to fit, to
|
|
// avoid any problems with that unseen memory usage.
|
|
|
|
// We could avoid the CPU cost if we prepended the decompressed size to
|
|
// the message (thus growing the messages by 1-2 bytes) and used that to
|
|
// allocate exactly the right buffer.
|
|
// But I don't know if it's worth the 2 bytes.
|
|
dst_buffers.back() = dst_buffers.back().clone();
|
|
}
|
|
if (dst_buffers.size() == 1) {
|
|
return rpc::rcv_buf(std::move(dst_buffers.front()));
|
|
}
|
|
return rpc::rcv_buf({std::make_move_iterator(dst_buffers.begin()), std::make_move_iterator(dst_buffers.end())}, size_decompressed);
|
|
} catch (...) {
|
|
decompressor.reset();
|
|
throw;
|
|
}
|
|
|
|
} // namespace netw
|