Files
scylladb/zstd.cc
Kefu Chai 4a268362b9 compress: fix compressor initialization order by making namespace_prefix a function
Fixes a race condition where COMPRESSOR_NAME in zstd.cc could be
initialized before compressor::namespace_prefix due to undefined
global variable initialization order across translation units. This
was causing ZstdCompressor to be unregistered in release builds,
making it impossible to create tables with Zstd compression.

Replace the global namespace_prefix variable with a function that
returns the fully qualified compressor name. This ensures proper
initialization order and fixes the registration of the ZstdCompressor.

Fixes scylladb/scylladb#22444
Signed-off-by: Kefu Chai <kefu.chai@scylladb.com>

Closes scylladb/scylladb#22451
2025-01-26 13:43:02 +02:00

157 lines
6.3 KiB
C++

/*
* Copyright (C) 2019-present ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <seastar/core/aligned_buffer.hh>
// We need to use experimental features of the zstd library (to allocate compression/decompression context),
// which are available only when the library is linked statically.
#define ZSTD_STATIC_LINKING_ONLY
#include <zstd.h>
#include "compress.hh"
#include "exceptions/exceptions.hh"
#include "utils/class_registrator.hh"
#include "utils/reusable_buffer.hh"
#include <concepts>
static const sstring COMPRESSION_LEVEL = "compression_level";
static const sstring COMPRESSOR_NAME = compressor::make_name("ZstdCompressor");
static const size_t DCTX_SIZE = ZSTD_estimateDCtxSize();
class zstd_processor : public compressor {
int _compression_level = 3;
size_t _cctx_size;
static auto with_dctx(std::invocable<ZSTD_DCtx*> auto f) {
// The decompression context has a fixed size of ~128 KiB,
// so we don't bother ever resizing it the way we do with
// the compression context.
static thread_local std::unique_ptr<char[]> buf = std::invoke([&] {
auto ptr = std::unique_ptr<char[]>(new char[DCTX_SIZE]);
auto dctx = ZSTD_initStaticDCtx(ptr.get(), DCTX_SIZE);
if (!dctx) {
// Barring a bug, this should never happen.
throw std::runtime_error("Unable to initialize ZSTD decompression context");
}
return ptr;
});
return f(reinterpret_cast<ZSTD_DCtx*>(buf.get()));
}
static auto with_cctx(size_t cctx_size, std::invocable<ZSTD_CCtx*> auto f) {
// See the comments to reusable_buffer for a rationale of using it for compression.
static thread_local utils::reusable_buffer<lowres_clock> buf(std::chrono::seconds(600));
static thread_local size_t last_seen_reallocs = buf.reallocs();
auto guard = utils::reusable_buffer_guard(buf);
// Note that the compression context isn't initialized with a particular
// compression config, but only with a particular size. As long as
// it is big enough, we can reuse a context initialized by an
// unrelated instance of zstd_processor without reinitializing it.
//
// If the existing context isn't big enough, the reusable buffer will
// be resized by the next line, and the following `if` will notice that
// and reinitialize the context.
auto view = guard.get_temporary_buffer(cctx_size);
if (last_seen_reallocs != buf.reallocs()) {
// Either the buffer just grew because we requested a buffer bigger
// than its last capacity, or it was shrunk some time ago by a timer.
// Either way, the resize destroyed the contents of the buffer and
// we have to initialize the context anew.
auto cctx = ZSTD_initStaticCCtx(view.data(), buf.size());
if (!cctx) {
// Barring a bug, this should never happen.
throw std::runtime_error("Unable to initialize ZSTD compression context");
}
last_seen_reallocs = buf.reallocs();
}
return f(reinterpret_cast<ZSTD_CCtx*>(view.data()));
}
public:
zstd_processor(const opt_getter&);
size_t uncompress(const char* input, size_t input_len, char* output,
size_t output_len) const override;
size_t compress(const char* input, size_t input_len, char* output,
size_t output_len) const override;
size_t compress_max_size(size_t input_len) const override;
std::set<sstring> option_names() const override;
std::map<sstring, sstring> options() const override;
};
zstd_processor::zstd_processor(const opt_getter& opts)
: compressor(COMPRESSOR_NAME) {
auto level = opts(COMPRESSION_LEVEL);
if (level) {
try {
_compression_level = std::stoi(*level);
} catch (const std::exception& e) {
throw exceptions::syntax_exception(
format("Invalid integer value {} for {}", *level, COMPRESSION_LEVEL));
}
auto min_level = ZSTD_minCLevel();
auto max_level = ZSTD_maxCLevel();
if (min_level > _compression_level || _compression_level > max_level) {
throw exceptions::configuration_exception(
format("{} must be between {} and {}, got {}", COMPRESSION_LEVEL, min_level, max_level, _compression_level));
}
}
auto chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB);
if (!chunk_len_kb) {
chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB_ERR);
}
auto chunk_len = chunk_len_kb
// This parameter has already been validated.
? std::stoi(*chunk_len_kb) * 1024
: compression_parameters::DEFAULT_CHUNK_LENGTH;
// We assume that the uncompressed input length is always <= chunk_len.
auto cparams = ZSTD_getCParams(_compression_level, chunk_len, 0);
_cctx_size = ZSTD_estimateCCtxSize_usingCParams(cparams);
}
size_t zstd_processor::uncompress(const char* input, size_t input_len, char* output, size_t output_len) const {
auto ret = with_dctx([&] (ZSTD_DCtx* dctx) {
return ZSTD_decompressDCtx(dctx, output, output_len, input, input_len);
});
if (ZSTD_isError(ret)) {
throw std::runtime_error( format("ZSTD decompression failure: {}", ZSTD_getErrorName(ret)));
}
return ret;
}
size_t zstd_processor::compress(const char* input, size_t input_len, char* output, size_t output_len) const {
auto ret = with_cctx(_cctx_size, [&] (ZSTD_CCtx* cctx) {
return ZSTD_compressCCtx(cctx, output, output_len, input, input_len, _compression_level);
});
if (ZSTD_isError(ret)) {
throw std::runtime_error( format("ZSTD compression failure: {}", ZSTD_getErrorName(ret)));
}
return ret;
}
size_t zstd_processor::compress_max_size(size_t input_len) const {
return ZSTD_compressBound(input_len);
}
std::set<sstring> zstd_processor::option_names() const {
return {COMPRESSION_LEVEL};
}
std::map<sstring, sstring> zstd_processor::options() const {
return {{COMPRESSION_LEVEL, std::to_string(_compression_level)}};
}
static const class_registrator<compressor, zstd_processor, const compressor::opt_getter&>
registrator(COMPRESSOR_NAME);