Files
scylladb/zstd.cc
Michał Chojnowski f4ca94d13b compress.hh: switch compressor::name() from an instance member to a virtual call
Before this patch, `compressor` is designed to be a proper abstract
class, where the creator of a compressor doesn't even know
what he's creating -- he passes a name, and it gets turned into a
`compressor` behind a scenes.

But later, when creation of compressors will involve looking up
dictionaries, this abstraction will only get in the way.
So we give up on keeping `compressor` abstract, and instead of
using "opaque" names we turn to an explicit enum of possible compressor types.

The main point of this patch is to add the `algorithm` enum and the `algorithm_to_name()`
function. The rest of the patch switches the `compressor::name()` function
to use `algorithm_to_name()` instead of the passed-by-constructor
`compressor::_name`, to keep a single source of truth for the names.
2025-04-01 00:07:27 +02:00

160 lines
6.3 KiB
C++

/*
* Copyright (C) 2019-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/aligned_buffer.hh>
// We need to use experimental features of the zstd library (to allocate compression/decompression context),
// which are available only when the library is linked statically.
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
#include "compress.hh"
#include "exceptions/exceptions.hh"
#include "utils/class_registrator.hh"
#include "utils/reusable_buffer.hh"
#include <concepts>
static const sstring COMPRESSION_LEVEL = "compression_level";
static const size_t DCTX_SIZE = ZSTD_estimateDCtxSize();
class zstd_processor : public compressor {
int _compression_level = 3;
size_t _cctx_size;
static auto with_dctx(std::invocable<ZSTD_DCtx*> auto f) {
// The decompression context has a fixed size of ~128 KiB,
// so we don't bother ever resizing it the way we do with
// the compression context.
static thread_local std::unique_ptr<char[]> buf = std::invoke([&] {
auto ptr = std::unique_ptr<char[]>(new char[DCTX_SIZE]);
auto dctx = ZSTD_initStaticDCtx(ptr.get(), DCTX_SIZE);
if (!dctx) {
// Barring a bug, this should never happen.
throw std::runtime_error("Unable to initialize ZSTD decompression context");
}
return ptr;
});
return f(reinterpret_cast<ZSTD_DCtx*>(buf.get()));
}
static auto with_cctx(size_t cctx_size, std::invocable<ZSTD_CCtx*> auto f) {
// See the comments to reusable_buffer for a rationale of using it for compression.
static thread_local utils::reusable_buffer<lowres_clock> buf(std::chrono::seconds(600));
static thread_local size_t last_seen_reallocs = buf.reallocs();
auto guard = utils::reusable_buffer_guard(buf);
// Note that the compression context isn't initialized with a particular
// compression config, but only with a particular size. As long as
// it is big enough, we can reuse a context initialized by an
// unrelated instance of zstd_processor without reinitializing it.
//
// If the existing context isn't big enough, the reusable buffer will
// be resized by the next line, and the following `if` will notice that
// and reinitialize the context.
auto view = guard.get_temporary_buffer(cctx_size);
if (last_seen_reallocs != buf.reallocs()) {
// Either the buffer just grew because we requested a buffer bigger
// than its last capacity, or it was shrunk some time ago by a timer.
// Either way, the resize destroyed the contents of the buffer and
// we have to initialize the context anew.
auto cctx = ZSTD_initStaticCCtx(view.data(), buf.size());
if (!cctx) {
// Barring a bug, this should never happen.
throw std::runtime_error("Unable to initialize ZSTD compression context");
}
last_seen_reallocs = buf.reallocs();
}
return f(reinterpret_cast<ZSTD_CCtx*>(view.data()));
}
public:
zstd_processor(const opt_getter&);
size_t uncompress(const char* input, size_t input_len, char* output,
size_t output_len) const override;
size_t compress(const char* input, size_t input_len, char* output,
size_t output_len) const override;
size_t compress_max_size(size_t input_len) const override;
std::set<sstring> option_names() const override;
std::map<sstring, sstring> options() const override;
algorithm get_algorithm() const override;
};
zstd_processor::zstd_processor(const opt_getter& opts) {
auto level = opts(COMPRESSION_LEVEL);
if (level) {
try {
_compression_level = std::stoi(*level);
} catch (const std::exception& e) {
throw exceptions::syntax_exception(
format("Invalid integer value {} for {}", *level, COMPRESSION_LEVEL));
}
auto min_level = ZSTD_minCLevel();
auto max_level = ZSTD_maxCLevel();
if (min_level > _compression_level || _compression_level > max_level) {
throw exceptions::configuration_exception(
format("{} must be between {} and {}, got {}", COMPRESSION_LEVEL, min_level, max_level, _compression_level));
}
}
auto chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB);
if (!chunk_len_kb) {
chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB_ERR);
}
auto chunk_len = chunk_len_kb
// This parameter has already been validated.
? std::stoi(*chunk_len_kb) * 1024
: compression_parameters::DEFAULT_CHUNK_LENGTH;
// We assume that the uncompressed input length is always <= chunk_len.
auto cparams = ZSTD_getCParams(_compression_level, chunk_len, 0);
_cctx_size = ZSTD_estimateCCtxSize_usingCParams(cparams);
}
size_t zstd_processor::uncompress(const char* input, size_t input_len, char* output, size_t output_len) const {
auto ret = with_dctx([&] (ZSTD_DCtx* dctx) {
return ZSTD_decompressDCtx(dctx, output, output_len, input, input_len);
});
if (ZSTD_isError(ret)) {
throw std::runtime_error( format("ZSTD decompression failure: {}", ZSTD_getErrorName(ret)));
}
return ret;
}
size_t zstd_processor::compress(const char* input, size_t input_len, char* output, size_t output_len) const {
auto ret = with_cctx(_cctx_size, [&] (ZSTD_CCtx* cctx) {
return ZSTD_compressCCtx(cctx, output, output_len, input, input_len, _compression_level);
});
if (ZSTD_isError(ret)) {
throw std::runtime_error( format("ZSTD compression failure: {}", ZSTD_getErrorName(ret)));
}
return ret;
}
size_t zstd_processor::compress_max_size(size_t input_len) const {
return ZSTD_compressBound(input_len);
}
std::set<sstring> zstd_processor::option_names() const {
return {COMPRESSION_LEVEL};
}
std::map<sstring, sstring> zstd_processor::options() const {
return {{COMPRESSION_LEVEL, std::to_string(_compression_level)}};
}
auto zstd_processor::get_algorithm() const -> algorithm {
return algorithm::zstd;
}
static const class_registrator<compressor, zstd_processor, const compressor::opt_getter&>
registrator(sstring(compression_parameters::algorithm_to_name(compressor::algorithm::zstd)));