Merge 'db/config: Add SSTable compression options for user tables' from Nikos Dragazis

ScyllaDB offers the `compression` DDL property for configuring compression per user table (compression algorithm and chunk size). If not specified, the default compression algorithm is the LZ4Compressor with a 4KiB chunk size. The same default applies to system tables as well.

This series introduces a new configuration option to allow customizing the default for user tables. It also adds some tests for the new functionality.

Fixes #25195.

Closes scylladb/scylladb#26003

* github.com:scylladb/scylladb:
  test/cluster: Add tests for invalid SSTable compression options
  test/boost: Add tests for SSTable compression config options
  main: Validate SSTable compression options from config
  db/config: Add SSTable compression options for user tables
  db/config: Prepare compression_parameters for config system
  compressor: Validate presence of sstable_compression in parameters
  compressor: Add missing space in exception message
This commit is contained in:
Avi Kivity
2025-09-28 20:23:22 +03:00
10 changed files with 355 additions and 3 deletions

View File

@@ -1577,6 +1577,7 @@ deps['test/boost/combined_tests'] += [
'test/boost/sessions_test.cc',
'test/boost/sstable_compaction_test.cc',
'test/boost/sstable_compressor_factory_test.cc',
'test/boost/sstable_compression_config_test.cc',
'test/boost/sstable_directory_test.cc',
'test/boost/sstable_set_test.cc',
'test/boost/statement_restrictions_test.cc',

View File

@@ -31,6 +31,8 @@
#include "db/config.hh"
#include "compaction/time_window_compaction_strategy.hh"
bool is_internal_keyspace(std::string_view name);
namespace cql3 {
namespace statements {
@@ -122,6 +124,10 @@ void create_table_statement::apply_properties_to(schema_builder& builder, const
addColumnMetadataFromAliases(cfmd, Collections.singletonList(valueAlias), defaultValidator, ColumnDefinition.Kind.COMPACT_VALUE);
#endif
if (!_properties->get_compression_options() && !is_internal_keyspace(keyspace())) {
builder.set_compressor_params(db.get_config().sstable_compression_user_table_options());
}
_properties->apply_to_builder(builder, _properties->make_schema_extensions(db.extensions()), db, keyspace());
}

View File

@@ -32,11 +32,15 @@
#include "db/tags/extension.hh"
#include "config.hh"
#include "extensions.hh"
#include "sstables/compressor.hh"
#include "utils/log.hh"
#include "service/tablet_allocator_fwd.hh"
#include "utils/config_file_impl.hh"
#include "exceptions/exceptions.hh"
#include <seastar/core/metrics_api.hh>
#include <seastar/core/relabel_config.hh>
static logging::logger cfglogger("config");
#include <seastar/util/file.hh>
namespace utils {
@@ -117,6 +121,12 @@ error_injection_list_to_json(const std::vector<db::config::error_injection_at_st
return value_to_json("error_injection_list");
}
static
json::json_return_type
compression_parameters_to_json(const compression_parameters& cp) {
return value_to_json(cp.get_options());
}
template <>
bool
config_from_string(std::string_view value) {
@@ -306,6 +316,12 @@ const config_type& config_type_for<db::config::UUID>() {
return ct;
}
template <>
const config_type& config_type_for<compression_parameters>() {
static config_type ct("compression parameters", compression_parameters_to_json);
return ct;
}
}
namespace YAML {
@@ -519,6 +535,34 @@ struct convert<utils::UUID> {
}
};
template<>
struct convert<compression_parameters> {
static bool decode(const Node& node, compression_parameters& cp) {
if (!node.IsMap()) {
return false;
}
std::map<sstring, sstring> options;
for (const auto& kv : node) {
options[kv.first.as<sstring>()] = kv.second.as<sstring>();
}
try {
cp = compression_parameters(options);
return true;
} catch (const exceptions::syntax_exception& e) {
cfglogger.error("Invalid compression parameters syntax: {}", e.what());
return false;
} catch (const exceptions::configuration_exception& e) {
cfglogger.error("Invalid compression parameters configuration: {}", e.what());
return false;
} catch (const std::runtime_error& e) {
cfglogger.error("Error parsing compression parameters: {}", e.what());
return false;
}
}
};
}
#if defined(DEBUG)
@@ -1274,6 +1318,14 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, enable_sstables_mc_format(this, "enable_sstables_mc_format", value_status::Unused, true, "Enable SSTables 'mc' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, enable_sstables_md_format(this, "enable_sstables_md_format", value_status::Unused, true, "Enable SSTables 'md' format to be used as the default file format. Deprecated, please use \"sstable_format\" instead.")
, sstable_format(this, "sstable_format", liveness::LiveUpdate, value_status::Used, "me", "Default sstable file format", {"md", "me"})
, sstable_compression_user_table_options(this, "sstable_compression_user_table_options", value_status::Used, compression_parameters{},
"Server-global user table compression options. If enabled, all user tables"
"will be compressed using the provided options, unless overridden"
"by compression options in the table schema. The available options are:\n"
"* sstable_compression: The compression algorithm to use. Supported values: LZ4Compressor (default), LZ4WithDictsCompressor, SnappyCompressor, DeflateCompressor, ZstdCompressor, ZstdWithDictsCompressor, '' (empty string; disables compression).\n"
"* chunk_length_in_kb: (Default: 4) The size of chunks to compress in kilobytes. Allowed values are powers of two between 1 and 128.\n"
"* crc_check_chance: (Default: 1.0) Not implemented (option value is ignored).\n"
"* compression_level: (Default: 3) Compression level for ZstdCompressor and ZstdWithDictsCompressor. Higher levels provide better compression ratios at the cost of speed. Allowed values are integers between 1 and 22.")
, sstable_compression_dictionaries_allow_in_ddl(this, "sstable_compression_dictionaries_allow_in_ddl", liveness::LiveUpdate, value_status::Used, true,
"Allows for configuring tables to use SSTable compression with shared dictionaries. "
"If the option is disabled, Scylla will reject CREATE and ALTER statements which try to set dictionary-based sstable compressors.\n"

View File

@@ -26,6 +26,7 @@
#include "utils/dict_trainer.hh"
#include "utils/advanced_rpc_compressor.hh"
#include "db/tri_mode_restriction.hh"
#include "sstables/compressor.hh"
namespace boost::program_options {
@@ -431,6 +432,7 @@ public:
named_value<bool> enable_sstables_mc_format;
named_value<bool> enable_sstables_md_format;
named_value<sstring> sstable_format;
named_value<compression_parameters> sstable_compression_user_table_options;
named_value<bool> sstable_compression_dictionaries_allow_in_ddl;
named_value<bool> sstable_compression_dictionaries_enable_writing;
named_value<float> sstable_compression_dictionaries_memory_budget_fraction;

14
main.cc
View File

@@ -2208,6 +2208,20 @@ sharded<locator::shared_token_metadata> token_metadata;
startlog.info("Verifying that all of the keyspaces are RF-rack-valid");
db.local().check_rf_rack_validity(cfg->rf_rack_valid_keyspaces(), token_metadata.local().get());
// Semantic validation of sstable compression parameters from config.
// Adding here (i.e., after `join_cluster`) to ensure that the
// required SSTABLE_COMPRESSION_DICTS cluster feature has been negotiated.
try {
const auto& dicts_feature_enabled = feature_service.local().sstable_compression_dicts;
const auto& dicts_usage_allowed = cfg->sstable_compression_dictionaries_allow_in_ddl();
cfg->sstable_compression_user_table_options().validate(
compression_parameters::dicts_feature_enabled(bool(dicts_feature_enabled)),
compression_parameters::dicts_usage_allowed(dicts_usage_allowed));
} catch (const std::exception& e) {
startlog.error("Invalid sstable_compression_user_table_options: {}", e.what());
throw bad_configuration_error();
}
dictionary_service dict_service(
dict_sampler,
sys_ks.local(),

View File

@@ -24,6 +24,7 @@
#include "sstables/sstable_compressor_factory.hh"
#include "compressor.hh"
#include "exceptions/exceptions.hh"
#include "utils/config_file_impl.hh"
#include "utils/class_registrator.hh"
#include "gms/feature_service.hh"
@@ -488,6 +489,8 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
if (auto v = get_option(SSTABLE_COMPRESSION)) {
_algorithm = name_to_algorithm(*v);
} else if (!options.empty()) {
throw exceptions::configuration_exception(seastar::format("Missing compression option '{}'", SSTABLE_COMPRESSION));
} else {
_algorithm = algorithm::none;
}
@@ -511,7 +514,7 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
try {
_crc_check_chance = std::stod(*v);
} catch (const std::exception& e) {
throw exceptions::syntax_exception(sstring("Invalid double value ") + *v + "for " + CRC_CHECK_CHANCE);
throw exceptions::syntax_exception(sstring("Invalid double value ") + *v + " for " + CRC_CHECK_CHANCE);
}
}
@@ -536,7 +539,7 @@ compression_parameters::compression_parameters(const std::map<sstring, sstring>&
}
}
void compression_parameters::validate(dicts_feature_enabled dicts_enabled, dicts_usage_allowed dicts_allowed) {
void compression_parameters::validate(dicts_feature_enabled dicts_enabled, dicts_usage_allowed dicts_allowed) const {
if (_algorithm == algorithm::zstd_with_dicts || _algorithm == algorithm::lz4_with_dicts) {
if (!dicts_enabled) {
throw std::runtime_error(std::format("sstable_compression {} can't be used before "
@@ -593,6 +596,13 @@ std::map<sstring, sstring> compression_parameters::get_options() const {
return opts;
}
std::istream& operator>>(std::istream& is, compression_parameters& cp) {
std::unordered_map<sstring, sstring> options_map;
is >> options_map;
cp = compression_parameters(options_map | std::ranges::to<std::map>());
return is;
}
lz4_processor::lz4_processor(cdict_ptr cdict, ddict_ptr ddict)
: _cdict(std::move(cdict))
, _ddict(std::move(ddict))

View File

@@ -107,7 +107,7 @@ public:
using dicts_feature_enabled = bool_class<struct dicts_feature_enabled_tag>;
using dicts_usage_allowed = bool_class<struct dicts_usage_allowed_tag>;
void validate(dicts_feature_enabled, dicts_usage_allowed);
void validate(dicts_feature_enabled, dicts_usage_allowed) const;
std::map<sstring, sstring> get_options() const;
@@ -124,3 +124,13 @@ private:
static void validate_options(const std::map<sstring, sstring>&);
static algorithm name_to_algorithm(std::string_view name);
};
// Stream operator for boost::program_options support
std::istream& operator>>(std::istream& is, compression_parameters& cp);
template <>
struct fmt::formatter<compression_parameters> : fmt::formatter<std::string_view> {
auto format(const compression_parameters& cp, fmt::format_context& ctx) const -> decltype(ctx.out()) {
return fmt::format_to(ctx.out(), "{}", cp.get_options());
}
};

View File

@@ -358,6 +358,7 @@ add_scylla_test(combined_tests
sessions_test.cc
sstable_compaction_test.cc
sstable_compressor_factory_test.cc
sstable_compression_config_test.cc
sstable_directory_test.cc
sstable_set_test.cc
statement_restrictions_test.cc

View File

@@ -0,0 +1,148 @@
/*
* Copyright (C) 2025 ScyllaDB
*/
/*
* SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
*/
#include <boost/test/unit_test.hpp>
#include <seastar/core/format.hh>
#undef SEASTAR_TESTING_MAIN
#include <seastar/testing/test_case.hh>
#include "db/config.hh"
#include "test/lib/log.hh"
#include "test/lib/tmpdir.hh"
#include "test/lib/cql_test_env.hh"
#include "test/lib/cql_assertions.hh"
#include "transport/messages/result_message.hh"
BOOST_AUTO_TEST_SUITE(sstable_compression_config_test)
// Helper to retrieve the compression options of a table
static compression_parameters get_table_compression_options(cql_test_env& env, const sstring& keyspace, const sstring& table) {
auto query = seastar::format("SELECT compression FROM system_schema.tables WHERE keyspace_name='{}' AND table_name='{}'", keyspace, table);
auto result = cquery_nofail(env, query);
assert_that(result).is_rows().with_size(1);
auto rows = dynamic_pointer_cast<cql_transport::messages::result_message::rows>(result);
auto& row = rows->rs().result_set().rows()[0];
BOOST_REQUIRE_EQUAL(row.size(), 1);
auto& compression_bytes = row[0];
BOOST_REQUIRE(compression_bytes.has_value());
auto compression_map = partially_deserialize_map(managed_bytes_view(*compression_bytes));
std::map<sstring, sstring> compression_options;
for (const auto& [k, v] : compression_map) {
auto key = value_cast<sstring>(utf8_type->deserialize(k));
auto value = value_cast<sstring>(utf8_type->deserialize(v));
compression_options[key] = value;
}
return compression_parameters{compression_options};
}
// Test iteraction of SSTable compression configuration with CREATE TABLE and
// ALTER TABLE statements.
SEASTAR_TEST_CASE(test_compression_with_yaml_config) {
tmpdir tmp;
sstring alg = "SnappyCompressor";
sstring chunk_kb = "32";
auto cfg = seastar::make_shared<db::config>();
auto yaml = seastar::format(R"foo(
sstable_compression_user_table_options:
sstable_compression: {}
chunk_length_in_kb: {}
)foo", alg, chunk_kb);
cfg->read_from_yaml(yaml);
co_await do_with_cql_env_thread([&] (cql_test_env& env) {
testlog.info("Testing that CREATE TABLE inherits compression options from configuration");
compression_parameters expected_options = std::map<sstring, sstring>{{"sstable_compression", alg}, {"chunk_length_in_kb", chunk_kb}};
cquery_nofail(env, "CREATE TABLE ks.t1 (pk int PRIMARY KEY)");
BOOST_REQUIRE(expected_options == get_table_compression_options(env, "ks", "t1"));
testlog.info("Testing that ALTER TABLE's compression properties override config settings");
expected_options = std::map<sstring, sstring>{{"sstable_compression", "DeflateCompressor"}, {"chunk_length_in_kb", "64"}};
cquery_nofail(env, "ALTER TABLE ks.t1 WITH compression = {'sstable_compression': 'DeflateCompressor', 'chunk_length_in_kb': '64'}");
BOOST_REQUIRE(expected_options == get_table_compression_options(env, "ks", "t1"));
testlog.info("Testing that a table retains its compression properties after ALTER TABLE on a different property");
cquery_nofail(env, "ALTER TABLE ks.t1 WITH comment = 'Test comment'");
BOOST_REQUIRE(expected_options == get_table_compression_options(env, "ks", "t1"));
testlog.info("Testing that ALTER TABLE can disable compression despite config settings");
expected_options = compression_parameters::no_compression();
cquery_nofail(env, "ALTER TABLE ks.t1 WITH compression = {'sstable_compression': ''}");
BOOST_REQUIRE(expected_options == get_table_compression_options(env, "ks", "t1"));
testlog.info("Testing that CREATE TABLE's compression properties override config settings");
expected_options = std::map<sstring, sstring>{{"sstable_compression", "LZ4Compressor"}, {"chunk_length_in_kb", "128"}};
cquery_nofail(env, "CREATE TABLE ks.t2 (pk int PRIMARY KEY) WITH compression = {'sstable_compression': 'LZ4Compressor', 'chunk_length_in_kb': '128'}");
BOOST_REQUIRE(expected_options == get_table_compression_options(env, "ks", "t2"));
}, cfg);
}
// Test that syntax errors are properly detected.
// (based on sanity checks in the `compression_parameters` constructor)
//
// NOTE: Ideally, the following tests should be conducted with a full ScyllaDB
// instance, in Python. However, ScyllaDB currently does not exit on such
// errors; it only emits error logs and falls back to the defaults. That's
// because of the suppressive error handler passed from `scylla_main()` to `read_from_file()`.
// There is an open issue about this: https://github.com/scylladb/scylladb/issues/9469
SEASTAR_TEST_CASE(test_syntax_errors_in_yaml_config) {
auto cfg = seastar::make_shared<db::config>();
testlog.info("Testing non-empty compression options with absent sstable_compression");
BOOST_REQUIRE_THROW(cfg->read_from_yaml(R"foo(
sstable_compression_user_table_options:
chunk_length_in_kb: 4
)foo"), std::invalid_argument);
testlog.info("Testing invalid sstable_compression");
BOOST_REQUIRE_THROW(cfg->read_from_yaml(R"foo(
sstable_compression_user_table_options:
sstable_compression: InvalidCompressor
)foo"), std::invalid_argument);
testlog.info("Testing invalid chunk_length_in_kb");
BOOST_REQUIRE_THROW(cfg->read_from_yaml(R"foo(
sstable_compression_user_table_options:
sstable_compression: LZ4Compressor
chunk_length_in_kb: four
)foo"), std::invalid_argument);
testlog.info("Testing invalid crc_check_chance");
BOOST_REQUIRE_THROW(cfg->read_from_yaml(R"foo(
sstable_compression_user_table_options:
sstable_compression: LZ4Compressor
crc_check_chance: zero
)foo"), std::invalid_argument);
testlog.info("Testing invalid compression_level");
BOOST_REQUIRE_THROW(cfg->read_from_yaml(R"foo(
sstable_compression_user_table_options:
sstable_compression: ZstdCompressor
compression_level: ten
)foo"), std::invalid_argument);
testlog.info("Testing invalid option name");
BOOST_REQUIRE_THROW(cfg->read_from_yaml(R"foo(
sstable_compression_user_table_options:
sstable_compression: ZstdCompressor
invalid_option_name: foo
)foo"), std::invalid_argument);
testlog.info("Testing compression level with non-ZSTD algorithm");
BOOST_REQUIRE_THROW(cfg->read_from_yaml(R"foo(
sstable_compression_user_table_options:
sstable_compression: LZ4Compressor
compression_level: 10
)foo"), std::invalid_argument);
return make_ready_future<>();
}
BOOST_AUTO_TEST_SUITE_END()

View File

@@ -0,0 +1,108 @@
#
# Copyright (C) 2025-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
import pytest
import logging
from test.pylib.manager_client import ManagerClient
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def yaml_to_cmdline(config):
cmdline = []
for k, v in config.items():
if isinstance(v, dict):
v = ','.join([f'{kk}={vv}' for kk, vv in v.items()])
cmdline.append(f'--{k.replace("_", "-")}')
cmdline.append(str(v))
return cmdline
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_dict_compression_not_allowed(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_dictionaries_allow_in_ddl': False,
'sstable_compression_user_table_options': {
'sstable_compression': 'ZstdWithDictsCompressor',
'chunk_length_in_kb': 4,
'compression_level': 10
}
}
expected_error = 'Invalid sstable_compression_user_table_options: sstable_compression ZstdWithDictsCompressor has been disabled by `sstable_compression_dictionaries_allow_in_ddl: false`'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_negative(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': -1
}
}
expected_error = 'Invalid sstable_compression_user_table_options: Invalid negative or null for chunk_length_in_kb/chunk_length_kb'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_beyond_max(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': 256
}
}
expected_error = 'Invalid sstable_compression_user_table_options: chunk_length_in_kb/chunk_length_kb must be 128 or less.'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_chunk_size_not_power_of_two(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': 3
}
}
expected_error = 'Invalid sstable_compression_user_table_options: chunk_length_in_kb/chunk_length_kb must be a power of 2.'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)
@pytest.mark.asyncio
@pytest.mark.parametrize('cfg_source', ['yaml', 'cmdline'])
async def test_crc_check_chance_out_of_bounds(manager: ManagerClient, cfg_source: str):
config = {
'sstable_compression_user_table_options': {
'sstable_compression': 'LZ4Compressor',
'chunk_length_in_kb': 128,
'crc_check_chance': 1.1
}
}
expected_error = 'Invalid sstable_compression_user_table_options: crc_check_chance must be between 0.0 and 1.0.'
if cfg_source == 'yaml':
await manager.server_add(config=config, expected_error=expected_error)
else:
await manager.server_add(cmdline=yaml_to_cmdline(config), expected_error=expected_error)