Add ZStandard compression

This adds the option to compress sstables using the Zstandard algorithm
(https://facebook.github.io/zstd/).
To use, pass 'sstable_compression': 'org.apache.cassandra.io.compress.ZstdCompressor'
to the 'compression' argument when creating a table.
You can also specify a 'compression_level'. See Zstd documentation for the available
compression levels.
Resolves #2613.

Signed-off-by: Kamil Braun <kbraun@scylladb.com>
This commit is contained in:
Kamil Braun
2019-07-22 15:00:43 +02:00
parent 7a61bcb021
commit f14e6e73bb
6 changed files with 203 additions and 3 deletions

3
.gitmodules vendored
View File

@@ -12,3 +12,6 @@
[submodule "libdeflate"]
path = libdeflate
url = ../libdeflate
[submodule "zstd"]
path = zstd
url = ../zstd

View File

@@ -495,6 +495,7 @@ scylla_core = (['database.cc',
'keys.cc',
'counters.cc',
'compress.cc',
'zstd.cc',
'sstables/mp_row_consumer.cc',
'sstables/sstables.cc',
'sstables/sstables_manager.cc',
@@ -1088,6 +1089,7 @@ seastar_flags += ['--compiler', args.cxx, '--c-compiler', args.cc, '--cflags=%s'
'--c++-dialect=gnu++17', '--use-std-optional-variant-stringview=1', '--optflags=%s' % (modes['release']['cxx_ld_flags']), ]
libdeflate_cflags = seastar_cflags
zstd_cflags = seastar_cflags + ' -Wno-implicit-fallthrough'
status = subprocess.call([args.python, './configure.py'] + seastar_flags, cwd='seastar')
@@ -1117,6 +1119,27 @@ for mode in build_modes:
modes[mode]['seastar_cflags'] = seastar_cflags
modes[mode]['seastar_libs'] = seastar_libs
MODE_TO_CMAKE_BUILD_TYPE = {'release' : 'RelWithDebInfo', 'debug' : 'Debug', 'dev' : 'Dev', 'sanitize' : 'Sanitize' }
# We need to use experimental features of the zstd library (to use our own allocators for the (de)compression context),
# which are available only when the library is linked statically.
def configure_zstd(build_dir, mode):
zstd_build_dir = os.path.join(build_dir, mode, 'zstd')
zstd_cmake_args = [
'-DCMAKE_BUILD_TYPE={}'.format(MODE_TO_CMAKE_BUILD_TYPE[mode]),
'-DCMAKE_C_COMPILER={}'.format(args.cc),
'-DCMAKE_CXX_COMPILER={}'.format(args.cxx),
'-DCMAKE_C_FLAGS={}'.format(zstd_cflags),
'-DZSTD_BUILD_PROGRAMS=OFF'
]
zstd_cmd = ['cmake', '-G', 'Ninja', os.path.relpath('zstd/build/cmake', zstd_build_dir)] + zstd_cmake_args
print(zstd_cmd)
os.makedirs(zstd_build_dir, exist_ok=True)
subprocess.check_call(zstd_cmd, shell=False, cwd=zstd_build_dir)
args.user_cflags += " " + pkg_config('jsoncpp', '--cflags')
args.user_cflags += ' -march=' + args.target
libs = ' '.join([maybe_static(args.staticyamlcpp, '-lyaml-cpp'), '-latomic', '-llz4', '-lz', '-lsnappy', pkg_config('jsoncpp', '--libs'),
@@ -1167,6 +1190,9 @@ if args.antlr3_exec:
else:
antlr3_exec = "antlr3"
for mode in build_modes:
configure_zstd(outdir, mode)
# configure.py may run automatically from an already-existing build.ninja.
# If the user interrupts configure.py in the middle, we need build.ninja
# to remain in a valid state. So we write our output to a temporary
@@ -1283,7 +1309,8 @@ with open(buildfile_tmp, 'w') as f:
f.write('build $builddir/{}/{}: ar.{} {}\n'.format(mode, binary, mode, str.join(' ', objs)))
else:
objs.extend(['$builddir/' + mode + '/' + artifact for artifact in [
'libdeflate/libdeflate.a'
'libdeflate/libdeflate.a',
'zstd/lib/libzstd.a',
]])
objs.append('$builddir/' + mode + '/gen/utils/gz/crc_combine_table.o')
if binary.startswith('tests/'):
@@ -1397,6 +1424,9 @@ with open(buildfile_tmp, 'w') as f:
f.write('rule libdeflate.{mode}\n'.format(**locals()))
f.write(' command = make -C libdeflate BUILD_DIR=../build/{mode}/libdeflate/ CFLAGS="{libdeflate_cflags}" CC={args.cc} ../build/{mode}/libdeflate//libdeflate.a\n'.format(**locals()))
f.write('build build/{mode}/libdeflate/libdeflate.a: libdeflate.{mode}\n'.format(**locals()))
f.write('build build/{mode}/zstd/lib/libzstd.a: ninja\n'.format(**locals()))
f.write(' subdir = build/{mode}/zstd\n'.format(**locals()))
f.write(' target = libzstd.a\n'.format(**locals()))
mode = 'dev' if 'dev' in modes else modes[0]
f.write('build checkheaders: phony || {}\n'.format(' '.join(['$builddir/{}/{}.o'.format(mode, hh) for hh in headers])))

30
licenses/zstd-license.txt Normal file
View File

@@ -0,0 +1,30 @@
BSD License
For Zstandard software
Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name Facebook nor the names of its contributors may be used to
endorse or promote products derived from this software without specific
prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@@ -33,8 +33,8 @@
// a "compression_metadata" object, which also contains additional information
// needed from decompression - such as the chunk size and compressor type.
//
// Cassandra supports three different compression algorithms for the chunks,
// LZ4, Snappy, and Deflate - the default (and therefore most important) is
// Cassandra supports four different compression algorithms for the chunks,
// LZ4, Snappy, Deflate, and Zstd - the default (and therefore most important) is
// LZ4. Each compressor is an implementation of the "compressor" class.
//
// Each compressed chunk is followed by a 4-byte checksum of the compressed

1
zstd Submodule

Submodule zstd added at ff304e9e65

136
zstd.cc Normal file
View File

@@ -0,0 +1,136 @@
/*
* Copyright (C) 2019 ScyllaDB
*/
/*
* This file is part of Scylla.
*
* Scylla is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Scylla is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Scylla. If not, see <http://www.gnu.org/licenses/>.
*/
#include <seastar/core/aligned_buffer.hh>
// We need to use experimental features of the zstd library (to allocate compression/decompression context),
// which are available only when the library is linked statically.
#define ZSTD_STATIC_LINKING_ONLY
#include "zstd/lib/zstd.h"
#include "compress.hh"
#include "utils/class_registrator.hh"
static const sstring COMPRESSION_LEVEL = "compression_level";
static const sstring COMPRESSOR_NAME = compressor::namespace_prefix + "ZstdCompressor";
class zstd_processor : public compressor {
int _compression_level = 3;
// Manages memory for the compression context.
std::unique_ptr<char[], free_deleter> _cctx_raw;
// Compression context. Observer of _cctx_raw.
ZSTD_CCtx* _cctx;
// Manages memory for the decompression context.
std::unique_ptr<char[], free_deleter> _dctx_raw;
// Decompression context. Observer of _dctx_raw.
ZSTD_DCtx* _dctx;
public:
zstd_processor(const opt_getter&);
size_t uncompress(const char* input, size_t input_len, char* output,
size_t output_len) const override;
size_t compress(const char* input, size_t input_len, char* output,
size_t output_len) const override;
size_t compress_max_size(size_t input_len) const override;
std::set<sstring> option_names() const override;
std::map<sstring, sstring> options() const override;
};
zstd_processor::zstd_processor(const opt_getter& opts)
: compressor(COMPRESSOR_NAME) {
auto level = opts(COMPRESSION_LEVEL);
if (level) {
try {
_compression_level = std::stoi(*level);
} catch (const std::exception& e) {
throw exceptions::syntax_exception(
format("Invalid integer value {} for {}", *level, COMPRESSION_LEVEL));
}
auto min_level = ZSTD_minCLevel();
auto max_level = ZSTD_maxCLevel();
if (min_level > _compression_level || _compression_level > max_level) {
throw exceptions::configuration_exception(
format("{} must be between {} and {}, got {}", COMPRESSION_LEVEL, min_level, max_level, _compression_level));
}
}
auto chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB);
if (!chunk_len_kb) {
chunk_len_kb = opts(compression_parameters::CHUNK_LENGTH_KB_ERR);
}
auto chunk_len = chunk_len_kb
// This parameter has already been validated.
? std::stoi(*chunk_len_kb) * 1024
: compression_parameters::DEFAULT_CHUNK_LENGTH;
// We assume that the uncompressed input length is always <= chunk_len.
auto cparams = ZSTD_getCParams(_compression_level, chunk_len, 0);
auto cctx_size = ZSTD_estimateCCtxSize_usingCParams(cparams);
// According to the ZSTD documentation, pointer to the context buffer must be 8-bytes aligned.
_cctx_raw = allocate_aligned_buffer<char>(cctx_size, 8);
_cctx = ZSTD_initStaticCCtx(_cctx_raw.get(), cctx_size);
if (!_cctx) {
throw std::runtime_error("Unable to initialize ZSTD compression context");
}
auto dctx_size = ZSTD_estimateDCtxSize();
_dctx_raw = allocate_aligned_buffer<char>(dctx_size, 8);
_dctx = ZSTD_initStaticDCtx(_dctx_raw.get(), dctx_size);
if (!_cctx) {
throw std::runtime_error("Unable to initialize ZSTD decompression context");
}
}
size_t zstd_processor::uncompress(const char* input, size_t input_len, char* output, size_t output_len) const {
auto ret = ZSTD_decompressDCtx(_dctx, output, output_len, input, input_len);
if (ZSTD_isError(ret)) {
throw std::runtime_error( format("ZSTD decompression failure: {}", ZSTD_getErrorName(ret)));
}
return ret;
}
size_t zstd_processor::compress(const char* input, size_t input_len, char* output, size_t output_len) const {
auto ret = ZSTD_compressCCtx(_cctx, output, output_len, input, input_len, _compression_level);
if (ZSTD_isError(ret)) {
throw std::runtime_error( format("ZSTD compression failure: {}", ZSTD_getErrorName(ret)));
}
return ret;
}
size_t zstd_processor::compress_max_size(size_t input_len) const {
return ZSTD_compressBound(input_len);
}
std::set<sstring> zstd_processor::option_names() const {
return {COMPRESSION_LEVEL};
}
std::map<sstring, sstring> zstd_processor::options() const {
return {{COMPRESSION_LEVEL, std::to_string(_compression_level)}};
}
static const class_registrator<compressor_ptr, zstd_processor, const compressor::opt_getter&>
registrator(COMPRESSOR_NAME);