Add ZStandard compression
This adds the option to compress sstables using the Zstandard algorithm (https://facebook.github.io/zstd/). To use, pass 'sstable_compression': 'org.apache.cassandra.io.compress.ZstdCompressor' to the 'compression' argument when creating a table. You can also specify a 'compression_level'. See Zstd documentation for the available compression levels. Resolves #2613. Signed-off-by: Kamil Braun <kbraun@scylladb.com>
This commit is contained in:
3
.gitmodules
vendored
3
.gitmodules
vendored
@@ -12,3 +12,6 @@
|
||||
[submodule "libdeflate"]
|
||||
path = libdeflate
|
||||
url = ../libdeflate
|
||||
[submodule "zstd"]
|
||||
path = zstd
|
||||
url = ../zstd
|
||||
|
||||
32
configure.py
32
configure.py
@@ -495,6 +495,7 @@ scylla_core = (['database.cc',
|
||||
'keys.cc',
|
||||
'counters.cc',
|
||||
'compress.cc',
|
||||
'zstd.cc',
|
||||
'sstables/mp_row_consumer.cc',
|
||||
'sstables/sstables.cc',
|
||||
'sstables/sstables_manager.cc',
|
||||
@@ -1088,6 +1089,7 @@ seastar_flags += ['--compiler', args.cxx, '--c-compiler', args.cc, '--cflags=%s'
|
||||
'--c++-dialect=gnu++17', '--use-std-optional-variant-stringview=1', '--optflags=%s' % (modes['release']['cxx_ld_flags']), ]
|
||||
|
||||
libdeflate_cflags = seastar_cflags
|
||||
zstd_cflags = seastar_cflags + ' -Wno-implicit-fallthrough'
|
||||
|
||||
status = subprocess.call([args.python, './configure.py'] + seastar_flags, cwd='seastar')
|
||||
|
||||
@@ -1117,6 +1119,27 @@ for mode in build_modes:
|
||||
modes[mode]['seastar_cflags'] = seastar_cflags
|
||||
modes[mode]['seastar_libs'] = seastar_libs
|
||||
|
||||
MODE_TO_CMAKE_BUILD_TYPE = {'release' : 'RelWithDebInfo', 'debug' : 'Debug', 'dev' : 'Dev', 'sanitize' : 'Sanitize' }
|
||||
|
||||
# We need to use experimental features of the zstd library (to use our own allocators for the (de)compression context),
|
||||
# which are available only when the library is linked statically.
|
||||
def configure_zstd(build_dir, mode):
|
||||
zstd_build_dir = os.path.join(build_dir, mode, 'zstd')
|
||||
|
||||
zstd_cmake_args = [
|
||||
'-DCMAKE_BUILD_TYPE={}'.format(MODE_TO_CMAKE_BUILD_TYPE[mode]),
|
||||
'-DCMAKE_C_COMPILER={}'.format(args.cc),
|
||||
'-DCMAKE_CXX_COMPILER={}'.format(args.cxx),
|
||||
'-DCMAKE_C_FLAGS={}'.format(zstd_cflags),
|
||||
'-DZSTD_BUILD_PROGRAMS=OFF'
|
||||
]
|
||||
|
||||
zstd_cmd = ['cmake', '-G', 'Ninja', os.path.relpath('zstd/build/cmake', zstd_build_dir)] + zstd_cmake_args
|
||||
|
||||
print(zstd_cmd)
|
||||
os.makedirs(zstd_build_dir, exist_ok=True)
|
||||
subprocess.check_call(zstd_cmd, shell=False, cwd=zstd_build_dir)
|
||||
|
||||
args.user_cflags += " " + pkg_config('jsoncpp', '--cflags')
|
||||
args.user_cflags += ' -march=' + args.target
|
||||
libs = ' '.join([maybe_static(args.staticyamlcpp, '-lyaml-cpp'), '-latomic', '-llz4', '-lz', '-lsnappy', pkg_config('jsoncpp', '--libs'),
|
||||
@@ -1167,6 +1190,9 @@ if args.antlr3_exec:
|
||||
else:
|
||||
antlr3_exec = "antlr3"
|
||||
|
||||
for mode in build_modes:
|
||||
configure_zstd(outdir, mode)
|
||||
|
||||
# configure.py may run automatically from an already-existing build.ninja.
|
||||
# If the user interrupts configure.py in the middle, we need build.ninja
|
||||
# to remain in a valid state. So we write our output to a temporary
|
||||
@@ -1283,7 +1309,8 @@ with open(buildfile_tmp, 'w') as f:
|
||||
f.write('build $builddir/{}/{}: ar.{} {}\n'.format(mode, binary, mode, str.join(' ', objs)))
|
||||
else:
|
||||
objs.extend(['$builddir/' + mode + '/' + artifact for artifact in [
|
||||
'libdeflate/libdeflate.a'
|
||||
'libdeflate/libdeflate.a',
|
||||
'zstd/lib/libzstd.a',
|
||||
]])
|
||||
objs.append('$builddir/' + mode + '/gen/utils/gz/crc_combine_table.o')
|
||||
if binary.startswith('tests/'):
|
||||
@@ -1397,6 +1424,9 @@ with open(buildfile_tmp, 'w') as f:
|
||||
f.write('rule libdeflate.{mode}\n'.format(**locals()))
|
||||
f.write(' command = make -C libdeflate BUILD_DIR=../build/{mode}/libdeflate/ CFLAGS="{libdeflate_cflags}" CC={args.cc} ../build/{mode}/libdeflate//libdeflate.a\n'.format(**locals()))
|
||||
f.write('build build/{mode}/libdeflate/libdeflate.a: libdeflate.{mode}\n'.format(**locals()))
|
||||
f.write('build build/{mode}/zstd/lib/libzstd.a: ninja\n'.format(**locals()))
|
||||
f.write(' subdir = build/{mode}/zstd\n'.format(**locals()))
|
||||
f.write(' target = libzstd.a\n'.format(**locals()))
|
||||
|
||||
mode = 'dev' if 'dev' in modes else modes[0]
|
||||
f.write('build checkheaders: phony || {}\n'.format(' '.join(['$builddir/{}/{}.o'.format(mode, hh) for hh in headers])))
|
||||
|
||||
30
licenses/zstd-license.txt
Normal file
30
licenses/zstd-license.txt
Normal file
@@ -0,0 +1,30 @@
|
||||
BSD License
|
||||
|
||||
For Zstandard software
|
||||
|
||||
Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without modification,
|
||||
are permitted provided that the following conditions are met:
|
||||
|
||||
* Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
|
||||
* Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
* Neither the name Facebook nor the names of its contributors may be used to
|
||||
endorse or promote products derived from this software without specific
|
||||
prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
|
||||
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
|
||||
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
@@ -33,8 +33,8 @@
|
||||
// a "compression_metadata" object, which also contains additional information
|
||||
// needed from decompression - such as the chunk size and compressor type.
|
||||
//
|
||||
// Cassandra supports three different compression algorithms for the chunks,
|
||||
// LZ4, Snappy, and Deflate - the default (and therefore most important) is
|
||||
// Cassandra supports four different compression algorithms for the chunks,
|
||||
// LZ4, Snappy, Deflate, and Zstd - the default (and therefore most important) is
|
||||
// LZ4. Each compressor is an implementation of the "compressor" class.
|
||||
//
|
||||
// Each compressed chunk is followed by a 4-byte checksum of the compressed
|
||||
|
||||
1
zstd
Submodule
1
zstd
Submodule
Submodule zstd added at ff304e9e65
136
zstd.cc
Normal file
136
zstd.cc
Normal file
@@ -0,0 +1,136 @@
|
||||
/*
|
||||
* Copyright (C) 2019 ScyllaDB
|
||||
*/
|
||||
|
||||
/*
|
||||
* This file is part of Scylla.
|
||||
*
|
||||
* Scylla is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* Scylla is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <seastar/core/aligned_buffer.hh>
|
||||
|
||||
// We need to use experimental features of the zstd library (to allocate compression/decompression context),
|
||||
// which are available only when the library is linked statically.
|
||||
#define ZSTD_STATIC_LINKING_ONLY
|
||||
#include "zstd/lib/zstd.h"
|
||||
|
||||
#include "compress.hh"
|
||||
#include "utils/class_registrator.hh"
|
||||
|
||||
static const sstring COMPRESSION_LEVEL = "compression_level";
|
||||
static const sstring COMPRESSOR_NAME = compressor::namespace_prefix + "ZstdCompressor";
|
||||
|
||||
class zstd_processor : public compressor {
|
||||
int _compression_level = 3;
|
||||
|
||||
// Manages memory for the compression context.
|
||||
std::unique_ptr<char[], free_deleter> _cctx_raw;
|
||||
// Compression context. Observer of _cctx_raw.
|
||||
ZSTD_CCtx* _cctx;
|
||||
|
||||
// Manages memory for the decompression context.
|
||||
std::unique_ptr<char[], free_deleter> _dctx_raw;
|
||||
// Decompression context. Observer of _dctx_raw.
|
||||
ZSTD_DCtx* _dctx;
|
||||
public:
|
||||
zstd_processor(const opt_getter&);
|
||||
|
||||
size_t uncompress(const char* input, size_t input_len, char* output,
|
||||
size_t output_len) const override;
|
||||
size_t compress(const char* input, size_t input_len, char* output,
|
||||
size_t output_len) const override;
|
||||
size_t compress_max_size(size_t input_len) const override;
|
||||
|
||||
std::set<sstring> option_names() const override;
|
||||
std::map<sstring, sstring> options() const override;
|
||||
};
|
||||
|
||||
zstd_processor::zstd_processor(const opt_getter& opts)
|
||||
: compressor(COMPRESSOR_NAME) {
|
||||
auto level = opts(COMPRESSION_LEVEL);
|
||||
if (level) {
|
||||
try {
|
||||
_compression_level = std::stoi(*level);
|
||||
} catch (const std::exception& e) {
|
||||
throw exceptions::syntax_exception(
|
||||
format("Invalid integer value {} for {}", *level, COMPRESSION_LEVEL));
|
||||
}
|
||||
|
||||
auto min_level = ZSTD_minCLevel();
|
||||
auto max_level = ZSTD_maxCLevel();
|
||||
if (min_level > _compression_level || _compression_level > max_level) {
|
||||
throw exceptions::configuration_exception(
|
||||
format("{} must be between {} and {}, got {}", COMPRESSION_LEVEL, min_level, max_level, _compression_level));
|
||||
}
|
||||
}
|
||||
|
||||
auto chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB);
|
||||
if (!chunk_len_kb) {
|
||||
chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB_ERR);
|
||||
}
|
||||
auto chunk_len = chunk_len_kb
|
||||
// This parameter has already been validated.
|
||||
? std::stoi(*chunk_len_kb) * 1024
|
||||
: compression_parameters::DEFAULT_CHUNK_LENGTH;
|
||||
|
||||
// We assume that the uncompressed input length is always <= chunk_len.
|
||||
auto cparams = ZSTD_getCParams(_compression_level, chunk_len, 0);
|
||||
auto cctx_size = ZSTD_estimateCCtxSize_usingCParams(cparams);
|
||||
// According to the ZSTD documentation, pointer to the context buffer must be 8-bytes aligned.
|
||||
_cctx_raw = allocate_aligned_buffer<char>(cctx_size, 8);
|
||||
_cctx = ZSTD_initStaticCCtx(_cctx_raw.get(), cctx_size);
|
||||
if (!_cctx) {
|
||||
throw std::runtime_error("Unable to initialize ZSTD compression context");
|
||||
}
|
||||
|
||||
auto dctx_size = ZSTD_estimateDCtxSize();
|
||||
_dctx_raw = allocate_aligned_buffer<char>(dctx_size, 8);
|
||||
_dctx = ZSTD_initStaticDCtx(_dctx_raw.get(), dctx_size);
|
||||
if (!_cctx) {
|
||||
throw std::runtime_error("Unable to initialize ZSTD decompression context");
|
||||
}
|
||||
}
|
||||
|
||||
size_t zstd_processor::uncompress(const char* input, size_t input_len, char* output, size_t output_len) const {
|
||||
auto ret = ZSTD_decompressDCtx(_dctx, output, output_len, input, input_len);
|
||||
if (ZSTD_isError(ret)) {
|
||||
throw std::runtime_error( format("ZSTD decompression failure: {}", ZSTD_getErrorName(ret)));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
size_t zstd_processor::compress(const char* input, size_t input_len, char* output, size_t output_len) const {
|
||||
auto ret = ZSTD_compressCCtx(_cctx, output, output_len, input, input_len, _compression_level);
|
||||
if (ZSTD_isError(ret)) {
|
||||
throw std::runtime_error( format("ZSTD compression failure: {}", ZSTD_getErrorName(ret)));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
size_t zstd_processor::compress_max_size(size_t input_len) const {
|
||||
return ZSTD_compressBound(input_len);
|
||||
}
|
||||
|
||||
std::set<sstring> zstd_processor::option_names() const {
|
||||
return {COMPRESSION_LEVEL};
|
||||
}
|
||||
|
||||
std::map<sstring, sstring> zstd_processor::options() const {
|
||||
return {{COMPRESSION_LEVEL, std::to_string(_compression_level)}};
|
||||
}
|
||||
|
||||
static const class_registrator<compressor_ptr, zstd_processor, const compressor::opt_getter&>
|
||||
registrator(COMPRESSOR_NAME);
|
||||
Reference in New Issue
Block a user